AVt天堂网 手机版,亚洲va久久久噜噜噜久久4399,天天综合亚洲色在线精品,亚洲一级Av无码毛片久久精品

當(dāng)前位置:首頁(yè) > 科技  > 軟件

Redisson雜談,你學(xué)到了什么?

來(lái)源: 責(zé)編: 時(shí)間:2023-10-16 17:09:00 239觀看
導(dǎo)讀一.Redisson 簡(jiǎn)介Redisson 是一個(gè)基于 Netty 通信框架的高性能 Redis 客戶端, 實(shí)現(xiàn)了分布式和可擴(kuò)展的 Java 數(shù)據(jù)結(jié)構(gòu),提供很多分布式相關(guān)操作服務(wù)以及大量便利的工具方法,讓開(kāi)發(fā)者可以把精力放在開(kāi)發(fā)業(yè)務(wù),避免重復(fù)造輪子

一.Redisson 簡(jiǎn)介

Redisson 是一個(gè)基于 Netty 通信框架的高性能 Redis 客戶端, 實(shí)現(xiàn)了分布式和可擴(kuò)展的 Java 數(shù)據(jù)結(jié)構(gòu),提供很多分布式相關(guān)操作服務(wù)以及大量便利的工具方法,讓開(kāi)發(fā)者可以把精力放在開(kāi)發(fā)業(yè)務(wù),避免重復(fù)造輪子。B8o28資訊網(wǎng)——每日最新資訊28at.com

二.Redisson 優(yōu)點(diǎn)

1.通信框架基于 Netty,使用多路復(fù)用。吞吐量高。B8o28資訊網(wǎng)——每日最新資訊28at.com

2.兼容支持 Redis 集群模式,Reids 哨兵模式等,天然適配分布式服務(wù)。B8o28資訊網(wǎng)——每日最新資訊28at.com

3.提供多種分布式對(duì)象的封裝,如:Bloom Filter,Object Bucket,Bitset,AtomicLong, 和 HyperLogLog 等。B8o28資訊網(wǎng)——每日最新資訊28at.com

4.提供分布式鎖實(shí)現(xiàn)包括:B8o28資訊網(wǎng)——每日最新資訊28at.com

RedissonFairLock 公平鎖,B8o28資訊網(wǎng)——每日最新資訊28at.com

RedissonLock 非公平鎖,B8o28資訊網(wǎng)——每日最新資訊28at.com

RedissonRedLock 紅鎖(基于紅鎖算法, 當(dāng)集群中大多數(shù)( N/2 + 1 )加鎖成功了,則認(rèn)為加鎖成功,B8o28資訊網(wǎng)——每日最新資訊28at.com

目前已被棄用,Redisson 官方不再建議使用)。B8o28資訊網(wǎng)——每日最新資訊28at.com

三.RedissonLock 分布式鎖相關(guān)部分源碼解析

RedissonLock 作為分布式鎖,實(shí)現(xiàn)了可重入鎖。阻塞鎖,非阻塞鎖。并且 Redisson 存在看門(mén)狗機(jī)制,可以對(duì)未手動(dòng)設(shè)置超時(shí)時(shí)間的鎖實(shí)現(xiàn)自動(dòng)續(xù)期。B8o28資訊網(wǎng)——每日最新資訊28at.com

1.Trylock 加鎖

加鎖代碼邏輯B8o28資訊網(wǎng)——每日最新資訊28at.com

/**** @param waitTime 獲取鎖的最大等待時(shí)間,默認(rèn) -1,* @param leaseTime 鎖的過(guò)期時(shí)間,默認(rèn) -1* @param unit* @param threadId* @return*/private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {  RFuture<Boolean> acquiredFuture;  if (leaseTime > 0) {    //若手動(dòng)設(shè)置了鎖的過(guò)期時(shí)間,則加鎖時(shí)以當(dāng)前傳入過(guò)期時(shí)間為準(zhǔn)    //執(zhí)行Lua腳本,加鎖    acquiredFuture = tryLockInnerAsync(waitTime, leaseTime, unit,     threadId,RedisCommands.EVAL_NULL_BOOLEAN);                                                   } else {    //若未手動(dòng)設(shè)置,則默認(rèn)過(guò)期時(shí)間等于配置的lockWatchdogTimeout,lockWatchdogTimeout默認(rèn)為30s。    //然后執(zhí)行Lua腳本,加鎖    acquiredFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,    TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);  }  CompletionStage<Boolean> f = acquiredFuture.thenApply(acquired -> {  //lock acquired  //若鎖成功獲取到  if (acquired) {    if (leaseTime > 0) {      internalLockLeaseTime = unit.toMillis(leaseTime);      } else {      //若未手動(dòng)設(shè)置過(guò)期時(shí)間,則執(zhí)行看門(mén)狗任務(wù),自動(dòng)續(xù)期      scheduleExpirationRenewal(threadId);    }  }  return acquired;  });  return new CompletableFutureWrapper<>(f);}

加鎖 Lua 腳本如下:B8o28資訊網(wǎng)——每日最新資訊28at.com

if (redis.call('exists', KEYS[1]) == 0) then " +  "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +  "redis.call('pexpire', KEYS[1], ARGV[1]); " +  "return nil; " +  "end; " +  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +  "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +  "redis.call('pexpire', KEYS[1], ARGV[1]); " +  "return nil; " +  "end; " +  "return redis.call('pttl', KEYS[1]);

其中 KEYS[1] 是鎖邏輯名稱,ARGV[1] 是 key 的過(guò)期時(shí)間,ARGV[2]是鎖的線程級(jí)別名稱( uuid + 線程id ,uuid 是每個(gè) Redisson 客戶端創(chuàng)建時(shí)唯一生成的)。B8o28資訊網(wǎng)——每日最新資訊28at.com

由此可看出,鎖利用 Hash 結(jié)構(gòu)實(shí)現(xiàn),其中 Hash 的 key 是鎖的邏輯名稱,field 是鎖的線程級(jí)別名稱,value 是鎖的重入次數(shù)。B8o28資訊網(wǎng)——每日最新資訊28at.com

加鎖 Lua 腳本的含義:B8o28資訊網(wǎng)——每日最新資訊28at.com

先判斷當(dāng)前邏輯鎖名稱的 key 是否存在,B8o28資訊網(wǎng)——每日最新資訊28at.com

若不存在,在 Hash 結(jié)構(gòu)中設(shè)置這個(gè)鎖,鎖重入次數(shù)加 1,然后給 key 設(shè)置一個(gè)過(guò)期時(shí)間,最后返回 null。B8o28資訊網(wǎng)——每日最新資訊28at.com

若存在,并且已經(jīng)被當(dāng)前線程持有,就鎖可重入次數(shù)加 1,并且重新設(shè)置 key 的過(guò)期時(shí)間,最后返回 null,B8o28資訊網(wǎng)——每日最新資訊28at.com

若當(dāng)前鎖被其他線程持有,返回 key 剩余過(guò)期時(shí)間。B8o28資訊網(wǎng)——每日最新資訊28at.com

2.Lock 阻塞鎖

Lock 阻塞鎖與 Trylock 底層調(diào)用代碼基本一致。多了一個(gè)等待鎖被其他線程釋放后,重新嘗試加鎖的過(guò)程。B8o28資訊網(wǎng)——每日最新資訊28at.com

代碼如下:B8o28資訊網(wǎng)——每日最新資訊28at.com

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {  long threadId = Thread.currentThread().getId();  Long ttl = tryAcquire(-1, leaseTime, unit, threadId);  // lock acquired  if (ttl == null) {    return;  }  //訂閱釋放鎖消息  CompletableFuture<RedissonLockEntry> future = subscribe(threadId);  pubSub.timeout(future);  RedissonLockEntry entry;  if (interruptibly) {    entry = commandExecutor.getInterrupted(future);  } else {    entry = commandExecutor.get(future);  }  try {    while (true) {      //重新嘗試取鎖      ttl = tryAcquire(-1, leaseTime, unit, threadId);      // lock acquired      if (ttl == null) {        break;      }      // waiting for message,      if (ttl >= 0) {        try {          //當(dāng)鎖仍然被其他線程占有時(shí),調(diào)用          //java.util.concurrent.Semaphore#tryAcquire方法進(jìn)行信號(hào)量阻塞,          //當(dāng)線程阻塞等待時(shí)間超過(guò)最大超時(shí)時(shí)間(ttl即鎖的key的剩余存活時(shí)間)          //或者 監(jiān)聽(tīng)到鎖釋放消息后,信號(hào)量被釋放后,線程不再阻塞          entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);        } catch (InterruptedException e) {          if (interruptibly) {            throw e;          }          entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);        }      } else {        if (interruptibly) {          //嘗試從信號(hào)量獲取一個(gè)許可          entry.getLatch().acquire();        } else {          entry.getLatch().acquireUninterruptibly();        }      }    }  } finally {  //取消訂閱鎖釋放消息  unsubscribe(entry, threadId);}

大致流程如下:B8o28資訊網(wǎng)——每日最新資訊28at.com

1.先獲取鎖,若獲取鎖成功,直接返回。B8o28資訊網(wǎng)——每日最新資訊28at.com

2.若獲取失敗,訂閱釋放鎖消息。B8o28資訊網(wǎng)——每日最新資訊28at.com

3.進(jìn)入 while 循環(huán),重新嘗試獲取鎖。若獲取鎖成功,則跳出循環(huán),并不再訂閱釋放鎖消息。B8o28資訊網(wǎng)——每日最新資訊28at.com

4.若重新獲取鎖失敗,進(jìn)行信號(hào)量阻塞,直到鎖被其他占有線程釋放(監(jiān)聽(tīng)鎖釋放消息的監(jiān)聽(tīng)器中,有喚醒信號(hào)量的邏輯)或者到達(dá)阻塞超時(shí)時(shí)間,然后繼續(xù)這個(gè) while 循環(huán)。B8o28資訊網(wǎng)——每日最新資訊28at.com

3.Unlock 解鎖

代碼如下B8o28資訊網(wǎng)——每日最新資訊28at.com

public RFuture<Void> unlockAsync(long threadId) {  //執(zhí)行解鎖lua腳本  RFuture<Boolean> future = unlockInnerAsync(threadId);  CompletionStage<Void> f = future.handle((opStatus, e) -> {    //取消看門(mén)狗任務(wù)    cancelExpirationRenewal(threadId);    if (e != null) {      throw new CompletionException(e);    }    if (opStatus == null) {      IllegalMonitorStateException cause = new IllegalMonitorStateException      ("attempt to unlock lock, not locked by current thread by node id: "      + id + " thread-id: " + threadId);      throw new CompletionException(cause);    }    return null;  });  return new CompletableFutureWrapper<>(f);}

1.其中解鎖 Lua 腳本如下:B8o28資訊網(wǎng)——每日最新資訊28at.com

if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +  "return nil;" +  "end; " +  "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +  "if (counter > 0) then " +  "redis.call('pexpire', KEYS[1], ARGV[2]); " +  "return 0; " +  "else " +  "redis.call('del', KEYS[1]); " +  "redis.call('publish', KEYS[2], ARGV[1]); " +  "return 1; " +  "end; " +  "return nil;

其中 KEYS[1] 為鎖的邏輯名稱,KEYS[2] 為通道名稱,ARGV[1] 為 0, ARGV[2] 為鎖的過(guò)期時(shí)間,默認(rèn) 30s,ARGV[3] 為鎖的線程級(jí)別名稱。B8o28資訊網(wǎng)——每日最新資訊28at.com

解鎖 Lua 腳本含義:B8o28資訊網(wǎng)——每日最新資訊28at.com

解鎖時(shí),先判斷當(dāng)前鎖是否被當(dāng)前線程持有,B8o28資訊網(wǎng)——每日最新資訊28at.com

若不是,則返回 null。B8o28資訊網(wǎng)——每日最新資訊28at.com

若是,鎖的可重入次數(shù) 減1。B8o28資訊網(wǎng)——每日最新資訊28at.com

然后繼續(xù)判斷鎖的可重入次數(shù)是否大于 0,若大于 0,繼續(xù)給這個(gè)鎖 key 續(xù)期 30s,并且最后返回 0。B8o28資訊網(wǎng)——每日最新資訊28at.com

若不大于 0,刪除這個(gè)鎖的 key,并向指定通道發(fā)布這個(gè)解鎖消息,并且返回 1。B8o28資訊網(wǎng)——每日最新資訊28at.com

2.如果這個(gè)鎖有看門(mén)狗任務(wù)在定時(shí)續(xù)期,當(dāng)解鎖成功時(shí)會(huì)取消這個(gè)定時(shí)續(xù)期任務(wù)。B8o28資訊網(wǎng)——每日最新資訊28at.com

4.看門(mén)狗機(jī)制

當(dāng)某個(gè)鎖內(nèi)的任務(wù)的執(zhí)行時(shí)間不可預(yù)估時(shí),可能執(zhí)行時(shí)間很長(zhǎng),也可能很短。此時(shí)若直接設(shè)置一個(gè)固定的鎖過(guò)期時(shí)間,可能會(huì)導(dǎo)致任務(wù)執(zhí)行時(shí)間遠(yuǎn)遠(yuǎn)大于鎖的過(guò)期時(shí)間,導(dǎo)致任務(wù)還未執(zhí)行完成,但是鎖已經(jīng)過(guò)期了。那其他線程又可以獲取到鎖,然后執(zhí)行該任務(wù)了,最終導(dǎo)致線程安全問(wèn)題。B8o28資訊網(wǎng)——每日最新資訊28at.com

為應(yīng)對(duì)這種情況,定期給鎖續(xù)期的看門(mén)狗機(jī)制出現(xiàn)了。B8o28資訊網(wǎng)——每日最新資訊28at.com

代碼:B8o28資訊網(wǎng)——每日最新資訊28at.com

//真正看門(mén)狗續(xù)期任務(wù)private void renewExpiration() {  ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());  if (ee == null) {    return;  }  //創(chuàng)建一個(gè)延時(shí)任務(wù),底層實(shí)現(xiàn)是netty時(shí)間輪。當(dāng)每過(guò)了lockWatchdogTimeout/3的時(shí)間,執(zhí)行該任務(wù)  Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {    @Override    public void run(Timeout timeout) throws Exception {      ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());        if (ent == null) {          return;        }        Long threadId = ent.getFirstThreadId();        //若當(dāng)前鎖已經(jīng)被當(dāng)前線程釋放,則鎖不再續(xù)期        if (threadId == null) {          return;        }        //調(diào)用Lua腳本,判斷當(dāng)前鎖是否被當(dāng)前線程占有,若是則返回true,        //并且重新設(shè)置key的過(guò)期時(shí)間,默認(rèn)30s        CompletionStage<Boolean> future = renewExpirationAsync(threadId);        future.whenComplete((res, e) -> {          if (e != null) {            log.error("Can't update lock " + getRawName() + " expiration", e);            EXPIRATION_RENEWAL_MAP.remove(getEntryName());            return;            }            //當(dāng)鎖仍然被當(dāng)前線程占有,說(shuō)明業(yè)務(wù)代碼還在執(zhí)行,則遞歸調(diào)用續(xù)期任務(wù)            if (res) {              // reschedule itself              log.info("續(xù)期任務(wù)執(zhí)行"+ "threadId:" +threadId);              renewExpiration();            } else {              //否則移除該續(xù)期任務(wù),直接在EXPIRATION_RENEWAL_MAP移除ExpirationEntry              cancelExpirationRenewal(null);            }        });    }  }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);    ee.setTimeout(task);}

當(dāng)沒(méi)有顯式指定鎖過(guò)期時(shí)間時(shí)候,就默認(rèn) key 過(guò)期時(shí)間 30s,然后定時(shí)任務(wù)每 10 秒( lockWatchdogTimeout/3 )進(jìn)行一次調(diào)用,執(zhí)行鎖續(xù)期動(dòng)作,若這個(gè)線程還持有這個(gè)鎖,就對(duì)這個(gè)線程持有的鎖進(jìn)行續(xù)期操作(通過(guò) pexpire 續(xù)期 key 30s),若途中持有鎖的線程 手動(dòng)被 unlock 或者機(jī)器宕機(jī)才會(huì)取消這個(gè)任務(wù)。否則會(huì)一直續(xù)期。B8o28資訊網(wǎng)——每日最新資訊28at.com

四.總結(jié)

Redisson 作為一個(gè) Redis 客戶端,基于 Redis、Lua 和 Netty 建立起了一套完善的分布式解決方案,比如分布式鎖的實(shí)現(xiàn),分布式對(duì)象的操作等。本文主要簡(jiǎn)單講述了在 Redisson 中分布式鎖的實(shí)現(xiàn)。其實(shí)在 Redisson 中還有很多值得深挖的點(diǎn)。比如:Redisson 中使用了大量 Netty 的特性。大家有興趣的話,可以仔細(xì)研究一下。B8o28資訊網(wǎng)——每日最新資訊28at.com

五.參考文章

https://github.com/redisson/redisson/wikiB8o28資訊網(wǎng)——每日最新資訊28at.com

https://cloud.tencent.com/developer/article/1500854B8o28資訊網(wǎng)——每日最新資訊28at.com

本文鏈接:http://www.tebozhan.com/showinfo-26-13605-0.htmlRedisson雜談,你學(xué)到了什么?

聲明:本網(wǎng)頁(yè)內(nèi)容旨在傳播知識(shí),若有侵權(quán)等問(wèn)題請(qǐng)及時(shí)與本網(wǎng)聯(lián)系,我們將在第一時(shí)間刪除處理。郵件:2376512515@qq.com

上一篇: 為什么 HTTP/3 正在吞噬世界

下一篇: 超好用的Java常用工具類StringUtils(帶代碼實(shí)例),提升開(kāi)發(fā)效率

標(biāo)簽:
  • 熱門(mén)焦點(diǎn)
Top