redis分布式锁实现---源码分析
2021/8/2 19:10:05
本文主要是介绍redis分布式锁实现---源码分析,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
一、回顾一下分布式锁的基本使用方式
①、注入redissonClient客户端
②、通过redissonClient客户端获取锁对象rLock
③、通过rLock尝试获取锁
// ①、注入redissonClient客户端 @Autowired private RedissonClient redissonClient; public boolean getLock() { // ②、通过redissonClient客户端获取锁对象rLock(RedissonLock实现默认是可重入锁) rLock = redissonClient.getLock(lockInfo.getName()); try { // ③、通过rLock尝试获取锁 return rLock.tryLock(lockInfo.getWaitTime(), lockInfo.getLeaseTime(), TimeUnit.SECONDS); } catch (InterruptedException e) { log.info("获取可重入锁时线程被意外中断,锁名称:{},异常信息:{}", lockInfo.getName(), e); } return false; }
二、redisson实现分布式锁结构介绍
1、redisson锁实现的基本特性
- redisson对Redis的各种命令操作和特性都做了很好的封装和扩展,是一个非常灵活易用的Java中间件。
- 在redisson实现分布式锁这块最核心的类是
RedissonLock
。 RedissonLock
类提供了可重入锁的实现。- redisson实现了公平锁(
RedissonFairLock
)、读锁(RedissonReadLock
)、写锁(RedissonWriteLock
)、可重入锁(RedissonLock
)、联锁(RedissonMultiLock
)、红锁(RedissonRedLock
)
2、锁实现的继承关系和子类实现
上面说到redisson实现分布式锁最核心的类是RedissonLock(很多锁都是基于该类实现的)
(红锁和联锁后面介绍),那么下面看看RedissonLock
的继承关系和子类以及类的结构,以便于更好的理解源码。
①、RedissonLock继承关系
②、RedissonLock的实现类
redissonLock类作为一个通用的模板类(默认提供可重入锁实现),其他很多类型的锁都基于该类去实现不同的加锁方式。
可以看到RedissonLock
类有四个子类实现,这些子类分别覆写了加锁释放锁的核心方法。
三、加锁方法分析
1、说明
- 上面理清了
RedissonLock
锁的继承以及实现关系,下面我们就找一个类型的锁实现来分析加锁解锁源码!这里以RedissonLock
默认实现的可重入锁为例进行源码分析。 RedissonLock
实现了多种加锁的接口,比如:带等待时间和释放时间的加锁接口(tryLock(long waitTime, long leaseTime, TimeUnit unit)
)、只带等待时间的加锁接口(tryLock(long waitTime, TimeUnit unit)
)、既不带等待时间也不带锁释放时间的加锁接口(tryLock()
)、只带释放时间没有等待时间的接口(lock(long leaseTime, TimeUnit unit)
)等,这里我们挑选一个参数最多的tryLock(long waitTime, long leaseTime, TimeUnit unit)
方法来分析。- 源码分析的方式,按照先画整体流程图,对整个加锁解锁流程有大致理解后再分析源码。
2、加锁整体流程分析
3、源码分析
下面所有的源码都来自于RedssionLock
类!!!
①、tryLock方法
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { // 1、将等待时间转换为毫秒、、获取当前时间、获取当前线程ID long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); final long threadId = Thread.currentThread().getId(); // 2、尝试申请锁,返回还剩余的锁过期时间【加锁核心方法,下面(②、加锁核心方法tryAcquire分析)】 Long ttl = tryAcquire(leaseTime, unit, threadId); // 3、ttl==null 表示获取锁成功则直接返回true // lock acquired if (ttl == null) { return true; } // 4、获取还需要等待的时间,且根据还需等待的时间(time)判断是否获取锁失败 time -= (System.currentTimeMillis() - current); // 如果还需要等待的时间为0,则说明获取锁已经失败了 // 申请锁的耗时如果大于等于最大等待时间,则申请锁失败 if (time <= 0) { acquireFailed(threadId); return false; } // 重新获取当前时间 current = System.currentTimeMillis(); // 5、上面第一次尝试获取锁失败,且还没有超出最大等待时间的基础上,基于Redis的发布订阅机制,订阅锁释放事件 final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId); /* * 6、基于Redis的发布订阅机制,订阅锁释放事件,并通过await方法阻塞等待锁释放,有效的解决了无效的锁申请浪费资源的问题: * 基于信号量,当锁被其它资源占用时,当前线程通过 Redis 的 channel 订阅锁的释放事件,一旦锁释放会发消息通知待等待的线程进行竞争 * 当 this.await返回false,说明等待时间已经超出获取锁最大等待时间,取消订阅并返回获取锁失败 * 当 this.await返回true,进入下面的循环再次尝试获取锁 * * await是通过CountDownLatch + 监听器机制来实现的,具体看方法内部注释,见下面【③、await加锁最大等待时间方法分析】 */ if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) { // 在等待时间耗完的情况下,取消对该锁的订阅 if (!subscribeFuture.cancel(false)) { subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() { @Override public void operationComplete(Future<RedissonLockEntry> future) throws Exception { if (subscribeFuture.isSuccess()) { unsubscribe(subscribeFuture, threadId); } } }); } // 获取锁失败 acquireFailed(threadId); return false; } // 7、如果在等待时间内订阅的锁已经被释放了,则会执行这里 try { // 获取还需要等待的时间 time -= (System.currentTimeMillis() - current); // 如果还需要等待的时间小于0,则说明已经超过最大等待时间,获取锁失败 if (time <= 0) { acquireFailed(threadId); return false; } // 8、能运行到这里则说明: // 1、当前还在最大等待时间内 // 2、并且等待的锁已经被释放(即对该锁的订阅事件已经被吊起过),在这里可以再次尝试获取锁 // 这是一个死循环,循环退出条件有两个: // ①、在最大等待时间内成功获取锁,返回true // ②、超出了最大等待时间,但仍然没有成功获取到锁,返回false while (true) { // 获取当前时间 long currentTime = System.currentTimeMillis(); // 8.1、再次尝试申请锁,返回还剩余的锁过期时间 ttl = tryAcquire(leaseTime, unit, threadId); // 8.2、ttl==null 表示获取锁成功则直接返回true // lock acquired if (ttl == null) { return true; } // 再次计算还需要等待多时时间 time -= (System.currentTimeMillis() - currentTime); // 8.3、如果还需要等待的时间小于0,则说明已经超过最大等待时间,获取锁失败 if (time <= 0) { acquireFailed(threadId); return false; } // waiting for message // 更新一下当前时间,因为上面的操作可能会耗时,进而导致下面根据currentTime计算的time不准确 currentTime = System.currentTimeMillis(); if (ttl >= 0 && ttl < time) { // 8.4、如果剩余时间(ttl)小于waittime ,就在 ttl 时间内,从Entry的信号量(Semaphore)获取一个许可(除非被中断或者一直没有可用的许可)。 getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { // 8.5、如果该锁剩余过期时间(ttl)大于waittime,则就在waittime 时间范围内等待可以通过信号量(Semaphore) getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } // 更新剩余的等待时间(最大等待时间-已经消耗的阻塞时间) time -= (System.currentTimeMillis() - currentTime); // 8.6、如果还需要等待的时间小于0,则说明已经超过最大等待时间,获取锁失败 if (time <= 0) { acquireFailed(threadId); return false; } } } finally { // 9、无论是否获得锁,都要取消订阅解锁消息 unsubscribe(subscribeFuture, threadId); } }
②、加锁核心方法tryAcquire分析
/** * 尝试获取锁,如果没有获取到锁则返回该锁还剩余多少毫秒过期,如果获取到了锁,则返回空 */ // 【1、尝试获取锁】 private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) { // tryAcquireAsync方法返回一个RFuture<Long>类型,get方法主要就是取得RFuture中的数值 // 该数值就是该锁还剩余的过期时间(如果为空,则表示已经获取到锁了,反之则表示该锁还剩多久过期) // 见下面【2、异步尝试获取锁】 return get(tryAcquireAsync(leaseTime, unit, threadId)); } // 【2、异步尝试获取锁】 private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) { // 1、如果有设置锁过期时间 if (leaseTime != -1) { // 调用tryLockInnerAsync,【通过lua脚本去加锁】,见下面【3、通过调用lua脚本去真正开始加锁】 return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } // 2、如果获取锁时没有传递锁过期时间,则这里会给个默认过期时间30s(通过执行lua脚本去获取锁) RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); // 3、给获取锁的操作添加一个监听,当获取锁的操作返回时(不管成功还是失败),立即调用监听方法 ttlRemainingFuture.addListener(new FutureListener<Long>() { // 当获取锁的操作执行结束时,该方法被吊起 @Override public void operationComplete(Future<Long> future) throws Exception { // 3.1、如果获取锁失败,则直接返回 if (!future.isSuccess()) { return; } Long ttlRemaining = future.getNow(); // lock acquired // ttlRemaining == null 则说明获取锁成功 if (ttlRemaining == null) { // 3.2、如果获取锁成功了,则开启定时任务去定时延长锁过期时间(看门狗) scheduleExpirationRenewal(threadId); } } }); return ttlRemainingFuture; } // 【3、通过调用lua脚本去真正开始加锁】 <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { // 将锁过期时间转换为毫秒 internalLockLeaseTime = unit.toMillis(leaseTime); // 通过lua脚本去获取锁(可重入锁) // pttl命令和ttl命令类似,只是他是以毫秒为单位返回剩余过期时间,ttl是以秒为单位 return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.<Object>singletonList(getName()) , internalLockLeaseTime, getLockName(threadId)); }
③、await加锁最大等待时间方法分析
protected boolean await(RFuture<?> future, long timeout, TimeUnit timeoutUnit) throws InterruptedException { // 具体实现见下面 return commandExecutor.await(future, timeout, timeoutUnit); } public boolean await(RFuture<?> future, long timeout, TimeUnit timeoutUnit) throws InterruptedException { // 创建一个门栓 final CountDownLatch l = new CountDownLatch(1); // 当订阅的锁被释放后会吊起这个监听方法,在监听方法内部将门栓数量减一 future.addListener(new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { // 监听方法被吊起,门栓数量减一 l.countDown(); } }); // 在这里等待门栓数量为0,或超时时间到了再继续运行 // 在等待时间内如果订阅的锁已经释放,监听方法会被吊起门栓数量为0,则这里返回true // 如果等待时间已经耗完了,订阅的锁还没被释放的话,则这里返回false return l.await(timeout, timeoutUnit); }
四、释放锁方法分析
1、释放锁流程分析
释放锁的代码逻辑比较简单,这里只描述一下释放锁的lua脚本的大体流程即可。
2、释放锁源码分析
①、unlock
public void unlock() { try { // 释放锁的核心方法unlockAsync get(unlockAsync(Thread.currentThread().getId())); } catch (RedisException e) { if (e.getCause() instanceof IllegalMonitorStateException) { throw (IllegalMonitorStateException)e.getCause(); } else { throw e; } } }
②、unlockAsync
RFuture可以理解为对Future的一个增强,netty中的实现。JDK中的future只能阻塞获取子线程的返回,在netty中对future进行了增强,可以添加监听并且异步获取。
public RFuture<Void> unlockAsync(final long threadId) { final RPromise<Void> result = new RedissonPromise<Void>(); // 1、具体的释放锁的lua脚本(释放锁的动作在这里完成) RFuture<Boolean> future = unlockInnerAsync(threadId); // 2、添加一个监听器,一旦释放锁的操作完成(无论失败或成功),都会吊起监听器的operationComplete方法 future.addListener(new FutureListener<Boolean>() { @Override public void operationComplete(Future<Boolean> future) throws Exception { // 3、如果释放锁失败了 if (!future.isSuccess()) { cancelExpirationRenewal(threadId); result.tryFailure(future.cause()); return; } Boolean opStatus = future.getNow(); if (opStatus == null) { IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + threadId); result.tryFailure(cause); return; } if (opStatus) { cancelExpirationRenewal(null); } // 4、释放锁成功 result.trySuccess(null); } }); return result; }
③、unlockInnerAsync
// 释放锁的lua脚本,这个lua脚本很简单不过多解锁 protected RFuture<Boolean> unlockInnerAsync(long threadId) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end;" + "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; "+ "end; " + "return nil;", Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId)); }
五、遗留问题
1、如果自己设置了锁过期时间那么redisson就不会开启看门狗去延长过期时间了吗?
关于这个问题,从源代码中可以看到如果自己设置了锁过期时间那么会直接调用
tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG)
方法去获取锁,获取到了以后就直接返回了,并没有看到在哪里会去开启一个看门狗。如果是这样的话,那么是不是在我们调用
tryLock
方法的时候,如果自己指定了锁的过期时间,是不是就意味着没有定时线程(看门狗)去定期的延长锁的过期时间了???那这样是不是就不能保证分布式锁的安全性了????
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) { // 1、如果有设置锁过期时间 if (leaseTime != -1) { // 调用tryLockInnerAsync,通过lua脚本去加锁【这里加锁成功后就直接返回了,并没有添加看门狗延长过期时间????】 return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } // 2、如果获取锁时没有传递锁过期时间,则这里会给个默认过期时间30s(通过执行lua脚本去获取锁) RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); // 3、给获取锁的操作添加一个监听,当获取锁的操作返回时(不管成功还是失败),立即调用监听方法 ttlRemainingFuture.addListener(new FutureListener<Long>() { // 当获取锁的操作执行结束时,该方法被吊起 @Override public void operationComplete(Future<Long> future) throws Exception { // 3.1、如果获取锁失败,则直接返回 if (!future.isSuccess()) { return; } Long ttlRemaining = future.getNow(); // ttlRemaining == null 则说明获取锁成功 if (ttlRemaining == null) { // 3.2、如果获取锁成功了,则开启定时任务去定时延长锁过期时间(看门狗) scheduleExpirationRenewal(threadId); } } }); return ttlRemainingFuture; }
六、参考文章
1、慢谈 Redis 实现分布式锁 以及 Redisson 源码解析
2、RedissonMultiLock + RedissonLock部分源码
3、redisson-2.10.4源代码分析
4、redis客户端redisson实战
这篇关于redis分布式锁实现---源码分析的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-12-24Redis资料:新手入门快速指南
- 2024-12-24Redis资料:新手入门教程与实践指南
- 2024-12-24Redis资料:新手入门教程与实践指南
- 2024-12-07Redis高并发入门详解
- 2024-12-07Redis缓存入门:新手必读指南
- 2024-12-07Redis缓存入门:新手必读教程
- 2024-12-07Redis入门:新手必备的简单教程
- 2024-12-07Redis入门:新手必读的简单教程
- 2024-12-06Redis入门教程:从安装到基本操作
- 2024-12-06Redis缓存入门教程:轻松掌握缓存技巧