文中的代码不完整,只是个大概,做个了解就可以了!
分布式锁在我们的开发过程中是十分常见的技术,通过来说我们会采用mysql、redis、zk等手段去做分布式锁,本次主要介绍mysql和redis实现的方式,因为zk我不熟。
mysql
使用mysql做分布式锁有以下的方式:
-
建立一张表记录对应的方法,并且使用唯一索引
先试图插入该表中,如果能够执行成功则代表获取到锁,执行失败则自旋直至获取锁 逻辑操作完成之后,删除对应的数据
-
使用Mysql自带的排它锁 select for update语句
同样需要一张表,并且表中初始化需要锁的资源 使用select xxx from table where xxx=xxx for update查询 并且设置超时时间,避免长时间持有锁 如果获取资源则代表获取锁 无论是否成功获取到锁都要释放掉连接
/**
* (这里只是简单介绍一下,对于上述方案可以进行优化,如:应用主从数据库,数据之间双向同步;一旦挂掉快速切换到备库上;做一个定时任务,每隔一定时间把数据库中的超时数据清理一遍;
* 使用while循环,直到insert成功再返回成功;记录当前获得锁的机器的主机信息和线程信息,下次再获取锁的时候先查询数据库
* 如果当前机器的主机信息和线程信息在数据库可以查到的话,直接把锁分配给他就可以了,实现可重入锁)
*
* @throws InterruptedException
*/
@RequestMapping("/lock")
public void lock() throws InterruptedException {
String className = Thread.currentThread().getStackTrace()[1].getClassName();
String methodName = Thread.currentThread().getStackTrace()[1].getMethodName();
TestLock testLock = new TestLock();
testLock.setController(className);
testLock.setMethod(methodName);
testLock.setTime(new Date());
try {
if (testLockMapper.insert(testLock) > 0) {
log.info("获取到锁");
Thread.sleep(10000);
testLockMapper.deleteByCondition(className, methodName);
log.info("释放锁");
} else {
log.info("获取不到锁");
}
} catch (Exception e) {
log.info("获取不到锁");
}
}
/**
* 加了排他锁之后,加了for update 的sql 将会阻塞,拿到锁之后继续执行
* 阻塞时间是否有限制? 这个没有一个 超时时间,过了超时时间就自动释放锁
* preparedStatement.setQueryTimeout(200);
* 初步想法是设置一个查询超时时间,中断查询,查询不到就拿不到锁就可以执行其他的逻辑,就可以不用一直阻塞。
* 但是不生效。
*
* @throws SQLException
* @throws InterruptedException
*/
@RequestMapping("/mysqlLock")
public void mysqlLock() throws SQLException, InterruptedException {
Connection connection = BeanUtils.source.getConnection();
connection.setAutoCommit(false);
try {
String sql = "select * from test_lock where controller = 'lock' for update;";
PreparedStatement preparedStatement = connection.prepareStatement(sql);
preparedStatement.setQueryTimeout(200);
log.info("开始执行sql");
long l = System.currentTimeMillis();
ResultSet resultSet = preparedStatement.executeQuery();
long l1 = System.currentTimeMillis();
System.out.println(l1 - l);
log.info("sql执行完成");
if (resultSet.next()) {
//代表获取到锁
log.info("获取到锁");
Thread.sleep(10000);
log.info("开始执行业务");
connection.commit();
log.info("释放");
return;
} else {
log.info("获取不到锁");
}
} catch (Exception e) {
connection.commit();
}
}
redis(演变过程)
常规做法
我们通常使用redis+lua来做一个分布式锁,为什么会采用这种方案我们应该一步一步来探讨一下这个问题
-
通常逻辑来说我们是这样处理的,首先将需要做互斥操作的资源进行setnx操作,如果已经存在了这个键则不设置,就会返回0,表示当前资源已经被其他线程占用,如果设置成功,返回的是1,表示当前线程获取了锁。然后执行操作,执行操作完成之后再删除key。 -
如果按照上面做法的话,再执行逻辑操作过程中发生中断,key没有给删掉,那么产生死锁,所以自然而然我们就会想到,给key添加过期时间,也就会自然而言的,但是setnx不能设置key的过期时间,所以我们就会想到,在setn操作完成之后 使用expire命令给key设置过期时间。 -
第2点的做法如果在expire执行之前就中断了线程,依然会产生死锁,那怎么办?其实我们可以使用set命令,其中有个方法重载SET key value [expiration EX seconds|PX milliseconds] [NX|XX],该方法重新既可以实现setnx的效果也可以添加过期时间
EX second:设置键的过期时间为 second 秒。SET key value EX second 效果等同于 SETEX key second value。 PX millisecond:设置键的过期时间为毫秒。SET key value PX millisecond 效果等同于 PSETEX key millisecond value。 NX:只在键不存在时,才对键进行设置操作。SET key value NX 效果等同于 SETNX key value。 XX:只在键已经存在时,才对键进行设置操作
所以我们使用set key value ex ttl nx 来设置值并且设置过期时间,如果设置成功则表示获取到锁。
-
到这里是否就没有问题了呢?其实不然 ,虽然我们设置了过期时间,但是我们忽略的业务逻辑执行的时间 与 过期时间的 之间的关系上面设置的锁的过期时间是 5s, 如果我们线程A业务逻辑操作时间是10s, 5s之后,锁过期了自动释放在 5s 后 线程B就可以拿到这把锁 ,线程B的业务逻辑操作时间也是10s, 等绝对时间到了 10s 时候,线程A执行del操作时释放锁,这时候 线程A 释放的是线程B的锁。 -
那么怎么解决这个问题?我们可以将是key的值设置成当前线程的信息,我们删除key之前先判断一下key的值是不是当前线程,但是这里查询和删除不是原子性,依然会有问题。所以我们需要lua脚本去解决我们的原子性问题。并且我们可以开一个子线程做守护线程,定时的隔一段时间去重置过期时间,这个做法叫做锁续租
总结一下,分布式锁我们通常使用set命令去设置key和过期时间,使用redis+lua去释放锁,还有一种实现是基于setnx,get,getset这三个命令的去实现的,感兴趣可以自己看一下,核心思想就是避免设置过期时间,直接将时间设置为value。
@RequestMapping("redisLock")
public void redisLock() throws InterruptedException {
/**
* 方法一存在问题:
* 1。如果在setnx与 del 操作之间 程序异常或终端,将会导致这把 ? 永远都无法释放,将会产生死锁问题。
*/
//方法一
if (redisOperations.setnx(LOCK_IN_REDIS_PREFIX, VALUE_LOCK_IN_REDIS)) {
//逻辑操作
log.info("获取到锁");
log.info("开始执行业务");
Thread.sleep(5000);
log.info("业务执行结束");
redisOperations.del(LOCK_IN_REDIS_PREFIX);
log.info("释放锁");
} else {
log.info("获取不到锁");
}
/**
* 针对方法一产生的死锁问题,可以通过添加过期时间来处理:
* 1.通过expire来为这个key增加过期时间
*
*/
boolean setnx = redisOperations.setnx(LOCK_IN_REDIS_PREFIX, VALUE_LOCK_IN_REDIS);
if (setnx) {
log.info("获取到锁");
log.info("开始设置过期时间");
redisOperations.expire(LOCK_IN_REDIS_PREFIX, 30000);
log.info("过期时间设置完成");
log.info("开始执行业务");
Thread.sleep(4000);
log.info("业务执行完成");
redisOperations.del(LOCK_IN_REDIS_PREFIX);
log.info("释放锁");
} else {
log.info("获取不到锁");
}
/**
* 上面这种方法还是会有问题的,因为 setnx和 expire 没有原子性,有可能执行了 setnx后面 系统异常了,那么依旧还是会产生死锁的现象
* 所以这个时间就要用更换思路,要保证expire和其他的操作是原子性的
* 2.采用redis的set命令 set命令支持在获取锁的同时设置key的过期时间
*/
String set = redisOperations.set(LOCK_IN_REDIS_PREFIX, VALUE_LOCK_IN_REDIS, "NX", "EX", 5000);
if (!StringUtils.isEmpty(set)){
log.info("获取到锁");
log.info("开始执行业务");
Thread.sleep(5000);
log.info("业务执行完成");
redisOperations.del(LOCK_IN_REDIS_PREFIX);
log.info("锁释放");
}else {
log.info("获取不到锁");
}
/**
* 到这里是否就没有问题了呢? 其实不然 ,就上面这段程序而言
* 虽然我们设置了过期时间,但是我们忽略的业务逻辑执行的时间 与 过期时间的 之间的关系
* 上面设置的?的过期时间是 5s, 如果我们线程A业务逻辑操作时间是10s, 5s之后,锁过期了自动释放
* 在 5s 后 线程B就可以拿到这把锁 ,线程B的业务逻辑操作时间也是10s, 等绝对时间到了 10s 时候,
* 线程A执行del操作时释放锁,这时候 线程A 释放的是线程B的锁
*/
/**
* 上面的问题可以这样来解决:
* 将当前线程的相关信息设置成值,
* 删除之前先判断一下value里面的线程信息是否是当前线程的值
* 但是这样get 和 del 不存在 原子性 可以通过lua脚本保证原子性
*/
String threadMsg = Thread.currentThread().getName();
String sets = redisOperations.set(LOCK_IN_REDIS_PREFIX, threadMsg, "NX", "EX", 5000);
String script = "--KEYS[1] string 锁的keyn" +
"--ARGV[1] string 当前线程的valuen" +
"n" +
"local valid_thread = ARGV[1] n" +
"local current_thread = redis.call("get",KEYS[1])n" +
"if n" +
"valid_thread == current_threadn" +
"thenn" +
"redis.call("del",KEYS[1])n" +
"end";
if (!StringUtils.isEmpty(sets)){
log.info("获取到锁");
log.info("开始执行业务");
Thread.sleep(5000);
log.info("业务执行完成");
redisOperations.eval(script, Collections.singletonList(LOCK_IN_REDIS_PREFIX),Collections.singletonList(VALUE_LOCK_IN_REDIS));
log.info("锁释放");
}else {
log.info("获取不到锁");
}
/**
* 还可以通过锁续租来解决 在获取得到锁的线程中开启一个守护线程没隔几秒续租一次
*/
String localThreadMsg = Thread.currentThread().getName();
String sets1 = redisOperations.set(LOCK_IN_REDIS_PREFIX, threadMsg, "NX", "EX", 5000);
String script1 = "--KEYS[1] string 锁的keyn" +
"--ARGV[1] string 当前线程的valuen" +
"n" +
"local valid_thread = ARGV[1] n" +
"local current_thread = redis.call("get",KEYS[1])n" +
"if n" +
"valid_thread == current_threadn" +
"thenn" +
"redis.call("del",KEYS[1])n" +
"end";
if (!StringUtils.isEmpty(sets)){
log.info("获取到锁");
log.info("开始执行业务");
Thread demon = new Thread(()->{
long start = System.currentTimeMillis();
while (System.currentTimeMillis()-start >= 29000L){
redisOperations.expire(LOCK_IN_REDIS_PREFIX,20);
start = System.currentTimeMillis();
}
});
demon.run();
Thread.sleep(5000);
log.info("业务执行完成");
redisOperations.eval(script, Collections.singletonList(LOCK_IN_REDIS_PREFIX),Collections.singletonList(VALUE_LOCK_IN_REDIS));
log.info("锁释放");
}else {
log.info("获取不到锁");
}
}
上面的做法很好解决的分布式锁的问题,但是我们的redis通常都是集群化,我们考虑一下这种情况 我们知道在主从架构下master节点可以读写,slave节点只能读,master通过执行bgsave()命令开启异步线程将数据同步给slave如果在这个同步过程,key还没有来得及同步给slave就发生故障,这个时候锁就会丢失。那么对应这种情况,有人提出了RedLock(红锁),他的主要原理是在多个redis节点中共享一个锁,使用多个锁来协调修改锁的状态,具体的细节可以去百度一下。我们redisson的框架提供了对redlock的实现,我们直接就可以用redisson就可以了。
redisson框架
Redisson是一个架设在redis上非常优秀的开源框架,提供了一种基于看门狗机制的分布式锁框架。
redisson在获取锁之后,会维护一个看门狗线程,在每一个锁设置的过期时间的1/3处,如果线程还没执行完任务,则不断延长锁的有效期。看门狗的检查锁超时时间默认是30秒,可以通过 lockWactchdogTimeout 参数来改变。
加锁的时间默认是30秒,如果加锁的业务没有执行完,那么每隔 30 ÷ 3 = 10秒,就会进行一次续期,把锁重置成30秒,保证解锁前锁不会自动失效。
线程去获取锁,获取成功:执行lua脚本,保存数据到redis数据库。
线程去获取锁,获取失败:一直通过while循环尝试获取锁,获取成功后,执行lua脚本,保存数据到redis数据库。
最后
想说的很多,晚点再说
本篇文章来源于微信公众号: Hephaestuses
微信扫描下方的二维码阅读本文

Comments NOTHING