Redisson分布式
2022/6/27 2:20:24
本文主要是介绍Redisson分布式,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
参考链接:https://www.cnblogs.com/jklixin/p/13212864.html
官网连接:https://redisson.org
Redisson分布式
GitHub中文文档
概念:是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务
引入依赖
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.13.1</version> </dependency>
配置
@Configuration public class MyRedissonConfig { @Bean(destroyMethod = "shutdown") public RedissonClient redissonClient() throws IOException{ //1.配置连接 Config config = new Config(); config.useSingleServer() .setPassword("123456") //可以用"rediss://"来启用SSL连接 .setAddress("redis://123.56.16.54:6379"); //2.创建客户端 RedissonClient redissonClient= Redisson.create(config); return redissonClient; } }
分布式锁
1、可重入锁
@ResponseBody @GetMapping("/hello") public String hello() { //1、获取一把锁,只要锁的名字一样,就是同一把锁 RLock lock = redisson.getLock(" my-lock "); //2、加锁 lock.lock();//阻塞式等待。默认加的锁都是30s时间。 //1)、锁的自动续期,如果业务超长,运行期间自动给锁续上新的30s。不用担心业务时间长,锁自动过期被用特 //2)、加锁的业务只要运行完成,就不会给当前锁续期,即使不手动解锁,锁默认在30s以后自动删除。 try{ System.out.println("加锁成功,执行业务..." + Thread.currentThread().getId()); Thread.sleep(30000); } catch (Exception e) { } finally { //3、解锁将设解锁代码没有运行,redisson会不会出现死锁 System.out.println("释放锁..." + Thread.currentThread().getId()); lock.unlock(); } return "hello"; }
基于Redis的Redisson分布式可重入锁RLock
Java对象实现了java.util.concurrent.locks.Lock
接口
大家都知道,如果负责储存这个分布式锁的Redisson节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗的检查锁的超时时间是30秒钟,也可以通过修改Config.lockWatchdogTimeout来另行指定。
try { while (true) { ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { break; } // waiting for message if (ttl >= 0) { try { future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { if (interruptibly) { throw e; } future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } } else { if (interruptibly) { future.getNow().getLatch().acquire(); } else { future.getNow().getLatch().acquireUninterruptibly(); } } } } finally { unsubscribe(future, threadId); }
另外Redisson还通过加锁的方法提供了leaseTime
的参数来指定加锁的时间。超过这个时间后锁便自动解开了。
// 加锁以后10秒钟自动解锁,不会续期 // 无需调用unlock方法手动解锁 lock.lock(10, TimeUnit.SECONDS);//10秒自动解锁,自动解锁时间一定要大于业务的执行时间。 //问题:Lock.Lock(10,TimeUnit.SECONDS);在锁时间到了以后,不会自动续期。 //1、如果我们传递了锁的超时时间,就发送给redis执行脚本,进行占锁,默认超时就是我们指定的时间 //2、如果我们未指定锁的超时时间,就使用30*1000(LockivatchdogTimeout看门狗的默认时间) //只要占锁成功,就会启动一个定时任务(重新给看门狗定义过期时间,新的过期时间是默认的30s),每个10s自动续期到30s // internalLockLeaseTime【看门狗时间】/3,10s后 // 尝试加锁,最多等待100秒,超出不等,上锁以后10秒自动解锁 boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS); if (res) { try { ... } finally { lock.unlock(); } }
2、公平锁
保证了当多个Redisson客户端线程同时请求加锁时,优先分配给先发出请求的线程。所有请求线程会在一个队列中排队,当某个线程出现宕机时,Redisson会等待5秒后继续下一个线程,也就是说如果前面有5个线程都处于等待状态,那么后面的线程会等待至少25秒。
RLock fairLock = redisson.getFairLock("anyLock"); // 最常见的使用方法 fairLock.lock(); // 10秒钟以后自动解锁 // 无需调用unlock方法手动解锁 fairLock.lock(10, TimeUnit.SECONDS); // 尝试加锁,最多等待100秒,上锁以后10秒自动解锁 boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS); ... fairLock.unlock();
3、读写锁
分布式可重入读写锁允许同时有多个读锁和一个写锁处于加锁状态。
RReadWriteLock rwlock = redisson.getReadWriteLock("anyRWLock"); // 最常见的加锁使用方法 rwlock.readLock().lock(); // 或 rwlock.writeLock().lock(); // 10秒钟以后自动解锁 // 无需调用unlock方法手动解锁 rwlock.readLock().lock(10, TimeUnit.SECONDS); // 或 rwlock.writeLock().lock(10, TimeUnit.SECONDS); // 尝试加锁,最多等待100秒,上锁以后10秒自动解锁 boolean res = rwlock.readLock().tryLock(100, 10, TimeUnit.SECONDS); // 或 boolean res = rwlock.writeLock().tryLock(100, 10, TimeUnit.SECONDS); ... lock.unlock();
进行写操作之前不能有锁,进行读操作之前可以有读锁,不能有写锁
4、信号量
@ResponseBody @GetMapping("/acquire") public String AcquireSemaphore(){ RSemaphore semaphore = redisson.getSemaphore("semaphore"); //需要获取几个信号量参数就写几,默认为1 boolean b = semaphore.tryAcquire(); if(b){ //获取成功 return "获取成功:信号量-1..."+"当前信号量"+semaphore.availablePermits(); }else{ return "获取失败:等待...."+"当前信号量"+semaphore.availablePermits(); } } @ResponseBody @GetMapping("/release") public String releaseSemaphore(){ RSemaphore semaphore = redisson.getSemaphore("semaphore"); semaphore.release();//semaphore.release(4);表示一次性释放4个信号量 return "释放成功:信号量+1......"+"当前信号量"+semaphore.availablePermits(); }
5、闭锁
(CountDownLatch)
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch"); latch.trySetCount(3);//redis中存在一个anyCountDownLatch=3,当其等于0时就闭锁 latch.await(); // 在其他线程或其他JVM里 RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch"); latch.countDown();//redis中的anyCountDownLatch减一
基于 Redis 的 Java 分布式可重入Lock对象,并实现Lock接口。
如果获取锁的 Redisson 实例崩溃,那么这种锁可能会在获取状态下永远挂起。为了避免这种 Redisson 维护锁看门狗,它会在锁持有者 Redisson 实例处于活动状态时延长锁的过期时间。默认情况下,锁定看门狗超时为 30 秒,可以通过Config.lockWatchdogTimeout设置进行更改。
Redisson 还允许leaseTime
在获取锁时指定参数。在指定的时间间隔后锁定的锁将自动释放。
RLock
对象的行为符合 Java Lock 规范。这意味着只有锁所有者线程才能解锁它,否则IllegalMonitorStateException
会被抛出。否则考虑使用RSemaphore对象。
代码示例:
RLock lock = redisson.getLock("myLock"); // traditional lock method lock.lock(); // or acquire lock and automatically unlock it after 10 seconds lock.lock(10, TimeUnit.SECONDS); // or wait for lock aquisition up to 100 seconds // and automatically unlock it after 10 seconds boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS); if (res) { try { ... } finally { lock.unlock(); } }
Async接口使用代码示例:
RLock lock = redisson.getLock("myLock"); RFuture<Void> lockFuture = lock.lockAsync(); // or acquire lock and automatically unlock it after 10 seconds RFuture<Void> lockFuture = lock.lockAsync(10, TimeUnit.SECONDS); // or wait for lock aquisition up to 100 seconds // and automatically unlock it after 10 seconds RFuture<Boolean> lockFuture = lock.tryLockAsync(100, 10, TimeUnit.SECONDS); lockFuture.whenComplete((res, exception) -> { // ... lock.unlockAsync(); });
Reactive接口使用代码示例:
RedissonReactiveClient redisson = redissonClient.reactive(); RLockReactive lock = redisson.getLock("myLock"); Mono<Void> lockMono = lock.lock(); // or acquire lock and automatically unlock it after 10 seconds Mono<Void> lockMono = lock.lock(10, TimeUnit.SECONDS); // or wait for lock aquisition up to 100 seconds // and automatically unlock it after 10 seconds Mono<Boolean> lockMono = lock.tryLock(100, 10, TimeUnit.SECONDS); lockMono.doOnNext(res -> { // ... }) .doFinally(lock.unlock()) .subscribe();
RxJava3接口使用代码示例:
RedissonRxClient redisson = redissonClient.rxJava(); RLockRx lock = redisson.getLock("myLock"); Completable lockRes = lock.lock(); // or acquire lock and automatically unlock it after 10 seconds Completable lockRes = lock.lock(10, TimeUnit.SECONDS); // or wait for lock aquisition up to 100 seconds // and automatically unlock it after 10 seconds Single<Boolean> lockRes = lock.tryLock(100, 10, TimeUnit.SECONDS); lockRes.doOnSuccess(res -> { // ... }) .doFinally(lock.unlock()) .subscribe();
8.2. 公平锁
基于 Redis 的 Java 分布式可重入公平Lock对象实现了Lock接口。
公平锁保证线程将按照他们请求的顺序获取它。所有等待的线程都排队,如果某个线程已经死亡,那么 Redisson 等待它的返回 5 秒。例如,如果 5 个线程由于某种原因死亡,则延迟将是 25 秒。
如果获取锁的 Redisson 实例崩溃,那么这种锁可能会在获取状态下永远挂起。为了避免这种 Redisson 维护锁看门狗,它会在锁持有者 Redisson 实例处于活动状态时延长锁的过期时间。默认情况下,锁定看门狗超时为 30 秒,可以通过Config.lockWatchdogTimeout设置进行更改。
Redisson 还允许leaseTime
在获取锁时指定参数。在指定的时间间隔后锁定的锁将自动释放。
RLock
对象的行为符合 Java Lock 规范。这意味着只有锁所有者线程才能解锁它,否则IllegalMonitorStateException
会被抛出。否则考虑使用RSemaphore对象。
代码示例:
RLock lock = redisson.getFairLock("myLock"); // traditional lock method lock.lock(); // or acquire lock and automatically unlock it after 10 seconds lock.lock(10, TimeUnit.SECONDS); // or wait for lock aquisition up to 100 seconds // and automatically unlock it after 10 seconds boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS); if (res) { try { ... } finally { lock.unlock(); } }
Async接口使用代码示例:
RLock lock = redisson.getFairLock("myLock"); RFuture<Void> lockFuture = lock.lockAsync(); // or acquire lock and automatically unlock it after 10 seconds RFuture<Void> lockFuture = lock.lockAsync(10, TimeUnit.SECONDS); // or wait for lock aquisition up to 100 seconds // and automatically unlock it after 10 seconds RFuture<Boolean> lockFuture = lock.tryLockAsync(100, 10, TimeUnit.SECONDS); lockFuture.whenComplete((res, exception) -> { // ... lock.unlockAsync(); });
Reactive接口使用代码示例:
RedissonReactiveClient redisson = redissonClient.reactive(); RLockReactive lock = redisson.getFairLock("myLock"); Mono<Void> lockMono = lock.lock(); // or acquire lock and automatically unlock it after 10 seconds Mono<Void> lockMono = lock.lock(10, TimeUnit.SECONDS); // or wait for lock aquisition up to 100 seconds // and automatically unlock it after 10 seconds Mono<Boolean> lockMono = lock.tryLock(100, 10, TimeUnit.SECONDS); lockMono.doOnNext(res -> { // ... }) .doFinally(lock.unlock()) .subscribe();
RxJava3接口使用代码示例:
RedissonRxClient redisson = redissonClient.rxJava(); RLockRx lock = redisson.getFairLock("myLock"); Completable lockRes = lock.lock(); // or acquire lock and automatically unlock it after 10 seconds Completable lockRes = lock.lock(10, TimeUnit.SECONDS); // or wait for lock aquisition up to 100 seconds // and automatically unlock it after 10 seconds Single<Boolean> lockRes = lock.tryLock(100, 10, TimeUnit.SECONDS); lockRes.doOnSuccess(res -> { // ... }) .doFinally(lock.unlock()) .subscribe();
8.3. 多锁
基于 Redis 的分布式MultiLock
对象允许对Lock对象进行分组并将它们作为单个锁处理。每个RLock
对象可能属于不同的 Redisson 实例。
如果获取的 Redisson 实例MultiLock
崩溃,那么这样的实例MultiLock
可能会永远挂在获取状态。为了避免这种 Redisson 维护锁看门狗,它会在锁持有者 Redisson 实例处于活动状态时延长锁的过期时间。默认情况下,锁定看门狗超时为 30 秒,可以通过Config.lockWatchdogTimeout设置进行更改。
Redisson 还允许leaseTime
在获取锁时指定参数。在指定的时间间隔后锁定的锁将自动释放。
MultiLock
对象的行为符合 Java Lock 规范。这意味着只有锁所有者线程才能解锁它,否则IllegalMonitorStateException
会被抛出。否则考虑使用RSemaphore对象。
代码示例:
RLock lock1 = redisson1.getLock("lock1"); RLock lock2 = redisson2.getLock("lock2"); RLock lock3 = redisson3.getLock("lock3"); RLock multiLock = anyRedisson.getMultiLock(lock1, lock2, lock3); // traditional lock method multiLock.lock(); // or acquire lock and automatically unlock it after 10 seconds multiLock.lock(10, TimeUnit.SECONDS); // or wait for lock aquisition up to 100 seconds // and automatically unlock it after 10 seconds boolean res = multiLock.tryLock(100, 10, TimeUnit.SECONDS); if (res) { try { ... } finally { multiLock.unlock(); } }
Async接口使用代码示例:
RLock lock1 = redisson1.getLock("lock1"); RLock lock2 = redisson2.getLock("lock2"); RLock lock3 = redisson3.getLock("lock3"); RLock multiLock = anyRedisson.getMultiLock(lock1, lock2, lock3); RFuture<Void> lockFuture = multiLock.lockAsync(); // or acquire lock and automatically unlock it after 10 seconds RFuture<Void> lockFuture = multiLock.lockAsync(10, TimeUnit.SECONDS); // or wait for lock aquisition up to 100 seconds // and automatically unlock it after 10 seconds RFuture<Boolean> lockFuture = multiLock.tryLockAsync(100, 10, TimeUnit.SECONDS); lockFuture.whenComplete((res, exception) -> { // ... multiLock.unlockAsync(); });
Reactive接口使用代码示例:
RedissonReactiveClient anyRedisson = redissonClient.reactive(); RLockReactive lock1 = redisson1.getLock("lock1"); RLockReactive lock2 = redisson2.getLock("lock2"); RLockReactive lock3 = redisson3.getLock("lock3"); RLockReactive multiLock = anyRedisson.getMultiLock(lock1, lock2, lock3); Mono<Void> lockMono = multiLock.lock(); // or acquire lock and automatically unlock it after 10 seconds Mono<Void> lockMono = multiLock.lock(10, TimeUnit.SECONDS); // or wait for lock aquisition up to 100 seconds // and automatically unlock it after 10 seconds Mono<Boolean> lockMono = multiLock.tryLock(100, 10, TimeUnit.SECONDS); lockMono.doOnNext(res -> { // ... }) .doFinally(multiLock.unlock()) .subscribe();
RxJava3接口使用代码示例:
RedissonRxClient anyRedisson = redissonClient.rxJava(); RLockRx lock1 = redisson1.getLock("lock1"); RLockRx lock2 = redisson2.getLock("lock2"); RLockRx lock3 = redisson3.getLock("lock3"); RLockRx multiLock = anyRedisson.getMultiLock(lock1, lock2, lock3); Completable lockRes = multiLock.lock(); // or acquire lock and automatically unlock it after 10 seconds Completable lockRes = multiLock.lock(10, TimeUnit.SECONDS); // or wait for lock aquisition up to 100 seconds // and automatically unlock it after 10 seconds Single<Boolean> lockRes = multiLock.tryLock(100, 10, TimeUnit.SECONDS); lockRes.doOnSuccess(res -> { // ... }) .doFinally(multiLock.unlock()) .subscribe();
8.4. 红锁
此对象已弃用。RLock 操作现在传播到所有 Redis 从站。
8.5。读写锁
基于 Redis 的 Java 分布式可重入ReadWriteLock对象实现了ReadWriteLock接口。读写锁都实现了 RLock接口。
允许多个 ReadLock 所有者和仅一个 WriteLock 所有者。
如果获取锁的 Redisson 实例崩溃,那么这种锁可能会在获取状态下永远挂起。为了避免这种 Redisson 维护锁看门狗,它会在锁持有者 Redisson 实例处于活动状态时延长锁的过期时间。默认情况下,锁定看门狗超时为 30 秒,可以通过Config.lockWatchdogTimeout设置进行更改。
Redisson 还允许leaseTime
在获取锁时指定参数。在指定的时间间隔后锁定的锁将自动释放。
RLock
对象的行为符合 Java Lock 规范。这意味着只有锁所有者线程才能解锁它,否则IllegalMonitorStateException
会被抛出。否则考虑使用RSemaphore对象。
代码示例:
RReadWriteLock rwlock = redisson.getReadWriteLock("myLock"); RLock lock = rwlock.readLock(); // or RLock lock = rwlock.writeLock(); // traditional lock method lock.lock(); // or acquire lock and automatically unlock it after 10 seconds lock.lock(10, TimeUnit.SECONDS); // or wait for lock aquisition up to 100 seconds // and automatically unlock it after 10 seconds boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS); if (res) { try { ... } finally { lock.unlock(); } }
Async接口使用代码示例:
RReadWriteLock rwlock = redisson.getReadWriteLock("myLock"); RLock lock = rwlock.readLock(); // or RLock lock = rwlock.writeLock(); RFuture<Void> lockFuture = lock.lockAsync(); // or acquire lock and automatically unlock it after 10 seconds RFuture<Void> lockFuture = lock.lockAsync(10, TimeUnit.SECONDS); // or wait for lock aquisition up to 100 seconds // and automatically unlock it after 10 seconds RFuture<Boolean> lockFuture = lock.tryLockAsync(100, 10, TimeUnit.SECONDS); lockFuture.whenComplete((res, exception) -> { // ... lock.unlockAsync(); });
Reactive接口使用代码示例:
RedissonReactiveClient redisson = redissonClient.reactive(); RReadWriteLockReactive rwlock = redisson.getReadWriteLock("myLock"); RLockReactive lock = rwlock.readLock(); // or RLockReactive lock = rwlock.writeLock(); Mono<Void> lockMono = lock.lock(); // or acquire lock and automatically unlock it after 10 seconds Mono<Void> lockMono = lock.lock(10, TimeUnit.SECONDS); // or wait for lock aquisition up to 100 seconds // and automatically unlock it after 10 seconds Mono<Boolean> lockMono = lock.tryLock(100, 10, TimeUnit.SECONDS); lockMono.doOnNext(res -> { // ... }) .doFinally(lock.unlock()) .subscribe();
RxJava3接口使用代码示例:
RedissonRxClient redisson = redissonClient.rxJava(); RReadWriteLockRx rwlock = redisson.getReadWriteLock("myLock"); RLockRx lock = rwlock.readLock(); // or RLockRx lock = rwlock.writeLock(); Completable lockRes = lock.lock(); // or acquire lock and automatically unlock it after 10 seconds Completable lockRes = lock.lock(10, TimeUnit.SECONDS); // or wait for lock aquisition up to 100 seconds // and automatically unlock it after 10 seconds Single<Boolean> lockRes = lock.tryLock(100, 10, TimeUnit.SECONDS); lockRes.doOnSuccess(res -> { // ... }) .doFinally(lock.unlock()) .subscribe();
8.6. 信号
基于 Redis 的 Java 分布式Semaphore对象,类似于Semaphore对象。
可以在使用前进行初始化,但这不是必需的,通过trySetPermits(permits)
方法获得可用的许可数量。
代码示例:
RSemaphore semaphore = redisson.getSemaphore("mySemaphore"); // acquire single permit semaphore.acquire(); // or acquire 10 permits semaphore.acquire(10); // or try to acquire permit boolean res = semaphore.tryAcquire(); // or try to acquire permit or wait up to 15 seconds boolean res = semaphore.tryAcquire(15, TimeUnit.SECONDS); // or try to acquire 10 permit boolean res = semaphore.tryAcquire(10); // or try to acquire 10 permits or wait up to 15 seconds boolean res = semaphore.tryAcquire(10, 15, TimeUnit.SECONDS); if (res) { try { ... } finally { semaphore.release(); } }
Async接口使用代码示例:
RSemaphore semaphore = redisson.getSemaphore("mySemaphore"); // acquire single permit RFuture<Void> acquireFuture = semaphore.acquireAsync(); // or acquire 10 permits RFuture<Void> acquireFuture = semaphore.acquireAsync(10); // or try to acquire permit RFuture<Boolean> acquireFuture = semaphore.tryAcquireAsync(); // or try to acquire permit or wait up to 15 seconds RFuture<Boolean> acquireFuture = semaphore.tryAcquireAsync(15, TimeUnit.SECONDS); // or try to acquire 10 permit RFuture<Boolean> acquireFuture = semaphore.tryAcquireAsync(10); // or try to acquire 10 permits or wait up to 15 seconds RFuture<Boolean> acquireFuture = semaphore.tryAcquireAsync(10, 15, TimeUnit.SECONDS); acquireFuture.whenComplete((res, exception) -> { // ... semaphore.releaseAsync(); });
Reactive接口使用代码示例:
RedissonReactiveClient redisson = redissonClient.reactive(); RSemaphoreReactive semaphore = redisson.getSemaphore("mySemaphore"); // acquire single permit Mono<Void> acquireMono = semaphore.acquire(); // or acquire 10 permits Mono<Void> acquireMono = semaphore.acquire(10); // or try to acquire permit Mono<Boolean> acquireMono = semaphore.tryAcquire(); // or try to acquire permit or wait up to 15 seconds Mono<Boolean> acquireMono = semaphore.tryAcquire(15, TimeUnit.SECONDS); // or try to acquire 10 permit Mono<Boolean> acquireMono = semaphore.tryAcquire(10); // or try to acquire 10 permits or wait up to 15 seconds Mono<Boolean> acquireMono = semaphore.tryAcquire(10, 15, TimeUnit.SECONDS); acquireMono.doOnNext(res -> { // ... }) .doFinally(semaphore.release()) .subscribe();
RxJava3接口使用代码示例:
RedissonRxClient redisson = redissonClient.rxJava(); RSemaphoreRx semaphore = redisson.getSemaphore("mySemaphore"); // acquire single permit Completable acquireRx = semaphore.acquire(); // or acquire 10 permits Completable acquireRx = semaphore.acquire(10); // or try to acquire permit Single<Boolean> acquireRx = semaphore.tryAcquire(); // or try to acquire permit or wait up to 15 seconds Single<Boolean> acquireRx = semaphore.tryAcquire(15, TimeUnit.SECONDS); // or try to acquire 10 permit Single<Boolean> acquireRx = semaphore.tryAcquire(10); // or try to acquire 10 permits or wait up to 15 seconds Single<Boolean> acquireRx = semaphore.tryAcquire(10, 15, TimeUnit.SECONDS); acquireRx.doOnSuccess(res -> { // ... }) .doFinally(semaphore.release()) .subscribe();
8.7. PermitExpirableSemaphore
基于 Redis 的 Java 分布式信号量对象,为每个获得的许可提供租用时间参数支持。每个许可证都由自己的 ID 标识,并且只能使用其 ID 释放。
应在使用前通过方法使用可用许可数量进行初始化trySetPermits(permits)
。允许通过addPermits(permits)
方法增加/减少可用许可证的数量。
代码示例:
RPermitExpirableSemaphore semaphore = redisson.getPermitExpirableSemaphore("mySemaphore"); semaphore.trySetPermits(23); // acquire permit String id = semaphore.acquire(); // or acquire permit with lease time in 10 seconds String id = semaphore.acquire(10, TimeUnit.SECONDS); // or try to acquire permit String id = semaphore.tryAcquire(); // or try to acquire permit or wait up to 15 seconds String id = semaphore.tryAcquire(15, TimeUnit.SECONDS); // or try to acquire permit with least time 15 seconds or wait up to 10 seconds String id = semaphore.tryAcquire(10, 15, TimeUnit.SECONDS); if (id != null) { try { ... } finally { semaphore.release(id); } }
Async接口使用代码示例:
RPermitExpirableSemaphore semaphore = redisson.getPermitExpirableSemaphore("mySemaphore"); RFuture<Boolean> setFuture = semaphore.trySetPermitsAsync(23); // acquire permit RFuture<String> acquireFuture = semaphore.acquireAsync(); // or acquire permit with lease time in 10 seconds RFuture<String> acquireFuture = semaphore.acquireAsync(10, TimeUnit.SECONDS); // or try to acquire permit RFuture<String> acquireFuture = semaphore.tryAcquireAsync(); // or try to acquire permit or wait up to 15 seconds RFuture<String> acquireFuture = semaphore.tryAcquireAsync(15, TimeUnit.SECONDS); // or try to acquire permit with least time 15 seconds or wait up to 10 seconds RFuture<String> acquireFuture = semaphore.tryAcquireAsync(10, 15, TimeUnit.SECONDS); acquireFuture.whenComplete((id, exception) -> { // ... semaphore.releaseAsync(id); });
Reactive接口使用代码示例:
RedissonReactiveClient redisson = redissonClient.reactive(); RPermitExpirableSemaphoreReactive semaphore = redisson.getPermitExpirableSemaphore("mySemaphore"); Mono<Boolean> setMono = semaphore.trySetPermits(23); // acquire permit Mono<String> acquireMono = semaphore.acquire(); // or acquire permit with lease time in 10 seconds Mono<String> acquireMono = semaphore.acquire(10, TimeUnit.SECONDS); // or try to acquire permit Mono<String> acquireMono = semaphore.tryAcquire(); // or try to acquire permit or wait up to 15 seconds Mono<String> acquireMono = semaphore.tryAcquire(15, TimeUnit.SECONDS); // or try to acquire permit with least time 15 seconds or wait up to 10 seconds Mono<String> acquireMono = semaphore.tryAcquire(10, 15, TimeUnit.SECONDS); acquireMono.flatMap(id -> { // ... return semaphore.release(id); }).subscribe();
RxJava3接口使用代码示例:
RedissonRxClient redisson = redissonClient.rxJava(); RPermitExpirableSemaphoreRx semaphore = redisson.getPermitExpirableSemaphore("mySemaphore"); Single<Boolean> setRx = semaphore.trySetPermits(23); // acquire permit Single<String> acquireRx = semaphore.acquire(); // or acquire permit with lease time in 10 seconds Single<String> acquireRx = semaphore.acquire(10, TimeUnit.SECONDS); // or try to acquire permit Maybe<String> acquireRx = semaphore.tryAcquire(); // or try to acquire permit or wait up to 15 seconds Maybe<String> acquireRx = semaphore.tryAcquire(15, TimeUnit.SECONDS); // or try to acquire permit with least time 15 seconds or wait up to 10 seconds Maybe<String> acquireRx = semaphore.tryAcquire(10, 15, TimeUnit.SECONDS); acquireRx.flatMap(id -> { // ... return semaphore.release(id); }).subscribe();
8.8. 倒计时锁存器
基于 Redis 的 Java 分布式CountDownLatch对象的结构类似于CountDownLatch对象。
应trySetCount(count)
在使用前使用 count by method 进行初始化。
代码示例:
RCountDownLatch latch = redisson.getCountDownLatch("myCountDownLatch"); latch.trySetCount(1); // await for count down latch.await(); // in other thread or JVM RCountDownLatch latch = redisson.getCountDownLatch("myCountDownLatch"); latch.countDown();
Async接口使用代码示例:
RCountDownLatch latch = redisson.getCountDownLatch("myCountDownLatch"); RFuture<Boolean> setFuture = lock.trySetCountAsync(1); // await for count down RFuture<Void> awaitFuture = latch.awaitAsync(); // in other thread or JVM RCountDownLatch latch = redisson.getCountDownLatch("myCountDownLatch"); RFuture<Void> countFuture = latch.countDownAsync();
Reactive接口使用代码示例:
RedissonReactiveClient redisson = redissonClient.reactive(); RCountDownLatchReactive latch = redisson.getCountDownLatch("myCountDownLatch"); Mono<Boolean> setMono = latch.trySetCount(1); // await for count down Mono<Void> awaitMono = latch.await(); // in other thread or JVM RCountDownLatchReactive latch = redisson.getCountDownLatch("myCountDownLatch"); Mono<Void> countMono = latch.countDown();
RxJava3接口使用代码示例:
RedissonRxClient redisson = redissonClient.rxJava(); RCountDownLatchRx latch = redisson.getCountDownLatch("myCountDownLatch"); Single<Boolean> setRx = latch.trySetCount(1); // await for count down Completable awaitRx = latch.await(); // in other thread or JVM RCountDownLatchRx latch = redisson.getCountDownLatch("myCountDownLatch"); Completable countRx = latch.countDown();
8.9。自旋锁
基于 Redis 的 Java 分布式可重入SpinLock对象,并实现Lock接口。
由于Lock对象中的 pubsub 使用情况,每短时间间隔获取/释放数千个或更多锁可能会导致达到网络吞吐量限制和 Redis CPU 过载。这是由于 Redis pubsub 的性质而发生的 - 消息被分发到 Redis 集群中的所有节点。Spin Lock 默认使用指数退避策略来获取锁,而不是 pubsub 通道。
如果获取锁的 Redisson 实例崩溃,那么这种锁可能会在获取状态下永远挂起。为了避免这种 Redisson 维护锁看门狗,它会在锁持有者 Redisson 实例处于活动状态时延长锁的过期时间。默认情况下,锁定看门狗超时为 30 秒,可以通过Config.lockWatchdogTimeout设置进行更改。
Redisson 还允许leaseTime
在获取锁时指定参数。在指定的时间间隔后锁定的锁将自动释放。
RLock
对象的行为符合 Java Lock 规范。这意味着只有锁所有者线程才能解锁它,否则IllegalMonitorStateException
会被抛出。否则考虑使用RSemaphore对象。
代码示例:
RLock lock = redisson.getSpinLock("myLock"); // traditional lock method lock.lock(); // or acquire lock and automatically unlock it after 10 seconds lock.lock(10, TimeUnit.SECONDS); // or wait for lock aquisition up to 100 seconds // and automatically unlock it after 10 seconds boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS); if (res) { try { ... } finally { lock.unlock(); } }
Async接口使用代码示例:
RLock lock = redisson.getSpinLock("myLock"); RFuture<Void> lockFuture = lock.lockAsync(); // or acquire lock and automatically unlock it after 10 seconds RFuture<Void> lockFuture = lock.lockAsync(10, TimeUnit.SECONDS); // or wait for lock aquisition up to 100 seconds // and automatically unlock it after 10 seconds RFuture<Boolean> lockFuture = lock.tryLockAsync(100, 10, TimeUnit.SECONDS); lockFuture.whenComplete((res, exception) -> { // ... lock.unlockAsync(); });
Reactive接口使用代码示例:
RedissonReactiveClient redisson = redissonClient.reactive(); RLockReactive lock = redisson.getSpinLock("myLock"); Mono<Void> lockMono = lock.lock(); // or acquire lock and automatically unlock it after 10 seconds Mono<Void> lockMono = lock.lock(10, TimeUnit.SECONDS); // or wait for lock aquisition up to 100 seconds // and automatically unlock it after 10 seconds Mono<Boolean> lockMono = lock.tryLock(100, 10, TimeUnit.SECONDS); lockMono.doOnNext(res -> { // ... }) .doFinally(lock.unlock()) .subscribe();
RxJava3接口使用代码示例:
RedissonRxClient redisson = redissonClient.rxJava(); RLockRx lock = redisson.getSpinLock("myLock"); Completable lockRes = lock.lock(); // or acquire lock and automatically unlock it after 10 seconds Completable lockRes = lock.lock(10, TimeUnit.SECONDS); // or wait for lock aquisition up to 100 seconds // and automatically unlock it after 10 seconds Single<Boolean> lockRes = lock.tryLock(100, 10, TimeUnit.SECONDS); lockRes.doOnSuccess(res -> { // ... }) .doFinally(lock.unlock()) .subscribe();
这篇关于Redisson分布式的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-12-24Redis资料:新手入门快速指南
- 2024-12-24Redis资料:新手入门教程与实践指南
- 2024-12-24Redis资料:新手入门教程与实践指南
- 2024-12-07Redis高并发入门详解
- 2024-12-07Redis缓存入门:新手必读指南
- 2024-12-07Redis缓存入门:新手必读教程
- 2024-12-07Redis入门:新手必备的简单教程
- 2024-12-07Redis入门:新手必读的简单教程
- 2024-12-06Redis入门教程:从安装到基本操作
- 2024-12-06Redis缓存入门教程:轻松掌握缓存技巧