Redis 分布式锁

背景

随着互联网应用从单体应用部署,到分布式部署,除了有好的方面外,随之而带来的问题也不断涌现。其中,最为重要的数据一致性问题,在分布式情况下并发对共享数据源进行写操作带来的数据
安全问题,都需要优先考虑在程序设计之中。
本次将从单体应用出发,到分布式集群部署,一步一步模拟在高并发程序下,不断涌现出的数据一致性问题,并对问题逐步解决的记录。

环境

首先使用 SpringBoot 搭建一个基础的 Web 服务

pom.xml

<dependencies>
    <!-- Web -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!-- Redis -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
</dependencies>

application.yml

# 端口
server:
  port: 8080
spring:
  application:
    name: redis-lock
  # 缓存配置
  redis:
    host: localhost
    port: 6379
    database: 0
    password:

无锁单节点

首先我们来看在单节点情况下,不使用锁进行资源保护,对订单接口进行并发访问带来对问题

@Slf4j
@RestController
@RequestMapping("/orders")
public class OrderController {

    public static final String ORDER_STOCK_KEY = "order:stock";

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    /**
     * 创建订单,使用 Redis 模拟数据源
     *
     *
     * @return
     */
    @GetMapping("/createOrder")
    public String unsafeDeductOrder() {log.info(" 开始创建订单...........");

        String stockNum = redisTemplate.opsForValue().get(ORDER_STOCK_KEY);
        if (StringUtils.isEmpty(stockNum)) {log.warn(" 订单创建失败,库存不足!");
            return " 订单创建失败,库存不足!";
        }
        int stock = Integer.parseInt(stockNum);
        if (stock <= 0) {log.warn(" 订单创建失败,库存不足!");
            return " 订单创建失败,库存不足!";
        }

        log.info(" 扣减前库存数: {}", stock);
        redisTemplate.opsForValue().set(ORDER_STOCK_KEY, String.valueOf(stock - 1));
        log.info(" 扣减后库存数: {}", redisTemplate.opsForValue().get(ORDER_STOCK_KEY));

        log.info(" 结束创建订单...........");
        return " 当前库存数:" + redisTemplate.opsForValue().get(ORDER_STOCK_KEY);
    }

}

初始化订单库存

redis> set order:stock 50
OK
redis> get order:stock
50

使用 Jmeter 模拟 200 并发,对创建订单接口进行访问,如果程序正常,应该在创建 50 个订单后,返回库存不足

查看结果,所有请求都成功了。但是我们可以看到日志中存在严重的重复扣减库存,这在真实环境是可怕的问题。同时查看最后一个请 求返回都结果,显示当前库存为 46 远比我们都预期差的多

2020-03-28 20:38:00.718  c.r.r.lock.controller.OrderController    : 开始创建订单...........
2020-03-28 20:38:00.718  c.r.r.lock.controller.OrderController    : 开始创建订单...........
2020-03-28 20:38:00.718  c.r.r.lock.controller.OrderController    : 开始创建订单...........
2020-03-28 20:38:01.013  c.r.r.lock.controller.OrderController    : 扣减前库存数: 50
2020-03-28 20:38:01.013  c.r.r.lock.controller.OrderController    : 扣减前库存数: 50
2020-03-28 20:38:01.013  c.r.r.lock.controller.OrderController    : 扣减前库存数: 50
...
...
2020-03-28 20:38:01.168  c.r.r.lock.controller.OrderController    : 开始创建订单...........
2020-03-28 20:38:01.158  c.r.r.lock.controller.OrderController    : 扣减后库存数: 49
2020-03-28 20:38:01.169  c.r.r.lock.controller.OrderController    : 开始创建订单...........
2020-03-28 20:38:01.158  c.r.r.lock.controller.OrderController    : 扣减前库存数: 49
2020-03-28 20:38:01.169  c.r.r.lock.controller.OrderController    : 开始创建订单...........
2020-03-28 20:38:01.158  c.r.r.lock.controller.OrderController    : 扣减前库存数: 49

Sync 锁单节点

在单体应用的情况下,我们可以通过使用锁的方式来避免上述问题。这里简单使用 synchronized 锁

@Slf4j
@RestController
@RequestMapping("/orders")
public class OrderController {

    public static final String ORDER_STOCK_KEY = "order:stock";

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    /**
     * 创建订单,添加 synchronized 锁
     *
     * @return
     */
    @GetMapping("/createOrder")
    public synchronized String unsafeDeductOrder() {log.info(" 开始创建订单...........");

        String stockNum = redisTemplate.opsForValue().get(ORDER_STOCK_KEY);
        if (StringUtils.isEmpty(stockNum)) {log.warn(" 订单创建失败,库存不足!");
            return " 订单创建失败,库存不足!";
        }
        int stock = Integer.parseInt(stockNum);
        if (stock <= 0) {log.warn(" 订单创建失败,库存不足!");
            return " 订单创建失败,库存不足!";
        }

        log.info(" 扣减前库存数: {}", stock);
        redisTemplate.opsForValue().set(ORDER_STOCK_KEY, String.valueOf(stock - 1));
        log.info(" 扣减后库存数: {}", redisTemplate.opsForValue().get(ORDER_STOCK_KEY));

        log.info(" 结束创建订单...........");
        return " 当前库存数:" + redisTemplate.opsForValue().get(ORDER_STOCK_KEY);
    }

}

初始化订单库存

redis> set order:stock 50
OK
redis> get order:stock
50

使用 Jmeter 模拟 200 并发,对创建订单接口进行访问,如果程序正常,应该在创建 50 个订单后,返回库存不足

查看结果,所有请求都成功了。查看响应结果也正常,无重复扣减库存,在扣减了 50 个库存后,正常返回无库存

2020-03-28 20:26:56.227  c.r.r.lock.controller.OrderController    : 开始创建订单...........
2020-03-28 20:26:56.461  c.r.r.lock.controller.OrderController    : 扣减前库存数: 50
2020-03-28 20:26:56.466  c.r.r.lock.controller.OrderController    : 扣减后库存数: 49
2020-03-28 20:26:56.467  c.r.r.lock.controller.OrderController    : 结束创建订单...........
2020-03-28 20:26:56.469  c.r.r.lock.controller.OrderController    : 开始创建订单...........
2020-03-28 20:26:56.471  c.r.r.lock.controller.OrderController    : 扣减前库存数: 49
2020-03-28 20:26:56.476  c.r.r.lock.controller.OrderController    : 扣减后库存数: 48
2020-03-28 20:26:56.476  c.r.r.lock.controller.OrderController    : 结束创建订单...........
2020-03-28 20:26:56.478  c.r.r.lock.controller.OrderController    : 开始创建订单...........
2020-03-28 20:26:56.480  c.r.r.lock.controller.OrderController    : 扣减前库存数: 48
2020-03-28 20:26:56.483  c.r.r.lock.controller.OrderController    : 扣减后库存数: 47
2020-03-28 20:26:56.484  c.r.r.lock.controller.OrderController    : 结束创建订单...........
...
...
2020-03-28 20:26:56.831  c.r.r.lock.controller.OrderController    : 开始创建订单...........
2020-03-28 20:26:56.833  c.r.r.lock.controller.OrderController    : 扣减前库存数: 2
2020-03-28 20:26:56.835  c.r.r.lock.controller.OrderController    : 扣减后库存数: 1
2020-03-28 20:26:56.836  c.r.r.lock.controller.OrderController    : 结束创建订单...........
2020-03-28 20:26:56.837  c.r.r.lock.controller.OrderController    : 开始创建订单...........
2020-03-28 20:26:56.839  c.r.r.lock.controller.OrderController    : 扣减前库存数: 1
2020-03-28 20:26:56.842  c.r.r.lock.controller.OrderController    : 扣减后库存数: 0
2020-03-28 20:26:56.842  c.r.r.lock.controller.OrderController    : 结束创建订单...........
2020-03-28 20:26:56.843  c.r.r.lock.controller.OrderController    : 开始创建订单...........
2020-03-28 20:26:56.845  c.r.r.lock.controller.OrderController    : 订单创建失败,库存不足!

Sync 锁多节点

如上,在单节点部署情况下使用 synchronized 锁可以解决问题,那么我们再来看看多节点集群部署情况下,是否能正常运行

  • 启动 8080 端口应用
# 端口
server:
  port: 8080
  • 启动 8081 端口应用
# 端口
server:
  port: 8081

使用 Nginx 做负载,将流量分别打入两个应用

upstream backend {
    server localhost:8080;
    server localhost:8081;
}
server {
    listen 80;
    server_name localhost;

    location / {proxy_pass http://backend;}
}

初始化订单库存

redis> set order:stock 50
OK
redis> get order:stock
50

使用 Jmeter 模拟 200 并发,访问 Nginx 入口,对两个节点进行负载均衡。如果程序正常,应该在创建 50 个订单后,返回库存不足

查看结果,所有请求都成功了。查看响应结发现存在重复扣减库存情况

  • 8080 节点日志
2020-03-28 21:00:03.203  c.r.r.lock.controller.OrderController    : 开始创建订单...........
2020-03-28 21:00:03.260  c.r.r.lock.controller.OrderController    : 扣减前库存数: 50
2020-03-28 21:00:03.270  c.r.r.lock.controller.OrderController    : 扣减后库存数: 49
2020-03-28 21:00:03.271  c.r.r.lock.controller.OrderController    : 结束创建订单...........
...
...
2020-03-28 21:00:03.684  c.r.r.lock.controller.OrderController    : 开始创建订单...........
2020-03-28 21:00:03.688  c.r.r.lock.controller.OrderController    : 扣减前库存数: 26
2020-03-28 21:00:03.693  c.r.r.lock.controller.OrderController    : 扣减后库存数: 25
2020-03-28 21:00:03.693  c.r.r.lock.controller.OrderController    : 结束创建订单...........
...
2020-03-28 21:00:04.162  c.r.r.lock.controller.OrderController    : 开始创建订单...........
2020-03-28 21:00:04.164  c.r.r.lock.controller.OrderController    : 订单创建失败,库存不足!
  • 8081 节点日志
2020-03-28 21:00:03.203  c.r.r.lock.controller.OrderController    : 开始创建订单...........
2020-03-28 21:00:03.264  c.r.r.lock.controller.OrderController    : 扣减前库存数: 50
2020-03-28 21:00:03.274  c.r.r.lock.controller.OrderController    : 扣减后库存数: 49
2020-03-28 21:00:03.274  c.r.r.lock.controller.OrderController    : 结束创建订单...........
...
...
2020-03-28 21:00:03.687  c.r.r.lock.controller.OrderController    : 开始创建订单...........
2020-03-28 21:00:03.690  c.r.r.lock.controller.OrderController    : 扣减前库存数: 26
2020-03-28 21:00:03.695  c.r.r.lock.controller.OrderController    : 扣减后库存数: 25
2020-03-28 21:00:03.695  c.r.r.lock.controller.OrderController    : 结束创建订单...........
...
2020-03-28 21:00:04.162  c.r.r.lock.controller.OrderController    : 开始创建订单...........
2020-03-28 21:00:04.164  c.r.r.lock.controller.OrderController    : 订单创建失败,库存不足!

Redis.setnx 多节点

我们可以看到,当在分布式情况下 synchronized 锁就无能为力了,因为集群中当每个节点都是在单独的 jvm 中运行的, 所以 synchronized 只能在当前 jvm 下保证并发安全。我们可以使用 redis 提供的 setnx 命令进行加锁,来保证集群情况下的并发安全

redis.setnx: 命令在指定的 key 不存在时,为 key 设置指定的值。如果指定的 key 存在,则不做任何操作。

@Slf4j
@RestController
@RequestMapping("/orders")
public class OrderController {

    public static final String ORDER_STOCK_KEY = "order:stock";
    public static final String LOCK_KEY = "lock:key";
    public static final String LOCK_VALUE = "lock:value";

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    /**
     * 创建订单
     *
     * @return
     */
    @GetMapping("/createOrder")
    public String unsafeDeductOrder() {
        // 这里使用 SpringBoot 提供的 setIfAbsent api 模拟 setnx 的效果
        // 如果返回 true 表示获取锁成功,正常创建订单。否则失败,不做任何操作
        Boolean flag = redisTemplate.opsForValue().setIfAbsent(LOCK_KEY, LOCK_VALUE);
        if (!flag) {return "";}

        log.info(" 开始创建订单...........");
        String stockNum = redisTemplate.opsForValue().get(ORDER_STOCK_KEY);
        if (StringUtils.isEmpty(stockNum)) {log.warn(" 订单创建失败,库存不足!");
            return " 订单创建失败,库存不足!";
        }
        int stock = Integer.parseInt(stockNum);
        if (stock <= 0) {log.warn(" 订单创建失败,库存不足!");
            // 注意,如果库存已经没有了,需要释放掉锁,不然会造成死锁
            redisTemplate.delete(LOCK_KEY);
            return " 订单创建失败,库存不足!";
        }

        log.info(" 扣减前库存数: {}", stock);
        redisTemplate.opsForValue().set(ORDER_STOCK_KEY, String.valueOf(stock - 1));
        log.info(" 扣减后库存数: {}", redisTemplate.opsForValue().get(ORDER_STOCK_KEY));

        // 注意,在处理完业务后,需要释放掉锁,不然会造成死锁
        redisTemplate.delete(LOCK_KEY);
        log.info(" 结束创建订单...........");
        return " 当前库存数:" + redisTemplate.opsForValue().get(ORDER_STOCK_KEY);
    }

}
  • 启动 8080 端口应用
# 端口
server:
  port: 8080
  • 启动 8081 端口应用
# 端口
server:
  port: 8081

使用 Jmeter 模拟 5s 1000 并发,对创建订单接口进行访问

查看结果,所有请求都成功了。查看响应结发现没有重复扣减库存情况,说明 redis.setnx 可以保证分布式下的并发安全问题

  • 8080 节点日志
2020-03-28 21:56:59.285  c.r.r.lock.controller.OrderController    : 开始创建订单...........
2020-03-28 21:56:59.297  c.r.r.lock.controller.OrderController    : 扣减前库存数: 47
2020-03-28 21:56:59.311  c.r.r.lock.controller.OrderController    : 扣减后库存数: 46
2020-03-28 21:56:59.318  c.r.r.lock.controller.OrderController    : 结束创建订单...........
2020-03-28 21:56:59.421  c.r.r.lock.controller.OrderController    : 开始创建订单...........
2020-03-28 21:56:59.443  c.r.r.lock.controller.OrderController    : 扣减前库存数: 45
2020-03-28 21:56:59.500  c.r.r.lock.controller.OrderController    : 扣减后库存数: 44
2020-03-28 21:56:59.528  c.r.r.lock.controller.OrderController    : 结束创建订单...........
...
...
2020-03-28 21:57:02.157  c.r.r.lock.controller.OrderController    : 开始创建订单...........
2020-03-28 21:57:02.159  c.r.r.lock.controller.OrderController    : 扣减前库存数: 3
2020-03-28 21:57:02.166  c.r.r.lock.controller.OrderController    : 扣减后库存数: 2
2020-03-28 21:57:02.170  c.r.r.lock.controller.OrderController    : 结束创建订单...........
2020-03-28 21:57:02.177  c.r.r.lock.controller.OrderController    : 开始创建订单...........
2020-03-28 21:57:02.180  c.r.r.lock.controller.OrderController    : 扣减前库存数: 2
2020-03-28 21:57:02.186  c.r.r.lock.controller.OrderController    : 扣减后库存数: 1
2020-03-28 21:57:02.189  c.r.r.lock.controller.OrderController    : 结束创建订单...........
2020-03-28 21:57:02.206  c.r.r.lock.controller.OrderController    : 开始创建订单...........
2020-03-28 21:57:02.210  c.r.r.lock.controller.OrderController    : 订单创建失败,库存不足!
  • 8081 节点日志
2020-03-28 21:56:59.135 c.r.r.lock.controller.OrderController : 开始创建订单...........
2020-03-28 21:56:59.148 c.r.r.lock.controller.OrderController : 扣减前库存数: 50
2020-03-28 21:56:59.156 c.r.r.lock.controller.OrderController : 扣减后库存数: 49
2020-03-28 21:56:59.162 c.r.r.lock.controller.OrderController : 结束创建订单...........
2020-03-28 21:56:59.194 c.r.r.lock.controller.OrderController : 开始创建订单...........
2020-03-28 21:56:59.217 c.r.r.lock.controller.OrderController : 扣减前库存数: 49
2020-03-28 21:56:59.236 c.r.r.lock.controller.OrderController : 扣减后库存数: 48
2020-03-28 21:56:59.528 c.r.r.lock.controller.OrderController : 结束创建订单...........
...
...
2020-03-28 21:57:02.124 c.r.r.lock.controller.OrderController : 开始创建订单...........
2020-03-28 21:57:02.131 c.r.r.lock.controller.OrderController : 扣减前库存数: 5
2020-03-28 21:57:02.138 c.r.r.lock.controller.OrderController : 扣减后库存数: 4
2020-03-28 21:57:02.141 c.r.r.lock.controller.OrderController : 结束创建订单...........
2020-03-28 21:57:02.191 c.r.r.lock.controller.OrderController : 开始创建订单...........
2020-03-28 21:57:02.196 c.r.r.lock.controller.OrderController : 扣减前库存数: 1
2020-03-28 21:57:02.201 c.r.r.lock.controller.OrderController : 扣减后库存数: 0
2020-03-28 21:57:02.204 c.r.r.lock.controller.OrderController : 结束创建订单...........
2020-03-28 21:57:02.206 c.r.r.lock.controller.OrderController : 开始创建订单...........
2020-03-28 21:57:02.210 c.r.r.lock.controller.OrderController : 订单创建失败,库存不足!
Redis.setnx 多节点 finally

虽然以上代码已经可以保证分布式下的并发安全,但是代码还存在这问题。假设我们的程序在创建订单的过程中出现异常,那么上述代码就会因为没有及时删除锁,而造成死锁。那么解决方法也比较简单,我们在 finally 块中删除锁,就可以了

@Slf4j
@RestController
@RequestMapping("/orders")
public class OrderController {

    public static final String ORDER_STOCK_KEY = "order:stock";
    public static final String LOCK_KEY = "lock:key";
    public static final String LOCK_VALUE = "lock:value";

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    /**
     * 创建订单
     *
     * @return
     */
    @GetMapping("/createOrder")
    public String unsafeDeductOrder() {
        // 这里使用 SpringBoot 提供的 setIfAbsent api 模拟 setnx 的效果
        // 如果返回 true 表示获取锁成功,正常创建订单。否则失败,不做任何操作
        Boolean flag = redisTemplate.opsForValue().setIfAbsent(LOCK_KEY, LOCK_VALUE);
        if (!flag) {return "";}

        log.info(" 开始创建订单...........");
        try {String stockNum = redisTemplate.opsForValue().get(ORDER_STOCK_KEY);
            if (StringUtils.isEmpty(stockNum)) {log.warn(" 订单创建失败,库存不足!");
                return " 订单创建失败,库存不足!";
            }
            int stock = Integer.parseInt(stockNum);
            if (stock <= 0) {log.warn(" 订单创建失败,库存不足!");
                return " 订单创建失败,库存不足!";
            }

            log.info(" 扣减前库存数: {}", stock);
            redisTemplate.opsForValue().set(ORDER_STOCK_KEY, String.valueOf(stock - 1));
            log.info(" 扣减后库存数: {}", redisTemplate.opsForValue().get(ORDER_STOCK_KEY));
        } catch (Exception e) {// do something} finally {
            // 注意,在处理完业务后,需要释放掉锁,不然会造成死锁
            redisTemplate.delete(LOCK_KEY);
        }

        log.info(" 结束创建订单...........");
        return " 当前库存数:" + redisTemplate.opsForValue().get(ORDER_STOCK_KEY);
    }

}

Redis.setnx 多节点 expire 非原子

上述代码已经保证了如果业务代码出现了异常,一样可以正常释放锁,来避免发生死锁。但是并没有完,如果程序运行程中服务器宕机了,那么如上代码还是会因为没有删除锁,而造成程序的死锁。那么我们可以给 setnx 的 key 设置一个 expire 过期时间,这样如果程序在运行中服务宕机了锁也会在 expire 过期时间到达后,进行删除锁

@Slf4j
@RestController
@RequestMapping("/orders")
public class OrderController {

    public static final String ORDER_STOCK_KEY = "order:stock";
    public static final String LOCK_KEY = "lock:key";
    public static final String LOCK_VALUE = "lock:value";

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    /**
     * 创建订单
     *
     * @return
     */
    @GetMapping("/createOrder")
    public String unsafeDeductOrder() {
        // 这里使用 SpringBoot 提供的 setIfAbsent api 模拟 setnx 的效果
        // 如果返回 true 表示获取锁成功,正常创建订单。否则失败,不做任何操作
        Boolean flag = redisTemplate.opsForValue().setIfAbsent(LOCK_KEY, LOCK_VALUE);
        if (!flag) {return "";}
        // 这里设置 5s 过期,当服务器宕机时 5s 后 redis 会自动删除锁
        redisTemplate.expire(LOCK_KEY, 5, TimeUnit.SECONDS);
        log.info(" 开始创建订单...........");
        try {String stockNum = redisTemplate.opsForValue().get(ORDER_STOCK_KEY);
            if (StringUtils.isEmpty(stockNum)) {log.warn(" 订单创建失败,库存不足!");
                return " 订单创建失败,库存不足!";
            }
            int stock = Integer.parseInt(stockNum);
            if (stock <= 0) {log.warn(" 订单创建失败,库存不足!");
                return " 订单创建失败,库存不足!";
            }

            log.info(" 扣减前库存数: {}", stock);
            redisTemplate.opsForValue().set(ORDER_STOCK_KEY, String.valueOf(stock - 1));
            log.info(" 扣减后库存数: {}", redisTemplate.opsForValue().get(ORDER_STOCK_KEY));
        } catch (Exception e) {// do something} finally {
            // 注意,在处理完业务后,需要释放掉锁,不然会造成死锁
            redisTemplate.delete(LOCK_KEY);
        }

        log.info(" 结束创建订单...........");
        return " 当前库存数:" + redisTemplate.opsForValue().get(ORDER_STOCK_KEY);
    }

}

Redis.setnx 多节点 expire 原子操作

上述代码已经保证了如果服务器宕机,一样可以正常释放锁,来避免发生死锁。但是并没有完,如果服务器在 setnx 和 expire 代码中间宕机了,那么如上代码还是会因为没有删除锁,而造成程序的死锁。那么我们可以使用 setnx(key, value, timeout, unit) api 来保证获取锁和过期设置的原子性,来解决上述问题

@Slf4j
@RestController
@RequestMapping("/orders")
public class OrderController {

    public static final String ORDER_STOCK_KEY = "order:stock";
    public static final String LOCK_KEY = "lock:key";
    public static final String LOCK_VALUE = "lock:value";

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    /**
     * 创建订单
     *
     * @return
     */
    @GetMapping("/createOrder")
    public String unsafeDeductOrder() {
        // 这里使用 SpringBoot 提供的 setIfAbsent api 模拟 setnx 的效果
        // 如果返回 true 表示获取锁成功,正常创建订单。否则失败,不做任何操作
        // 这里设置 5s 过期,当服务器宕机时 5s 后 redis 会自动删除锁,保证获取锁和过期设置的原子性
        Boolean flag = redisTemplate.opsForValue().setIfAbsent(LOCK_KEY, LOCK_VALUE, 5, TimeUnit.SECONDS);
        if (!flag) {return "";}

        log.info(" 开始创建订单...........");
        try {String stockNum = redisTemplate.opsForValue().get(ORDER_STOCK_KEY);
            if (StringUtils.isEmpty(stockNum)) {log.warn(" 订单创建失败,库存不足!");
                return " 订单创建失败,库存不足!";
            }
            int stock = Integer.parseInt(stockNum);
            if (stock <= 0) {log.warn(" 订单创建失败,库存不足!");
                return " 订单创建失败,库存不足!";
            }

            log.info(" 扣减前库存数: {}", stock);
            redisTemplate.opsForValue().set(ORDER_STOCK_KEY, String.valueOf(stock - 1));
            log.info(" 扣减后库存数: {}", redisTemplate.opsForValue().get(ORDER_STOCK_KEY));
        } catch (Exception e) {// do something} finally {
            // 注意,在处理完业务后,需要释放掉锁,不然会造成死锁
            redisTemplate.delete(LOCK_KEY);
        }

        log.info(" 结束创建订单...........");
        return " 当前库存数:" + redisTemplate.opsForValue().get(ORDER_STOCK_KEY);
    }

}

Redis.setnx 多节点 expire 原子操作超时程序未执行完,添加 clientId (uuid) 判断,避免释放其他线程锁

上述代码在一般的程序中使用基本上已经没有问题了。但是我们做设计,必须要将场景考虑全。上面我们设置了过期时间为 5s,但是在实际的业务中我们无法确定超时时间设置多少才是正确的。那么问题就来了,假如有 A、B、C 三个线程并发执行创建订单,我们设置了过期时间为 5s,A 线程首先获取到锁,但是程序执行需要 10s,那么 A 线程还没有执行完业务时,锁已经被释放,此时线程 B 获取到锁,B 线程执行业务代码过程中 A 线程业务执行完成,A 线程会释放锁,注意此时 A 线程释放的锁其实是 B 线程的锁,这种情况下如果有多个线程,那么将有大部分的线程释放的锁是不属于自己的,这样的程序是有问题的。如何解决这样的问题呢?我们可以将 lock_value 设置为每个线程独有一个值,在释放锁时判断 lock_value 是否为当前线程所有,如果是则释放锁,如果不是则跳过。这样就可以解决 C 释放 B 、 B 释放 A 锁的问题了

流程图如下:

@Slf4j
@RestController
@RequestMapping("/orders")
public class OrderController {

    public static final String ORDER_STOCK_KEY = "order:stock";
    public static final String LOCK_KEY = "lock:key";

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    /**
     * 创建订单
     *
     * @return
     */
    @GetMapping("/createOrder")
    public String unsafeDeductOrder() {
        // 当前线程 lock_value
        String lockValue = UUID.randomUUID().toString();
        // 这里使用 SpringBoot 提供的 setIfAbsent api 模拟 setnx 的效果
        // 如果返回 true 表示获取锁成功,正常创建订单。否则失败,不做任何操作
        // 这里设置 5s 过期,当服务器宕机时 5s 后 redis 会自动删除锁,保证获取锁和过期设置的原子性
        Boolean flag = redisTemplate.opsForValue().setIfAbsent(LOCK_KEY, lockValue, 5, TimeUnit.SECONDS);
        if (!flag) {return "";}

        log.info(" 开始创建订单...........");
        try {String stockNum = redisTemplate.opsForValue().get(ORDER_STOCK_KEY);
            if (StringUtils.isEmpty(stockNum)) {log.warn(" 订单创建失败,库存不足!");
                return " 订单创建失败,库存不足!";
            }
            int stock = Integer.parseInt(stockNum);
            if (stock <= 0) {log.warn(" 订单创建失败,库存不足!");
                return " 订单创建失败,库存不足!";
            }

            log.info(" 扣减前库存数: {}", stock);
            redisTemplate.opsForValue().set(ORDER_STOCK_KEY, String.valueOf(stock - 1));
            log.info(" 扣减后库存数: {}", redisTemplate.opsForValue().get(ORDER_STOCK_KEY));
        } catch (Exception e) {// do something} finally {
            // 注意,在处理完业务后,需要释放掉锁,不然会造成死锁
            // 如果是当前线程的锁,才进行释放
            String value = redisTemplate.opsForValue().get(LOCK_KEY);
            if (lockValue.equals(value)) {redisTemplate.delete(LOCK_KEY);
            }
        }

        log.info(" 结束创建订单...........");
        return " 当前库存数:" + redisTemplate.opsForValue().get(ORDER_STOCK_KEY);
    }

}

Redisson Java Client 解决续租问题

上述代码解决掉了线程之间释放锁错误的问题,刚刚提到的无法确定锁过期时间的问题依然存在。对于这个问题的解决方案,目前用的比较多的是 ' 续期 ' 方式。简单来说就是,启动一个后台线程,定时检查业务代码执行状态,如果到达过期时间业务依然没有执行完,那么就进行 ' 续期 ' 操作,将过期时间延长,直至业务代码执行完成后,正常释放锁。' 续期 ' 操作实现起来还是相对麻烦,而且需要考虑的场景较多,那么目前为止 Redis 增强框架 Redisson 提供了较为完整的续期功能,所以大多数企业都会使用该框架进行分布式锁的使用

Redisson: Redisson 是一个 Redis Java 客户端,具有内存数据网格的特性。
它提供了更方便和最简单的方式与 Redis 的工作。
Redisson 对象提供了关注点分离,这允许您将重点放在数据建模和应用程序逻辑上。

  • 添加依赖
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.11.0</version>
</dependency>
  • 添加配置类
@Configuration
public class RedissonConfig {@Value("${spring.redis.host}")
    private String host;
    @Value("${spring.redis.port}")
    private Integer port;
    @Value("${spring.redis.database}")
    private Integer database;
    @Value("${spring.redis.password}")
    private String password;

    @Bean(destroyMethod = "shutdown")
    public RedissonClient redissonClient() {
        // 使用单机 Redis
        Config config = new Config();
        SingleServerConfig serverConfig = config.useSingleServer();
        serverConfig.setAddress(String.format("redis://%s:%s", host, port));
        serverConfig.setTimeout(5000);
        serverConfig.setDatabase(database);
        serverConfig.setPassword(StringUtils.isEmpty(password) ? null : password);
        return Redisson.create(config);
    }
  • 使用
@Slf4j
@RestController
@RequestMapping("/orders")
public class OrderController {

    public static final String ORDER_STOCK_KEY = "order:stock";
    public static final String LOCK_KEY = "lock:key";

    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    @Autowired
    private RedissonClient redissonClient;

    /**
     * 创建订单
     *
     * @return
     */
    @GetMapping("/createOrder")
    public String unsafeDeductOrder() {
        // 获取并开启锁
        RLock lock = redissonClient.getLock(LOCK_KEY);
        lock.lock();
        log.info(" 开始创建订单...........");
        try {String stockNum = redisTemplate.opsForValue().get(ORDER_STOCK_KEY);
            if (StringUtils.isEmpty(stockNum)) {log.warn(" 订单创建失败,库存不足!");
                return " 订单创建失败,库存不足!";
            }
            int stock = Integer.parseInt(stockNum);
            if (stock <= 0) {log.warn(" 订单创建失败,库存不足!");
                return " 订单创建失败,库存不足!";
            }

            log.info(" 扣减前库存数: {}", stock);
            redisTemplate.opsForValue().set(ORDER_STOCK_KEY, String.valueOf(stock - 1));
            log.info(" 扣减后库存数: {}", redisTemplate.opsForValue().get(ORDER_STOCK_KEY));
        } catch (Exception e) {// do something} finally {
            // 释放锁
            lock.unlock();}

        log.info(" 结束创建订单...........");
        return " 当前库存数:" + redisTemplate.opsForValue().get(ORDER_STOCK_KEY);
    }

}

总结

以上就是我们在分布式环境下,使用锁的各种坑及解决方案。在我们日常工作中,已经有前辈封装了非常好的框架供我们解决各类问题,但在使用的同时,我们一定要知道为什么需要这类框架来解决这类问题,问题的本质是什么,这样才能做到触类旁通。



Redis     

本博客所有文章除特别声明外,均采用 CC BY-SA 3.0协议 。转载请注明出处!