分布式锁1:5种方案解决商品超卖的方案的优缺点

一 分布式锁

1.1 分布式锁的作用

在多线程高并发场景下,为了保证资源的线程安全问题,jdk为我们提供了synchronized关键字和ReentrantLock可重入锁,但是它们只能保证一个工程内的线程安全。在分布式集群、微服务、云原生横行的当下,如何保证不同进程、不同服务、不同机器的线程安全问题。jdk并没有给我们提供既有的解决方案。需要自己通过编写方案来解决,目前主流的实现有以下方式:

  1. 基于mysql关系型实现

  2. 基于redis非关系型数据实现

  3. 基于zookeeper/etcd实现

1.2  四种方案的比较

性能:一个sql > 悲观锁 > jvm锁 > 乐观锁

如果追求极致性能、业务场景简单并且不需要记录数据前后变化的情况下。 优先选择:一个sql

如果写并发量较低(多读),争抢不是很激烈的情况下优先选择:乐观锁

如果写并发量较高,一般会经常冲突,此时选择乐观锁的话,会导致业务代码不间断的重试。​ 优先选择:mysql悲观锁

不推荐jvm本地锁。

二  模拟单体超卖

2.1 工程结构

2.2 编写工程代码

1.pom文件

  <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.46</version>
        </dependency>

        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>3.4.1</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.16</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <version>2.3.12.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

2.配置文件

server.port=9999
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/fenbu_lock?characterEncoding=utf-8&useSSL=true
spring.datasource.username=root
spring.datasource.password=cloudiip
redis.host=172.16.116.100

 3.controller

@RestController
public class StockController {

    @Autowired
    private StockService stockService;

    @GetMapping("stock/deduct")
    public String deduct(){
       // this.stockService.deduct();
        this.stockService.deductByMsqlDb();
        return "hello stock deduct!!";
    }

}

4.service

@Service
public class StockService {
    @Autowired
    private StockMapper stockMapper;
    private Stock stock = new Stock();

    private ReentrantLock lock = new ReentrantLock();

    public void deduct(){
//        lock.lock();
//        try {
//            stock.setStock(stock.getStock() - 1);
//            System.out.println("库存余量:" + stock.getStock());
//        } finally {
//            lock.unlock();
//        }
    }
    public void deductByMsqlDb(){
        // 先查询库存是否充足
        Stock stock = this.stockMapper.selectById(1L);

        // 再减库存
        if (stock != null && stock.getCount() > 0){
            stock.setCount(stock.getCount() - 1);
            this.stockMapper.updateById(stock);
        }
    }

}

5.mapper


@Mapper
public interface StockMapper extends BaseMapper<Stock> {
}

6.启动类

@SpringBootApplication
@MapperScan("com.atguigu.distributed.lock.mapper")
public class DistributedLockApplication {

    public static void main(String[] args)
    {
        SpringApplication.run(DistributedLockApplication.class, args);
        System.out.println("========================启动成功==========");
    }

}

7.pojo类

@Data
@TableName("db_stock")
public class Stock {
    @TableId
    private Long id;

    private String productCode;

    private String stockCode;

    private Integer count;
   // private Integer stock = 5000;
}

8.附件数据表

1.新建一个数据库,附件数据表,如图

2.脚本文件

CREATE TABLE `db_stock` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `product_code` varchar(255) DEFAULT NULL COMMENT '商品编号',
  `stock_code` varchar(255) DEFAULT NULL COMMENT '仓库编号',
  `count` int(11) DEFAULT NULL COMMENT '库存量',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

2.3 测试验证

http://localhost:9999/stock/deduct

查看数据库

2.4 jmeter模拟并发访问

2.4.1 启动jmeter

 2.4.2 配置jmeter

1.添加线程组

并发100循环50次,即5000次请求。  

3.给线程组添加HTTP Request请求:

4.将接口地址:http://localhost:9999/stock/deduct  配置到下面

 5.再选择你想要的测试报表,例如这里选择聚合报告:

启动测试,查看压力测试报告:

 参数api说明如下:

1.Label 取样器别名,如果勾选Include group name ,则会添加线程组的名称作为前缀

# Samples 取样器运行次数

Average 请求(事务)的平均响应时间

Median 中位数

90% Line 90%用户响应时间

95% Line 90%用户响应时间

99% Line 90%用户响应时间

Min 最小响应时间

Max 最大响应时间

Error 错误率

Throughput 吞吐率

Received KB/sec 每秒收到的千字节

Sent KB/sec 每秒收到的千字节

测试结果:请求总数5000次,平均请求时间129ms,中位数(50%)请求是在36ms内完成的,错误率0%,每秒钟平均吞吐量716.7次。

结论:此时如果还有人来下单,就会出现超卖现象(别人购买成功,而无货可发)。

三 方案1:使用jvm的本地锁解决冲突

3.1 原理

添加synchronized关键字之后,StockService就具备了对象锁,由于添加了独占的排他锁,同一时刻只有一个请求能够获取到锁,并减库存。此时,所有请求只会one-by-one执行下去,也就不会发生超卖现象。

3.2 操作

用jvm锁(synchronized关键字或者ReetrantLock)试试:

2.使用jmeter再次测试

查看数据库

并没有发生超卖现象,完美解决。  

3.3 此方案的缺点失效情况

1.多例模式  2.事务 ;3.集群

四 方案2:使用表级锁的sql解决冲突

4.1 表锁的使用范围

4.1.1 更新sql使用表锁

描述:

会话A执行: update db_stock set count=count-#{count} where product_code='1001' and count>=1

会话B执行:因为会话A执行的更新语句触发了表级锁,导致会话B无法执行插入,更新等语句。

insert into db_stock values(4,'1002','上海仓',5000);

update db_stock set count=count-1 where id=3;

1.会话A: 开启事务,执行更新语句,先不执行commit提交

2.会话B:  由于会话A执行更新语句后未提交,触发表级锁,此时自己进行更新,插入无法进行。

4.1.2 表锁变行锁

mysql悲观锁使用行级锁的条件:
1.锁的查询或者更新必须使用索引字段
2.查询或者更新必须是具体值。

1.给查询条件设置索引字段,让更新语句变为行级锁

2.会话A执行更新,让更新语句变为行级锁

3.会话B进行更新,回车后,可以看到进行提交执行了 

4.2 操作案例 

1.mapper级别

    @Update("update db_stock set count=count-#{count} where product_code=#{productCode} and count>=#{count}")
    int updateStock(@Param("productCode") String productCode,@Param("count") Integer count);

2.service

  public   void deductBySql(){
        // 先查询库存是否充足
       this.stockMapper.updateStock("1001",1);
        System.out.println("请求进来了.......");

    }

 3.查看数据库

4.并发压力测试

5.查看效果:均正确消费。

4.3 此方案缺点

优点:能够解决jvm本地锁多失效的3种情况。

缺点:1.确定锁的范围 行级锁还是表级锁;2.同一个商品有多条库存记录;

3.无法记录库存前后的变化记录。

五  方案3:使用悲观锁解决冲突

5.1 使用悲观锁原理

除了使用jvm锁之外,还可以使用数据锁:悲观锁 或者 乐观锁。

1.悲观锁:在select的时候就会加锁,采用先加锁后处理的模式,虽然保证了数据处理的安全性,但也会阻塞其他线程的写操作。在读取数据时锁住那几行,其他对这几行的更新需要等到悲观锁结束时才能继续 。select ... for update
悲观锁适用于写多读少的场景,因为拿不到锁的线程,会将线程挂起,交出CPU资源,可以把CPU给其他线程使用,提高了CPU的利用率。

会话A:select ... for update   给具体的行数据加上排他锁,也即行锁。

会话B :无法对1001进行更新,因为上了行级锁

5.2 操作案例

一个sql:直接更新时判断,在更新中判断库存是否大于0 ;

update table set surplus = (surplus - buyQuantity) where id = 1 and (surplus - buyQuantity) > 0

1.mapper:编写悲观锁语句

2.service:添加事务注解  @Transactional

3.数据表

4.jmeter压力测试

 5.查看效果:成功实现所减数据为0,均正确消费。

5.3 此方案的优缺点

1.性能问题;2.死锁问题:对多条数据加锁时,加锁顺序要一致;

3.库存操作要统一,一个会话用 select  x for update  一个会话执行select可以进行查询 ,存在数据不一致情况。

会话A:进行查询上表锁

会话B:可以进行查询查询。

六  方案4:使用乐观锁解决冲突

6.1 乐观锁原理

乐观锁:采取了更加宽松的加锁机制,大多是基于数据版本( Version )及时间戳来实现。适合于读比较多,不会阻塞读,读取数据时不上锁,更新时检查是否数据已经被更新过,如果是则取消当前更新进行重试。version 或者 时间戳(CAS思想)。

6.2 操作案例

使用数据版本(Version)记录机制实现,这是乐观锁最常用的实现 方式。一般是通过为数据库表增加一个数字类型的 “version” 字段来实现。当读取数据时,将version字段的值一同读出,数据每更新一次,对此version值加一。当我们提交更新的时候,判断数据库表对应记录 的当前版本信息与第一次取出来的version值进行比对,如果数据库表当前版本号与第一次取出来的version值相等,则予以更新。

更新sql:

select * from db_stock where product_code='1001'
update db_stock set count=4996,version=version+1 where id=1 and version=0;

1.修改service

2.数据库表

3.压力测试

4.查看消费结果: 均正确消费

6.3 乐观锁存在的缺点

1.高并发情况下,性能比较低下,并发量越小,性能越高。 2.读写情况下,乐观锁不可靠。

七 方案5:使用redis的乐观锁

7.1 redis的乐观锁的原理

利用redis监听 + 事务。

watch stock //监听一个或者多个key的值,在监听中,key的值有改动,则执行失败
multi   //开启事务
set stock 5000
exec   //执行事务

情况1:如果执行过程中stock的值没有被其他链接改变,则执行成功

情况2: 如果执行过程中stock的值被改变,则执行失败效果如下:

7.2 代码实现

1.pom文件

  <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

2.配置文件

3.业务实现代码

        @Autowired
    private StringRedisTemplate  stringRedisTemplate;
public  void  deductByRedisLock(){
        //watch 监听
        this.stringRedisTemplate.execute(new SessionCallback<Object>() {
            @SneakyThrows
            public Object execute(RedisOperations operations) throws DataAccessException {
                System.out.println("redis 乐观锁....");
                operations.watch("stock");
                //1.查询库存字段
                String stock= stringRedisTemplate.opsForValue().get("stock");
                //2.判断库存
                if(stock!=null&&stock.length()!=0){
                    Integer st=Integer.parseInt(stock);
                    if(st>0){
                        //开启事务
                        operations.multi();
                        stringRedisTemplate.opsForValue().set("stock",String.valueOf(st-1));
                        //关闭事务
                     List exeList=   operations.exec();
                     if(exeList==null||exeList.size()==0){//减库存失败,重试
                         Thread.sleep(40);
                         deductByRedisLock();
                     }
                     return exeList;
                    }
                }
                return null;
            }
        });

    }

4.设置redis

127.0.0.1:6379> set stock 5000
OK
127.0.0.1:6379> get stock
"5000"
5.批量测试

 6.查看结果:正确消费无误。解决超卖现象。

127.0.0.1:6379> set stock 5000
OK
127.0.0.1:6379> get stock
"5000"
127.0.0.1:6379> get stock
"0"
127.0.0.1:6379> 


7.3 此方案的优缺点

1.性能比较低,并发很低