背景
随着互联网应用从单体应用部署,到分布式部署,除了有好的方面外,随之而带来的问题也不断涌现。其中,最为重要的数据一致性问题,在分布式情况下并发对共享数据源进行写操作带来的数据
安全问题,都需要优先考虑在程序设计之中。
本次将从单体应用出发,到分布式集群部署,一步一步模拟在高并发程序下,不断涌现出的数据一致性问题,并对问题逐步解决的记录。
环境
首先使用 SpringBoot 搭建一个基础的 Web 服务
pom.xml
<dependencies>
<!-- Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
</dependencies>
application.yml
# 端口
server:
port: 8080
spring:
application:
name: redis-lock
# 缓存配置
redis:
host: localhost
port: 6379
database: 0
password:
无锁单节点
首先我们来看在单节点情况下,不使用锁进行资源保护,对订单接口进行并发访问带来对问题
@Slf4j
@RestController
@RequestMapping("/orders")
public class OrderController {
public static final String ORDER_STOCK_KEY = "order:stock";
@Autowired
private RedisTemplate<String, String> redisTemplate;
/**
* 创建订单,使用 Redis 模拟数据源
*
*
* @return
*/
@GetMapping("/createOrder")
public String unsafeDeductOrder() {log.info(" 开始创建订单...........");
String stockNum = redisTemplate.opsForValue().get(ORDER_STOCK_KEY);
if (StringUtils.isEmpty(stockNum)) {log.warn(" 订单创建失败,库存不足!");
return " 订单创建失败,库存不足!";
}
int stock = Integer.parseInt(stockNum);
if (stock <= 0) {log.warn(" 订单创建失败,库存不足!");
return " 订单创建失败,库存不足!";
}
log.info(" 扣减前库存数: {}", stock);
redisTemplate.opsForValue().set(ORDER_STOCK_KEY, String.valueOf(stock - 1));
log.info(" 扣减后库存数: {}", redisTemplate.opsForValue().get(ORDER_STOCK_KEY));
log.info(" 结束创建订单...........");
return " 当前库存数:" + redisTemplate.opsForValue().get(ORDER_STOCK_KEY);
}
}
初始化订单库存
redis> set order:stock 50
OK
redis> get order:stock
50
使用 Jmeter 模拟 200 并发,对创建订单接口进行访问,如果程序正常,应该在创建 50 个订单后,返回库存不足
查看结果,所有请求都成功了。但是我们可以看到日志中存在严重的重复扣减库存,这在真实环境是可怕的问题。同时查看最后一个请 求返回都结果,显示当前库存为 46 远比我们都预期差的多
2020-03-28 20:38:00.718 c.r.r.lock.controller.OrderController : 开始创建订单...........
2020-03-28 20:38:00.718 c.r.r.lock.controller.OrderController : 开始创建订单...........
2020-03-28 20:38:00.718 c.r.r.lock.controller.OrderController : 开始创建订单...........
2020-03-28 20:38:01.013 c.r.r.lock.controller.OrderController : 扣减前库存数: 50
2020-03-28 20:38:01.013 c.r.r.lock.controller.OrderController : 扣减前库存数: 50
2020-03-28 20:38:01.013 c.r.r.lock.controller.OrderController : 扣减前库存数: 50
...
...
2020-03-28 20:38:01.168 c.r.r.lock.controller.OrderController : 开始创建订单...........
2020-03-28 20:38:01.158 c.r.r.lock.controller.OrderController : 扣减后库存数: 49
2020-03-28 20:38:01.169 c.r.r.lock.controller.OrderController : 开始创建订单...........
2020-03-28 20:38:01.158 c.r.r.lock.controller.OrderController : 扣减前库存数: 49
2020-03-28 20:38:01.169 c.r.r.lock.controller.OrderController : 开始创建订单...........
2020-03-28 20:38:01.158 c.r.r.lock.controller.OrderController : 扣减前库存数: 49
Sync 锁单节点
在单体应用的情况下,我们可以通过使用锁的方式来避免上述问题。这里简单使用 synchronized 锁
@Slf4j
@RestController
@RequestMapping("/orders")
public class OrderController {
public static final String ORDER_STOCK_KEY = "order:stock";
@Autowired
private RedisTemplate<String, String> redisTemplate;
/**
* 创建订单,添加 synchronized 锁
*
* @return
*/
@GetMapping("/createOrder")
public synchronized String unsafeDeductOrder() {log.info(" 开始创建订单...........");
String stockNum = redisTemplate.opsForValue().get(ORDER_STOCK_KEY);
if (StringUtils.isEmpty(stockNum)) {log.warn(" 订单创建失败,库存不足!");
return " 订单创建失败,库存不足!";
}
int stock = Integer.parseInt(stockNum);
if (stock <= 0) {log.warn(" 订单创建失败,库存不足!");
return " 订单创建失败,库存不足!";
}
log.info(" 扣减前库存数: {}", stock);
redisTemplate.opsForValue().set(ORDER_STOCK_KEY, String.valueOf(stock - 1));
log.info(" 扣减后库存数: {}", redisTemplate.opsForValue().get(ORDER_STOCK_KEY));
log.info(" 结束创建订单...........");
return " 当前库存数:" + redisTemplate.opsForValue().get(ORDER_STOCK_KEY);
}
}
初始化订单库存
redis> set order:stock 50
OK
redis> get order:stock
50
使用 Jmeter 模拟 200 并发,对创建订单接口进行访问,如果程序正常,应该在创建 50 个订单后,返回库存不足
查看结果,所有请求都成功了。查看响应结果也正常,无重复扣减库存,在扣减了 50 个库存后,正常返回无库存
2020-03-28 20:26:56.227 c.r.r.lock.controller.OrderController : 开始创建订单...........
2020-03-28 20:26:56.461 c.r.r.lock.controller.OrderController : 扣减前库存数: 50
2020-03-28 20:26:56.466 c.r.r.lock.controller.OrderController : 扣减后库存数: 49
2020-03-28 20:26:56.467 c.r.r.lock.controller.OrderController : 结束创建订单...........
2020-03-28 20:26:56.469 c.r.r.lock.controller.OrderController : 开始创建订单...........
2020-03-28 20:26:56.471 c.r.r.lock.controller.OrderController : 扣减前库存数: 49
2020-03-28 20:26:56.476 c.r.r.lock.controller.OrderController : 扣减后库存数: 48
2020-03-28 20:26:56.476 c.r.r.lock.controller.OrderController : 结束创建订单...........
2020-03-28 20:26:56.478 c.r.r.lock.controller.OrderController : 开始创建订单...........
2020-03-28 20:26:56.480 c.r.r.lock.controller.OrderController : 扣减前库存数: 48
2020-03-28 20:26:56.483 c.r.r.lock.controller.OrderController : 扣减后库存数: 47
2020-03-28 20:26:56.484 c.r.r.lock.controller.OrderController : 结束创建订单...........
...
...
2020-03-28 20:26:56.831 c.r.r.lock.controller.OrderController : 开始创建订单...........
2020-03-28 20:26:56.833 c.r.r.lock.controller.OrderController : 扣减前库存数: 2
2020-03-28 20:26:56.835 c.r.r.lock.controller.OrderController : 扣减后库存数: 1
2020-03-28 20:26:56.836 c.r.r.lock.controller.OrderController : 结束创建订单...........
2020-03-28 20:26:56.837 c.r.r.lock.controller.OrderController : 开始创建订单...........
2020-03-28 20:26:56.839 c.r.r.lock.controller.OrderController : 扣减前库存数: 1
2020-03-28 20:26:56.842 c.r.r.lock.controller.OrderController : 扣减后库存数: 0
2020-03-28 20:26:56.842 c.r.r.lock.controller.OrderController : 结束创建订单...........
2020-03-28 20:26:56.843 c.r.r.lock.controller.OrderController : 开始创建订单...........
2020-03-28 20:26:56.845 c.r.r.lock.controller.OrderController : 订单创建失败,库存不足!
Sync 锁多节点
如上,在单节点部署情况下使用 synchronized 锁可以解决问题,那么我们再来看看多节点集群部署情况下,是否能正常运行
- 启动 8080 端口应用
# 端口
server:
port: 8080
- 启动 8081 端口应用
# 端口
server:
port: 8081
使用 Nginx 做负载,将流量分别打入两个应用
upstream backend {
server localhost:8080;
server localhost:8081;
}
server {
listen 80;
server_name localhost;
location / {proxy_pass http://backend;}
}
初始化订单库存
redis> set order:stock 50
OK
redis> get order:stock
50
使用 Jmeter 模拟 200 并发,访问 Nginx 入口,对两个节点进行负载均衡。如果程序正常,应该在创建 50 个订单后,返回库存不足
查看结果,所有请求都成功了。查看响应结发现存在重复扣减库存情况
- 8080 节点日志
2020-03-28 21:00:03.203 c.r.r.lock.controller.OrderController : 开始创建订单...........
2020-03-28 21:00:03.260 c.r.r.lock.controller.OrderController : 扣减前库存数: 50
2020-03-28 21:00:03.270 c.r.r.lock.controller.OrderController : 扣减后库存数: 49
2020-03-28 21:00:03.271 c.r.r.lock.controller.OrderController : 结束创建订单...........
...
...
2020-03-28 21:00:03.684 c.r.r.lock.controller.OrderController : 开始创建订单...........
2020-03-28 21:00:03.688 c.r.r.lock.controller.OrderController : 扣减前库存数: 26
2020-03-28 21:00:03.693 c.r.r.lock.controller.OrderController : 扣减后库存数: 25
2020-03-28 21:00:03.693 c.r.r.lock.controller.OrderController : 结束创建订单...........
...
2020-03-28 21:00:04.162 c.r.r.lock.controller.OrderController : 开始创建订单...........
2020-03-28 21:00:04.164 c.r.r.lock.controller.OrderController : 订单创建失败,库存不足!
- 8081 节点日志
2020-03-28 21:00:03.203 c.r.r.lock.controller.OrderController : 开始创建订单...........
2020-03-28 21:00:03.264 c.r.r.lock.controller.OrderController : 扣减前库存数: 50
2020-03-28 21:00:03.274 c.r.r.lock.controller.OrderController : 扣减后库存数: 49
2020-03-28 21:00:03.274 c.r.r.lock.controller.OrderController : 结束创建订单...........
...
...
2020-03-28 21:00:03.687 c.r.r.lock.controller.OrderController : 开始创建订单...........
2020-03-28 21:00:03.690 c.r.r.lock.controller.OrderController : 扣减前库存数: 26
2020-03-28 21:00:03.695 c.r.r.lock.controller.OrderController : 扣减后库存数: 25
2020-03-28 21:00:03.695 c.r.r.lock.controller.OrderController : 结束创建订单...........
...
2020-03-28 21:00:04.162 c.r.r.lock.controller.OrderController : 开始创建订单...........
2020-03-28 21:00:04.164 c.r.r.lock.controller.OrderController : 订单创建失败,库存不足!
Redis.setnx 多节点
我们可以看到,当在分布式情况下 synchronized 锁就无能为力了,因为集群中当每个节点都是在单独的 jvm 中运行的, 所以 synchronized 只能在当前 jvm 下保证并发安全。我们可以使用 redis 提供的 setnx 命令进行加锁,来保证集群情况下的并发安全
redis.setnx: 命令在指定的 key 不存在时,为 key 设置指定的值。如果指定的 key 存在,则不做任何操作。
@Slf4j
@RestController
@RequestMapping("/orders")
public class OrderController {
public static final String ORDER_STOCK_KEY = "order:stock";
public static final String LOCK_KEY = "lock:key";
public static final String LOCK_VALUE = "lock:value";
@Autowired
private RedisTemplate<String, String> redisTemplate;
/**
* 创建订单
*
* @return
*/
@GetMapping("/createOrder")
public String unsafeDeductOrder() {
// 这里使用 SpringBoot 提供的 setIfAbsent api 模拟 setnx 的效果
// 如果返回 true 表示获取锁成功,正常创建订单。否则失败,不做任何操作
Boolean flag = redisTemplate.opsForValue().setIfAbsent(LOCK_KEY, LOCK_VALUE);
if (!flag) {return "";}
log.info(" 开始创建订单...........");
String stockNum = redisTemplate.opsForValue().get(ORDER_STOCK_KEY);
if (StringUtils.isEmpty(stockNum)) {log.warn(" 订单创建失败,库存不足!");
return " 订单创建失败,库存不足!";
}
int stock = Integer.parseInt(stockNum);
if (stock <= 0) {log.warn(" 订单创建失败,库存不足!");
// 注意,如果库存已经没有了,需要释放掉锁,不然会造成死锁
redisTemplate.delete(LOCK_KEY);
return " 订单创建失败,库存不足!";
}
log.info(" 扣减前库存数: {}", stock);
redisTemplate.opsForValue().set(ORDER_STOCK_KEY, String.valueOf(stock - 1));
log.info(" 扣减后库存数: {}", redisTemplate.opsForValue().get(ORDER_STOCK_KEY));
// 注意,在处理完业务后,需要释放掉锁,不然会造成死锁
redisTemplate.delete(LOCK_KEY);
log.info(" 结束创建订单...........");
return " 当前库存数:" + redisTemplate.opsForValue().get(ORDER_STOCK_KEY);
}
}
- 启动 8080 端口应用
# 端口
server:
port: 8080
- 启动 8081 端口应用
# 端口
server:
port: 8081
使用 Jmeter 模拟 5s 1000 并发,对创建订单接口进行访问
查看结果,所有请求都成功了。查看响应结发现没有重复扣减库存情况,说明 redis.setnx 可以保证分布式下的并发安全问题
- 8080 节点日志
2020-03-28 21:56:59.285 c.r.r.lock.controller.OrderController : 开始创建订单...........
2020-03-28 21:56:59.297 c.r.r.lock.controller.OrderController : 扣减前库存数: 47
2020-03-28 21:56:59.311 c.r.r.lock.controller.OrderController : 扣减后库存数: 46
2020-03-28 21:56:59.318 c.r.r.lock.controller.OrderController : 结束创建订单...........
2020-03-28 21:56:59.421 c.r.r.lock.controller.OrderController : 开始创建订单...........
2020-03-28 21:56:59.443 c.r.r.lock.controller.OrderController : 扣减前库存数: 45
2020-03-28 21:56:59.500 c.r.r.lock.controller.OrderController : 扣减后库存数: 44
2020-03-28 21:56:59.528 c.r.r.lock.controller.OrderController : 结束创建订单...........
...
...
2020-03-28 21:57:02.157 c.r.r.lock.controller.OrderController : 开始创建订单...........
2020-03-28 21:57:02.159 c.r.r.lock.controller.OrderController : 扣减前库存数: 3
2020-03-28 21:57:02.166 c.r.r.lock.controller.OrderController : 扣减后库存数: 2
2020-03-28 21:57:02.170 c.r.r.lock.controller.OrderController : 结束创建订单...........
2020-03-28 21:57:02.177 c.r.r.lock.controller.OrderController : 开始创建订单...........
2020-03-28 21:57:02.180 c.r.r.lock.controller.OrderController : 扣减前库存数: 2
2020-03-28 21:57:02.186 c.r.r.lock.controller.OrderController : 扣减后库存数: 1
2020-03-28 21:57:02.189 c.r.r.lock.controller.OrderController : 结束创建订单...........
2020-03-28 21:57:02.206 c.r.r.lock.controller.OrderController : 开始创建订单...........
2020-03-28 21:57:02.210 c.r.r.lock.controller.OrderController : 订单创建失败,库存不足!
- 8081 节点日志
2020-03-28 21:56:59.135 c.r.r.lock.controller.OrderController : 开始创建订单...........
2020-03-28 21:56:59.148 c.r.r.lock.controller.OrderController : 扣减前库存数: 50
2020-03-28 21:56:59.156 c.r.r.lock.controller.OrderController : 扣减后库存数: 49
2020-03-28 21:56:59.162 c.r.r.lock.controller.OrderController : 结束创建订单...........
2020-03-28 21:56:59.194 c.r.r.lock.controller.OrderController : 开始创建订单...........
2020-03-28 21:56:59.217 c.r.r.lock.controller.OrderController : 扣减前库存数: 49
2020-03-28 21:56:59.236 c.r.r.lock.controller.OrderController : 扣减后库存数: 48
2020-03-28 21:56:59.528 c.r.r.lock.controller.OrderController : 结束创建订单...........
...
...
2020-03-28 21:57:02.124 c.r.r.lock.controller.OrderController : 开始创建订单...........
2020-03-28 21:57:02.131 c.r.r.lock.controller.OrderController : 扣减前库存数: 5
2020-03-28 21:57:02.138 c.r.r.lock.controller.OrderController : 扣减后库存数: 4
2020-03-28 21:57:02.141 c.r.r.lock.controller.OrderController : 结束创建订单...........
2020-03-28 21:57:02.191 c.r.r.lock.controller.OrderController : 开始创建订单...........
2020-03-28 21:57:02.196 c.r.r.lock.controller.OrderController : 扣减前库存数: 1
2020-03-28 21:57:02.201 c.r.r.lock.controller.OrderController : 扣减后库存数: 0
2020-03-28 21:57:02.204 c.r.r.lock.controller.OrderController : 结束创建订单...........
2020-03-28 21:57:02.206 c.r.r.lock.controller.OrderController : 开始创建订单...........
2020-03-28 21:57:02.210 c.r.r.lock.controller.OrderController : 订单创建失败,库存不足!
Redis.setnx 多节点 finally
虽然以上代码已经可以保证分布式下的并发安全,但是代码还存在这问题。假设我们的程序在创建订单的过程中出现异常,那么上述代码就会因为没有及时删除锁,而造成死锁。那么解决方法也比较简单,我们在 finally 块中删除锁,就可以了
@Slf4j
@RestController
@RequestMapping("/orders")
public class OrderController {
public static final String ORDER_STOCK_KEY = "order:stock";
public static final String LOCK_KEY = "lock:key";
public static final String LOCK_VALUE = "lock:value";
@Autowired
private RedisTemplate<String, String> redisTemplate;
/**
* 创建订单
*
* @return
*/
@GetMapping("/createOrder")
public String unsafeDeductOrder() {
// 这里使用 SpringBoot 提供的 setIfAbsent api 模拟 setnx 的效果
// 如果返回 true 表示获取锁成功,正常创建订单。否则失败,不做任何操作
Boolean flag = redisTemplate.opsForValue().setIfAbsent(LOCK_KEY, LOCK_VALUE);
if (!flag) {return "";}
log.info(" 开始创建订单...........");
try {String stockNum = redisTemplate.opsForValue().get(ORDER_STOCK_KEY);
if (StringUtils.isEmpty(stockNum)) {log.warn(" 订单创建失败,库存不足!");
return " 订单创建失败,库存不足!";
}
int stock = Integer.parseInt(stockNum);
if (stock <= 0) {log.warn(" 订单创建失败,库存不足!");
return " 订单创建失败,库存不足!";
}
log.info(" 扣减前库存数: {}", stock);
redisTemplate.opsForValue().set(ORDER_STOCK_KEY, String.valueOf(stock - 1));
log.info(" 扣减后库存数: {}", redisTemplate.opsForValue().get(ORDER_STOCK_KEY));
} catch (Exception e) {// do something} finally {
// 注意,在处理完业务后,需要释放掉锁,不然会造成死锁
redisTemplate.delete(LOCK_KEY);
}
log.info(" 结束创建订单...........");
return " 当前库存数:" + redisTemplate.opsForValue().get(ORDER_STOCK_KEY);
}
}
Redis.setnx 多节点 expire 非原子
上述代码已经保证了如果业务代码出现了异常,一样可以正常释放锁,来避免发生死锁。但是并没有完,如果程序运行程中服务器宕机了,那么如上代码还是会因为没有删除锁,而造成程序的死锁。那么我们可以给 setnx 的 key 设置一个 expire 过期时间,这样如果程序在运行中服务宕机了锁也会在 expire 过期时间到达后,进行删除锁
@Slf4j
@RestController
@RequestMapping("/orders")
public class OrderController {
public static final String ORDER_STOCK_KEY = "order:stock";
public static final String LOCK_KEY = "lock:key";
public static final String LOCK_VALUE = "lock:value";
@Autowired
private RedisTemplate<String, String> redisTemplate;
/**
* 创建订单
*
* @return
*/
@GetMapping("/createOrder")
public String unsafeDeductOrder() {
// 这里使用 SpringBoot 提供的 setIfAbsent api 模拟 setnx 的效果
// 如果返回 true 表示获取锁成功,正常创建订单。否则失败,不做任何操作
Boolean flag = redisTemplate.opsForValue().setIfAbsent(LOCK_KEY, LOCK_VALUE);
if (!flag) {return "";}
// 这里设置 5s 过期,当服务器宕机时 5s 后 redis 会自动删除锁
redisTemplate.expire(LOCK_KEY, 5, TimeUnit.SECONDS);
log.info(" 开始创建订单...........");
try {String stockNum = redisTemplate.opsForValue().get(ORDER_STOCK_KEY);
if (StringUtils.isEmpty(stockNum)) {log.warn(" 订单创建失败,库存不足!");
return " 订单创建失败,库存不足!";
}
int stock = Integer.parseInt(stockNum);
if (stock <= 0) {log.warn(" 订单创建失败,库存不足!");
return " 订单创建失败,库存不足!";
}
log.info(" 扣减前库存数: {}", stock);
redisTemplate.opsForValue().set(ORDER_STOCK_KEY, String.valueOf(stock - 1));
log.info(" 扣减后库存数: {}", redisTemplate.opsForValue().get(ORDER_STOCK_KEY));
} catch (Exception e) {// do something} finally {
// 注意,在处理完业务后,需要释放掉锁,不然会造成死锁
redisTemplate.delete(LOCK_KEY);
}
log.info(" 结束创建订单...........");
return " 当前库存数:" + redisTemplate.opsForValue().get(ORDER_STOCK_KEY);
}
}
Redis.setnx 多节点 expire 原子操作
上述代码已经保证了如果服务器宕机,一样可以正常释放锁,来避免发生死锁。但是并没有完,如果服务器在 setnx 和 expire 代码中间宕机了,那么如上代码还是会因为没有删除锁,而造成程序的死锁。那么我们可以使用 setnx(key, value, timeout, unit) api 来保证获取锁和过期设置的原子性,来解决上述问题
@Slf4j
@RestController
@RequestMapping("/orders")
public class OrderController {
public static final String ORDER_STOCK_KEY = "order:stock";
public static final String LOCK_KEY = "lock:key";
public static final String LOCK_VALUE = "lock:value";
@Autowired
private RedisTemplate<String, String> redisTemplate;
/**
* 创建订单
*
* @return
*/
@GetMapping("/createOrder")
public String unsafeDeductOrder() {
// 这里使用 SpringBoot 提供的 setIfAbsent api 模拟 setnx 的效果
// 如果返回 true 表示获取锁成功,正常创建订单。否则失败,不做任何操作
// 这里设置 5s 过期,当服务器宕机时 5s 后 redis 会自动删除锁,保证获取锁和过期设置的原子性
Boolean flag = redisTemplate.opsForValue().setIfAbsent(LOCK_KEY, LOCK_VALUE, 5, TimeUnit.SECONDS);
if (!flag) {return "";}
log.info(" 开始创建订单...........");
try {String stockNum = redisTemplate.opsForValue().get(ORDER_STOCK_KEY);
if (StringUtils.isEmpty(stockNum)) {log.warn(" 订单创建失败,库存不足!");
return " 订单创建失败,库存不足!";
}
int stock = Integer.parseInt(stockNum);
if (stock <= 0) {log.warn(" 订单创建失败,库存不足!");
return " 订单创建失败,库存不足!";
}
log.info(" 扣减前库存数: {}", stock);
redisTemplate.opsForValue().set(ORDER_STOCK_KEY, String.valueOf(stock - 1));
log.info(" 扣减后库存数: {}", redisTemplate.opsForValue().get(ORDER_STOCK_KEY));
} catch (Exception e) {// do something} finally {
// 注意,在处理完业务后,需要释放掉锁,不然会造成死锁
redisTemplate.delete(LOCK_KEY);
}
log.info(" 结束创建订单...........");
return " 当前库存数:" + redisTemplate.opsForValue().get(ORDER_STOCK_KEY);
}
}
Redis.setnx 多节点 expire 原子操作超时程序未执行完,添加 clientId (uuid) 判断,避免释放其他线程锁
上述代码在一般的程序中使用基本上已经没有问题了。但是我们做设计,必须要将场景考虑全。上面我们设置了过期时间为 5s,但是在实际的业务中我们无法确定超时时间设置多少才是正确的。那么问题就来了,假如有 A、B、C 三个线程并发执行创建订单,我们设置了过期时间为 5s,A 线程首先获取到锁,但是程序执行需要 10s,那么 A 线程还没有执行完业务时,锁已经被释放,此时线程 B 获取到锁,B 线程执行业务代码过程中 A 线程业务执行完成,A 线程会释放锁,注意此时 A 线程释放的锁其实是 B 线程的锁,这种情况下如果有多个线程,那么将有大部分的线程释放的锁是不属于自己的,这样的程序是有问题的。如何解决这样的问题呢?我们可以将 lock_value 设置为每个线程独有一个值,在释放锁时判断 lock_value 是否为当前线程所有,如果是则释放锁,如果不是则跳过。这样就可以解决 C 释放 B 、 B 释放 A 锁的问题了
流程图如下:
@Slf4j
@RestController
@RequestMapping("/orders")
public class OrderController {
public static final String ORDER_STOCK_KEY = "order:stock";
public static final String LOCK_KEY = "lock:key";
@Autowired
private RedisTemplate<String, String> redisTemplate;
/**
* 创建订单
*
* @return
*/
@GetMapping("/createOrder")
public String unsafeDeductOrder() {
// 当前线程 lock_value
String lockValue = UUID.randomUUID().toString();
// 这里使用 SpringBoot 提供的 setIfAbsent api 模拟 setnx 的效果
// 如果返回 true 表示获取锁成功,正常创建订单。否则失败,不做任何操作
// 这里设置 5s 过期,当服务器宕机时 5s 后 redis 会自动删除锁,保证获取锁和过期设置的原子性
Boolean flag = redisTemplate.opsForValue().setIfAbsent(LOCK_KEY, lockValue, 5, TimeUnit.SECONDS);
if (!flag) {return "";}
log.info(" 开始创建订单...........");
try {String stockNum = redisTemplate.opsForValue().get(ORDER_STOCK_KEY);
if (StringUtils.isEmpty(stockNum)) {log.warn(" 订单创建失败,库存不足!");
return " 订单创建失败,库存不足!";
}
int stock = Integer.parseInt(stockNum);
if (stock <= 0) {log.warn(" 订单创建失败,库存不足!");
return " 订单创建失败,库存不足!";
}
log.info(" 扣减前库存数: {}", stock);
redisTemplate.opsForValue().set(ORDER_STOCK_KEY, String.valueOf(stock - 1));
log.info(" 扣减后库存数: {}", redisTemplate.opsForValue().get(ORDER_STOCK_KEY));
} catch (Exception e) {// do something} finally {
// 注意,在处理完业务后,需要释放掉锁,不然会造成死锁
// 如果是当前线程的锁,才进行释放
String value = redisTemplate.opsForValue().get(LOCK_KEY);
if (lockValue.equals(value)) {redisTemplate.delete(LOCK_KEY);
}
}
log.info(" 结束创建订单...........");
return " 当前库存数:" + redisTemplate.opsForValue().get(ORDER_STOCK_KEY);
}
}
Redisson Java Client 解决续租问题
上述代码解决掉了线程之间释放锁错误的问题,刚刚提到的无法确定锁过期时间的问题依然存在。对于这个问题的解决方案,目前用的比较多的是 ' 续期 ' 方式。简单来说就是,启动一个后台线程,定时检查业务代码执行状态,如果到达过期时间业务依然没有执行完,那么就进行 ' 续期 ' 操作,将过期时间延长,直至业务代码执行完成后,正常释放锁。' 续期 ' 操作实现起来还是相对麻烦,而且需要考虑的场景较多,那么目前为止 Redis 增强框架 Redisson 提供了较为完整的续期功能,所以大多数企业都会使用该框架进行分布式锁的使用
Redisson: Redisson 是一个 Redis Java 客户端,具有内存数据网格的特性。
它提供了更方便和最简单的方式与 Redis 的工作。
Redisson 对象提供了关注点分离,这允许您将重点放在数据建模和应用程序逻辑上。
- 添加依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.11.0</version>
</dependency>
- 添加配置类
@Configuration
public class RedissonConfig {@Value("${spring.redis.host}")
private String host;
@Value("${spring.redis.port}")
private Integer port;
@Value("${spring.redis.database}")
private Integer database;
@Value("${spring.redis.password}")
private String password;
@Bean(destroyMethod = "shutdown")
public RedissonClient redissonClient() {
// 使用单机 Redis
Config config = new Config();
SingleServerConfig serverConfig = config.useSingleServer();
serverConfig.setAddress(String.format("redis://%s:%s", host, port));
serverConfig.setTimeout(5000);
serverConfig.setDatabase(database);
serverConfig.setPassword(StringUtils.isEmpty(password) ? null : password);
return Redisson.create(config);
}
- 使用
@Slf4j
@RestController
@RequestMapping("/orders")
public class OrderController {
public static final String ORDER_STOCK_KEY = "order:stock";
public static final String LOCK_KEY = "lock:key";
@Autowired
private RedisTemplate<String, String> redisTemplate;
@Autowired
private RedissonClient redissonClient;
/**
* 创建订单
*
* @return
*/
@GetMapping("/createOrder")
public String unsafeDeductOrder() {
// 获取并开启锁
RLock lock = redissonClient.getLock(LOCK_KEY);
lock.lock();
log.info(" 开始创建订单...........");
try {String stockNum = redisTemplate.opsForValue().get(ORDER_STOCK_KEY);
if (StringUtils.isEmpty(stockNum)) {log.warn(" 订单创建失败,库存不足!");
return " 订单创建失败,库存不足!";
}
int stock = Integer.parseInt(stockNum);
if (stock <= 0) {log.warn(" 订单创建失败,库存不足!");
return " 订单创建失败,库存不足!";
}
log.info(" 扣减前库存数: {}", stock);
redisTemplate.opsForValue().set(ORDER_STOCK_KEY, String.valueOf(stock - 1));
log.info(" 扣减后库存数: {}", redisTemplate.opsForValue().get(ORDER_STOCK_KEY));
} catch (Exception e) {// do something} finally {
// 释放锁
lock.unlock();}
log.info(" 结束创建订单...........");
return " 当前库存数:" + redisTemplate.opsForValue().get(ORDER_STOCK_KEY);
}
}
总结
以上就是我们在分布式环境下,使用锁的各种坑及解决方案。在我们日常工作中,已经有前辈封装了非常好的框架供我们解决各类问题,但在使用的同时,我们一定要知道为什么需要这类框架来解决这类问题,问题的本质是什么,这样才能做到触类旁通。
本博客所有文章除特别声明外,均采用 CC BY-SA 3.0协议 。转载请注明出处!