Redisson 是一個基于 Netty 通信框架的高性能 Redis 客戶端, 實現了分布式和可擴展的 Java 數據結構,提供很多分布式相關操作服務以及大量便利的工具方法,讓開發者可以把精力放在開發業務,避免重復造輪子。
1.通信框架基于 Netty,使用多路復用。吞吐量高。
2.兼容支持 Redis 集群模式,Reids 哨兵模式等,天然適配分布式服務。
3.提供多種分布式對象的封裝,如:Bloom Filter,Object Bucket,Bitset,AtomicLong, 和 HyperLogLog 等。
4.提供分布式鎖實現包括:
RedissonFairLock 公平鎖,
RedissonLock 非公平鎖,
RedissonRedLock 紅鎖(基于紅鎖算法, 當集群中大多數( N/2 + 1 )加鎖成功了,則認為加鎖成功,
目前已被棄用,Redisson 官方不再建議使用)。
RedissonLock 作為分布式鎖,實現了可重入鎖。阻塞鎖,非阻塞鎖。并且 Redisson 存在看門狗機制,可以對未手動設置超時時間的鎖實現自動續期。
加鎖代碼邏輯
/**** @param waitTime 獲取鎖的最大等待時間,默認 -1,* @param leaseTime 鎖的過期時間,默認 -1* @param unit* @param threadId* @return*/private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { RFuture<Boolean> acquiredFuture; if (leaseTime > 0) { //若手動設置了鎖的過期時間,則加鎖時以當前傳入過期時間為準 //執行Lua腳本,加鎖 acquiredFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId,RedisCommands.EVAL_NULL_BOOLEAN); } else { //若未手動設置,則默認過期時間等于配置的lockWatchdogTimeout,lockWatchdogTimeout默認為30s。 //然后執行Lua腳本,加鎖 acquiredFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN); } CompletionStage<Boolean> f = acquiredFuture.thenApply(acquired -> { //lock acquired //若鎖成功獲取到 if (acquired) { if (leaseTime > 0) { internalLockLeaseTime = unit.toMillis(leaseTime); } else { //若未手動設置過期時間,則執行看門狗任務,自動續期 scheduleExpirationRenewal(threadId); } } return acquired; }); return new CompletableFutureWrapper<>(f);}
加鎖 Lua 腳本如下:
if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);
其中 KEYS[1] 是鎖邏輯名稱,ARGV[1] 是 key 的過期時間,ARGV[2]是鎖的線程級別名稱( uuid + 線程id ,uuid 是每個 Redisson 客戶端創建時唯一生成的)。
由此可看出,鎖利用 Hash 結構實現,其中 Hash 的 key 是鎖的邏輯名稱,field 是鎖的線程級別名稱,value 是鎖的重入次數。
加鎖 Lua 腳本的含義:
先判斷當前邏輯鎖名稱的 key 是否存在,
若不存在,在 Hash 結構中設置這個鎖,鎖重入次數加 1,然后給 key 設置一個過期時間,最后返回 null。
若存在,并且已經被當前線程持有,就鎖可重入次數加 1,并且重新設置 key 的過期時間,最后返回 null,
若當前鎖被其他線程持有,返回 key 剩余過期時間。
Lock 阻塞鎖與 Trylock 底層調用代碼基本一致。多了一個等待鎖被其他線程釋放后,重新嘗試加鎖的過程。
代碼如下:
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException { long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(-1, leaseTime, unit, threadId); // lock acquired if (ttl == null) { return; } //訂閱釋放鎖消息 CompletableFuture<RedissonLockEntry> future = subscribe(threadId); pubSub.timeout(future); RedissonLockEntry entry; if (interruptibly) { entry = commandExecutor.getInterrupted(future); } else { entry = commandExecutor.get(future); } try { while (true) { //重新嘗試取鎖 ttl = tryAcquire(-1, leaseTime, unit, threadId); // lock acquired if (ttl == null) { break; } // waiting for message, if (ttl >= 0) { try { //當鎖仍然被其他線程占有時,調用 //java.util.concurrent.Semaphore#tryAcquire方法進行信號量阻塞, //當線程阻塞等待時間超過最大超時時間(ttl即鎖的key的剩余存活時間) //或者 監聽到鎖釋放消息后,信號量被釋放后,線程不再阻塞 entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { if (interruptibly) { throw e; } entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } } else { if (interruptibly) { //嘗試從信號量獲取一個許可 entry.getLatch().acquire(); } else { entry.getLatch().acquireUninterruptibly(); } } } } finally { //取消訂閱鎖釋放消息 unsubscribe(entry, threadId);}
大致流程如下:
1.先獲取鎖,若獲取鎖成功,直接返回。
2.若獲取失敗,訂閱釋放鎖消息。
3.進入 while 循環,重新嘗試獲取鎖。若獲取鎖成功,則跳出循環,并不再訂閱釋放鎖消息。
4.若重新獲取鎖失敗,進行信號量阻塞,直到鎖被其他占有線程釋放(監聽鎖釋放消息的監聽器中,有喚醒信號量的邏輯)或者到達阻塞超時時間,然后繼續這個 while 循環。
代碼如下
public RFuture<Void> unlockAsync(long threadId) { //執行解鎖lua腳本 RFuture<Boolean> future = unlockInnerAsync(threadId); CompletionStage<Void> f = future.handle((opStatus, e) -> { //取消看門狗任務 cancelExpirationRenewal(threadId); if (e != null) { throw new CompletionException(e); } if (opStatus == null) { IllegalMonitorStateException cause = new IllegalMonitorStateException ("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + threadId); throw new CompletionException(cause); } return null; }); return new CompletableFutureWrapper<>(f);}
1.其中解鎖 Lua 腳本如下:
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end; " + "return nil;
其中 KEYS[1] 為鎖的邏輯名稱,KEYS[2] 為通道名稱,ARGV[1] 為 0, ARGV[2] 為鎖的過期時間,默認 30s,ARGV[3] 為鎖的線程級別名稱。
解鎖 Lua 腳本含義:
解鎖時,先判斷當前鎖是否被當前線程持有,
若不是,則返回 null。
若是,鎖的可重入次數 減1。
然后繼續判斷鎖的可重入次數是否大于 0,若大于 0,繼續給這個鎖 key 續期 30s,并且最后返回 0。
若不大于 0,刪除這個鎖的 key,并向指定通道發布這個解鎖消息,并且返回 1。
2.如果這個鎖有看門狗任務在定時續期,當解鎖成功時會取消這個定時續期任務。
當某個鎖內的任務的執行時間不可預估時,可能執行時間很長,也可能很短。此時若直接設置一個固定的鎖過期時間,可能會導致任務執行時間遠遠大于鎖的過期時間,導致任務還未執行完成,但是鎖已經過期了。那其他線程又可以獲取到鎖,然后執行該任務了,最終導致線程安全問題。
為應對這種情況,定期給鎖續期的看門狗機制出現了。
代碼:
//真正看門狗續期任務private void renewExpiration() { ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ee == null) { return; } //創建一個延時任務,底層實現是netty時間輪。當每過了lockWatchdogTimeout/3的時間,執行該任務 Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ent == null) { return; } Long threadId = ent.getFirstThreadId(); //若當前鎖已經被當前線程釋放,則鎖不再續期 if (threadId == null) { return; } //調用Lua腳本,判斷當前鎖是否被當前線程占有,若是則返回true, //并且重新設置key的過期時間,默認30s CompletionStage<Boolean> future = renewExpirationAsync(threadId); future.whenComplete((res, e) -> { if (e != null) { log.error("Can't update lock " + getRawName() + " expiration", e); EXPIRATION_RENEWAL_MAP.remove(getEntryName()); return; } //當鎖仍然被當前線程占有,說明業務代碼還在執行,則遞歸調用續期任務 if (res) { // reschedule itself log.info("續期任務執行"+ "threadId:" +threadId); renewExpiration(); } else { //否則移除該續期任務,直接在EXPIRATION_RENEWAL_MAP移除ExpirationEntry cancelExpirationRenewal(null); } }); } }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); ee.setTimeout(task);}
當沒有顯式指定鎖過期時間時候,就默認 key 過期時間 30s,然后定時任務每 10 秒( lockWatchdogTimeout/3 )進行一次調用,執行鎖續期動作,若這個線程還持有這個鎖,就對這個線程持有的鎖進行續期操作(通過 pexpire 續期 key 30s),若途中持有鎖的線程 手動被 unlock 或者機器宕機才會取消這個任務。否則會一直續期。
Redisson 作為一個 Redis 客戶端,基于 Redis、Lua 和 Netty 建立起了一套完善的分布式解決方案,比如分布式鎖的實現,分布式對象的操作等。本文主要簡單講述了在 Redisson 中分布式鎖的實現。其實在 Redisson 中還有很多值得深挖的點。比如:Redisson 中使用了大量 Netty 的特性。大家有興趣的話,可以仔細研究一下。
https://github.com/redisson/redisson/wiki
https://cloud.tencent.com/developer/article/1500854
本文鏈接:http://www.tebozhan.com/showinfo-26-13605-0.htmlRedisson雜談,你學到了什么?
聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。郵件:2376512515@qq.com
上一篇: 為什么 HTTP/3 正在吞噬世界