文中的代码不完整,只是个大概,做个了解就可以了!

   

分布式锁在我们的开发过程中是十分常见的技术,通过来说我们会采用mysql、redis、zk等手段去做分布式锁,本次主要介绍mysql和redis实现的方式,因为zk我不熟。

mysql

    使用mysql做分布式锁有以下的方式:

  1. 建立一张表记录对应的方法,并且使用唯一索引

    先试图插入该表中,如果能够执行成功则代表获取到锁,执行失败则自旋直至获取锁 逻辑操作完成之后,删除对应的数据

  1. 使用Mysql自带的排它锁 select for update语句

    同样需要一张表,并且表中初始化需要锁的资源 使用select xxx from table where xxx=xxx for update查询 并且设置超时时间,避免长时间持有锁 如果获取资源则代表获取锁 无论是否成功获取到锁都要释放掉连接

 /**
     * (这里只是简单介绍一下,对于上述方案可以进行优化,如:应用主从数据库,数据之间双向同步;一旦挂掉快速切换到备库上;做一个定时任务,每隔一定时间把数据库中的超时数据清理一遍;
     * 使用while循环,直到insert成功再返回成功;记录当前获得锁的机器的主机信息和线程信息,下次再获取锁的时候先查询数据库
     * 如果当前机器的主机信息和线程信息在数据库可以查到的话,直接把锁分配给他就可以了,实现可重入锁)
     *
     * @throws InterruptedException
     */

    @RequestMapping("/lock")
    public void lock() throws InterruptedException {

        String className = Thread.currentThread().getStackTrace()[1].getClassName();
        String methodName = Thread.currentThread().getStackTrace()[1].getMethodName();
        TestLock testLock = new TestLock();
        testLock.setController(className);
        testLock.setMethod(methodName);
        testLock.setTime(new Date());
        try {
            if (testLockMapper.insert(testLock) > 0) {
                log.info("获取到锁");
                Thread.sleep(10000);
                testLockMapper.deleteByCondition(className, methodName);
                log.info("释放锁");
            } else {
                log.info("获取不到锁");
            }
        } catch (Exception e) {
            log.info("获取不到锁");
        }

    }

    /**
     * 加了排他锁之后,加了for update 的sql 将会阻塞,拿到锁之后继续执行
     * 阻塞时间是否有限制? 这个没有一个 超时时间,过了超时时间就自动释放锁
     * preparedStatement.setQueryTimeout(200);
     * 初步想法是设置一个查询超时时间,中断查询,查询不到就拿不到锁就可以执行其他的逻辑,就可以不用一直阻塞。
     * 但是不生效。
     *
     * @throws SQLException
     * @throws InterruptedException
     */

    @RequestMapping("/mysqlLock")
    public void mysqlLock() throws SQLException, InterruptedException {
        Connection connection = BeanUtils.source.getConnection();
        connection.setAutoCommit(false);
        try {
            String sql = "select * from test_lock where controller = 'lock' for update;";
            PreparedStatement preparedStatement = connection.prepareStatement(sql);
            preparedStatement.setQueryTimeout(200);

            log.info("开始执行sql");
            long l = System.currentTimeMillis();
            ResultSet resultSet = preparedStatement.executeQuery();
            long l1 = System.currentTimeMillis();
            System.out.println(l1 - l);

            log.info("sql执行完成");
            if (resultSet.next()) {
                //代表获取到锁
                log.info("获取到锁");

                Thread.sleep(10000);
                log.info("开始执行业务");
                connection.commit();
                log.info("释放");
                return;
            } else {
                log.info("获取不到锁");
            }
        } catch (Exception e) {
            connection.commit();
        }
    }

redis(演变过程)

常规做法

    我们通常使用redis+lua来做一个分布式锁,为什么会采用这种方案我们应该一步一步来探讨一下这个问题

  1. 通常逻辑来说我们是这样处理的,首先将需要做互斥操作的资源进行setnx操作,如果已经存在了这个键则不设置,就会返回0,表示当前资源已经被其他线程占用,如果设置成功,返回的是1,表示当前线程获取了锁。然后执行操作,执行操作完成之后再删除key。
  2. 如果按照上面做法的话,再执行逻辑操作过程中发生中断,key没有给删掉,那么产生死锁,所以自然而然我们就会想到,给key添加过期时间,也就会自然而言的,但是setnx不能设置key的过期时间,所以我们就会想到,在setn操作完成之后 使用expire命令给key设置过期时间。
  3. 第2点的做法如果在expire执行之前就中断了线程,依然会产生死锁,那怎么办?其实我们可以使用set命令,其中有个方法重载SET key value [expiration EX seconds|PX milliseconds] [NX|XX],该方法重新既可以实现setnx的效果也可以添加过期时间
  • EX second:设置键的过期时间为 second 秒。SET key value EX second 效果等同于 SETEX key second value。
  • PX millisecond:设置键的过期时间为毫秒。SET key value PX millisecond 效果等同于 PSETEX key millisecond value。
  • NX:只在键不存在时,才对键进行设置操作。SET key value NX 效果等同于 SETNX key value。
  • XX:只在键已经存在时,才对键进行设置操作


    所以我们使用set key value ex ttl nx 来设置值并且设置过期时间,如果设置成功则表示获取到锁。

  1. 到这里是否就没有问题了呢?其实不然 ,虽然我们设置了过期时间,但是我们忽略的业务逻辑执行的时间 与 过期时间的 之间的关系上面设置的锁的过期时间是 5s, 如果我们线程A业务逻辑操作时间是10s, 5s之后,锁过期了自动释放在 5s 后 线程B就可以拿到这把锁 ,线程B的业务逻辑操作时间也是10s, 等绝对时间到了 10s 时候,线程A执行del操作时释放锁,这时候 线程A 释放的是线程B的锁。
  2. 那么怎么解决这个问题?我们可以将是key的值设置成当前线程的信息,我们删除key之前先判断一下key的值是不是当前线程,但是这里查询和删除不是原子性,依然会有问题。所以我们需要lua脚本去解决我们的原子性问题。并且我们可以开一个子线程做守护线程,定时的隔一段时间去重置过期时间,这个做法叫做锁续租

    总结一下,分布式锁我们通常使用set命令去设置key和过期时间,使用redis+lua去释放锁,还有一种实现是基于setnx,get,getset这三个命令的去实现的,感兴趣可以自己看一下,核心思想就是避免设置过期时间,直接将时间设置为value。

 @RequestMapping("redisLock")
    public void redisLock() throws InterruptedException {

        /**
         * 方法一存在问题:
         * 1。如果在setnx与 del 操作之间 程序异常或终端,将会导致这把 ? 永远都无法释放,将会产生死锁问题。
         */


        //方法一
        if (redisOperations.setnx(LOCK_IN_REDIS_PREFIX, VALUE_LOCK_IN_REDIS)) {
            //逻辑操作
            log.info("获取到锁");
            log.info("开始执行业务");
            Thread.sleep(5000);
            log.info("业务执行结束");
            redisOperations.del(LOCK_IN_REDIS_PREFIX);
            log.info("释放锁");
        } else {
            log.info("获取不到锁");
        }

        /**
         * 针对方法一产生的死锁问题,可以通过添加过期时间来处理:
         * 1.通过expire来为这个key增加过期时间
         *
         */

        boolean setnx = redisOperations.setnx(LOCK_IN_REDIS_PREFIX, VALUE_LOCK_IN_REDIS);
        if (setnx) {
            log.info("获取到锁");
            log.info("开始设置过期时间");
            redisOperations.expire(LOCK_IN_REDIS_PREFIX, 30000);
            log.info("过期时间设置完成");
            log.info("开始执行业务");
            Thread.sleep(4000);
            log.info("业务执行完成");
            redisOperations.del(LOCK_IN_REDIS_PREFIX);
            log.info("释放锁");
        } else {
            log.info("获取不到锁");
        }
        /**
         * 上面这种方法还是会有问题的,因为 setnx和 expire 没有原子性,有可能执行了 setnx后面 系统异常了,那么依旧还是会产生死锁的现象
         * 所以这个时间就要用更换思路,要保证expire和其他的操作是原子性的
         * 2.采用redis的set命令  set命令支持在获取锁的同时设置key的过期时间
         */

        String set = redisOperations.set(LOCK_IN_REDIS_PREFIX, VALUE_LOCK_IN_REDIS, "NX""EX"5000);
        if (!StringUtils.isEmpty(set)){
            log.info("获取到锁");
            log.info("开始执行业务");
            Thread.sleep(5000);
            log.info("业务执行完成");
            redisOperations.del(LOCK_IN_REDIS_PREFIX);
            log.info("锁释放");
        }else {
            log.info("获取不到锁");
        }

        /**
         * 到这里是否就没有问题了呢? 其实不然 ,就上面这段程序而言
         * 虽然我们设置了过期时间,但是我们忽略的业务逻辑执行的时间 与 过期时间的 之间的关系
         * 上面设置的?的过期时间是 5s,  如果我们线程A业务逻辑操作时间是10s, 5s之后,锁过期了自动释放
         * 在 5s 后 线程B就可以拿到这把锁 ,线程B的业务逻辑操作时间也是10s,  等绝对时间到了 10s 时候,
         * 线程A执行del操作时释放锁,这时候 线程A 释放的是线程B的锁
         */




        /**
         * 上面的问题可以这样来解决:
         *      将当前线程的相关信息设置成值,
         *      删除之前先判断一下value里面的线程信息是否是当前线程的值
         * 但是这样get 和 del 不存在 原子性 可以通过lua脚本保证原子性
         */

        String threadMsg = Thread.currentThread().getName();
        String sets = redisOperations.set(LOCK_IN_REDIS_PREFIX, threadMsg, "NX""EX"5000);
        String script = "--KEYS[1] string 锁的keyn" +
                "--ARGV[1] string 当前线程的valuen" +
                "n" +
                "local valid_thread = ARGV[1] n" +
                "local current_thread = redis.call("get",KEYS[1])n" +
                "if n" +
                "valid_thread == current_threadn" +
                "thenn" +
                "redis.call("del",KEYS[1])n" +
                "end";
        if (!StringUtils.isEmpty(sets)){
            log.info("获取到锁");
            log.info("开始执行业务");
            Thread.sleep(5000);
            log.info("业务执行完成");
            redisOperations.eval(script, Collections.singletonList(LOCK_IN_REDIS_PREFIX),Collections.singletonList(VALUE_LOCK_IN_REDIS));
            log.info("锁释放");
        }else {
            log.info("获取不到锁");
        }


        /**
         * 还可以通过锁续租来解决 在获取得到锁的线程中开启一个守护线程没隔几秒续租一次
         */


        String localThreadMsg = Thread.currentThread().getName();
        String sets1 = redisOperations.set(LOCK_IN_REDIS_PREFIX, threadMsg, "NX""EX"5000);
        String script1 = "--KEYS[1] string 锁的keyn" +
                "--ARGV[1] string 当前线程的valuen" +
                "n" +
                "local valid_thread = ARGV[1] n" +
                "local current_thread = redis.call("get",KEYS[1])n" +
                "if n" +
                "valid_thread == current_threadn" +
                "thenn" +
                "redis.call("del",KEYS[1])n" +
                "end";
        if (!StringUtils.isEmpty(sets)){
            log.info("获取到锁");
            log.info("开始执行业务");
            Thread demon = new Thread(()->{
                long start = System.currentTimeMillis();
                while (System.currentTimeMillis()-start >= 29000L){
                    redisOperations.expire(LOCK_IN_REDIS_PREFIX,20);
                    start = System.currentTimeMillis();
                }
            });
            demon.run();
            Thread.sleep(5000);
            log.info("业务执行完成");
            redisOperations.eval(script, Collections.singletonList(LOCK_IN_REDIS_PREFIX),Collections.singletonList(VALUE_LOCK_IN_REDIS));
            log.info("锁释放");
        }else {
            log.info("获取不到锁");
        }

    }

    上面的做法很好解决的分布式锁的问题,但是我们的redis通常都是集群化,我们考虑一下这种情况 我们知道在主从架构下master节点可以读写,slave节点只能读,master通过执行bgsave()命令开启异步线程将数据同步给slave如果在这个同步过程,key还没有来得及同步给slave就发生故障,这个时候锁就会丢失。那么对应这种情况,有人提出了RedLock(红锁),他的主要原理是在多个redis节点中共享一个锁,使用多个锁来协调修改锁的状态,具体的细节可以去百度一下。我们redisson的框架提供了对redlock的实现,我们直接就可以用redisson就可以了。

redisson框架

Redisson是一个架设在redis上非常优秀的开源框架,提供了一种基于看门狗机制的分布式锁框架。

    redisson在获取锁之后,会维护一个看门狗线程,在每一个锁设置的过期时间的1/3处,如果线程还没执行完任务,则不断延长锁的有效期。看门狗的检查锁超时时间默认是30秒,可以通过 lockWactchdogTimeout 参数来改变。

    加锁的时间默认是30秒,如果加锁的业务没有执行完,那么每隔 30 ÷ 3 = 10秒,就会进行一次续期,把锁重置成30秒,保证解锁前锁不会自动失效。

    线程去获取锁,获取成功:执行lua脚本,保存数据到redis数据库。

    线程去获取锁,获取失败:一直通过while循环尝试获取锁,获取成功后,执行lua脚本,保存数据到redis数据库。



    想说的很多,晚点再说


本篇文章来源于微信公众号: Hephaestuses



微信扫描下方的二维码阅读本文

此作者没有提供个人介绍
最后更新于 2023-06-19