个人随笔
目录
Redisson分布式锁(二):Redisson分布式锁源码分析
2021-01-10 22:28:12

一、实现分布式锁的要求

  • 确保互斥:在同一时刻,必须保证锁至多只能被一个客户端持有。
  • 不能死锁:在一个客户端在持有锁的期间崩溃而没有主动解锁情况下,也能保证后续其他客户端能加锁。
  • 避免活锁:在获取锁失败的情况下,反复进行重试操作,占用Cpu资源,影响性能。
  • 实现更多锁特性:锁中断、锁重入、锁超时等。
  • 确保客户端只能解锁自己持有的锁。

二、redission加锁、锁等待、解锁

1、加锁

  1. <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
  2. internalLockLeaseTime = unit.toMillis(leaseTime);
  3. return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
  4. "if (redis.call('exists', KEYS[1]) == 0) then " +
  5. "redis.call('hset', KEYS[1], ARGV[2], 1); " +
  6. "redis.call('pexpire', KEYS[1], ARGV[1]); " +
  7. "return nil; " +
  8. "end; " +
  9. "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
  10. "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
  11. "redis.call('pexpire', KEYS[1], ARGV[1]); " +
  12. "return nil; " +
  13. "end; " +
  14. "return redis.call('pttl', KEYS[1]);",
  15. Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
  16. }

KEYS[1]为getName();
ARGV[1]为internalLockLeaseTime;
ARGV[2]为getLockName(threadId)。

org.redisson.RedissonLock类tryLockInnerAsync通过eval命令执行Lua代码完成加锁操作。KEYS[1]为锁在redis中的key,key对应value为map结构,ARGV[1]为锁超时时间,ARGV[2]为锁value中的key。ARGV[2]由UUID+threadId组成,用来标记锁被谁持有。

(1) 第一个If判断key是否存在,不存在完成加锁操作

  • redis.call(‘hset’, KEYS[1], ARGV[2], 1);创建key[1] map中添加key:ARGV[2] ,value:1
  • redis.call(‘pexpire’, KEYS[1], ARGV[1]);设置key[1]过期时间,避免发生死锁。

eval命令执行Lua代码的时候,Lua代码将被当成一个命令去执行,并且直到eval命令执行完成,Redis才会执行其他命令。可避免第一条命令执行成功第二条命令执行失败导致死锁。

(2)第二个if判断key存在且当前线程已经持有锁, 重入:

  • redis.call(‘hexists’, KEYS[1],ARGV[2]);判断redis中锁的标记值是否与当前请求的标记值相同,相同代表该线程已经获取锁。
  • redis.call(‘hincrby’, KEYS[1], ARGV[2],1);记录同一线程持有锁之后累计加锁次数实现锁重入。

(3)最后的return表示key存在被其他线程获取的锁, 等待:

  • redis.call(‘pexpire’, KEYS[1], ARGV[1]); 重制锁超时时间。

2、锁等待

  1. @Override
  2. public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
  3. long threadId = Thread.currentThread().getId();
  4. //步骤1、这里调用加锁操作,若没有返回null表名有其它线程占用锁,获得锁超时时间,执行锁等待逻辑
  5. Long ttl = tryAcquire(leaseTime, unit, threadId);
  6. // lock acquired
  7. if (ttl == null) {
  8. return;
  9. }
  10. //步骤2、步骤一中加锁操作失败,订阅消息,利用redis的pubsub提供一个通知机制来减少不断的重试,避免发生活锁。
  11. RFuture<RedissonLockEntry> future = subscribe(threadId);
  12. commandExecutor.syncSubscription(future);
  13. try {
  14. while (true) {
  15. ttl = tryAcquire(leaseTime, unit, threadId);
  16. // lock acquired
  17. if (ttl == null) {
  18. break;
  19. }
  20. //步骤3、getLath()获取RedissionLockEntry实例latch变量,由于permits为0,所以调用acquire()方法后线程阻塞。
  21. // waiting for message
  22. if (ttl >= 0) {
  23. getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
  24. } else {
  25. getEntry(threadId).getLatch().acquire();
  26. }
  27. }
  28. } finally {
  29. unsubscribe(future, threadId);
  30. }
  31. // get(lockAsync(leaseTime, unit));
  32. }

其实很多这种循环阻塞的逻辑都不会使用空轮询,如果用空轮询的话太耗费CPU的性能,比如我们有定时去查询数据表中是否有数据,会使用while无线循环,但是配合Thread.sleep来防止空轮询,还有的会借助java的阻塞队列来防止空轮询,这里猜测阻塞队列是借助操作系统内核的中断来唤醒的,这个具体得去研究系统底层实现,这里就不多说,而redisssion是通过消息订阅来阻塞的,也防止了空轮询。总之,尽量避免使用空轮询。

3、解锁

  1. protected RFuture<Boolean> unlockInnerAsync(long threadId) {
  2. return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
  3. "if (redis.call('exists', KEYS[1]) == 0) then " +
  4. "redis.call('publish', KEYS[2], ARGV[1]); " +
  5. "return 1; " +
  6. "end;" +
  7. "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
  8. "return nil;" +
  9. "end; " +
  10. "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
  11. "if (counter > 0) then " +
  12. "redis.call('pexpire', KEYS[1], ARGV[2]); " +
  13. "return 0; " +
  14. "else " +
  15. "redis.call('del', KEYS[1]); " +
  16. "redis.call('publish', KEYS[2], ARGV[1]); " +
  17. "return 1; "+
  18. "end; " +
  19. "return nil;",
  20. Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
  21. }

KEYS[1]为getName();
KEYS[2]为getChannelName();
ARGV[1]为LockPubSub.unlockMessage;
ARGV[2]为internalLockLeaseTime;
ARGV[3]为getLockName(threadId);

  • (1)第一个if判断锁对应key是否存在,不存在则publish消息,将获取锁被阻塞的线程恢复重新获取锁;
  • (2)第二个if判断锁对应key存在,value中是否存在当前要释放锁的标示,不存在返回nil,确保锁只能被持有的线程释放。
  • (3)对应key存在,value中存在当前要释放锁的标示,将锁标示对应值-1,第三个if判断锁标示对应的值是否大于0,大于0,表示有锁重入情况发生,重新设置锁过期时间。
  • (4)对应key存在,value中存在当前要释放锁的标示,将锁标示对应值-1后等于0,调用del操作释放锁,并publish消息,将获取锁被阻塞的线程恢复重新获取锁;

订阅者将会收到如下消息。

  1. public class LockPubSub extends PublishSubscribe<RedissonLockEntry> {
  2. ...
  3. @Override
  4. protected void onMessage(RedissonLockEntry value, Long message) {
  5. if (message.equals(unlockMessage)) {
  6. Runnable runnableToExecute = value.getListeners().poll();
  7. if (runnableToExecute != null) {
  8. runnableToExecute.run();
  9. }
  10. value.getLatch().release();
  11. } else if (message.equals(readUnlockMessage)) {
  12. while (true) {
  13. Runnable runnableToExecute = value.getListeners().poll();
  14. if (runnableToExecute == null) {
  15. break;
  16. }
  17. runnableToExecute.run();
  18. }
  19. //订阅者接收到publish消息后,执行release操作,调用acquire被阻塞的线程将继续执行获取锁操作。
  20. value.getLatch().release(value.getLatch().getQueueLength());
  21. }
  22. }
  23. ...
  24. }

这里就跟步骤2的所等待逻辑互相呼应了

  1. ...
  2. RFuture<RedissonLockEntry> future = subscribe(threadId);
  3. ...
  4. getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
  5. ...

4、watch dog自动延期机制

redisson默认超时时间是30s,假设我们的业务处理逻辑超过了30秒怎么办?redisson是这样实现的,当客户端
一获得锁,则会启动一个watch dog看门狗,他是一个后台线程,会每隔10秒检查一下,如果客户端1还持有锁key,那么就会不断的延长锁key的生存时间。

源码如下:



看门狗因为是新启动一个后台线程在运行,会对性能有一定的影响。

三、redisson实现分布式锁是否满足要求

1、确保互斥

要求:在同一时刻,必须保证锁至多只能被一个客户端持有。

redisson实现:redisson用lua脚本来保证原子操作,判断key是否存在,存在者锁等待,所以是满足于该要求的。

2、不能死锁

要求:在一个客户端在持有锁的期间崩溃而没有主动解锁情况下,也能保证后续其他客户端能加锁。

redisson实现:redisson对锁设置了超时时间,再客户端奔溃的情况下,超时后会自动被清除,所以不会死锁,满足要求。

3、避免活锁

要求:在获取锁失败的情况下,反复进行重试操作,占用Cpu资源,影响性能。
redisson实现:redisson通过消息订阅机制来防止空轮询,满足要求。

4、实现更多锁特性:锁中断、锁重入、锁超时等。

要求:确保客户端只能解锁自己持有的锁。
redisson实现:redisson通过map的结构来实现锁的重入,同一先线程进入只不过value+1,满足要求并且还有看门狗防止客户端还存在的情况下锁超时。

 926

啊!这个可能是世界上最丑的留言输入框功能~


当然,也是最丑的留言列表

有疑问发邮件到 : suibibk@qq.com 侵权立删
Copyright : 个人随笔   备案号 : 粤ICP备18099399号-2