AVt天堂网 手机版,亚洲va久久久噜噜噜久久4399,天天综合亚洲色在线精品,亚洲一级Av无码毛片久久精品

當前位置:首頁 > 科技  > 軟件

1.5萬字 + 25張圖盤點RocketMQ 11種消息類型,你知道幾種?

來源: 責編: 時間:2023-11-08 09:11:20 325觀看
導讀大家好,我是三友~~故事的開頭是這樣的最近有個兄弟私信了我一張截圖圖片我一看截圖內容,好家伙,原來是我一年多前立的flag倒不是我忘了這件事,我后來也的確寫了一篇的關于RocketMQ運行的原理的文章只不過這篇文章是從上帝

大家好,我是三友~~LH928資訊網——每日最新資訊28at.com

故事的開頭是這樣的LH928資訊網——每日最新資訊28at.com

最近有個兄弟私信了我一張截圖LH928資訊網——每日最新資訊28at.com

圖片圖片LH928資訊網——每日最新資訊28at.com

我一看截圖內容,好家伙,原來是我一年多前立的flagLH928資訊網——每日最新資訊28at.com

倒不是我忘了這件事,我后來也的確寫了一篇的關于RocketMQ運行的原理的文章LH928資訊網——每日最新資訊28at.com

只不過這篇文章是從上帝的視角去看待RocektMQ一條消息整個生命周期的過程LH928資訊網——每日最新資訊28at.com

所以就沒有具體的分析事務和延遲消息的實現原理,也算是留下了一個小小的坑吧LH928資訊網——每日最新資訊28at.com

不過,既然現在有兄弟問了,那么今天我這就來把這個坑填上LH928資訊網——每日最新資訊28at.com

并且,索性咱就直接把這個坑填得滿滿的,直接盤點RocketMQ支持的11種消息類型以及背后的實現原理LH928資訊網——每日最新資訊28at.com

圖片圖片LH928資訊網——每日最新資訊28at.com

本文是基于RocketMQ 4.9版本講解LH928資訊網——每日最新資訊28at.com

前置知識

為了幫助大家更好地理解這些消息底層的實現原理,這里我就通過三個問題來講一講RocketMQ最最基本的原理LH928資訊網——每日最新資訊28at.com

1、生產者如何發送消息

在RocketMQ中有兩個重要的角色LH928資訊網——每日最新資訊28at.com

  • NameServer:就相當于一個注冊中心
  • Broker:RocketMQ服務端

當RocketMQ服務端,也就是Broker在啟動的時候,會往NameServer注冊自己的信息LH928資訊網——每日最新資訊28at.com

圖片圖片LH928資訊網——每日最新資訊28at.com

這些信息其中就包括LH928資訊網——每日最新資訊28at.com

  • 當前Broker所在機器的ip和端口
  • 當前Broker管理的Topic的名稱以及每個Topic有幾個隊列

當生產者和消費者啟動的時候,就會從NameServer拉取這些信息,這樣生產者和消費者就可以通過NameServer中獲取到Broker的ip和端口,跟Broker通信了LH928資訊網——每日最新資訊28at.com

而Topic我們也都知道,是消息隊列中一個很重要的概念,代表了一類消息的集合LH928資訊網——每日最新資訊28at.com

在RocketMQ中,每個Topic默認都會有4個隊列,并且每個隊列都有一個id,默認從0開始,依次遞增LH928資訊網——每日最新資訊28at.com

圖片圖片LH928資訊網——每日最新資訊28at.com

當生產者發送消息的時候,就會從消息所在Topic的隊列中,根據一定的算法選擇一個,然后攜帶這個隊列的id(queueId),再發送給BrokerLH928資訊網——每日最新資訊28at.com

攜帶的隊列的id就代表了這條消息屬于這個隊列的LH928資訊網——每日最新資訊28at.com

所以從更細化的來說,消息雖然是在Topic底下,但是真正是分布在不同的隊列上的,每個隊列會有這個Topic下的部分消息。LH928資訊網——每日最新資訊28at.com

2、消息存在哪

當消息被Broker接收到的時候,Broker會將消息存到本地的磁盤文件中,保證Broker重啟之后消息也不丟失LH928資訊網——每日最新資訊28at.com

RocketMQ給這個存消息的文件起了一個高大上的名字:CommitLogLH928資訊網——每日最新資訊28at.com

由于消息會很多,所以為了防止文件過大,CommitLog在物理磁盤文件上被分為多個磁盤文件,每個文件默認的固定大小是1GLH928資訊網——每日最新資訊28at.com

圖片圖片LH928資訊網——每日最新資訊28at.com

消息在寫入到文件時,除了包含消息本身的內容數據,也還會包含其它信息,比如LH928資訊網——每日最新資訊28at.com

  • 消息的Topic
  • 消息所在隊列的id,前面提到過
  • 消息生產者的ip和端口
  • ...

這些數據會和消息本身按照一定的順序同時寫到CommitLog文件中LH928資訊網——每日最新資訊28at.com

圖片圖片LH928資訊網——每日最新資訊28at.com

上圖中黃色排列順序和實際的存的內容并非實際情況,我只是舉個例子LH928資訊網——每日最新資訊28at.com

3、消費者如何消費消息

消費者是如何拉取消息的

在RocketMQ中,消息的消費單元是以隊列來的LH928資訊網——每日最新資訊28at.com

圖片圖片LH928資訊網——每日最新資訊28at.com

所以RocketMQ為了方便快速的查找和消費消息,會為每個Topic的每個隊列也單獨創建一個文件LH928資訊網——每日最新資訊28at.com

RocketMQ給這個文件也起了一個高大上的名字:ConsumeQueueLH928資訊網——每日最新資訊28at.com

當消息被存到CommitLog之后,其實還會往這條消息所在隊列的ConsumeQueue文件中插一條數據LH928資訊網——每日最新資訊28at.com

每個隊列的ConsumeQueue也是由多個文件組成,每個文件默認是存30萬條數據LH928資訊網——每日最新資訊28at.com

插入ConsumeQueue中的每條數據由20個字節組成,包含3部分信息LH928資訊網——每日最新資訊28at.com

  • 消息在CommitLog的起始位置(8個字節)
  • 消息在CommitLog存儲的長度(8個字節)
  • 消息tag的hashCode(4個字節)

圖片圖片LH928資訊網——每日最新資訊28at.com

每條數據也有自己的編號(offset),默認從0開始,依次遞增LH928資訊網——每日最新資訊28at.com

當消費者拉取消息的時候,會告訴服務端自己消費哪個隊列(queueId),哪個位置的消息(offset)的消息LH928資訊網——每日最新資訊28at.com

服務端接收到消息之后,會找到queueId對應的ConsumeQueue,然后找到offset位置的數據,最后根據這條數據到CommitLog文件查找真正的消息內容LH928資訊網——每日最新資訊28at.com

所以,從這可以看出,ConsumeQueue其實就相當于是一個索引文件,方便我們快速查找在CommitLog中的消息LH928資訊網——每日最新資訊28at.com

所以,記住下面這個非常重要的結論,有助于后面的文章內容的理解LH928資訊網——每日最新資訊28at.com

要想查找到某個Topic下的消息,那么一定是先找這個Topic隊列對應的ConsumeQueue,之后再通過ConsumeQueue中的數據去CommitLog文件查找真正的消息內容LH928資訊網——每日最新資訊28at.com

消費者組和消費模式

在RocketMQ,消費者是有個消費者組的概念,在啟動消費者的時候會指定該消費者屬于哪個消費者組。LH928資訊網——每日最新資訊28at.com

//創建一個消費者,指定消費者組的名稱為sanyouConsumerDefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sanyouConsumer");

一個消費者組中可以有多個消費者,不同消費者組之間消費消息是互不干擾的LH928資訊網——每日最新資訊28at.com

圖片圖片LH928資訊網——每日最新資訊28at.com

在同一個消費者組中,消息消費有兩種模式LH928資訊網——每日最新資訊28at.com

  • 集群模式
  • 廣播模式

同一條消息在同一個消費者組底下只會被消費一次,這就叫集群模式LH928資訊網——每日最新資訊28at.com

集群消費的實現就是將隊列按照一定的算法分配給消費者,默認是按照平均分配的LH928資訊網——每日最新資訊28at.com

圖片圖片LH928資訊網——每日最新資訊28at.com

廣播模式剛好相反,同一條消息能被同一個消費者組底下所有的消費者消費一次LH928資訊網——每日最新資訊28at.com

圖片圖片LH928資訊網——每日最新資訊28at.com

RocketMQ默認是集群模式,如果你想用廣播模式,只需設置一下即可LH928資訊網——每日最新資訊28at.com

consumer.setMessageModel(MessageModel.BROADCASTING);

好了,到這就講完了前置知識,這些前置知識后面或多或少都有提到LH928資訊網——每日最新資訊28at.com

如果你覺得看的不過癮,更詳細的文章奉上RocketMQ消息短暫而又精彩的一生LH928資訊網——每日最新資訊28at.com

普通消息

普通消息其實就很簡單,如下面代碼所示,就是發送一條普通的消息LH928資訊網——每日最新資訊28at.com

public class Producer {    public static void main(String[] args) throws Exception {        //創建一個生產者,指定生產者組為 sanyouProducer        DefaultMQProducer producer = new DefaultMQProducer("sanyouProducer");        // 指定NameServer的地址        producer.setNamesrvAddr("192.168.200.143:9876");        // 啟動生產者        producer.start();        //創建一條消息 topic為 sanyouTopic 消息內容為 三友的java日記        Message msg = new Message("sanyouTopic", "三友的java日記".getBytes(RemotingHelper.DEFAULT_CHARSET));        // 發送消息并得到消息的發送結果,然后打印        SendResult sendResult = producer.send(msg);        System.out.printf("%s%n", sendResult);        // 關閉生產者        producer.shutdown();    }}

構建的消息的topic為sanyouTopic,內容為三友的java日記,這就是一條很普通的消息LH928資訊網——每日最新資訊28at.com

批量消息

批量消息從名字也可以看出來,就是將多個消息同時發過去,減少網絡請求的次數LH928資訊網——每日最新資訊28at.com

public class Producer {    public static void main(String[] args) throws Exception {        //創建一個生產者,指定生產者組為 sanyouProducer        DefaultMQProducer producer = new DefaultMQProducer("sanyouProducer");        // 指定NameServer的地址        producer.setNamesrvAddr("192.168.200.143:9876");        // 啟動生產者        producer.start();        //用以及集合保存多個消息        List<Message> messages = new ArrayList<>();        messages.add(new Message("sanyouTopic", "三友的java日記 0".getBytes()));        messages.add(new Message("sanyouTopic", "三友的java日記 1".getBytes()));        messages.add(new Message("sanyouTopic", "三友的java日記 2".getBytes()));        // 發送消息并得到消息的發送結果,然后打印        SendResult sendResult = producer.send(messages);        System.out.printf("%s%n", sendResult);        // 關閉生產者        producer.shutdown();    }}

多個普通消息同時發送,這就是批量消息LH928資訊網——每日最新資訊28at.com

不過在使用批量消息的時候,需要注意以下兩點LH928資訊網——每日最新資訊28at.com

  • 每條消息的Topic必須都得是一樣的
  • 不支持延遲消息和事務消息

普通消息和批量消息比較簡單,沒有復雜的邏輯,就是將消息發送過去,在ConsumeQueue和CommitLog存上對應的數據就可以了LH928資訊網——每日最新資訊28at.com

順序消息

所謂的順序消息就是指LH928資訊網——每日最新資訊28at.com

生產者發送消息的順序跟消費者消費消息的順序是一致的LH928資訊網——每日最新資訊28at.com

RocketMQ可以保證同一個隊列的消息絕對順序,先進入隊列的消息會先被消費者拉取到,但是無法保證一個Topic內消息的絕對順序LH928資訊網——每日最新資訊28at.com

所以要想通過RocketMQ實現順序消費,需要保證兩點LH928資訊網——每日最新資訊28at.com

  • 生產者將需要保證順序的消息發送到同一個隊列
  • 消費者按照順序消費拉取到的消息

圖片圖片LH928資訊網——每日最新資訊28at.com

那么,第一個問題,如何消息發送到同一個隊列LH928資訊網——每日最新資訊28at.com

前面有提到,RocketMQ發送消息的時候會選擇一個隊列進行發送LH928資訊網——每日最新資訊28at.com

而RocketMQ默認是通過輪詢算法來選擇隊列的,這就無法保證需要順序消費的消息會存到同一個隊列底下LH928資訊網——每日最新資訊28at.com

所以,默認情況下是不行了,我們需要自定義隊列的選擇算法,才能保證消息都在同一個隊列中LH928資訊網——每日最新資訊28at.com

RocketMQ提供了自定義隊列選擇的接口MessageQueueSelectorLH928資訊網——每日最新資訊28at.com

比如我們可以實現這個接口,保證相同訂單id的消息都選擇同一個隊列,在消息發送的時候指定一下就可以了LH928資訊網——每日最新資訊28at.com

SendResult sendResult = producer.send(msg, new MessageQueueSelector() {    @Override    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {        //可以根據業務的id從mqs中選擇一個隊列        return null;    }}, new Object());

保證消息順序發送之后,第二個問題,消費者怎么按照順序消費拉取到的消息?LH928資訊網——每日最新資訊28at.com

這個問題RocketMQ已經考慮到了,看看RocketMQ多么地貼心LH928資訊網——每日最新資訊28at.com

RocketMQ在消費消息的時候,提供了兩種方式:LH928資訊網——每日最新資訊28at.com

  • 并發消費
  • 順序消費

并發消費,多個線程同時處理同一個隊列拉取到的消息LH928資訊網——每日最新資訊28at.com

順序消費,同一時間只有一個線程會處理同一個隊列拉取到的消息LH928資訊網——每日最新資訊28at.com

至于是并發消費還是順序消費,需要我們自己去指定LH928資訊網——每日最新資訊28at.com

對于順序處理,只需要實現MessageListenerOrderly接口,處理消息就可以了LH928資訊網——每日最新資訊28at.com

public class Consumer {    public static void main(String[] args) throws InterruptedException, MQClientException {        // 創建一個消費者        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sanyouConsumer");        // 指定NameServer的地址        consumer.setNamesrvAddr("192.168.200.143:9876");        // 訂閱sanyouTopic這個topic下的所有的消息        consumer.subscribe("sanyouTopic", "*");        // 注冊一個消費的監聽器,當有消息的時候,會回調這個監聽器來消費消息        consumer.registerMessageListener(new MessageListenerOrderly() {            @Override            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {                for (MessageExt msg : msgs) {                    System.out.printf("消費消息:%s", new String(msg.getBody()) + "/n");                }                return ConsumeOrderlyStatus.SUCCESS;            }        });        // 啟動消費者        consumer.start();        System.out.printf("Consumer Started.%n");    }}

如果想并發消費,換成實現MessageListenerConcurrently即可LH928資訊網——每日最新資訊28at.com

到這你可能會有一個疑問LH928資訊網——每日最新資訊28at.com

并發消費和順序消費跟前面提到的集群消費和廣播消費有什么區別?LH928資訊網——每日最新資訊28at.com

集群消費和廣播消費指的是一個消費者組里的每個消費者是去拉取全部隊列的消息還是部分隊列的消息,也就是選擇需要拉取的隊列LH928資訊網——每日最新資訊28at.com

而并發和順序消費的意思是,是對已經拉到的同一個隊列的消息,是并發處理還是按照消息的順序去處理LH928資訊網——每日最新資訊28at.com

延遲消息

延遲消息就是指生產者發送消息之后,消息不會立馬被消費,而是等待一定的時間之后再被消息LH928資訊網——每日最新資訊28at.com

RocketMQ的延遲消息用起來非常簡單,只需要在創建消息的時候指定延遲級別,之后這條消息就成為延遲消息了LH928資訊網——每日最新資訊28at.com

Message message = new Message("sanyouTopic", "三友的java日記 0".getBytes());//延遲級別message.setDelayTimeLevel(1);

雖然用起來簡單,但是背后的實現原理還是有點意思,我們接著往下看LH928資訊網——每日最新資訊28at.com

RocketMQ延遲消息的延遲時間默認有18個級別,不同的延遲級別對應的延遲時間不同LH928資訊網——每日最新資訊28at.com

圖片圖片LH928資訊網——每日最新資訊28at.com

RocketMQ內部有一個Topic,專門用來表示是延遲消息的,叫SCHEDULE_TOPIC_XXXX,XXXX不是占位符,就是XXXXLH928資訊網——每日最新資訊28at.com

RocketMQ會根據延遲級別的個數為SCHEDULE_TOPIC_XXXX這個Topic創建相對應數量的隊列LH928資訊網——每日最新資訊28at.com

比如默認延遲級別是18,那么SCHEDULE_TOPIC_XXXX就有18個隊列,隊列的id從0開始,所以延遲級別為1時,對應的隊列id就是0,為2時對應的就是1,依次類推LH928資訊網——每日最新資訊28at.com

圖片圖片LH928資訊網——每日最新資訊28at.com

那SCHEDULE_TOPIC_XXXX這個Topic有什么作用呢?LH928資訊網——每日最新資訊28at.com

這就得從消息存儲時的一波偷梁換柱的騷操作了說起了LH928資訊網——每日最新資訊28at.com

當服務端接收到消息的時候,判斷延遲級別大于0的時候,說明是延遲消息,此時會干下面三件事:LH928資訊網——每日最新資訊28at.com

  • 將消息的Topic改成SCHEDULE_TOPIC_XXXX
  • 將消息的隊列id設置為延遲級別對應的隊列id
  • 將消息真正的Topic和隊列id存到前面提到的消息存儲時的額外信息中

之后消息就按照正常存儲的步驟存到CommitLog文件中LH928資訊網——每日最新資訊28at.com

由于消息存到的是SCHEDULE_TOPIC_XXXX這個Topic中,而不是消息真正的目標Topic中,所以消費者此時是消費不到消息的LH928資訊網——每日最新資訊28at.com

舉個例子,比如有條消息,Topic為sanyou,所在的隊列id = 1,延遲級別 = 1,那么偷梁換柱之后的結果如下圖所示LH928資訊網——每日最新資訊28at.com

圖片圖片LH928資訊網——每日最新資訊28at.com

代碼如下LH928資訊網——每日最新資訊28at.com

圖片圖片LH928資訊網——每日最新資訊28at.com

所以從上分析可以得出一個結論LH928資訊網——每日最新資訊28at.com

所有RocketMQ的延遲消息,最終都會存儲到SCHEDULE_TOPIC_XXXX這個Topic中,并且同一個延遲級別的消息在同一個隊列中LH928資訊網——每日最新資訊28at.com

在存消息偷梁換柱之后,實現延遲消費的最關鍵的一個步驟來了LH928資訊網——每日最新資訊28at.com

BocketMQ在啟動的時候,除了為每個延遲級別創建一個隊列之后,還會為每個延遲級別創建一個延遲任務,也就相當于一個定時任務,每隔100ms執行一次LH928資訊網——每日最新資訊28at.com

圖片圖片LH928資訊網——每日最新資訊28at.com

這個延遲任務會去檢查這個隊列中的消息有沒有到達延遲時間,也就是不是可以消費了LH928資訊網——每日最新資訊28at.com

前面的結論,每個隊列都有一個ConsumeQueue文件,可以通過ConsumeQueue找到這個隊列中的消息LH928資訊網——每日最新資訊28at.com

一旦發現到達延遲時間,可以消費了,此時就會從這條消息額外存儲的消息中拿到真正的Topic和隊列id,重新構建一條新的消息,將新的消息的Topic和隊列id設置成真正的Topic和隊列id,內容還是原來消息的內容LH928資訊網——每日最新資訊28at.com

之后再一次將新構建的消息存儲到CommitLog中LH928資訊網——每日最新資訊28at.com

由于新消息的Topic變成消息真正的Topic了,所以之后消費者就能夠消費到這條消息了LH928資訊網——每日最新資訊28at.com

圖片圖片LH928資訊網——每日最新資訊28at.com

所以,從整體來說,RocketMQ延遲消息的實現本質上就是最開始消息是存在SCHEDULE_TOPIC_XXXX這個中轉的Topic中LH928資訊網——每日最新資訊28at.com

然后會有一個類似定時任務的東西,不停地去找到這個Topic中的消息LH928資訊網——每日最新資訊28at.com

一旦發現這個消息達到了延遲任務,說明可以消費了,那么就重新構建一條消息,這條消息的Topic和隊列id都是實際上的Topic和隊列id,然后存到CommitLogLH928資訊網——每日最新資訊28at.com

之后消費者就能夠在目標的Topic獲取到消息了LH928資訊網——每日最新資訊28at.com

事務消息

事務消息用起來也比較簡單,如下所示:LH928資訊網——每日最新資訊28at.com

public class TransactionMessageDemo {    public static void main(String[] args) throws Exception {        TransactionMQProducer transactionMQProducer = new TransactionMQProducer("sanyouProducer");        transactionMQProducer.setNamesrvAddr("192.168.200.143:9876");        //設置事務監聽器        transactionMQProducer.setTransactionListener(new TransactionListener() {            @Override            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {                //處理本次事務                return LocalTransactionState.COMMIT_MESSAGE;            }            @Override            public LocalTransactionState checkLocalTransaction(MessageExt msg) {                //檢查本地事務                return LocalTransactionState.COMMIT_MESSAGE;            }        });        transactionMQProducer.start();        Message message = new Message("sanyouTopic", "三友的java日記".getBytes());        //發送消息        transactionMQProducer.sendMessageInTransaction(message, new Object());    }}

事務消息發送相對于前面的例子主要有以下不同:LH928資訊網——每日最新資訊28at.com

  • 將前面的DefaultMQProducer換成TransactionMQProducer
  • 需要設置事務的監聽器TransactionListener,來執行本地事務
  • 發送方法改成 sendMessageInTransaction

為什么要這么改,接下來我們來講講背后的實現原理LH928資訊網——每日最新資訊28at.com

上一節在說延遲消息的時候提到,RocketMQ使用到了SCHEDULE_TOPIC_XXXX這個中轉Topic,來偷梁換柱實現延遲消息LH928資訊網——每日最新資訊28at.com

不僅僅是延遲消息,事務消息其實也是這么干的,它也會進行偷梁換柱,將消息先存在RMQ_SYS_TRANS_HALF_TOPIC這個Topic下,同時也會將消息真正的Topic和隊列id存到額外信息中,操作都是一樣滴LH928資訊網——每日最新資訊28at.com

圖片圖片LH928資訊網——每日最新資訊28at.com

由于消息不在真正目標的Topic下,所以這條消息消費者也是消費不到滴LH928資訊網——每日最新資訊28at.com

當消息成功存儲之后,服務端會向生產者響應,告訴生產者我消息存儲成功了,你可以執行本地事務了LH928資訊網——每日最新資訊28at.com

之后生產者就會執行本地執行事務,也就是執行如下方法LH928資訊網——每日最新資訊28at.com

TransactionListener#executeLocalTransactionLH928資訊網——每日最新資訊28at.com

當本地事務執行完之后,會將執行的結果發送給服務端LH928資訊網——每日最新資訊28at.com

服務端會根據事務的執行狀態來執行對應的處理結果LH928資訊網——每日最新資訊28at.com

  • commit:提交事務消息,跟延遲消息一樣,重新構建一條消息,Topic和隊列id都設置成消息真正的Topic和隊列id,然后重新存到CommitLog文件,這樣消費者就可以消費到消息了
  • rollback:回滾消息,其實并沒有實際的操作,因為消息本身就不在真正的Topic下,所以消費者壓根就消費不到,什么都不做就可以了
  • unknown:本地事務執行異常時就是這個狀態,這個狀態下會干一些事,咱們后面再說

所以在正常情況下,事務消息整個運行流程如下圖所示LH928資訊網——每日最新資訊28at.com

圖片圖片LH928資訊網——每日最新資訊28at.com

既然有正常情況下,那么就有非正常情況下LH928資訊網——每日最新資訊28at.com

比如前面提到的拋異常導致unknown,又或者什么亂七八糟的原因,導致無法正常提交本地事務的執行狀態,那么此時該怎么辦呢?LH928資訊網——每日最新資訊28at.com

RocketMQ當然也想到了,他有自己的一套補償機制LH928資訊網——每日最新資訊28at.com

RocketMQ內部會起動一個線程,默認每隔1分鐘去檢查沒有被commit或者rollback的事務消息LH928資訊網——每日最新資訊28at.com

RocketMQ內部有一套機制,可以找出哪些事務消息沒有commit或者rollback,這里就不細說了LH928資訊網——每日最新資訊28at.com

當發現這條消息超過6s沒有提交事務狀態,那么此時就會向生產者發送一個請求,讓生產者去檢查一下本地的事務執行的狀態,就是執行下面這行代碼LH928資訊網——每日最新資訊28at.com

TransactionListener#checkLocalTransactionLH928資訊網——每日最新資訊28at.com

之后會將這個方法返回的事務狀態提交給服務端,服務端就可以知道事務的執行狀態了LH928資訊網——每日最新資訊28at.com

圖片LH928資訊網——每日最新資訊28at.com

這里有一個細節需要注意,事務消息檢查次數不是無限的,默認最大為15次,一旦超過15次,那么就不會再被檢查了,而是會直接把這個消息存到TRANS_CHECK_MAX_TIME_TOPIC中LH928資訊網——每日最新資訊28at.com

所以你可以從這個Topic讀取那些無法正常提交事務的消息LH928資訊網——每日最新資訊28at.com

這就是RocketMQ事務消息的原理LH928資訊網——每日最新資訊28at.com

小總結

RocketMQ事務消息的實現主要是先將消息存到RMQ_SYS_TRANS_HALF_TOPIC這個中間Topic,有些資料會把這個消息稱為半消息(half消息),這是因為這個消息不能被消費LH928資訊網——每日最新資訊28at.com

之后會執行本地的事務,提交本地事務的執行狀態LH928資訊網——每日最新資訊28at.com

RocketMQ會根據事務的執行狀態去判斷commit或者是rollback消息,也就是是不是可以讓消費者消費這條消息的意思LH928資訊網——每日最新資訊28at.com

在一些異常情況下,生產者無法及時正確提交事務執行狀態LH928資訊網——每日最新資訊28at.com

RocketMQ會向生產者發送消息,讓生產者去檢查本地的事務,之后再提交事務狀態LH928資訊網——每日最新資訊28at.com

當然,這個檢查次數默認不超過15次,如果超過15次還未成功提交事務狀態,RocketMQ就會直接把這個消息存到TRANS_CHECK_MAX_TIME_TOPIC中LH928資訊網——每日最新資訊28at.com

請求-應答消息

這個消息類型比較有意思,類似一種RPC的模式LH928資訊網——每日最新資訊28at.com

生產者發送消息之后可以阻塞等待消費者消費這個消息的之后返回的結果LH928資訊網——每日最新資訊28at.com

生產者通過過調用request方法發送消息,接收回復消息LH928資訊網——每日最新資訊28at.com

public class Producer {    public static void main(String[] args) throws Exception {        //創建一個生產者,指定生產者組為 sanyouProducer        DefaultMQProducer producer = new DefaultMQProducer("sanyouProducer");        // 指定NameServer的地址        producer.setNamesrvAddr("192.168.200.143:9876");        // 啟動生產者        producer.start();        Message message = new Message("sanyouTopic", "三友的java日記".getBytes());                //發送消息,拿到響應結果, 3000代表超時時間,3s內未拿到響應結果,就超時,會拋出RequestTimeoutException異常        Message result = producer.request(message, 3000);        System.out.println("接收到響應消息:" + result);        // 關閉生產者        producer.shutdown();    }}

而對于消費者來著,當消費完消息之后,也要作為生產者,將響應的消息發送出去LH928資訊網——每日最新資訊28at.com

public class Consumer {    public static void main(String[] args) throws InterruptedException, MQClientException {        //創建一個生產者,指定生產者組為 sanyouProducer        DefaultMQProducer producer = new DefaultMQProducer("sanyouProducer");        // 指定NameServer的地址        producer.setNamesrvAddr("192.168.200.143:9876");        // 啟動生產者        producer.start();        // 通過push模式消費消息,指定消費者組        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sanyouConsumer");        // 指定NameServer的地址        consumer.setNamesrvAddr("192.168.200.143:9876");        // 訂閱這個topic下的所有的消息        consumer.subscribe("sanyouTopic", "*");        // 注冊一個消費的監聽器,當有消息的時候,會回調這個監聽器來消費消息        consumer.registerMessageListener(new MessageListenerConcurrently() {            @Override            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,                                                            ConsumeConcurrentlyContext context) {                for (MessageExt msg : msgs) {                    System.out.printf("消費消息:%s", new String(msg.getBody()) + "/n");                    try {                        // 用RocketMQ自帶的工具類創建響應消息                        Message replyMessage = MessageUtil.createReplyMessage(msg, "這是響應消息內容".getBytes(StandardCharsets.UTF_8));                        // 將響應消息發送出去,拿到發送結果                        SendResult replyResult = producer.send(replyMessage, 3000);                        System.out.println("響應消息的結果 = " + replyResult);                    } catch (Exception e) {                        e.printStackTrace();                    }                }                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;            }        });        // 啟動消費者        consumer.start();        System.out.printf("Consumer Started.%n");    }}

這種請求-應答消息實現原理也比較簡單,如下圖所示LH928資訊網——每日最新資訊28at.com

圖片圖片LH928資訊網——每日最新資訊28at.com

生產者和消費者,會跟RocketMQ服務端進行網絡連接LH928資訊網——每日最新資訊28at.com

所以他們都是通過這個連接來發送和拉取消息的LH928資訊網——每日最新資訊28at.com

當服務端接收到回復消息之后,有個專門處理回復消息的類LH928資訊網——每日最新資訊28at.com

圖片圖片LH928資訊網——每日最新資訊28at.com

這個類就會直接找到發送消息的生產者的連接,之后會通過這個連接將回復消息發送給生產者LH928資訊網——每日最新資訊28at.com

RocketMQ底層是基于Netty通信的,所以如果你有用過Netty的話,應該都知道,就是通過Channel來發送的LH928資訊網——每日最新資訊28at.com

重試消息

重試消息并不是我們業務中主動發送的,而是指當消費者消費消息失敗之后,會間隔一段時間之后再次消費這條消息LH928資訊網——每日最新資訊28at.com

重試的機制在并發消費模式和順序消費模式下實現的原理并不相同LH928資訊網——每日最新資訊28at.com

并發消費模式重試實現原理

RocetMQ會為每個消費者組創建一個重試消息所在的Topic,名字格式為LH928資訊網——每日最新資訊28at.com

%RETRY% + 消費者組名稱LH928資訊網——每日最新資訊28at.com

舉個例子,假設消費者組為sanyouConsumer,那么重試Topic的名稱為:%RETRY%sanyouConsumerLH928資訊網——每日最新資訊28at.com

當消息消費失敗后,RocketMQ會把消息存到這個Topic底下LH928資訊網——每日最新資訊28at.com

消費者在啟動的時候會主動去訂閱這個Topic,那么自然而然就能消費到消費失敗的消息了LH928資訊網——每日最新資訊28at.com

圖片圖片LH928資訊網——每日最新資訊28at.com

為什么要為每個消費者組創建一個重試Topic呢?LH928資訊網——每日最新資訊28at.com

其實我前面已經說過,每個消費者組的消費是隔離的,互不影響LH928資訊網——每日最新資訊28at.com

所以,每個消費者組消費失敗的消息可能就不一樣,自然要放到不同的Topic下了LH928資訊網——每日最新資訊28at.com

重試消息是如何實現間隔一段時間來消費呢?LH928資訊網——每日最新資訊28at.com

說到間隔一段時間消費,你有沒有覺得似曾相識?LH928資訊網——每日最新資訊28at.com

不錯,間隔一段時間消費說白了不就是延遲消費么!LH928資訊網——每日最新資訊28at.com

所以,并發消費模式下間隔一段時間底層就是使用的延遲消息來實現的LH928資訊網——每日最新資訊28at.com

RocetMQ會為重試消息設置一個延遲級別LH928資訊網——每日最新資訊28at.com

并且延遲級別與重試次數的關系為LH928資訊網——每日最新資訊28at.com

delayLevel = 3 + 已經重試次數LH928資訊網——每日最新資訊28at.com

比如第一次消費失敗,那么已經重試次數就是0,那么此時延遲級別就是3LH928資訊網——每日最新資訊28at.com

對應的默認的延遲時間就是10s,也就是一次消息重試消費間隔時間是10sLH928資訊網——每日最新資訊28at.com

隨著重試次數越多,延遲級別也越來越高,重試的間隔也就越來越長,但是最大也是最大延遲級別的時間LH928資訊網——每日最新資訊28at.com

不過需要注意的是,在并發消費模式下,只有集群消費才支持消息重試,對于廣播消費模式來說,是不支持消息重試的,消費失敗就失敗了,不會管LH928資訊網——每日最新資訊28at.com

順序消費模式重試實現原理

順序消費模式下重試就比較簡單了LH928資訊網——每日最新資訊28at.com

當消費失敗的時候,他并不會將消息發送到服務端,而是直接在本地等1s鐘之后重試LH928資訊網——每日最新資訊28at.com

在這個等待的期間其它消息是不能被消費的LH928資訊網——每日最新資訊28at.com

這是因為保證消息消費的順序性,即使前面的消息消費失敗了,它也需要等待前面的消息處理完畢才能處理后面的消息LH928資訊網——每日最新資訊28at.com

順序消費模式下,并發消費和集群消費均支持重試消息LH928資訊網——每日最新資訊28at.com

死信消息

死信消息就是指如果消息最終無法被正常消費,那么這條消息就會成為死信消息LH928資訊網——每日最新資訊28at.com

RocketMQ中,消息會變成死信消息有兩種情況LH928資訊網——每日最新資訊28at.com

第一種就是消息重試次數已經達到了最大重試次數LH928資訊網——每日最新資訊28at.com

最大重試次數取決于并發消費還是順序消費LH928資訊網——每日最新資訊28at.com

  • 順序消費,默認最大重試次數就是 Integer.MAX_VALUE,基本上就是無限次重試,所以默認情況下順序消費的消息幾乎不可能成為死信消息
  • 并發消費的話,那么最大重試次數默認就是16次

當然可以通過如下的方法來設置最大重試次數LH928資訊網——每日最新資訊28at.com

DefaultMQPushConsumer#setMaxReconsumeTimesLH928資訊網——每日最新資訊28at.com

除了上面的情況之外,當在并發消費模式下,你可以在消息消費失敗之后手動指定,直接讓消息變成死信消息LH928資訊網——每日最新資訊28at.com

在并發消費消息的模式下,處理消息的方法有這么一個參數LH928資訊網——每日最新資訊28at.com

ConsumeConcurrentlyContextLH928資訊網——每日最新資訊28at.com

圖片圖片LH928資訊網——每日最新資訊28at.com

這個類中有這么一個屬性LH928資訊網——每日最新資訊28at.com

圖片圖片LH928資訊網——每日最新資訊28at.com

這個參數值有三種情況,注釋也有寫:LH928資訊網——每日最新資訊28at.com

  • 小于0,那么直接會把消息放到死信隊列,成為死信消息。注釋寫的是=-1,其實只要小于0就可以成為死信消息,不一定非得是-1
  • 0,默認就是0,這個代表消息重試消費,并且重試的時間間隔(也就是延遲級別)由服務端決定,也即是前面重試消息提到的 delayLevel = 3 + 已經重試次數
  • 大于0,此時就表示客戶端指定消息重試的時間間隔,是幾就代表延遲級別為幾,比如設置成1,那么延遲級別就為1

所以,在并發消費模式下,可以通過設置這個參數值為-1,直接讓處理失敗的消息成為死信消息LH928資訊網——每日最新資訊28at.com

當消息成為死信消息之后,消息并不會丟失LH928資訊網——每日最新資訊28at.com

RocketMQ會將死信消息保存在死信Topic底下,Topic格式為LH928資訊網——每日最新資訊28at.com

%DLQ% + 消費者組名稱LH928資訊網——每日最新資訊28at.com

跟重試Topic的格式有點像,只是將%RETRY%換成了%DLQ%LH928資訊網——每日最新資訊28at.com

如果你想知道有哪些死信消息,只需要訂閱這個Topic即可獲得LH928資訊網——每日最新資訊28at.com

小總結

所以總的來說,兩種情況會讓消息成為死信消息:LH928資訊網——每日最新資訊28at.com

  • 消息重試次數超過最大次數,跟消息的處理方式有關,默認情況下順序處理最大次數是幾乎是無限次,也就是幾乎不可能成為死信消息;并發處理的情況下,最大重試次數默認就是16次。最大重試次數是可以設置的。
  • 在并發處理的情況下,通過ConsumeConcurrentlyContext將delayLevelWhenNextConsume屬性設置成-1,讓消息直接變成死信消息

當消息成為死信消息的時候,會被存到%DLQ% + 消費者組名稱這個Topic下LH928資訊網——每日最新資訊28at.com

用戶可以通過這個Topic獲取到死信消息,手動干預處理這些消息LH928資訊網——每日最新資訊28at.com

同步消息

同步消息是指,當生產者發送消息的時候,需要阻塞等待服務端響應消息存儲的結果LH928資訊網——每日最新資訊28at.com

同步消息跟前面提到的消息類型并不是互斥的LH928資訊網——每日最新資訊28at.com

比如前面說的普通消息時舉的例子,他就是同步發送的,那么它也是一個同步消息LH928資訊網——每日最新資訊28at.com

這種模式用于對數據一致性要求較高的場景中,但是等待也會消耗一定的時間LH928資訊網——每日最新資訊28at.com

異步消息

既然有了同步消息,那么相對應的就有異步消息LH928資訊網——每日最新資訊28at.com

異步消息就是指生產者發送消息后,不需要阻塞等待服務端存儲消息的結果LH928資訊網——每日最新資訊28at.com

所以異步消息的好處就是可以減少等待響應過程消耗的時間LH928資訊網——每日最新資訊28at.com

如果你想知道有沒有發送成功,可以在發送消息的時候傳個回調的接口SendCallback的實現LH928資訊網——每日最新資訊28at.com

Message message = new Message("sanyouTopic", "三友的java日記".getBytes());//異步發送消息producer.send(message, new SendCallback() {            @Override            public void onSuccess(SendResult sendResult) {                System.out.println("消息發送結果 = " + sendResult);            }            @Override            public void onException(Throwable e) {                System.out.println("消息發送異常 = " + e.getMessage());            }        });

當消息發送之后收到發送結果或者出現異常的時候,RocektMQ就會回調這個SendCallback實現類,你就可以知道消息發送的結果了LH928資訊網——每日最新資訊28at.com

單向消息

所謂的單向消息就是指,生產者發送消息給服務端之后,就直接不管了LH928資訊網——每日最新資訊28at.com

所以對于生產者來說,他是不會去care消息發送的結果了,即使發送失敗了,對于生產者來說也是無所謂的LH928資訊網——每日最新資訊28at.com

所以這種方式的主要應用于那種能夠忍受丟消息的操作場景LH928資訊網——每日最新資訊28at.com

比如像日志收集就比較適合使用這種方式LH928資訊網——每日最新資訊28at.com

單向消息的發送是通過sendOneway來調用的LH928資訊網——每日最新資訊28at.com

Message message = new Message("sanyouTopic", "三友的java日記".getBytes());//發送單向消息producer.sendOneway(message);

總的來說,同步消息、異步消息、單向消息代表的是消息的發送方式,主要是針對消息的發送方來說,對消息的存儲之類是的沒有任何影響的LH928資訊網——每日最新資訊28at.com

最后

ok,到這本文就結束了LH928資訊網——每日最新資訊28at.com

本文又又是一篇非常非常肝的文章,不知道你是否堅持看到這里LH928資訊網——每日最新資訊28at.com

我在寫的過程中也是不斷地死磕源碼,盡可能避免出現錯誤的內容LH928資訊網——每日最新資訊28at.com

同時也在嘗試爭取把我所看到的源碼以一種最簡單的方式說出來LH928資訊網——每日最新資訊28at.com

本文鏈接:http://www.tebozhan.com/showinfo-26-17669-0.html1.5萬字 + 25張圖盤點RocketMQ 11種消息類型,你知道幾種?

聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。郵件:2376512515@qq.com

上一篇: 聽說你會架構設計?來,弄一個群聊系統

下一篇: 多任務多場景問題解決方案與實踐

標簽:
  • 熱門焦點
  • Find N3入網:最高支持16+1TB

    OPPO將于近期登場的Find N3折疊屏目前已經正式入網,型號為PHN110。本次Find N3在外觀方面相比前兩代有很大的變化,不再是小號的橫向折疊屏,而是跟別的廠商一樣采用了較為常見的
  • 鴻蒙OS 4.0公測機型公布:甚至連nova6都支持

    華為全新的HarmonyOS 4.0操作系統將于今天下午正式登場,官方在發布會之前也已經正式給出了可升級的機型產品,這意味著這些機型會率先支持升級享用。這次的HarmonyOS 4.0支持
  • 小米降噪藍牙耳機Necklace分享:聽一首歌 讀懂一個故事

    在今天下午的小米Civi 2新品發布會上,小米還帶來了一款新的降噪藍牙耳機Necklace,我們也在發布結束的第一時間給大家帶來這款耳機的簡單分享。現在大家能見到最多的藍牙耳機
  • Rust中的高吞吐量流處理

    作者 | Noz編譯 | 王瑞平本篇文章主要介紹了Rust中流處理的概念、方法和優化。作者不僅介紹了流處理的基本概念以及Rust中常用的流處理庫,還使用這些庫實現了一個流處理程序
  • CSS單標簽實現轉轉logo

    轉轉品牌升級后更新了全新的Logo,今天我們用純CSS來實現轉轉的新Logo,為了有一定的挑戰性,這里我們只使用一個標簽實現,將最大化的使用CSS能力完成Logo的繪制與動畫效果。新logo
  • 三分鐘白話RocketMQ系列—— 如何發送消息

    我們知道RocketMQ主要分為消息 生產、存儲(消息堆積)、消費 三大塊領域。那接下來,我們白話一下,RocketMQ是如何發送消息的,揭秘消息生產全過程。注意,如果白話中不小心提到相關代
  • 騰訊蓋樓,字節拆墻

    來源 | 光子星球撰文 | 吳坤諺編輯 | 吳先之&ldquo;想重溫暴刷深淵、30+技能搭配暴搓到爽的游戲體驗嗎?一起上晶核,即刻暴打!&rdquo;曾憑借直播騰訊旗下代理格斗游戲《DNF》一
  • 新電商三兄弟,“抖快紅”成團!

    來源:價值研究所作 者:Hernanderz 隨著內容電商的概念興起,抖音、快手、小紅書組成的&ldquo;新電商三兄弟&rdquo;成為業內一股不可忽視的勢力,給阿里、京東、拼多多帶去了巨大壓
  • 蘋果140W USB-C充電器:采用氮化鎵技術

    據10 月 30 日 9to5 Mac 消息報道,當蘋果推出新的 MacBook Pro 2021 時,該公司還推出了新的 140W USB-C 充電器,附贈在 MacBook Pro 16 英寸機型的盒子里,也支
Top