22 关于 RedissonLock

2021/10/2 19:10:13

本文主要是介绍22 关于 RedissonLock,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

前言 

相关介绍主要围绕着 一种常用的分布式锁的实现 RedissonLock  

呵呵 一种常见的分布式锁, 但是 从工作至今用到的还不是很多吧, 刚好最近 有一些场景需要这方面的知识 

重新回来梳理一下 这块的知识, 关于这个 RedissonLock 很久之前我是看过的, 大概是 17 年的样子吧, 不过 当时没有梳理成文档的形式 

呵呵 如今再回来看看, 还是稍微有一些成本 

以下测试用例基于 jedis 3.5.2 + redis-6.2.0 
 

测试用例

/**
 * Test20RedissonLock
 *
 * @author Jerry.X.He <970655147@qq.com>
 * @version 1.0
 * @date 2021-03-22 14:13
 */
public class Test20RedissonLock {

    // Test20RedissonLock
    public static void main(String[] args) {

        Config config = new Config();
        config.useSingleServer().setAddress("redis://127.0.0.1:6379").setDatabase(0);
        RedissonClient redissonClient = Redisson.create(config);
        RLock disLock = redissonClient.getLock("sync");

        // test01NoConflict
        test01NoConflict(disLock);

        // test02TryWaitingLock
//        test02TryWaitingLock(disLock);

    }

    // case1, no conflict
    public static void test01NoConflict(RLock disLock) {
        System.out.println(" before lock : " + new Date());
        disLock.lock();
        System.out.println(" after lock : " + new Date());

        System.out.println(" do biz ");
        disLock.unlock();
        System.out.println(" the end ");
    }

    // // case2. other thread hold the lock, then lock
    public static void test02TryWaitingLock(RLock disLock) {
        // first thread hold the lock 2 seconds
        new Thread(() -> {
            disLock.lock();
            sleep(2000);
            disLock.unlock();
        }).start();

        sleep(10);
        System.out.println(" before lock : " + new Date());
        disLock.lock();
        System.out.println(" after lock : " + new Date());

        System.out.println(" do biz ");
        disLock.unlock();
        System.out.println(" the end ");
    }

}

我们这里主要是 围绕着两个常见的场景来进行分析, 大致的剖析一下 这两个场景下面 redisson 做的事情 

1. 没有竞争的情况下 RedissonLock 的 lock/unlock 做了那些事情 

2. 存在竞争的情况下 RedissonLock 的 lock/unlock 做了那些事情 

3. RedissonLock 获取锁 超时之后 unlock 会是什么情况 ? 

redis 里面 lua 脚本执行的原子性

引用 : EVAL – Redis

Atomicity of scripts

Redis uses the same Lua interpreter to run all the commands. Also Redis guarantees that a script is executed in an atomic way: no other script or Redis command will be executed while a script is being executed. This semantic is similar to the one of MULTI / EXEC. From the point of view of all the other clients the effects of a script are either still not visible or already completed.

However this also means that executing slow scripts is not a good idea. It is not hard to create fast scripts, as the script overhead is very low, but if you are going to use slow scripts you should be aware that while the script is running no other client can execute commands.

1. 没有竞争的情况下 RedissonLock 的 lock/unlock 做了那些事情 

控制台输出信息如下 

redis-server 执行的所有的命令 

首先我们看一下 redis-server 这边接受到的命令的相关日志[手动从 redis-server 打印出来, 或者抓包也能拿到数据] 

可以看到 服务器这边 只是接收到了两个命令, 那显然对应的就是我们的 lock/unlock 这两个命令了, 我们接下来看一看 

# lock 的时候 RedissonLock 向 redis-server 这边发送的命令
*6
$4
EVAL
$339
if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);
$1
1
$4
sync
$5
30000
$38
a43ef10a-d89d-4cda-962c-c6e370a8fee0:1

# lua script 格式化一下
if (redis.call('exists', KEYS[1]) == 0)
	then redis.call('hincrby', KEYS[1], ARGV[2], 1);
	redis.call('pexpire', KEYS[1], ARGV[1]);
	return nil;
	end;
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1)
	then redis.call('hincrby', KEYS[1], ARGV[2], 1);
	redis.call('pexpire', KEYS[1], ARGV[1]);
	return nil;
	end;
return redis.call('pttl', KEYS[1]);


# unlock 的时候 RedissonLock 向 redis-server 这边发送的命令 
*8
$4
EVAL
$305
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;
$1
2
$4
sync
$29
redisson_lock__channel:{sync}
$1
0
$5
30000
$38
a43ef10a-d89d-4cda-962c-c6e370a8fee0:1

# lua script 格式化一下
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0)
	then return nil;
	end;
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
if (counter > 0)
	then redis.call('pexpire', KEYS[1], ARGV[2]);
	return 0;
else
	redis.call('del', KEYS[1]);
	redis.call('publish', KEYS[2], ARGV[1]);
	return 1;
	end;
return nil;

lock 的 lua 脚本

传递的参数, 我们吧上面的传递的信息 稍微格式化一下, 看起来友好一些 

sync 是我这里的 RedissonLock 的 name, 来标记我这里的 Lock 

30000 是来自于 RedissonLock 的默认配置的超时时间[参见 RedissonLock.internalLockLeaseTime] 

a43ef10a-d89d-4cda-962c-c6e370a8fee0 是来自于 RedissonLock.commandExecutor 的 connectionManager 的 id 

1 是当前线程的 id 

所以这里的锁的标志[id]是有两个维度的 : name 和 "RedissonLock.commandExecutor 的 connectionManager" 

假设普通情况下我们是全局使用同一个 connectionManager, 那么 锁的标志[id]就可以根据 name 来判断了 

KEYS = [sync]
ARGV = [30000, a43ef10a-d89d-4cda-962c-c6e370a8fee0:1]

lock 对应的 lua 脚本也格式化一下

所以这里是检查 "sync" 是否存在, 如果不存在, 则尝试加锁 

        尝试加锁的方式为 "hincrby sync a43ef10a-d89d-4cda-962c-c6e370a8fee0:1 1", 标记 线程1 持有 sync 的锁, 持有 1 次

如果 "sync" 存在, 检查 是否存在 "a43ef10a-d89d-4cda-962c-c6e370a8fee0:1"[当前线程是否持有 "sync" 的锁] 

        如果持有, "hincrby sync a43ef10a-d89d-4cda-962c-c6e370a8fee0:1 1", 表示 当前线程持有 "sync" 的重入次数, 刷新 过期时间 

否则 "sync" 被其他线程持有[或者 "sync" 被其他业务占用],  返回 "sync" 的过期时间 

if (redis.call('exists', KEYS[1]) == 0)
	then redis.call('hincrby', KEYS[1], ARGV[2], 1);
	redis.call('pexpire', KEYS[1], ARGV[1]);
	return nil;
	end;
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1)
	then redis.call('hincrby', KEYS[1], ARGV[2], 1);
	redis.call('pexpire', KEYS[1], ARGV[1]);
	return nil;
	end;
return redis.call('pttl', KEYS[1]);

unlock 的脚本

传递的参数, 我们吧上面的传递的信息 稍微格式化一下, 看起来友好一些 

sync 是我这里的 RedissonLock 的 name, 来标记我这里的 Lock 

30000 是来自于 RedissonLock 的默认配置的超时时间[参见 RedissonLock.internalLockLeaseTime] 

a43ef10a-d89d-4cda-962c-c6e370a8fee0 是来自于 RedissonLock.commandExecutor 的 connectionManager 的 id 

1 是当前线程的 id 

redisson_lock__channel:{sync} : 是我们基于 redis 的 subscribe/publish 机制 与其他线程交互的一个 channel 

KEYS = [sync, redisson_lock__channel:{sync}]
ARGV = [0, 30000, a43ef10a-d89d-4cda-962c-c6e370a8fee0:1]

unlock 对应的 lua 脚本也格式化一下

if (redis.call('hexists', KEYS[1], ARGV[3]) == 0)
	then return nil;
	end;
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
if (counter > 0)
	then redis.call('pexpire', KEYS[1], ARGV[2]);
	return 0;
else
	redis.call('del', KEYS[1]);
	redis.call('publish', KEYS[2], ARGV[1]);
	return 1;
	end;
return nil;

这里首先检查 当前线程是否是持有 "sync" 的锁, 如果灭有 直接返回 null 

如果当前线程持有 "sync" 的锁, "hincrby sync a43ef10a-d89d-4cda-962c-c6e370a8fee0:1 -1", 释放一次 

如果释放了一次之后 counter > 0, 表示当前获取了多次锁 但是还未完全释放, 刷新 "sync" 的过期时间, 返回 0 

如果释放了一次之后 counter <= 0[异常情况下会小于0], 获取了 n 次锁, 前面 n-1 次已经释放, 当前是最后一次释放 "sync" 的锁 

        删除 "sync" 对应的 key, 并 "publish redisson_lock__channel:{sync}" 通知其他等待的线程 来尝试获取锁了, 返回 1 

其他情况 返回 null 

回到 RedissonLock.lock 

回到我们这里的场景, 我们只有一个线程 来获取 "sync" 的锁, 以及释放 "sync" 的锁, 因此走的流程相对比较简单 

我们稍微调试一下 代码 

这里 当前线程会尝试获取 “sync” 的锁, 发送给 redis-server 的是上面的 lock 对应的 lua 脚本, 因此我们这里 没有其他线程持有 "sync" 锁, 因此 lua 脚本返回的是 null 

所以这里 ttl 得到的结果为 null, 表示 当前线程 获取到了 "sync" 的锁, 直接 return 了, 后面去执行 业务代码 

上面的 lock 相关 lua 代码来自于 RedissonLock 的如下代码片段, 传递 如下 script, keys, args 给 redis-server 

然后当前线程 继续持有 future 等待 future 执行完成 

这里是 设置 promise 的结果的地方

lock 的线程等待 future 执行完成之后, 返回结果, 这里的 result 是一个 Object, 为什么 lock 里面返回的 ttl 是 null ? 我们接着往下看一下 ImmediatePromise. getNow 

当然 我们这里的调试会忽略一些细节的地方, 比如这里 promise 的 listener 里面执行了什么, 维护了什么? [这个您可以自己再看] 

为什么 lock 里面返回的 ttl 是 null ?? 

再回到  上面, 我们这里获取 "sync" 的锁, 获取成功 

处理之后我们查看一下 redis-server 里面的 "sync" 

存在一个 entry, 可以看出 "sync" 锁是被 线程1 持有, 持有1次 

内存中维护的 "sync", 这个就是上面的提到的 listener 中维护的数据 

"sync" 锁是被 线程1 持有, 持有1次  

回到 RedissonLock.unlock 

上面的 unlock 相关 lua 代码来自于 RedissonLock 的如下代码片段, 传递 如下 script, keys, args 给 redis-server 

然后当前线程 继续持有 future 等待 future 执行完成 

2. 存在竞争的情况下 RedissonLock 的 lock/unlock 做了那些事情 

接下来我们来看 test02TryWaitingLock, 我们这里 这个 case 里面的主要情况如下 

创建了一个 线程1 获取 "sync" 的锁, 处理业务花费两秒时间 

然后 外面的 main 线程在 线程1 获取了 "sync" 的锁之后, 尝试获取 "sync" 的锁 

所以 我们期望应该是 main 线程的 before lock 和 after lock 正常期望应该是在 2s 左右的时间 

关于 线程1 的流程, 其实就是和上面的 case1 的情况基本上一致, 他是没有阻碍的获取到锁 

我们这里主要关注的是 main 线程的流程 

控制台输出信息如下 

redis-server 执行的所有的命令 

首先我们看一下 redis-server 这边接受到的命令的相关日志[手动从 redis-server 打印出来, 或者抓包也能拿到数据] 

可以看到 服务器这边 只是接收到了两个命令, 那显然对应的就是我们的 lock/unlock 这两个命令了, 我们接下来看一看 

这个流程从代码上来看 是合理的, 我们待会儿结合代码 剖析一下 

# step1. 线程1 获取锁
*6
$4
EVAL
$339
if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);
$1
1
$4
sync
$5
30000
$39
11760fef-9de5-4304-89e4-6bbcee0c630c:47


# step2. main 尝试获取锁
*6
$4
EVAL
$339
if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);
$1
1
$4
sync
$5
30000
$38
11760fef-9de5-4304-89e4-6bbcee0c630c:1


# step3. main subscribe "redisson_lock__channel:{sync}"
*2
$9
SUBSCRIBE
$29
redisson_lock__channel:{sync}


# step4. main 再次尝试获取锁
*6
$4
EVAL
$339
if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);
$1
1
$4
sync
$5
30000
$38
11760fef-9de5-4304-89e4-6bbcee0c630c:1


# step5. 线程1 释放锁
*8
$4
EVAL
$305
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;
$1
2
$4
sync
$29
redisson_lock__channel:{sync}
$1
0
$5
30000
$39
11760fef-9de5-4304-89e4-6bbcee0c630c:47


# step6. main 再次尝试获取锁
*6
$4
EVAL
$339
if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);
$1
1
$4
sync
$5
30000
$38
11760fef-9de5-4304-89e4-6bbcee0c630c:1


# step7. main unsubscribe "redisson_lock__channel:{sync}"
*2
$11
UNSUBSCRIBE
$29
redisson_lock__channel:{sync}


# step8. main 释放锁
*8
$4
EVAL
$305
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;
$1
2
$4
sync
$29
redisson_lock__channel:{sync}
$1
0
$5
30000
$38
11760fef-9de5-4304-89e4-6bbcee0c630c:1

来到 main 线程的 RedissonLock.lock 

main 线程里面 第一次 tryAcquire 拿到的 "sync" 的 ttl 是 29801, 差不多就是 线程1 获取锁之后 200ms, main 线程开始获取锁, 这里拿到的是 "sync" 这个 key 的过期时间, 还有 29.8s 

然后 main 订阅 "redisson_lock__channel:{sync}", 并注册了一个 Listener(收到消息之后的处理, 唤醒等待的线程), 这里的 subscribe 部分的并发处理是使用 Semaphore 进行处理, 初始化 permits 为 1

 然后 同步等待订阅相关业务完成 

然后来到下面的 while(true) 

        main 线程里面 再次尝试 tryAcquire, 拿到的 "sync" 的 ttl 是 29794, 可以看出 subscribe 这部分的注册是没有多大的开销的 

        接下来是 阻塞等待 ttl ms[或者 acquire 等待/中断], 使用的是一个 Semaphore 进行处理, 初始化 permits 为 1 

        我们这里业务代码中的这个 Semaphore 需要等待 29s, 但是实际上两秒之后 lock 就被释放了, 这里的通知是怎么做的呢? 就是上面的 subscribe 的 handler[unlock 了之后 “publish redisson_lock__channel:{sync} 1” ] 

        我们来看一下这个 subscribe 的 handler, 这里面的处理是 poll RedissonLockEntry 的 listener 来执行, 我们这里是已经没得下一个 listener 了, 然后 唤醒等待的 semaphore[因此我们这里 main 线程实际上大概是 wait 了 2s]

        然后接着走下一个循环, 此时 线程1 已经 unlock 了 "sync" 的锁, main 线程再来第三次 tryAcquire 获取 "sync" 的锁成功, ttl 返回 null, 跳出循环 

获取到锁之后, unsubscribe redisson_lock__channel:{sync}, 清理资源 

第一次 tryAcquire  

第二次 tryAcquire 

subscribe redisson_lock__channel:{sync} 的 handler 

第三次 tryAcquire 

来到 main 线程的 RedissonLock.unlock 

这里的 锁的释放 就和上面的释放锁 基本一致了, 不多赘述 

梳理完整个流程之后再回到上面 redis-server 执行命令的日志再看看? 

3. RedissonLock 获取锁 超时之后 unlock 会是什么情况 ? 

常见的 IllegalMonitorStateException, 比较简单, 这里就不细说了 



这篇关于22 关于 RedissonLock的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程