2020年11月18日星期三

Redisson分布式锁以及其底层原理

介绍与配置

Redisson官方文档:https://github.com/redisson/redisson/wiki/Redisson%E9%A1%B9%E7%9B%AE%E4%BB%8B%E7%BB%8D

Springboot 自动配置类: RedissonAutoConfiguration 

pom配置:

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId></dependency><dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>3.13.6</version></dependency>

如果什么都不配置的话,会默认使用单Redis节点模式,代码中直接就可以使用  RedissonClient 

 

具体配置可参考官方文档:https://github.com/redisson/redisson/wiki/2.-%E9%85%8D%E7%BD%AE%E6%96%B9%E6%B3%95

分布式锁测试

@Slf4j@SpringBootTest(classes = DemoWebApplication.class)public class RedissonTest {  @Resource private RedissonClient redissonClient;  @Test public void redissonTest() throws InterruptedException {  log.info("===redissonTest====start===============");  for (int i = 0; i < 10; i++) {   new Thread(() -> {    lockTest();   }).start();  }  Thread.sleep(30000);  log.info("===redissonTest====end==============="); }
  private void tryLockTest() {
    // 见下文
  }
  private void lockTest() {
    // 见下文
  }
}
 
  private void tryLockTest() {  String threadName = Thread.currentThread().getName();  log.info("===Thread=={}===start===", threadName);  RLock lock = redisson.getLock("DistributedRedisLockTest");  // 尝试加锁,最多等待10秒,上锁以后30秒自动解锁  boolean lockFlag = false;  try {   // 尝试去加锁,10秒没获取到锁,则返回false   // res = lock.tryLock(10, TimeUnit.SECONDS);   // 尝试去加锁,10秒没获取到锁,则返回false,获取到则返回true,获取到锁后30秒自动释放   // 当waitTime设置为0时,就相当于setNx,获取不到锁直接退出   lockFlag = lock.tryLock(5, 1, TimeUnit.SECONDS);   if (!lockFlag) {    log.info("===Thread=={}==res={}==没有获取到锁,退出===", threadName, lockFlag);    return;   }   log.info("===Thread=={}============getLock===", threadName);   // 模拟业务逻辑   Thread.sleep(2000);  } catch (Exception e) {   log.error("执行异常,e:{}", ExceptionUtils.getStackTrace(e));  } finally {   log.info("===Thread=={}==========isHeldByCurrentThread={}", threadName, lock.isHeldByCurrentThread());   // 释放锁也可能出现异常,比如业务代码没执行完,锁就过期,此时进行释放会抛异常,加个当前线程是否持有所的判断   if (lock.isHeldByCurrentThread()) {    lock.unlock();   }  }  log.info("===Thread=={}==lockFlag={}=end===", threadName, lockFlag); }

 

 private void lockTest() {  String threadName = Thread.currentThread().getName();  log.info("===Thread=={}===start===", threadName);  RLock lock = redisson.getLock("DistributedRedisLockTest");  // lock表示去加锁,加锁成功,没有返回值,继续执行下面代码;加锁失败,它会一直阻塞,直到锁被释放,再继续往下执行  // lock.lock();  // 1秒自动释放时间,但是后续执行unlock操作时会报错(自己只能解锁自己的,第一个线程释放之后执行到unlock方法,但是此时锁已经是第二个线程的了)  lock.lock(1, TimeUnit.SECONDS);  log.info("===Thread=={}============getLock===", threadName);  try {   Thread.sleep(2000);  } catch (Exception e) {   log.error("执行异常,e:{}", ExceptionUtils.getStackTrace(e));  } finally {   log.info("===Thread=={}==========isHeldByCurrentThread={}", threadName, lock.isHeldByCurrentThread());   // 释放锁也可能出现异常,比如业务代码没执行完,锁就过期,此时进行释放会抛异常,加个当前线程是否持有所的判断   if (lock.isHeldByCurrentThread()) {    lock.unlock();   }  }  log.info("===Thread=={}===end===", threadName); }

特点

  • 分布式
  • 可以自动释放锁,防止死锁
  • 可重入锁 (相同线程不需要在等待锁,而是可以直接进行相应操作)
  • 防误删,当前线程只能删除当前线程的锁 (业务执行时间过长,超过锁失效时间,锁被释放,第二个线程获取锁,此时第一个线程执行到释放锁代码时,不能删除第二个线程的锁)
  • 可阻塞等待
  • 看门狗机制,延长过期时间(没有设置过期时间的情况,leaseTime=-1,默认失效时间为30秒,启动看门狗线程,定时检查是否需要延长时间scheduleExpirationRenewal)
  • 锁种类多样:可重入锁、公平锁、联锁、红锁、读写锁

存在的问题

  分布式架构中的CAP理论,分布式系统只能同时满足两个

    • 一致性(Consistency)
    • 可用性(Availability)
    • 分区容错性(Partition tolerance)
  • Redisson分布式锁是AP模式,当锁存在的redis节点宕机,可能会被误判为锁失效,或者没有加锁。(Zookeeper实现的分布式锁,是CP理论)

原理

本文中Redisson版本为 redisson-spring-boot-starter 3.13.6

先看下接口方法:

public interface RRLock extends Lock, RLockAsync{ //----------------------Lock接口方法----------------------- /**  * 加锁 锁的有效期默认30秒  */ void lock(); /**  * tryLock()方法是有返回值的,它表示用来尝试获取锁,如果获取成功,则返回true,如果获取失败(即锁已被其他线程获取),则返回false .  */ boolean tryLock(); /**  * tryLock(long time, TimeUnit unit)方法和tryLock()方法是类似的,只不过区别在于这个方法在拿不到锁时会等待一定的时间,  * 在时间期限之内如果还拿不到锁,就返回false。如果如果一开始拿到锁或者在等待期间内拿到了锁,则返回true。  *  * @param time 等待时间  * @param unit 时间单位 小时、分、秒、毫秒等  */ boolean tryLock(long time, TimeUnit unit) throws InterruptedException; /**  * 解锁  */ void unlock(); /**  * 中断锁 表示该锁可以被中断 假如A和B同时调这个方法,A获取锁,B为获取锁,那么B线程可以通过  * Thread.currentThread().interrupt(); 方法真正中断该线程  */ void lockInterruptibly(); //----------------------RLock接口方法----------------------- /**  * 加锁 上面是默认30秒这里可以手动设置锁的有效时间  *  * @param leaseTime 锁有效时间  * @param unit  时间单位 小时、分、秒、毫秒等  */ void lock(long leaseTime, TimeUnit unit); /**  * 这里比上面多一个参数,多添加一个锁的有效时间  *  * @param waitTime 等待时间  * @param leaseTime 锁有效时间  * @param unit  时间单位 小时、分、秒、毫秒等  */ boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException; /**  * 检验该锁是否被线程使用,如果被使用返回True  */ boolean isLocked(); /**  * 检查当前线程是否获得此锁(这个和上面的区别就是该方法可以判断是否当前线程获得此锁,而不是此锁是否被线程占有)  * 这个比上面那个实用  */ boolean isHeldByCurrentThread(); /**  * 中断锁 和上面中断锁差不多,只是这里如果获得锁成功,添加锁的有效时间  * @param leaseTime 锁有效时间  * @param unit  时间单位 小时、分、秒、毫秒等  */ void lockInterruptibly(long leaseTime, TimeUnit unit); }
  • tryLock方法
@Override public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {  long time = unit.toMillis(waitTime);  long current = System.currentTimeMillis();  long threadId = Thread.currentThread().getId();  Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);  // 获取到锁,则直接返回true  if (ttl == null) {   return true;  }  // 没获取到锁,校验是否超过等待时长,超过则返回false  time -= System.currentTimeMillis() - current;  if (time <= 0) {   acquireFailed(waitTime, unit, threadId);   return false;  }    current = System.currentTimeMillis();
     // 订阅监听redis消息,并且创建RedissonLockEntry,其中RedissonLockEntry中比较关键的是一个 Semaphore属性对象,用来控制本地的锁请求的信号量同步,返回的是netty框架的Future实现。 RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId); // 阻塞等待subscribe的future的结果对象,如果subscribe方法调用超过了time,说明已经超过了客户端设置的最大wait time,则直接返回false,取消订阅,不再继续申请锁了。
     // 只有await返回true,才进入循环尝试获取锁
     if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) { if (!subscribeFuture.cancel(false)) { subscribeFuture.onComplete((res, e) -> { if (e == null) { unsubscribe(subscribeFuture, threadId); } }); } acquireFailed(waitTime, unit, threadId); return false; } try { time -= System.currentTimeMillis() - current; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; }   // 如果没有超过尝试获取锁的等待时间,那么通过While一直获取锁。最终只会有两种结果
       // 1) 在等待时间内获取锁成功 返回true 2)等待时间结束了还没有获取到锁那么返回false。 while (true) { long currentTime = System.currentTimeMillis(); ttl = tryAcquire(waitTime, leaseTime, unit, threadId); // 获取锁成功 if (ttl == null) { return true; }          // 获取锁失败 time -= System.currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } // waiting for message currentTime = System.currentTimeMillis(); if (ttl >= 0 && ttl < time) { subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); }           time -= System.currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } } } finally { unsubscribe(subscribeFuture, threadId); }// return get(tryLockAsync(waitTime, leaseTime, unit)); }

 

 private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
     // 如果指定了失效时间,就按指定的失效时间执行,然后返回 if (leaseTime != -1) { return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); }
     // 如果没有指定失效时间(leaseTime=-1),则默认配置30秒 (getLockWatchdogTimeOut()=30) RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime, commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); // 加锁完毕之后,启动看门狗线程,定时的延期失效时间(定时任务为 internalLockLeaseTime / 3 毫秒之后执行)
     ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {    return;   }   // lock acquired   if (ttlRemaining == null) {
          // 启动看门狗任务 scheduleExpirationRenewal(threadId); } }); return ttlRemainingFuture; }
 <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {  internalLockLeaseTime = unit.toMillis(leaseTime);     // 通过lua脚本访问Redis,保证操作的原子性, 以及达到批量操作的效果,提升性能
     // KEYS[1] :需要加锁的key,这里需要是字符串类型。
     // ARGV[1] :锁的超时时间,防止死锁
     // ARGV[2] :锁的唯一标识,id(UUID.randomUUID()) + ":" + threadId
     return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
              // 检查是否key已经被占用,如果没有则设置超时时间和唯一标识,初始化value=1
     "if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
              "redis.call('pexpire', KEYS[1], ARGV[1]); " +
  "return nil; " +
              "end; " +
               // 如果锁重入,需要判断锁的key field 都一直情况下 value 加一
               "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
               "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " +
               // 返回剩余的过期时间 "return redis.call('pttl', KEYS[1]);", Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }

注:tryLock一般用于特定满足需求的场合,但不建议作为一般需求的分布式锁,一般分布式锁建议用void lock(long leaseTime, TimeUnit unit)。因为从性能上考虑,在高并发情况下后者效率是前者的好几倍

  • unlock方法
  @Override public RFuture<Void> unlockAsync(long threadId) {  RPromise<Void> result = new RedissonPromise<Void>();  RFuture<Boolean> future = unlockInnerAsync(threadId);  future.onComplete((opStatus, e) -> {
       // 释放锁后取消刷新锁失效时间的调度任务 cancelExpirationRenewal(threadId); if (e != null) { result.tryFailure(e); return; }       // 非锁的持有者释放锁时抛出异常 if (opStatus == null) { IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + threadId); result.tryFailure(cause); return; } result.trySuccess(null); }); return result; }  // 通过 Lua 脚本执行 Redis 命令释放锁

   // KEYS[1] :需要加锁的key,这里需要是字符串类型。

   // KEYS[2] :redis消息的ChannelName,一个分布式锁对应唯一的一个channelName:"redisson_lock__channel__{" + getName() + "}"

   // ARGV[1] :reids消息体,这里只需要一个字节的标记就可以,主要标记redis的key已经解锁,再结合redis的Subscribe,能唤醒其他订阅解锁消息的客户端线程申请锁。=

   // ARGV[2] :锁的超时时间,防止死锁

   // ARGV[3] :锁的唯一标识,也就是刚才介绍的 id(UUID.randomUUID()) + ":" + threadId

  protected RFuture<Boolean> unlockInnerAsync(long threadId) {  return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
               // key和field不匹配,说明当前客户端线程没有持有锁,不能主动解锁。      "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
               // 如果counter>0说明锁在重入,不能删除key "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " +
               // 删除key并且publish 解锁消息 "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end; " + "return nil;", Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)); }

使用 EVAL 命令执行 Lua 脚本来释放锁:

  1. key 不存在,说明锁已释放,直接执行 publish 命令发布释放锁消息并返回 1
  2. key 存在,但是 field 在 Hash 中不存在,说明自己不是锁持有者,无权释放锁,返回 nil
  3. 因为锁可重入,所以释放锁时不能把所有已获取的锁全都释放掉,一次只能释放一把锁,因此执行 hincrby 对锁的值减一
  4. 释放一把锁后,如果还有剩余的锁,则刷新锁的失效时间并返回 0;如果刚才释放的已经是最后一把锁,则执行 del 命令删除锁的 key,并发布锁释放消息,返回 1

注意这里有个实际开发过程中,容易出现很容易出现上面第二步异常,非锁的持有者释放锁时抛出异常。

 

 

参考:

Redisson实现分布式锁(1)---原理

Redisson实现分布式锁(2)—RedissonLock

利用Redisson实现分布式锁及其底层原理解析

原文转载:http://www.shaoqun.com/a/490357.html

ifttt:https://www.ikjzd.com/w/956

olive:https://www.ikjzd.com/w/2025

贝贝官网:https://www.ikjzd.com/w/1321


介绍与配置Redisson官方文档:https://github.com/redisson/redisson/wiki/Redisson%E9%A1%B9%E7%9B%AE%E4%BB%8B%E7%BB%8DSpringboot自动配置类:RedissonAutoConfigurationpom配置:<dependency><groupId>org.springframew
pocket:pocket
卖家精灵:卖家精灵
深圳哪里卖衣服最时尚最便宜款式又好看又多的??:深圳哪里卖衣服最时尚最便宜款式又好看又多的??
哪些口岸可以去澳门?珠海机场哪个口岸最近啊?:哪些口岸可以去澳门?珠海机场哪个口岸最近啊?
干货:亚马逊品牌备案操作流程(图文详解):干货:亚马逊品牌备案操作流程(图文详解)

没有评论:

发表评论