分布式锁1:5种方案解决商品超卖的方案的优缺点
一 分布式锁
1.1 分布式锁的作用
在多线程高并发场景下,为了保证资源的线程安全问题,jdk为我们提供了synchronized关键字和ReentrantLock可重入锁,但是它们只能保证一个工程内的线程安全。在分布式集群、微服务、云原生横行的当下,如何保证不同进程、不同服务、不同机器的线程安全问题。jdk并没有给我们提供既有的解决方案。需要自己通过编写方案来解决,目前主流的实现有以下方式:
-
基于mysql关系型实现
-
基于redis非关系型数据实现
-
基于zookeeper/etcd实现
1.2 四种方案的比较
性能:一个sql > 悲观锁 > jvm锁 > 乐观锁
如果追求极致性能、业务场景简单并且不需要记录数据前后变化的情况下。 优先选择:一个sql
如果写并发量较低(多读),争抢不是很激烈的情况下优先选择:乐观锁
如果写并发量较高,一般会经常冲突,此时选择乐观锁的话,会导致业务代码不间断的重试。 优先选择:mysql悲观锁
不推荐jvm本地锁。
二 模拟单体超卖
2.1 工程结构
2.2 编写工程代码
1.pom文件
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.46</version>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.4.1</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.16</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<version>2.3.12.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
2.配置文件
server.port=9999
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/fenbu_lock?characterEncoding=utf-8&useSSL=true
spring.datasource.username=root
spring.datasource.password=cloudiip
redis.host=172.16.116.100
3.controller
@RestController
public class StockController {
@Autowired
private StockService stockService;
@GetMapping("stock/deduct")
public String deduct(){
// this.stockService.deduct();
this.stockService.deductByMsqlDb();
return "hello stock deduct!!";
}
}
4.service
@Service
public class StockService {
@Autowired
private StockMapper stockMapper;
private Stock stock = new Stock();
private ReentrantLock lock = new ReentrantLock();
public void deduct(){
// lock.lock();
// try {
// stock.setStock(stock.getStock() - 1);
// System.out.println("库存余量:" + stock.getStock());
// } finally {
// lock.unlock();
// }
}
public void deductByMsqlDb(){
// 先查询库存是否充足
Stock stock = this.stockMapper.selectById(1L);
// 再减库存
if (stock != null && stock.getCount() > 0){
stock.setCount(stock.getCount() - 1);
this.stockMapper.updateById(stock);
}
}
}
5.mapper
@Mapper
public interface StockMapper extends BaseMapper<Stock> {
}
6.启动类
@SpringBootApplication
@MapperScan("com.atguigu.distributed.lock.mapper")
public class DistributedLockApplication {
public static void main(String[] args)
{
SpringApplication.run(DistributedLockApplication.class, args);
System.out.println("========================启动成功==========");
}
}
7.pojo类
@Data
@TableName("db_stock")
public class Stock {
@TableId
private Long id;
private String productCode;
private String stockCode;
private Integer count;
// private Integer stock = 5000;
}
8.附件数据表
1.新建一个数据库,附件数据表,如图
2.脚本文件
CREATE TABLE `db_stock` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`product_code` varchar(255) DEFAULT NULL COMMENT '商品编号',
`stock_code` varchar(255) DEFAULT NULL COMMENT '仓库编号',
`count` int(11) DEFAULT NULL COMMENT '库存量',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
2.3 测试验证
http://localhost:9999/stock/deduct
查看数据库
2.4 jmeter模拟并发访问
2.4.1 启动jmeter
2.4.2 配置jmeter
1.添加线程组
并发100循环50次,即5000次请求。
3.给线程组添加HTTP Request请求:
4.将接口地址:http://localhost:9999/stock/deduct 配置到下面
5.再选择你想要的测试报表,例如这里选择聚合报告:
启动测试,查看压力测试报告:
参数api说明如下:
1.Label 取样器别名,如果勾选Include group name ,则会添加线程组的名称作为前缀
# Samples 取样器运行次数
Average 请求(事务)的平均响应时间
Median 中位数
90% Line 90%用户响应时间
95% Line 90%用户响应时间
99% Line 90%用户响应时间
Min 最小响应时间
Max 最大响应时间
Error 错误率
Throughput 吞吐率
Received KB/sec 每秒收到的千字节
Sent KB/sec 每秒收到的千字节
测试结果:请求总数5000次,平均请求时间129ms,中位数(50%)请求是在36ms内完成的,错误率0%,每秒钟平均吞吐量716.7次。
结论:此时如果还有人来下单,就会出现超卖现象(别人购买成功,而无货可发)。
三 方案1:使用jvm的本地锁解决冲突
3.1 原理
添加synchronized关键字之后,StockService就具备了对象锁,由于添加了独占的排他锁,同一时刻只有一个请求能够获取到锁,并减库存。此时,所有请求只会one-by-one执行下去,也就不会发生超卖现象。
3.2 操作
用jvm锁(synchronized关键字或者ReetrantLock)试试:
2.使用jmeter再次测试
查看数据库
并没有发生超卖现象,完美解决。
3.3 此方案的缺点失效情况
1.多例模式 2.事务 ;3.集群
四 方案2:使用表级锁的sql解决冲突
4.1 表锁的使用范围
4.1.1 更新sql使用表锁
描述:
会话A执行: update db_stock set count=count-#{count} where product_code='1001' and count>=1
会话B执行:因为会话A执行的更新语句触发了表级锁,导致会话B无法执行插入,更新等语句。
insert into db_stock values(4,'1002','上海仓',5000);
update db_stock set count=count-1 where id=3;
1.会话A: 开启事务,执行更新语句,先不执行commit提交
2.会话B: 由于会话A执行更新语句后未提交,触发表级锁,此时自己进行更新,插入无法进行。
4.1.2 表锁变行锁
mysql悲观锁使用行级锁的条件:
1.锁的查询或者更新必须使用索引字段
2.查询或者更新必须是具体值。
1.给查询条件设置索引字段,让更新语句变为行级锁
2.会话A执行更新,让更新语句变为行级锁
3.会话B进行更新,回车后,可以看到进行提交执行了
4.2 操作案例
1.mapper级别
@Update("update db_stock set count=count-#{count} where product_code=#{productCode} and count>=#{count}")
int updateStock(@Param("productCode") String productCode,@Param("count") Integer count);
2.service
public void deductBySql(){
// 先查询库存是否充足
this.stockMapper.updateStock("1001",1);
System.out.println("请求进来了.......");
}
3.查看数据库
4.并发压力测试
5.查看效果:均正确消费。
4.3 此方案缺点
优点:能够解决jvm本地锁多失效的3种情况。
缺点:1.确定锁的范围 行级锁还是表级锁;2.同一个商品有多条库存记录;
3.无法记录库存前后的变化记录。
五 方案3:使用悲观锁解决冲突
5.1 使用悲观锁原理
除了使用jvm锁之外,还可以使用数据锁:悲观锁 或者 乐观锁。
1.悲观锁:在select的时候就会加锁,采用先加锁后处理的模式,虽然保证了数据处理的安全性,但也会阻塞其他线程的写操作。在读取数据时锁住那几行,其他对这几行的更新需要等到悲观锁结束时才能继续 。select ... for update
悲观锁适用于写多读少的场景,因为拿不到锁的线程,会将线程挂起,交出CPU资源,可以把CPU给其他线程使用,提高了CPU的利用率。
会话A:select ... for update 给具体的行数据加上排他锁,也即行锁。
会话B :无法对1001进行更新,因为上了行级锁
5.2 操作案例
一个sql:直接更新时判断,在更新中判断库存是否大于0 ;
update table set surplus = (surplus - buyQuantity) where id = 1 and (surplus - buyQuantity) > 0
1.mapper:编写悲观锁语句
2.service:添加事务注解 @Transactional
3.数据表
4.jmeter压力测试
5.查看效果:成功实现所减数据为0,均正确消费。
5.3 此方案的优缺点
1.性能问题;2.死锁问题:对多条数据加锁时,加锁顺序要一致;
3.库存操作要统一,一个会话用 select x for update 一个会话执行select可以进行查询 ,存在数据不一致情况。
会话A:进行查询上表锁
会话B:可以进行查询查询。
六 方案4:使用乐观锁解决冲突
6.1 乐观锁原理
乐观锁:采取了更加宽松的加锁机制,大多是基于数据版本( Version )及时间戳来实现。适合于读比较多,不会阻塞读,读取数据时不上锁,更新时检查是否数据已经被更新过,如果是则取消当前更新进行重试。version 或者 时间戳(CAS思想)。
6.2 操作案例
使用数据版本(Version)记录机制实现,这是乐观锁最常用的实现 方式。一般是通过为数据库表增加一个数字类型的 “version” 字段来实现。当读取数据时,将version字段的值一同读出,数据每更新一次,对此version值加一。当我们提交更新的时候,判断数据库表对应记录 的当前版本信息与第一次取出来的version值进行比对,如果数据库表当前版本号与第一次取出来的version值相等,则予以更新。
更新sql:
select * from db_stock where product_code='1001'
update db_stock set count=4996,version=version+1 where id=1 and version=0;
1.修改service
2.数据库表
3.压力测试
4.查看消费结果: 均正确消费
6.3 乐观锁存在的缺点
1.高并发情况下,性能比较低下,并发量越小,性能越高。 2.读写情况下,乐观锁不可靠。
七 方案5:使用redis的乐观锁
7.1 redis的乐观锁的原理
利用redis监听 + 事务。
watch stock //监听一个或者多个key的值,在监听中,key的值有改动,则执行失败
multi //开启事务
set stock 5000
exec //执行事务
情况1:如果执行过程中stock的值没有被其他链接改变,则执行成功
情况2: 如果执行过程中stock的值被改变,则执行失败效果如下:
7.2 代码实现
1.pom文件
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
2.配置文件
3.业务实现代码
@Autowired
private StringRedisTemplate stringRedisTemplate;
public void deductByRedisLock(){
//watch 监听
this.stringRedisTemplate.execute(new SessionCallback<Object>() {
@SneakyThrows
public Object execute(RedisOperations operations) throws DataAccessException {
System.out.println("redis 乐观锁....");
operations.watch("stock");
//1.查询库存字段
String stock= stringRedisTemplate.opsForValue().get("stock");
//2.判断库存
if(stock!=null&&stock.length()!=0){
Integer st=Integer.parseInt(stock);
if(st>0){
//开启事务
operations.multi();
stringRedisTemplate.opsForValue().set("stock",String.valueOf(st-1));
//关闭事务
List exeList= operations.exec();
if(exeList==null||exeList.size()==0){//减库存失败,重试
Thread.sleep(40);
deductByRedisLock();
}
return exeList;
}
}
return null;
}
});
}
4.设置redis
127.0.0.1:6379> set stock 5000
OK
127.0.0.1:6379> get stock
"5000"
5.批量测试
6.查看结果:正确消费无误。解决超卖现象。
127.0.0.1:6379> set stock 5000
OK
127.0.0.1:6379> get stock
"5000"
127.0.0.1:6379> get stock
"0"
127.0.0.1:6379>
7.3 此方案的优缺点
1.性能比较低,并发很低