一、实现分布式锁的要求
- 确保互斥:在同一时刻,必须保证锁至多只能被一个客户端持有。
- 不能死锁:在一个客户端在持有锁的期间崩溃而没有主动解锁情况下,也能保证后续其他客户端能加锁。
- 避免活锁:在获取锁失败的情况下,反复进行重试操作,占用Cpu资源,影响性能。
- 实现更多锁特性:锁中断、锁重入、锁超时等。
- 确保客户端只能解锁自己持有的锁。
二、redission加锁、锁等待、解锁
1、加锁
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
KEYS[1]为getName();
ARGV[1]为internalLockLeaseTime;
ARGV[2]为getLockName(threadId)。
org.redisson.RedissonLock类tryLockInnerAsync通过eval命令执行Lua代码完成加锁操作。KEYS[1]为锁在redis中的key,key对应value为map结构,ARGV[1]为锁超时时间,ARGV[2]为锁value中的key。ARGV[2]由UUID+threadId组成,用来标记锁被谁持有。
(1) 第一个If判断key是否存在,不存在完成加锁操作
- redis.call(‘hset’, KEYS[1], ARGV[2], 1);创建key[1] map中添加key:ARGV[2] ,value:1
- redis.call(‘pexpire’, KEYS[1], ARGV[1]);设置key[1]过期时间,避免发生死锁。
eval命令执行Lua代码的时候,Lua代码将被当成一个命令去执行,并且直到eval命令执行完成,Redis才会执行其他命令。可避免第一条命令执行成功第二条命令执行失败导致死锁。
(2)第二个if判断key存在且当前线程已经持有锁, 重入:
- redis.call(‘hexists’, KEYS[1],ARGV[2]);判断redis中锁的标记值是否与当前请求的标记值相同,相同代表该线程已经获取锁。
- redis.call(‘hincrby’, KEYS[1], ARGV[2],1);记录同一线程持有锁之后累计加锁次数实现锁重入。
(3)最后的return表示key存在被其他线程获取的锁, 等待:
- redis.call(‘pexpire’, KEYS[1], ARGV[1]); 重制锁超时时间。
2、锁等待
@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
long threadId = Thread.currentThread().getId();
//步骤1、这里调用加锁操作,若没有返回null表名有其它线程占用锁,获得锁超时时间,执行锁等待逻辑
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
//步骤2、步骤一中加锁操作失败,订阅消息,利用redis的pubsub提供一个通知机制来减少不断的重试,避免发生活锁。
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
try {
while (true) {
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
//步骤3、getLath()获取RedissionLockEntry实例latch变量,由于permits为0,所以调用acquire()方法后线程阻塞。
// waiting for message
if (ttl >= 0) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().acquire();
}
}
} finally {
unsubscribe(future, threadId);
}
// get(lockAsync(leaseTime, unit));
}
其实很多这种循环阻塞的逻辑都不会使用空轮询,如果用空轮询的话太耗费CPU的性能,比如我们有定时去查询数据表中是否有数据,会使用while无线循环,但是配合Thread.sleep来防止空轮询,还有的会借助java的阻塞队列来防止空轮询,这里猜测阻塞队列是借助操作系统内核的中断来唤醒的,这个具体得去研究系统底层实现,这里就不多说,而redisssion是通过消息订阅来阻塞的,也防止了空轮询。总之,尽量避免使用空轮询。
3、解锁
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
}
KEYS[1]为getName();
KEYS[2]为getChannelName();
ARGV[1]为LockPubSub.unlockMessage;
ARGV[2]为internalLockLeaseTime;
ARGV[3]为getLockName(threadId);
- (1)第一个if判断锁对应key是否存在,不存在则publish消息,将获取锁被阻塞的线程恢复重新获取锁;
- (2)第二个if判断锁对应key存在,value中是否存在当前要释放锁的标示,不存在返回nil,确保锁只能被持有的线程释放。
- (3)对应key存在,value中存在当前要释放锁的标示,将锁标示对应值-1,第三个if判断锁标示对应的值是否大于0,大于0,表示有锁重入情况发生,重新设置锁过期时间。
- (4)对应key存在,value中存在当前要释放锁的标示,将锁标示对应值-1后等于0,调用del操作释放锁,并publish消息,将获取锁被阻塞的线程恢复重新获取锁;
订阅者将会收到如下消息。
public class LockPubSub extends PublishSubscribe<RedissonLockEntry> {
...
@Override
protected void onMessage(RedissonLockEntry value, Long message) {
if (message.equals(unlockMessage)) {
Runnable runnableToExecute = value.getListeners().poll();
if (runnableToExecute != null) {
runnableToExecute.run();
}
value.getLatch().release();
} else if (message.equals(readUnlockMessage)) {
while (true) {
Runnable runnableToExecute = value.getListeners().poll();
if (runnableToExecute == null) {
break;
}
runnableToExecute.run();
}
//订阅者接收到publish消息后,执行release操作,调用acquire被阻塞的线程将继续执行获取锁操作。
value.getLatch().release(value.getLatch().getQueueLength());
}
}
...
}
这里就跟步骤2的所等待逻辑互相呼应了
...
RFuture<RedissonLockEntry> future = subscribe(threadId);
...
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
...
4、watch dog自动延期机制
redisson默认超时时间是30s,假设我们的业务处理逻辑超过了30秒怎么办?redisson是这样实现的,当客户端
一获得锁,则会启动一个watch dog看门狗,他是一个后台线程,会每隔10秒检查一下,如果客户端1还持有锁key,那么就会不断的延长锁key的生存时间。
源码如下:
看门狗因为是新启动一个后台线程在运行,会对性能有一定的影响。
三、redisson实现分布式锁是否满足要求
1、确保互斥
要求:在同一时刻,必须保证锁至多只能被一个客户端持有。
redisson实现:redisson用lua脚本来保证原子操作,判断key是否存在,存在者锁等待,所以是满足于该要求的。
2、不能死锁
要求:在一个客户端在持有锁的期间崩溃而没有主动解锁情况下,也能保证后续其他客户端能加锁。
redisson实现:redisson对锁设置了超时时间,再客户端奔溃的情况下,超时后会自动被清除,所以不会死锁,满足要求。
3、避免活锁
要求:在获取锁失败的情况下,反复进行重试操作,占用Cpu资源,影响性能。
redisson实现:redisson通过消息订阅机制来防止空轮询,满足要求。
4、实现更多锁特性:锁中断、锁重入、锁超时等。
要求:确保客户端只能解锁自己持有的锁。
redisson实现:redisson通过map的结构来实现锁的重入,同一先线程进入只不过value+1,满足要求并且还有看门狗防止客户端还存在的情况下锁超时。