消息隊(duì)列是一種典型的發(fā)布/訂閱模式,是專(zhuān)門(mén)為異步化應(yīng)用和分布式系統(tǒng)設(shè)計(jì)的,具有高性能、穩(wěn)定性及可伸縮性的特點(diǎn),是開(kāi)發(fā)分布式系統(tǒng)和應(yīng)用系統(tǒng)必備的技術(shù)之一。目前,針對(duì)不同的業(yè)務(wù)場(chǎng)景,比較成熟可靠的消息中間件產(chǎn)品有RocketMQ、Kafka、RabbitMq等,基于Redis再去實(shí)現(xiàn)一個(gè)消息隊(duì)列少有提及,那么已經(jīng)有很成熟的產(chǎn)品可以選擇,還有必要再基于Redis自己來(lái)實(shí)現(xiàn)一個(gè)消息隊(duì)列嗎?基于Redis實(shí)現(xiàn)的消息隊(duì)列有什么特別的地方嗎?
先來(lái)回顧一個(gè)Redis有哪些特性:
總結(jié)一下:redis的特點(diǎn)就是:快、簡(jiǎn)單、穩(wěn)定;
以RocketMQ為代表,作為專(zhuān)業(yè)的消息中間件而言,有哪些特性呢:
總結(jié)一下:RocketMQ的特點(diǎn)就是除了性能非常高、系統(tǒng)本身的功能比較專(zhuān)業(yè)、完善,能適應(yīng)非常多的場(chǎng)景;
從上述分析可以看出,Redis隊(duì)列和MQ消息隊(duì)列各有優(yōu)勢(shì),Redis的最大特點(diǎn)就是快,所以基于Redis的消息隊(duì)列相比MQ消息隊(duì)列而言,更適合實(shí)時(shí)處理,但是基于Redis的消息隊(duì)列更易受服務(wù)器內(nèi)存限制;而RocketMQ消息隊(duì)列作為專(zhuān)業(yè)的消息中間件產(chǎn)品,功能更完善,更適合應(yīng)用于比較復(fù)雜的業(yè)務(wù)場(chǎng)景,可以實(shí)現(xiàn)離線消息發(fā)送、消息可靠投遞以及消息的安全性,但MQ消息隊(duì)列的讀寫(xiě)性能略低于Redis隊(duì)列。在技術(shù)選型時(shí),除了上述的因素外,還有一個(gè)需要注意:大多數(shù)系統(tǒng)都會(huì)引入Redis作為基礎(chǔ)的緩存中間件使用,如果要選用RocketMQ的話(huà),還需要額外再申請(qǐng)資源進(jìn)行部署。
很多時(shí)候,所謂的優(yōu)點(diǎn)和缺點(diǎn),只是針對(duì)特定場(chǎng)景而言,如果場(chǎng)景不一樣了,優(yōu)點(diǎn)可能會(huì)變成缺點(diǎn),缺點(diǎn)也可能會(huì)變成優(yōu)點(diǎn)。因此,除了專(zhuān)業(yè)的消息中間件外,基于Redis實(shí)現(xiàn)一個(gè)消息隊(duì)列也是有必要的,在某些特殊的業(yè)務(wù)場(chǎng)景,比如一些并發(fā)量不是很高的管理系統(tǒng),某些業(yè)務(wù)流程需要異步化處理,這時(shí)選擇基于Redis自己實(shí)現(xiàn)一個(gè)消息隊(duì)列,也是一個(gè)比較好的選擇。這也是本篇文章主要分享的內(nèi)容。
隊(duì)列(Queue)是一種數(shù)據(jù)結(jié)構(gòu),遵循先進(jìn)先出(FIFO)的原則。在隊(duì)列中,元素被添加到末尾(入隊(duì)),并從開(kāi)頭移除(出隊(duì))。
Java中有哪些隊(duì)列?
以LinkedBlockingQueue為例,其使用方法是這樣的:
創(chuàng)建了一個(gè)生產(chǎn)者線程和一個(gè)消費(fèi)者線程,生產(chǎn)者線程和消費(fèi)者線程分別對(duì)同一個(gè)LinkedBlockingQueue對(duì)象進(jìn)行操作。生產(chǎn)者線程通過(guò)調(diào)用put()方法將元素添加到隊(duì)列中,而消費(fèi)者線程通過(guò)調(diào)用take()方法從隊(duì)列中取出元素。這兩個(gè)方法都會(huì)阻塞線程,直到隊(duì)列中有元素可供取出或有空間可供添加元素。
import java.util.concurrent.LinkedBlockingQueue; public class LinkedBlockingQueueExample { public static void main(String[] args) { LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(); // 生產(chǎn)者線程 new Thread(() -> { for (int i = 0; i < 10; i++) { try { queue.put("Element " + i); System.out.println("Produced: Element " + i); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); // 消費(fèi)者線程 new Thread(() -> { for (int i = 0; i < 10; i++) { try { String element = queue.take(); System.out.println("Consumed: " + element); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } }
圖片
無(wú)法持久化保存消息,如果 Redis 服務(wù)器宕機(jī)或重啟,那么所有的消息將會(huì)丟失;
發(fā)布訂閱模式是“發(fā)后既忘”的工作模式,如果有訂閱者離線重連之后就不能消費(fèi)之前的歷史消息;
不支持消費(fèi)者確認(rèn)機(jī)制,穩(wěn)定性不能得到保證,例如當(dāng)消費(fèi)者獲取到消息之后,還沒(méi)來(lái)得及執(zhí)行就宕機(jī)了。因?yàn)闆](méi)有消費(fèi)者確認(rèn)機(jī)制,Redis 就會(huì)誤以為消費(fèi)者已經(jīng)執(zhí)行了,因此就不會(huì)重復(fù)發(fā)送未被正常消費(fèi)的消息了,這樣整體的 Redis 穩(wěn)定性就被沒(méi)有辦法得到保障了。
基于Stream 類(lèi)型實(shí)現(xiàn):使用 Stream 的 xadd 和 xrange 來(lái)實(shí)現(xiàn)消息的存入和讀取了,并且 Stream 提供了 xack 手動(dòng)確認(rèn)消息消費(fèi)的命令,用它我們就可以實(shí)現(xiàn)消費(fèi)者確認(rèn)的功能了,使用命令如下:
127.0.0.1:6379> xack mq group1 1580959593553-0(integer) 1
消費(fèi)確認(rèn)增加了消息的可靠性,一般在業(yè)務(wù)處理完成之后,需要執(zhí)行 ack 確認(rèn)消息已經(jīng)被消費(fèi)完成,整個(gè)流程的執(zhí)行如下圖所示:
其中“Group”為群組,消費(fèi)者也就是接收者需要訂閱到群組才能正常獲取到消息。
以上就是基于Redis實(shí)現(xiàn)消息隊(duì)列的幾種方式的簡(jiǎn)單對(duì)比介紹,下面主要是分享一下基于Redis的List數(shù)據(jù)類(lèi)型實(shí)現(xiàn),其他幾種方式,有興趣的小伙可以自己嘗試一下。
基于Redis的List數(shù)據(jù)類(lèi)型實(shí)現(xiàn)消費(fèi)隊(duì)列的工作原理是什么?
Redis基于List結(jié)構(gòu)實(shí)現(xiàn)隊(duì)列的原理主要依賴(lài)于List的push和pop操作。
在Redis中,你可以使用LPUSH命令將一個(gè)或多個(gè)元素推入列表的左邊,也就是列表頭部。同樣,你可以使用RPUSH命令將一個(gè)或多個(gè)元素推入列表的右邊,也就是列表尾部。
對(duì)于隊(duì)列來(lái)說(shuō),新元素總是從隊(duì)列的頭部進(jìn)入,而讀取操作總是從隊(duì)列的尾部開(kāi)始。因此,當(dāng)你想將一個(gè)新元素加入隊(duì)列時(shí),你可以使用LPUSH命令。當(dāng)你想從隊(duì)列中取出一個(gè)元素時(shí),你可以使用RPOP命令。
此外,Redis還提供了BRPOP命令,這是一個(gè)阻塞的RPOP版本。如果給定列表內(nèi)沒(méi)有任何元素可供彈出的話(huà),將阻塞連接直到等待超時(shí)或發(fā)現(xiàn)可彈出元素為止。
需要注意的是,雖然Redis能夠提供原子性的push和pop操作,但是在并發(fā)環(huán)境下使用隊(duì)列時(shí),仍然需要考慮線程安全和并發(fā)控制的問(wèn)題。你可能需要使用Lua腳本或者其他機(jī)制來(lái)確保并發(fā)操作的正確性。
總的來(lái)說(shuō),Redis通過(guò)提供List數(shù)據(jù)結(jié)構(gòu)以及一系列相關(guān)命令,可以很方便地實(shí)現(xiàn)隊(duì)列的功能。
下面是Redis關(guān)于List數(shù)據(jù)結(jié)構(gòu)操作的命令主要包括以下幾種:
以一個(gè)實(shí)際需求為例,演示一個(gè)基于Redis的延遲隊(duì)列是怎么使用的?
有一個(gè)XX任務(wù)管理的功能,主要的業(yè)務(wù)過(guò)程:
1、創(chuàng)建任務(wù)后;
2、不斷檢查任務(wù)的狀態(tài),任務(wù)的狀態(tài)有三種:待執(zhí)行、執(zhí)行中、執(zhí)行完成;
3、如果任務(wù)狀態(tài)是執(zhí)行完成后,主動(dòng)獲取任務(wù)執(zhí)行結(jié)果,對(duì)任務(wù)執(zhí)行結(jié)果進(jìn)行處理;如果任務(wù)狀態(tài)是待執(zhí)行、執(zhí)行中,則延遲5秒后,再次查詢(xún)?nèi)蝿?wù)執(zhí)行狀態(tài);
圖片
1、依賴(lài)引入
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-redis</artifactId> <version>1.4.7.RELEASE</version></dependency><dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.23.1</version></dependency>
2、定義三個(gè)延遲隊(duì)列BeforeQueue、RunningQueue、CompleteQueue,對(duì)隊(duì)列的任務(wù)進(jìn)行存取,BeforeQueue用于對(duì)待執(zhí)行狀態(tài)的任務(wù)的存取,Running用于對(duì)執(zhí)行中狀態(tài)的任務(wù)的存取,CompleteQueue用于對(duì)執(zhí)行完成狀態(tài)的任務(wù)的存取,在三個(gè)任務(wù)隊(duì)列中,取出元素是阻塞的,即如果隊(duì)列中沒(méi)有新的任務(wù),當(dāng)前線程會(huì)一直阻塞等待,直到有新的任務(wù)進(jìn)入;如果是隊(duì)列中還有元素,則遵循先進(jìn)先出的原則逐個(gè)取出進(jìn)行處理;
@Component@Slf4jpublic class BeforeQueue { @Autowired private RedissonClient redissonClient; /** * <p>取出元素</p> * <p>如果隊(duì)列中沒(méi)有元素,就阻塞等待,直</p> * @return */ public Object take(){ RBlockingQueue<Object> queue1 = redissonClient.getBlockingQueue("queue1"); Object obj = null; try { obj = queue1.take(); log.info("從myqueue1取出元素:{}",obj.toString()); } catch (InterruptedException e) { e.printStackTrace(); } return obj; } /** * <p>放入元素</p> * @param obj */ public void offer(Object obj){ RBlockingDeque<Object> queue1 = redissonClient.getBlockingDeque("queue1"); RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(queue1); delayedQueue.offer(obj,5, TimeUnit.SECONDS); log.info("向myqueue1設(shè)置元素:{}",obj.toString()); }}
@Component@Slf4jpublic class RunningQueue { @Autowired private RedissonClient redissonClient; public Object take(){ RBlockingQueue<Object> queue1 = redissonClient.getBlockingQueue("queue2"); Object obj = null; try { obj = queue1.take(); log.info("從myqueue2取出元素:{}",obj.toString()); } catch (InterruptedException e) { e.printStackTrace(); } return obj; } public void offer(Object obj){ RBlockingDeque<Object> queue1 = redissonClient.getBlockingDeque("queue2"); RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(queue1); delayedQueue.offer(obj,5, TimeUnit.SECONDS); log.info("向myqueue2設(shè)置元素:{}",obj.toString()); }}
@Component@Slf4jpublic class CompleteQueue { @Autowired private RedissonClient redissonClient; public Object take(){ RBlockingQueue<Object> queue1 = redissonClient.getBlockingQueue("queue3"); Object obj = null; try { obj = queue1.take(); log.info("從CompleteQueue取出元素:{}",obj.toString()); } catch (InterruptedException e) { e.printStackTrace(); } return obj; } public void offer(Object obj){ RBlockingQueue<Object> queue1 = redissonClient.getBlockingDeque("queue3"); RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(queue1); delayedQueue.offer(obj,5, TimeUnit.SECONDS); log.info("向CompleteQueue設(shè)置元素:{}",obj.toString()); }}
3、定義三個(gè)監(jiān)聽(tīng)器BeforeQueueListener、RunningQueueListener、CompleteQueueListener,監(jiān)聽(tīng)器的主要作用主要就是負(fù)責(zé)監(jiān)聽(tīng)三個(gè)隊(duì)列中是否有新的任務(wù) 元素進(jìn)入,如果有,則立即取出消費(fèi);如果沒(méi)有,則阻塞等待新的元素進(jìn)入,具體的實(shí)現(xiàn)邏輯是:新創(chuàng)建的任務(wù)會(huì)先放置到BeforeQueue中,BeforeQueueListener監(jiān)聽(tīng)到有新的任務(wù)進(jìn)入,會(huì)取出任務(wù)作一些業(yè)務(wù)處理,業(yè)務(wù)處理完一放入到RunningQueue中,RunningQueueListener監(jiān)聽(tīng)到有新的任務(wù)進(jìn)入,會(huì)取出任務(wù)再進(jìn)行處理,這里的處理主要是查詢(xún)?nèi)蝿?wù)執(zhí)行狀態(tài),查詢(xún)狀態(tài)結(jié)果主要分兩種情況:1、執(zhí)行中、待執(zhí)行狀態(tài),則把任務(wù)重新放入RunningQueue隊(duì)列中,延遲5秒;2、執(zhí)行完成狀態(tài),則把任務(wù)放置到CompleteQueue中;CompleteQueueListener監(jiān)聽(tīng)到有新的任務(wù)進(jìn)入后,會(huì)主動(dòng)獲取任務(wù)執(zhí)行結(jié)果,作最后業(yè)務(wù)處理;
4、監(jiān)聽(tīng)器在在處理隊(duì)列中的數(shù)據(jù)相關(guān)的業(yè)務(wù)時(shí),如果發(fā)生異常,則需要把取出的元素再重新入入到當(dāng)前隊(duì)列中,等待下一輪的重試;
@Component@Slf4jpublic class BeforeQueueListener implements Listener{ @Autowired private BeforeQueue beforeQueue; @Autowired private RunningQueue runningQueue; @Override public void start() { new Thread(new Runnable() { @Override public void run() { while (true){ log.info("監(jiān)聽(tīng)器進(jìn)入阻塞:BeforeQueueListener"); Object obj = beforeQueue.take(); if (ObjectUtil.isNotNull(obj)) { try { log.info("開(kāi)始休眠1s模擬業(yè)務(wù)處理:BeforeQueueListener,元素:{}",obj.toString()); Thread.currentThread().sleep(1000); log.info("業(yè)務(wù)處理完成:BeforeQueueListener,元素:{}",obj.toString()); runningQueue.offer(obj); } catch (InterruptedException e) { log.error("業(yè)務(wù)處理發(fā)生異常,重置元素到BeforeQueue隊(duì)列中"); log.error(e.getMessage()); beforeQueue.offer(obj); } } } } }).start(); }}
@Component@Slf4jpublic class RunningQueueListener implements Listener { @Autowired private RunningQueue runningQueue; @Autowired private CompleteQueue completeQueue; @Override public void start() { new Thread(new Runnable() { @Override public void run() { while (true) { log.info("監(jiān)聽(tīng)器進(jìn)入阻塞:RunningQueueListener"); Object obj = runningQueue.take(); if (ObjectUtil.isNotNull(obj)) { try { log.info("開(kāi)始休眠1s模擬業(yè)務(wù)處理:RunningQueueListener,元素:{}", obj.toString()); Thread.currentThread().sleep(1000); Random random = new Random(); int i = random.nextInt(2); if (i==0) { test(); } log.info("業(yè)務(wù)處理完成:RunningQueueListener,元素:{}", obj.toString()); completeQueue.offer(obj); } catch (Exception e) { log.error("業(yè)務(wù)處理發(fā)生異常,重置元素到RunningQueue隊(duì)列中"); log.error(e.getMessage()); runningQueue.offer(obj); } } } } }).start(); } public void test(){ try { int i=1/0; } catch (Exception e) { throw new RuntimeException("除數(shù)異常"); } }}
@Component@Slf4jpublic class CompleteQueueListener implements Listener{ @Autowired private CompleteQueue completeQueue; @Override public void start() { new Thread(new Runnable() { @Override public void run() { while (true){ log.info("監(jiān)聽(tīng)器進(jìn)入阻塞:CompleteQueueListener"); Object obj = completeQueue.take(); if (ObjectUtil.isNotNull(obj)) { try { log.info("開(kāi)始休眠1s模擬業(yè)務(wù)處理:CompleteQueueListener,元素:{}",obj.toString()); Thread.currentThread().sleep(1000); log.info("業(yè)務(wù)處理完成:listener3,元素:{}",obj.toString()); } catch (InterruptedException e) { log.error("業(yè)務(wù)處理發(fā)生異常,重置元素到CompleteQueue隊(duì)列中"); log.error(e.getMessage()); completeQueue.offer(obj); } log.info("CompleteQueueListener任務(wù)結(jié)束,元素:{}",obj.toString()); } } } }).start(); }}
5、利用Springboot的擴(kuò)展點(diǎn)ApplicationRunner,在項(xiàng)目啟動(dòng)完成后,分別啟動(dòng)BeforeQueueListener、RunningQueueListener、CompleteQueueListener,讓三個(gè)監(jiān)聽(tīng)器進(jìn)入阻塞監(jiān)聽(tīng)狀態(tài)
@Componentpublic class MyRunner implements ApplicationRunner { @Autowired private ApplicationContext applicationContext; @Override public void run(ApplicationArguments args) throws Exception { Map<String, Listener> beansOfType = applicationContext.getBeansOfType(Listener.class); for (String s : beansOfType.keySet()) { Listener listener = beansOfType.get(s); listener.start(); } }}
結(jié)果驗(yàn)證
圖片
三個(gè)任務(wù)隊(duì)列分別有三個(gè)線程來(lái)進(jìn)行阻塞監(jiān)聽(tīng),即如果任務(wù)隊(duì)列中有任務(wù)元素,則取出進(jìn)行處理;如果沒(méi)有,則阻塞等待,主線程只負(fù)責(zé)把任務(wù)設(shè)置到任務(wù)隊(duì)列中,出現(xiàn)的問(wèn)題是:控制臺(tái)的日志輸出顯示任務(wù)元素已經(jīng)放置到第一個(gè)BeforeQueue中,按照預(yù)期的結(jié)果應(yīng)該是,控制臺(tái)的日志輸出會(huì)顯示,從BeforeQueue取出元素進(jìn)行業(yè)務(wù)處理、以及業(yè)務(wù)處理的日志,然后放置到RunningQueue中,再?gòu)腞unningQueue中取出進(jìn)行業(yè)務(wù)處理,接著放置到CompleteQueue隊(duì)列中,最后從CompleteQueue中取出進(jìn)行業(yè)務(wù)處理,最后結(jié)束;實(shí)際情況是:總是缺少?gòu)腂eforeQueue取出元素進(jìn)行業(yè)務(wù)處理、以及業(yè)務(wù)處理的日志,其他的日志輸出都很正常、執(zhí)行結(jié)果也正常;
經(jīng)過(guò)排查分析,最后找到了原因:
是logback線程安全問(wèn)題, Logback 的大部分組件都是線程安全的,但某些特定的配置可能會(huì)導(dǎo)致線程安全問(wèn)題。例如,如果你在同一個(gè) Appender 中處理多個(gè)線程的日志事件,那么可能會(huì)出現(xiàn)線程安全問(wèn)題,導(dǎo)致某些日志事件丟失。
問(wèn)題原因找到了,其實(shí)解決方法也就找到,具體就是logback的異步日志,logback.xml配置如下:
<?xml versinotallow="1.0" encoding="UTF-8"?><configuration scan="true" scanPeriod="60 seconds" debug="false"> <!-- 日志存放路徑 --> <property name="log.path" value="logs/"/> <!-- 日志輸出格式 --> <property name="console.log.pattern" value="%d{yyyy-MM-dd HH:mm:ss.SSS} %highlight(%-5level) %magenta(${PID:-}) - %green([%-21thread]) %cyan(%-35logger{30}) %msg%n"/> <!-- 控制臺(tái)輸出 --> <appender name="console" class="ch.qos.logback.core.ConsoleAppender"> <encoder> <pattern>${console.log.pattern}</pattern> <charset>utf-8</charset> </encoder> </appender> <appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender"> <queueSize>500</queueSize> <discardingThreshold>0</discardingThreshold> <neverBlock>true</neverBlock> <appender-ref ref="console" /> </appender> <!--系統(tǒng)操作日志--> <root level="info"> <appender-ref ref="ASYNC" /> </root></configuration>
文章中展示了關(guān)鍵性代碼,示例全部代碼地址:https://gitcode.net/fox9916/redisson-demo.git
本文鏈接:http://www.tebozhan.com/showinfo-26-55111-0.html基于Redis實(shí)現(xiàn)消息隊(duì)列的實(shí)踐
聲明:本網(wǎng)頁(yè)內(nèi)容旨在傳播知識(shí),若有侵權(quán)等問(wèn)題請(qǐng)及時(shí)與本網(wǎng)聯(lián)系,我們將在第一時(shí)間刪除處理。郵件:2376512515@qq.com