Redisson——联合锁和红锁

前言

联合锁和红锁放在一起来分析,是因为这两种锁实现在Redisson中,关联密切。

/** * Returns MultiLock instance associated with specified <code>locks</code> * * @param locks - collection of locks * @return MultiLock object */ RLock getMultiLock(RLock... locks); /* * Use getLock method instead. Returned instance uses Redis Slave synchronization */ @Deprecated RLock getRedLock(RLock... locks);

红锁

上面getRedLock方法上有@Deprecated注解,说明在Redisson实现中,不推荐使用红锁。

简介

红锁是官方提出的一个锁方案,大概如下:
假设redis集群有5个节点,一个客户端申请锁时,会向所有的节点申请锁,只有大于一半的节点数量返回成功,且耗费的总时间小于锁有效时间,才会判定为拿锁成功,否则会被认定为失败,需要将已经成功的节点锁释放掉。
具体官方文档见:redis.io/topics/dist…

实现

RedissonRedLock继承了联合锁【RedissonMultiLock】,也没有重写核心方法。

/** * RedLock locking algorithm implementation for multiple locks. * It manages all locks as one. * * @see <a href="http://redis.io/topics/distlock">http://redis.io/topics/distlock</a> * * @author Nikita Koksharov * */ public class RedissonRedLock extends RedissonMultiLock { /** * Creates instance with multiple {@link RLock} objects. * Each RLock object could be created by own Redisson instance. * * @param locks - array of locks */ public RedissonRedLock(RLock... locks) { super(locks); } @Override protected int failedLocksLimit() { return locks.size() - minLocksAmount(locks); } protected int minLocksAmount(final List<RLock> locks) { return locks.size()/2 + 1; } @Override protected long calcLockWaitTime(long remainTime) { return Math.max(remainTime / locks.size(), 1); } @Override public void unlock() { unlockInner(locks); } }

联合锁

Groups multiple independent locks and manages them as one lock.
是对一个锁的集合进行操作,与红锁不同的是,集合中所有的锁,要么全部拿锁成功,要么失败,不存在部分成功。实现类:RedissonMultiLock

获取锁

不限等待时间获取
lock

@Override public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException { long baseWaitTime = locks.size() * 1500; long waitTime = -1; if (leaseTime == -1) { waitTime = baseWaitTime; } else { leaseTime = unit.toMillis(leaseTime); waitTime = leaseTime; if (waitTime <= 2000) { waitTime = 2000; } else if (waitTime <= baseWaitTime) { waitTime = ThreadLocalRandom.current().nextLong(waitTime/2, waitTime); } else { waitTime = ThreadLocalRandom.current().nextLong(baseWaitTime, waitTime); } } while (true) { if (tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS)) { return; } } }

tryLock

限定等待时间获取

同步获取

同步获取的代码较长,这里仅截取关键代码。代表方法:tryLock(),调用后,等待锁获取成功或者失败之后,再执行后续逻辑。
下面是后续可能的实现逻辑,直接调用异步获取,然后阻塞等待结果。

try { return tryLockAsync(waitTime, leaseTime, unit).get(); } catch (ExecutionException e) { throw new IllegalStateException(e); }

异步获取

异步获取的代码更长,同样只截取关键代码

@Override public RFuture<Boolean> tryLockAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { RPromise<Boolean> result = new RedissonPromise<Boolean>(); // 这里创建锁状态,这个对象内,进行锁申请 LockState state = new LockState(waitTime, leaseTime, unit, threadId); state.tryAcquireLockAsync(locks.listIterator(), result); return result; }

异步获取主要是通过内部类【LockState】来实现

void tryAcquireLockAsync(ListIterator<RLock> iterator, RPromise<Boolean> result) { RLock lock = iterator.next(); RPromise<Boolean> lockAcquiredFuture = new RedissonPromise<Boolean>(); // lock.tryLockAsync(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS, threadId) .onComplete(new TransferListener<Boolean>(lockAcquiredFuture)); lockAcquiredFuture.onComplete((res, e) -> { boolean lockAcquired = res; if (lockAcquired) { acquiredLocks.add(lock); } else { if (failedLocksLimit == 0) { unlockInnerAsync(acquiredLocks, threadId).onComplete((r, ex) -> { failedLocksLimit = failedLocksLimit(); acquiredLocks.clear(); // reset iterator while (iterator.hasPrevious()) { iterator.previous(); } checkRemainTimeAsync(iterator, result); }); return; } else { failedLocksLimit--; } } // 如果持有时间不限制,则直接再次调用此方法,操作下一个锁;否则判断持有时间是否已经耗尽,如果时间耗尽,则解锁,并返回拿锁失败 checkRemainTimeAsync(iterator, result); }); }

执行步骤:

  1. 拿到集合中的下一个锁,尝试获取锁;
  2. 判断是否获取成功,如果成功,则将此锁添加到已成功的集合中,然后执行checkRemainTimeAsync方法,此方法的功能:如果持有时间不限制,则直接再次调用此方法,操作下一个锁;否则判断持有时间是否已经耗尽,如果时间耗尽,则解锁,并返回拿锁失败;
  3. 如果获取锁失败,则判断失败重试次数是否为零,如果不为零,则继续往后执行,操作下一个锁;
  4. 如果失败重试次数为零【failedLocksLimit】,则释放掉所有已持有的锁,成功后,调用checkRemainTimeAsync方法,再次尝试获取。

释放锁

联合锁不支持强制释放

同步无返回

联合锁释放时,阻塞执行,直到释放完成以后,才返回。

@Override public void unlock() { List<RFuture<Void>> futures = new ArrayList<>(locks.size()); for (RLock lock : locks) { // 拿到每个锁释放的Future结果 futures.add(lock.unlockAsync()); } for (RFuture<Void> future : futures) { // 遍历等待每个锁释放完成 future.syncUninterruptibly(); } }

异步有返回

将参数中的锁逐个释放,直接返回一个Future。

protected RFuture<Void> unlockInnerAsync(Collection<RLock> locks, long threadId) { if (locks.isEmpty()) { return RedissonPromise.newSucceededFuture(null); } RPromise<Void> result = new RedissonPromise<Void>(); AtomicInteger counter = new AtomicInteger(locks.size()); for (RLock lock : locks) { lock.unlockAsync(threadId).onComplete((res, e) -> { // 如果释放锁的过程中,发生异常,则直接返回错误。 if (e != null) { result.tryFailure(e); return; } // 当锁全部释放之后,返回成功 if (counter.decrementAndGet() == 0) { result.trySuccess(null); } }); } return result; }

强制释放

联合锁不支持强制释放,重写的方法内直接返回异常

@Override public boolean forceUnlock() { throw new UnsupportedOperationException(); }