第6章 - Redis + Lua 脚本 ⭐
嗨,朋友!我是长安。
如果说上一章的 SpringBoot + Lua 是"锦上添花",那 Redis + Lua 就是刚需了!在分布式系统中,Redis Lua 脚本几乎是不可替代的——分布式锁、限流、原子操作,都离不开它。
🤔 为什么 Redis 需要 Lua?
Redis 是单线程的,单条命令是原子的。但多条命令组合就不是原子的了!
# 典型的并发问题:检查并扣减库存
GET stock # 读取库存 = 10
# ---- 其他线程也读取到 10 ----
DECRBY stock 1 # 扣减 → 9
# ---- 其他线程也扣减 → 9(应该是 8!)----
问题
两个请求同时读到库存为 10,各扣减 1,最终库存是 9 而不是 8。这就是竞态条件!
解决方案:Lua 脚本。Redis 会把整个 Lua 脚本作为一个原子操作执行,期间不会被其他命令打断。
🚀 Redis Lua 基础
在 Redis 中执行 Lua
# EVAL 命令
# EVAL "脚本" 键的数量 键1 键2 ... 参数1 参数2 ...
redis-cli EVAL "return 'Hello Lua!'" 0
# "Hello Lua!"
redis-cli EVAL "return 1 + 2" 0
# (integer) 3
# 使用 KEYS 和 ARGV 传递参数
redis-cli EVAL "return KEYS[1] .. ' = ' .. ARGV[1]" 1 mykey myvalue
# "mykey = myvalue"
KEYS 和 ARGV
| 变量 | 说明 | 对应参数 |
|---|---|---|
KEYS[1] | 第 1 个键名 | EVAL 后的键名参数 |
KEYS[2] | 第 2 个键名 | — |
ARGV[1] | 第 1 个额外参数 | 键名之后的参数 |
ARGV[2] | 第 2 个额外参数 | — |
注意
Lua 中数组索引从 1 开始!所以是 KEYS[1] 不是 KEYS[0]。
在 Lua 中调用 Redis 命令
-- redis.call() 调用 Redis 命令
redis.call("SET", "name", "长安")
local value = redis.call("GET", "name")
return value -- "长安"
-- redis.call vs redis.pcall
-- redis.call() 出错时会中断脚本
-- redis.pcall() 出错时返回错误信息,不中断
🔧 SpringBoot 中使用 Redis Lua
依赖配置
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
基本方式:直接执行脚本
@Service
public class RedisLuaService {
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 执行 Lua 脚本
*/
public Object executeLuaScript(String script, List<String> keys, Object... args) {
DefaultRedisScript<String> redisScript = new DefaultRedisScript<>();
redisScript.setScriptText(script);
redisScript.setResultType(String.class);
return redisTemplate.execute(redisScript, keys, args);
}
}
推荐方式:预加载脚本
@Configuration
public class RedisLuaConfig {
/**
* 库存扣减脚本
*/
@Bean
public DefaultRedisScript<Long> stockDeductScript() {
DefaultRedisScript<Long> script = new DefaultRedisScript<>();
script.setLocation(new ClassPathResource("scripts/stock_deduct.lua"));
script.setResultType(Long.class);
return script;
}
/**
* 限流脚本
*/
@Bean
public DefaultRedisScript<Long> rateLimitScript() {
DefaultRedisScript<Long> script = new DefaultRedisScript<>();
script.setLocation(new ClassPathResource("scripts/rate_limit.lua"));
script.setResultType(Long.class);
return script;
}
}
🌟 实战一:原子扣减库存
-- scripts/stock_deduct.lua
-- KEYS[1] = 库存 key
-- ARGV[1] = 扣减数量
local stock = tonumber(redis.call("GET", KEYS[1]))
if stock == nil then
return -1 -- 库存 key 不存在
end
local deduct = tonumber(ARGV[1])
if stock < deduct then
return -2 -- 库存不足
end
-- 扣减库存
redis.call("DECRBY", KEYS[1], deduct)
return stock - deduct -- 返回剩余库存
@Service
public class StockService {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private DefaultRedisScript<Long> stockDeductScript;
/**
* 原子扣减库存
* @return 剩余库存,-1 表示 key 不存在,-2 表示库存不足
*/
public long deductStock(String productId, int quantity) {
String key = "stock:" + productId;
Long result = redisTemplate.execute(
stockDeductScript,
Collections.singletonList(key),
String.valueOf(quantity)
);
return result != null ? result : -1;
}
}
// 使用示例
@RestController
@RequestMapping("/api/stock")
public class StockController {
@Autowired
private StockService stockService;
@PostMapping("/deduct")
public Map<String, Object> deduct(
@RequestParam String productId,
@RequestParam int quantity) {
long remaining = stockService.deductStock(productId, quantity);
Map<String, Object> result = new HashMap<>();
if (remaining == -1) {
result.put("success", false);
result.put("message", "商品不存在");
} else if (remaining == -2) {
result.put("success", false);
result.put("message", "库存不足");
} else {
result.put("success", true);
result.put("remaining", remaining);
}
return result;
}
}
🌟 实战二:分布式限流器
-- scripts/rate_limit.lua
-- 滑动窗口限流
-- KEYS[1] = 限流 key
-- ARGV[1] = 窗口大小(秒)
-- ARGV[2] = 最大请求数
-- ARGV[3] = 当前时间戳(毫秒)
local key = KEYS[1]
local window = tonumber(ARGV[1]) * 1000 -- 转为毫秒
local maxRequests = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
-- 移除窗口外的数据
redis.call("ZREMRANGEBYSCORE", key, 0, now - window)
-- 获取当前窗口内的请求数
local currentRequests = redis.call("ZCARD", key)
if currentRequests < maxRequests then
-- 未达到限流阈值,允许通过
redis.call("ZADD", key, now, now .. "-" .. math.random(1000000))
redis.call("EXPIRE", key, tonumber(ARGV[1]) + 1)
return 1 -- 允许
else
return 0 -- 拒绝
end
@Service
public class RateLimiter {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private DefaultRedisScript<Long> rateLimitScript;
/**
* 检查是否允许请求
* @param key 限流 key(如 "rate:user:123" 或 "rate:api:/order")
* @param windowSec 窗口大小(秒)
* @param maxReq 窗口内最大请求数
*/
public boolean isAllowed(String key, int windowSec, int maxReq) {
Long result = redisTemplate.execute(
rateLimitScript,
Collections.singletonList(key),
String.valueOf(windowSec),
String.valueOf(maxReq),
String.valueOf(System.currentTimeMillis())
);
return result != null && result == 1;
}
}
// 在接口中使用限流
@RestController
public class OrderController {
@Autowired
private RateLimiter rateLimiter;
@PostMapping("/api/order")
public String createOrder(@RequestParam String userId) {
// 每个用户每秒最多 5 次请求
String key = "rate:user:" + userId;
if (!rateLimiter.isAllowed(key, 1, 5)) {
return "请求太频繁,请稍后再试!";
}
// 正常处理业务...
return "下单成功";
}
}
🌟 实战三:分布式锁
-- scripts/distributed_lock.lua
-- 加锁脚本
-- KEYS[1] = 锁的 key
-- ARGV[1] = 锁的值(唯一标识,防止误删)
-- ARGV[2] = 过期时间(秒)
if redis.call("SET", KEYS[1], ARGV[1], "NX", "EX", tonumber(ARGV[2])) then
return 1 -- 加锁成功
else
return 0 -- 加锁失败
end
-- scripts/distributed_unlock.lua
-- 解锁脚本(只有持有者才能解锁)
-- KEYS[1] = 锁的 key
-- ARGV[1] = 锁的值(唯一标识)
if redis.call("GET", KEYS[1]) == ARGV[1] then
redis.call("DEL", KEYS[1])
return 1 -- 解锁成功
else
return 0 -- 不是自己的锁,不能解
end
@Service
public class DistributedLockService {
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 加锁
*/
public boolean lock(String lockKey, String requestId, int expireSeconds) {
String script = "if redis.call('SET',KEYS[1],ARGV[1],'NX','EX',tonumber(ARGV[2])) then return 1 else return 0 end";
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(script, Long.class);
Long result = redisTemplate.execute(
redisScript,
Collections.singletonList(lockKey),
requestId,
String.valueOf(expireSeconds)
);
return result != null && result == 1;
}
/**
* 解锁(原子操作:检查 + 删除)
*/
public boolean unlock(String lockKey, String requestId) {
String script = "if redis.call('GET',KEYS[1])==ARGV[1] then return redis.call('DEL',KEYS[1]) else return 0 end";
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(script, Long.class);
Long result = redisTemplate.execute(
redisScript,
Collections.singletonList(lockKey),
requestId
);
return result != null && result == 1;
}
}
// 使用分布式锁
String lockKey = "lock:order:" + orderId;
String requestId = UUID.randomUUID().toString();
if (lockService.lock(lockKey, requestId, 30)) {
try {
// 业务逻辑...
processOrder(orderId);
} finally {
lockService.unlock(lockKey, requestId);
}
} else {
System.out.println("获取锁失败,请稍后重试");
}
📋 Redis Lua 注意事项
重要限制
- 不要写太长的脚本 — Redis 是单线程的,Lua 执行期间会阻塞其他命令
- 不要使用随机/时间函数 —
math.random()、os.time()在 Redis 集群中可能不一致 - KEYS 必须显式传入 — 不要在脚本中硬编码 key,否则 Redis 集群无法正确路由
- 脚本要幂等 — 网络超时重试时,脚本可能被执行多次
📝 小结
- Redis Lua 脚本保证原子性,解决并发竞态问题
- 核心命令:
EVAL "脚本" 键数量 KEY... ARGV... - 在脚本中用
redis.call()调用 Redis 命令 - SpringBoot 中用
DefaultRedisScript+RedisTemplate.execute()执行 - 三大实战场景:库存扣减、分布式限流、分布式锁
- 脚本要短小精悍,避免阻塞 Redis
➡️ 下一步
Redis + Lua 掌握了!下一章来了解 OpenResty 入门,看看 Nginx + Lua 的高性能网关方案。
💪 练习题
- 写一个 Redis Lua 脚本,实现"如果 key 不存在就设置值并设置过期时间"。
- 实现一个基于 Redis Lua 的计数器,每次调用 +1,超过阈值返回失败。
- 为什么 Redis 分布式锁的解锁必须用 Lua 脚本?
- Redis Lua 脚本中为什么不能硬编码 key?
答案提示
if not redis.call('EXISTS',KEYS[1]) then redis.call('SET',KEYS[1],ARGV[1]) redis.call('EXPIRE',KEYS[1],ARGV[2]) return 1 else return 0 endlocal c=redis.call('INCR',KEYS[1]) if c==1 then redis.call('EXPIRE',KEYS[1],ARGV[2]) end if c>tonumber(ARGV[1]) then return 0 end return 1- 因为"检查锁的持有者"和"删除锁"必须是原子操作,否则可能误删别人的锁
- Redis 集群根据 KEYS 参数做路由分片,硬编码 key 会导致路由失败
