因?yàn)楣ぷ髦行枰玫椒植际降难訒r(shí)隊(duì)列,調(diào)研了一段時(shí)間,選擇使用 Redisson DelayedQueue,為了搞清楚內(nèi)部運(yùn)行流程,特記錄下來。
總體流程大概是圖中的這個(gè)樣子,初看一眼有點(diǎn)不知從何下手,接下來我會(huì)通過以下幾點(diǎn)來分析流程,相信看完本文你能了解整個(gè)運(yùn)行流程。
圖片
發(fā)送延遲消息代碼如下,發(fā)送了一條延遲時(shí)間為 5s 的消息。
public void produce() { String queuename = "delay-queue"; RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(queuename); RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue); delayedQueue.offer("測(cè)試延遲消息", 5, TimeUnit.SECONDS);}
接收消息代碼如下,可以看到 delayedQueue 是沒有用到的,那么為什么要加這一行呢,這個(gè)后面總結(jié)部分回答。
public void consume() throws InterruptedException { String queuename = "delay-queue"; RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(queuename); RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue); String msg = blockingQueue.take(); //收到消息進(jìn)行處理...}
這兩段代碼可以寫在兩個(gè)不同的 Java 工程里,只要連接的是同一個(gè) Redis 就行。
調(diào)用 comsume() 之后,如果隊(duì)列里沒有消息,會(huì)阻塞等待隊(duì)列里有消息并且取到了才會(huì)返回。之所以這么說是因?yàn)榭赡苡袆e的 Java 進(jìn)程也在跟你一樣取同一個(gè)隊(duì)列里的消息,如果消息被另一個(gè)搶完了,那這時(shí)就還得阻塞等待。
這時(shí)看上去的原理是這樣的:
生產(chǎn)者調(diào)用 offer() 后,自己內(nèi)部開啟一個(gè)定時(shí)器,等到了時(shí)間再發(fā)送到 redis 的 list 里。
圖片
如果是這樣設(shè)計(jì)的話,相信大家都能看出來一個(gè)很簡(jiǎn)單的問題,要是延時(shí)時(shí)間還沒到,生產(chǎn)者自己掛了,那樣消息就丟了。所以還是讓我們接著往下看。
redisson 源碼里一共創(chuàng)建了三個(gè)隊(duì)列:【消息延時(shí)隊(duì)列】、【消息順序隊(duì)列】、【消息目標(biāo)隊(duì)列】。
圖片
假設(shè)在同一時(shí)間按照 msg1、msg2、msg3 的順序發(fā)消息到延時(shí)隊(duì)列,這三條消息就會(huì)被保存在【消息延時(shí)隊(duì)列】和【消息順序隊(duì)列】。
可以看到【消息延時(shí)隊(duì)列】的順序是按照到期時(shí)間升序排列的,而不是像【消息順序隊(duì)列】按照插入順序排。
消息到期后會(huì)將消息從前兩個(gè)隊(duì)列移除(怎么移?誰來移?),插入【消息目標(biāo)隊(duì)列】,也就是圖中第三個(gè)隊(duì)列。
消費(fèi)者也是阻塞在【消息目標(biāo)隊(duì)列】上取消息。
這時(shí)可以簡(jiǎn)單說明下每個(gè)隊(duì)列的作用:
其實(shí)【消息延時(shí)隊(duì)列】隊(duì)列里存的時(shí)間(也就是 zet 的 score)是到期的時(shí)間戳,為了畫圖方便,圖里就畫的是延遲的時(shí)間,不過不影響理解。
理解好這幾個(gè)隊(duì)列的名字和作用,后面還會(huì)一直用到,如果忘了可以翻回來回顧下。
因?yàn)闀鴮懤斫夥奖愫汀鞠㈨樞蜿?duì)列】在本文沒涉及到,后面部分好幾次提到的內(nèi)容:把到期的消息從【消息延時(shí)隊(duì)列】移到【消息目標(biāo)隊(duì)列】里,這句話實(shí)際的代碼邏輯是這樣:把【消息延時(shí)隊(duì)列】和【消息順序隊(duì)列】里的到期消息移除,把它們插入到【消息目標(biāo)隊(duì)列】。
知道了內(nèi)部所使用到的數(shù)據(jù)結(jié)構(gòu)后,這里可以簡(jiǎn)單說下整體的基本流程。
先說發(fā)送延遲消息,發(fā)送的延遲消息會(huì)先存在【消息延時(shí)隊(duì)列】和【消息順序隊(duì)列】,如果【消息延時(shí)隊(duì)列】原本是空的,會(huì)發(fā)布訂閱信息提醒有新的消息。
獲取延遲消息只需要從【消息目標(biāo)隊(duì)列】阻塞的取就行了,因?yàn)槔锩娑际堑狡跀?shù)據(jù)。
那么問題就只剩下怎么樣判斷時(shí)間到了,把【消息延時(shí)隊(duì)列】里的消息移動(dòng)到【消息目標(biāo)隊(duì)列】里呢?
這部分工作交給了初始化延時(shí)隊(duì)列來處理。
這里面會(huì)定時(shí)從【消息延時(shí)隊(duì)列】查詢最新到期時(shí)間,定時(shí)去把【消息延時(shí)隊(duì)列】里的消息移動(dòng)到【消息目標(biāo)隊(duì)列】里。
如果【消息延時(shí)隊(duì)列】是空的,就不會(huì)再定時(shí)查,而是等待發(fā)布訂閱信息提醒,再定時(shí)把【消息延時(shí)隊(duì)列】里的消息移動(dòng)到【消息目標(biāo)隊(duì)列】里。
剛開始看可能有點(diǎn)抽象,可以看完底下一節(jié)內(nèi)容之后,再回頭來看這里對(duì)應(yīng)的流程總結(jié),可能會(huì)比較清晰。
發(fā)送延時(shí)消息的邏輯比較簡(jiǎn)單,先看下發(fā)送的代碼。
public void produce() { String queuename = "delay-queue"; RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(queuename); RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue); delayedQueue.offer("測(cè)試延遲消息", 5, TimeUnit.SECONDS);}
從 delayedQueue.offer 方法開始,最終會(huì)執(zhí)行到 RedissonDelayedQueue 的 offerAsync 方法里。
offerAsync 方法的作用就是發(fā)送一段腳本給 redis 執(zhí)行,腳本內(nèi)容是:
@Overridepublic RFuture<Void> offerAsync(V e, long delay, TimeUnit timeUnit) { if (delay < 0) { throw new IllegalArgumentException("Delay can't be negative"); } long delayInMs = timeUnit.toMillis(delay); long timeout = System.currentTimeMillis() + delayInMs; long randomId = ThreadLocalRandom.current().nextLong(); return commandExecutor.evalWriteNoRetryAsync(getRawName(), codec, RedisCommands.EVAL_VOID, "local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);" + "redis.call('zadd', KEYS[2], ARGV[1], value);" + "redis.call('rpush', KEYS[3], value);" // if new object added to queue head when publish its startTime // to all scheduler workers + "local v = redis.call('zrange', KEYS[2], 0, 0); " + "if v[1] == value then " + "redis.call('publish', KEYS[4], ARGV[1]); " + "end;", Arrays.<Object>asList(getRawName(), timeoutSetName, queueName, channelName), timeout, randomId, encode(e));}
獲取延時(shí)消息是本文最簡(jiǎn)單的一部分。
public void consume() throws InterruptedException { String queuename = "delay-queue"; RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(queuename); RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue); String msg = blockingQueue.take(); //收到消息進(jìn)行處理...}
blockingQueue.take() 方法其實(shí)只是對(duì)【消息目標(biāo)隊(duì)列】執(zhí)行 blpop 阻塞的獲取到期消息
看一下初始化的代碼。
public void init() { String queuename = "delay-queue"; RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(queuename); RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);}
入口就是在 redissonClient.getDelayedQueue(blockingQueue) 中,創(chuàng)建了 RedissonDelayedQueue 對(duì)象,并執(zhí)行了構(gòu)造方法里的邏輯。
那么這里面主要做了什么事呢?
主要是調(diào)用了 QueueTransferTask 的 start() 方法。
public void start() { RTopic schedulerTopic = getTopic(); statusListenerId = schedulerTopic.addListener(new BaseStatusListener() { @Override public void onSubscribe(String channel) { pushTask(); } }); messageListenerId = schedulerTopic.addListener(Long.class, new MessageListener<Long>() { @Override public void onMessage(CharSequence channel, Long startTime) { scheduleTask(startTime); } });}
這段代碼主要是設(shè)置了指定主題(主題名:redisson_delay_queue_channel:{queuename})兩個(gè)發(fā)布訂閱的監(jiān)聽器。
需要注意的是,這里會(huì)先訂閱指定主題,然后觸發(fā)執(zhí)行 onSubscribe() 方法。
所以我們主要搞懂這三個(gè)方法都是做什么的,那么整個(gè)初始化流程就明白了。
因?yàn)檫@三個(gè)方法是相互調(diào)用的,只看文字的話容易云里霧里,這里有個(gè)流程圖,看方法解釋文字的時(shí)候可以對(duì)照著流程圖看比較有印象。
圖片
private void scheduleTask(final Long startTime) { TimeoutTask oldTimeout = lastTimeout.get(); if (startTime == null) { return; } if (oldTimeout != null) { oldTimeout.getTask().cancel(); } long delay = startTime - System.currentTimeMillis(); if (delay > 10) { Timeout timeout = connectionManager.newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { pushTask(); TimeoutTask currentTimeout = lastTimeout.get(); if (currentTimeout.getTask() == timeout) { lastTimeout.compareAndSet(currentTimeout, null); } } }, delay, TimeUnit.MILLISECONDS); if (!lastTimeout.compareAndSet(oldTimeout, new TimeoutTask(startTime, timeout))) { timeout.cancel(); } } else { pushTask(); }}
@Overrideprotected RFuture<Long> pushTaskAsync() { return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG, "local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); " + "if #expiredValues > 0 then " + "for i, v in ipairs(expiredValues) do " + "local randomId, value = struct.unpack('dLc0', v);" + "redis.call('rpush', KEYS[1], value);" + "redis.call('lrem', KEYS[3], 1, v);" + "end; " + "redis.call('zrem', KEYS[2], unpack(expiredValues));" + "end; " // get startTime from scheduler queue head task + "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); " + "if v[1] ~= nil then " + "return v[2]; " + "end " + "return nil;", Arrays.<Object>asList(getRawName(), timeoutSetName, queueName), System.currentTimeMillis(), 100);}
看不懂也不要緊,聽我解釋下就明白了。
這里發(fā)送了一段腳本給 redis 執(zhí)行:
我的理解就是初始化的時(shí)候
1是為了處理舊的消息,比如生產(chǎn)者1發(fā)送了消息,然后時(shí)間沒到自己下線了,這時(shí)如果沒有其他客戶端在線,就沒有人能把數(shù)據(jù)從【消息目標(biāo)隊(duì)列】移到【消息目標(biāo)隊(duì)列】了。
2是返回的這個(gè)時(shí)間戳,會(huì)拿這個(gè)定時(shí),等時(shí)間到了去【消息目標(biāo)隊(duì)列】拉取到期的消息。
簡(jiǎn)單總結(jié)就是這個(gè)方法是把到期消息從【消息延時(shí)隊(duì)列】放到【消息目標(biāo)隊(duì)列】里,并且返回了最近要到期消息的時(shí)間戳。
private void pushTask() { RFuture<Long> startTimeFuture = pushTaskAsync(); startTimeFuture.whenComplete((res, e) -> { if (e != null) { if (e instanceof RedissonShutdownException) { return; } log.error(e.getMessage(), e); scheduleTask(System.currentTimeMillis() + 5 * 1000L); return; } if (res != null) { scheduleTask(res); } });}
這個(gè)代碼看起來就比較簡(jiǎn)單,調(diào)用了 pushTaskAsync() 獲取最近要到期消息的時(shí)間戳(異步封裝了一下)。
有異常的話就調(diào)用 scheduleTask() 五秒后再執(zhí)行一次 pushTask()。
沒有異常的話如果有最近要到期消息的時(shí)間戳(說明【消息延時(shí)隊(duì)列】里還有未到期消息),用這個(gè)最新到期時(shí)間調(diào)用 scheduleTask(),在這個(gè)指定的時(shí)間調(diào)用 pushTask()。
這個(gè)方法簡(jiǎn)單總結(jié)就是決定了要不要調(diào)用、什么時(shí)候再調(diào)用 pushTask(),主要操作邏輯都在 pushTaskAsync() 里(把到期的消息從【消息延時(shí)隊(duì)列】移到【消息目標(biāo)隊(duì)列】供消費(fèi)端消費(fèi))。
了解了上面幾個(gè)方法的流程和含義,還記得一開頭提到的添加了兩個(gè)發(fā)布訂閱的監(jiān)聽器嗎?
1.當(dāng)指定主題有新訂閱時(shí)調(diào)用 pushTask() 方法,里面又會(huì)調(diào)用 pushTaskAsync() 方法
2.當(dāng)指定主題有新消息時(shí)調(diào)用 scheduleTask(startTime) 方法
需要注意的是,這里會(huì)先訂閱指定主題,然后觸發(fā)執(zhí)行 onSubscribe() 方法
再放一下開頭的圖總體流程圖:
圖片
這里回答開頭部分說的問題,到這看完了本文,你可以試著自己想一想這個(gè)問題的答案。
接收消息代碼如下,可以看到 delayedQueue 是沒有用到的,那么為什么要加這一行呢,這個(gè)后面總結(jié)部分回答。
public void consume() throws InterruptedException { String queuename = "delay-queue"; RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(queuename); RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue); String msg = blockingQueue.take(); //收到消息進(jìn)行處理...}
其實(shí)這個(gè)問題也是我開發(fā)過程中遇到的一個(gè)奇怪的地方,接收方代碼沒有初始化延時(shí)隊(duì)列。
首先再啰嗦一句,初始化延時(shí)隊(duì)列的作用是會(huì)定時(shí)去把【消息延時(shí)隊(duì)列】里的到期數(shù)據(jù)移動(dòng)到【消息目標(biāo)隊(duì)列】。
如果只有發(fā)送方初始化延時(shí)隊(duì)列:
所以接收方代碼里也初始化延時(shí)隊(duì)列能夠避免一部分?jǐn)?shù)據(jù)丟失問題。
本文鏈接:http://www.tebozhan.com/showinfo-26-88383-0.html分布式延時(shí)消息的另外一種選擇 Redisson
聲明:本網(wǎng)頁內(nèi)容旨在傳播知識(shí),若有侵權(quán)等問題請(qǐng)及時(shí)與本網(wǎng)聯(lián)系,我們將在第一時(shí)間刪除處理。郵件:2376512515@qq.com
上一篇: 聊聊Vue.js 基礎(chǔ)語法詳解