AVt天堂网 手机版,亚洲va久久久噜噜噜久久4399,天天综合亚洲色在线精品,亚洲一级Av无码毛片久久精品

當前位置:首頁 > 科技  > 軟件

深入理解 RocketMQ 廣播消費

來源: 責編: 時間:2023-09-28 10:09:12 329觀看
導讀這篇文章我們聊聊廣播消費,因為廣播消費在某些場景下真的有奇效。筆者會從基礎概念、實現機制、實戰案例三個方面一一展開,希望能幫助到大家。1 基礎概念RocketMQ 支持兩種消息模式:集群消費( Clustering )和廣播消費( Broa

這篇文章我們聊聊廣播消費,因為廣播消費在某些場景下真的有奇效。筆者會從基礎概念、實現機制、實戰案例三個方面一一展開,希望能幫助到大家。ZpX28資訊網——每日最新資訊28at.com

1 基礎概念

RocketMQ 支持兩種消息模式:集群消費( Clustering )和廣播消費( Broadcasting )。ZpX28資訊網——每日最新資訊28at.com

集群消費:ZpX28資訊網——每日最新資訊28at.com

同一 Topic 下的一條消息只會被同一消費組中的一個消費者消費。也就是說,消息被負載均衡到了同一個消費組的多個消費者實例上。ZpX28資訊網——每日最新資訊28at.com

圖片圖片ZpX28資訊網——每日最新資訊28at.com

廣播消費:ZpX28資訊網——每日最新資訊28at.com

當使用廣播消費模式時,每條消息推送給集群內所有的消費者,保證消息至少被每個消費者消費一次。ZpX28資訊網——每日最新資訊28at.com

圖片圖片ZpX28資訊網——每日最新資訊28at.com

2 源碼解析

首先下圖展示了廣播消費的代碼示例。ZpX28資訊網——每日最新資訊28at.com

public class PushConsumer {    public static final String CONSUMER_GROUP = "myconsumerGroup";    public static final String DEFAULT_NAMESRVADDR = "localhost:9876";    public static final String TOPIC = "mytest";    public static final String SUB_EXPRESSION = "TagA || TagC || TagD";    public static void main(String[] args) throws InterruptedException, MQClientException {        // 定義 DefaultPushConsumer         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);        // 定義名字服務地址        consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR);        // 定義消費讀取位點        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);        // 定義消費模式        consumer.setMessageModel(MessageModel.BROADCASTING);        // 訂閱主題信息        consumer.subscribe(TOPIC, SUB_EXPRESSION);        // 訂閱消息監聽器        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {            try {                for (MessageExt messageExt : msgs) {                    System.out.println(new String(messageExt.getBody()));                }            }catch (Exception e) {                e.printStackTrace();            }            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;        });        consumer.start();        System.out.printf("Broadcast Consumer Started.%n");    }}

和集群消費不同的點在于下面的代碼:ZpX28資訊網——每日最新資訊28at.com

consumer.setMessageModel(MessageModel.BROADCASTING);

接下來,我們從源碼角度來看看廣播消費和集群消費有哪些差異點 ?ZpX28資訊網——每日最新資訊28at.com

首先進入 DefaultMQPushConsumerImpl 類的 start 方法 , 分析啟動流程中他們兩者的差異點:ZpX28資訊網——每日最新資訊28at.com

圖片圖片ZpX28資訊網——每日最新資訊28at.com

▍ 差異點1:拷貝訂閱關系ZpX28資訊網——每日最新資訊28at.com

private void copySubscription() throws MQClientException {    try {       Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();       if (sub != null) {          for (final Map.Entry<String, String> entry : sub.entrySet()) {              final String topic = entry.getKey();              final String subString = entry.getValue();              SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, subString);                this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);            }        }       if (null == this.messageListenerInner) {          this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();       }       // 注意下面的代碼 , 集群模式下自動訂閱重試主題        switch (this.defaultMQPushConsumer.getMessageModel()) {           case BROADCASTING:               break;           case CLUSTERING:                final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());                SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(retryTopic, SubscriptionData.SUB_ALL);                this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);                break;            default:                break;        }    } catch (Exception e) {        throw new MQClientException("subscription exception", e);    }}

在集群模式下,會自動訂閱重試隊列,而廣播模式下,并沒有這段代碼。也就是說廣播模式下,不支持消息重試。ZpX28資訊網——每日最新資訊28at.com

▍ 差異點2:本地進度存儲ZpX28資訊網——每日最新資訊28at.com

switch (this.defaultMQPushConsumer.getMessageModel()) {    case BROADCASTING:        this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());        break;    case CLUSTERING:        this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());        break;    default:        break;}this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);

我們可以看到消費進度存儲的對象是:LocalFileOffsetStore , 進度文件存儲在如下的主目錄 /{用戶主目錄}/.rocketmq_offsets。ZpX28資訊網——每日最新資訊28at.com

public final static String LOCAL_OFFSET_STORE_DIR = System.getProperty(    "rocketmq.client.localOffsetStoreDir",    System.getProperty("user.home") + File.separator + ".rocketmq_offsets");

進度文件是 /mqClientId/{consumerGroupName}/offsets.json 。ZpX28資訊網——每日最新資訊28at.com

this.storePath = LOCAL_OFFSET_STORE_DIR + File.separator + this.mQClientFactory.getClientId() + File.separator + this.groupName + File.separator + "offsets.json";

筆者創建了一個主題 mytest , 包含4個隊列,進度文件內容如下:ZpX28資訊網——每日最新資訊28at.com

圖片圖片ZpX28資訊網——每日最新資訊28at.com

消費者啟動后,我們可以將整個流程簡化如下圖,并繼續整理差異點:ZpX28資訊網——每日最新資訊28at.com

圖片圖片ZpX28資訊網——每日最新資訊28at.com

▍ 差異點3:負載均衡消費該主題的所有 MessageQueueZpX28資訊網——每日最新資訊28at.com

進入負載均衡抽象類 RebalanceImpl 的rebalanceByTopic方法 。ZpX28資訊網——每日最新資訊28at.com

private void rebalanceByTopic(final String topic, final boolean isOrder) {    switch (messageModel) {        case BROADCASTING: {            Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);            if (mqSet != null) {                boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);                // 省略代碼            } else {                log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);            }            break;        }        case CLUSTERING: {            Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);            List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);            // 省略代碼            if (mqSet != null && cidAll != null) {                List<MessageQueue> mqAll = new ArrayList<MessageQueue>();                mqAll.addAll(mqSet);                Collections.sort(mqAll);                Collections.sort(cidAll);                AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;                List<MessageQueue> allocateResult = null;                try {                     allocateResult = strategy.allocate(                            this.consumerGroup,                            this.mQClientFactory.getClientId(),                            mqAll,                            cidAll);                    } catch (Throwable e) {                        // 省略日志打印代碼                        return;                    }                Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();                if (allocateResult != null) {                    allocateResultSet.addAll(allocateResult);                }                boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);                //省略代碼            }            break;        }        default:            break;    }}

從上面代碼我們可以看到消息模式為廣播消費模式時,消費者會消費該主題下所有的隊列,這一點也可以從本地的進度文件 offsets.json 得到印證。ZpX28資訊網——每日最新資訊28at.com

▍ 差異點4:不支持順序消息ZpX28資訊網——每日最新資訊28at.com

我們知道消費消息順序服務會向 Borker 申請鎖 。消費者根據分配的隊列 messageQueue ,向 Borker 申請鎖 ,如果申請成功,則會拉取消息,如果失敗,則定時任務每隔 20 秒會重新嘗試。ZpX28資訊網——每日最新資訊28at.com

if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {        @Override        public void run() {            try {                ConsumeMessageOrderlyService.this.lockMQPeriodically();            } catch (Throwable e) {                log.error("scheduleAtFixedRate lockMQPeriodically exception", e);            }        }    }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);}

但是從上面的代碼,我們發現只有在集群消費的時候才會定時申請鎖,這樣就會導致廣播消費時,無法為負載均衡的隊列申請鎖,導致拉取消息服務一直無法獲取消息數據。ZpX28資訊網——每日最新資訊28at.com

筆者修改消費例子,在消息模式為廣播模式的場景下,將消費模式從并發消費修改為順序消費。ZpX28資訊網——每日最新資訊28at.com

consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {    try {        for (MessageExt messageExt : msgs) {            System.out.println(new String(messageExt.getBody()));        }    }catch (Exception e) {        e.printStackTrace();    }    return ConsumeOrderlyStatus.SUCCESS;});

圖片圖片ZpX28資訊網——每日最新資訊28at.com

通過 IDEA DEBUG 圖,筆者觀察到因為負載均衡后的隊列無法獲取到鎖,所以拉取消息的線程無法發起拉取消息請求到 Broker , 也就不會走到消費消息的流程。ZpX28資訊網——每日最新資訊28at.com

因此,廣播消費模式并不支持順序消息。ZpX28資訊網——每日最新資訊28at.com

▍ 差異點5:并發消費消費失敗時,沒有重試ZpX28資訊網——每日最新資訊28at.com

進入并發消息消費類ConsumeMessageConcurrentlyService 的處理消費結果方法 processConsumeResult。ZpX28資訊網——每日最新資訊28at.com

switch (this.defaultMQPushConsumer.getMessageModel()) {    case BROADCASTING:        for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {            MessageExt msg = consumeRequest.getMsgs().get(i);            log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());        }        break;    case CLUSTERING:        List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());        for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {            MessageExt msg = consumeRequest.getMsgs().get(i);            boolean result = this.sendMessageBack(msg, context);            if (!result) {                msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);                msgBackFailed.add(msg);            }        }        if (!msgBackFailed.isEmpty()) {            consumeRequest.getMsgs().removeAll(msgBackFailed);            this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());        }        break;    default:        break;}

消費消息失敗后,集群消費時,消費者實例會通過 CONSUMER_SEND_MSG_BACK 請求,將失敗消息發回到 Broker 端。ZpX28資訊網——每日最新資訊28at.com

但在廣播模式下,僅僅是打印了消息信息。因此,廣播模式下,并沒有消息重試。ZpX28資訊網——每日最新資訊28at.com

3 實戰案例

廣播消費主要用于兩種場景:消息推送和緩存同步。ZpX28資訊網——每日最新資訊28at.com

3.1 消息推送

筆者第一次接觸廣播消費的業務場景是神州專車司機端的消息推送。ZpX28資訊網——每日最新資訊28at.com

用戶下單之后,訂單系統生成專車訂單,派單系統會根據相關算法將訂單派給某司機,司機端就會收到派單推送。ZpX28資訊網——每日最新資訊28at.com

圖片圖片ZpX28資訊網——每日最新資訊28at.com

推送服務是一個 TCP 服務(自定義協議),同時也是一個消費者服務,消息模式是廣播消費。ZpX28資訊網——每日最新資訊28at.com

司機打開司機端 APP 后,APP 會通過負載均衡和推送服務創建長連接,推送服務會保存 TCP 連接引用 (比如司機編號和 TCP channel 的引用)。ZpX28資訊網——每日最新資訊28at.com

派單服務是生產者,將派單數據發送到 MetaQ ,  每個推送服務都會消費到該消息,推送服務判斷本地內存中是否存在該司機的 TCP channel , 若存在,則通過 TCP 連接將數據推送給司機端。ZpX28資訊網——每日最新資訊28at.com

肯定有同學會問:假如網絡原因,推送失敗怎么處理 ?有兩個要點:ZpX28資訊網——每日最新資訊28at.com

  1. 司機端 APP 定時主動拉取派單信息;
  2. 當推送服務沒有收到司機端的 ACK 時 ,也會一定時限內再次推送,達到閾值后,不再推送。

3.2 緩存同步

高并發場景下,很多應用使用本地緩存,提升系統性能 。ZpX28資訊網——每日最新資訊28at.com

本地緩存可以是 HashMap 、ConcurrentHashMap ,也可以是緩存框架 Guava Cache 或者 Caffeine cache 。ZpX28資訊網——每日最新資訊28at.com

圖片圖片ZpX28資訊網——每日最新資訊28at.com

如上圖,應用A啟動后,作為一個 RocketMQ 消費者,消息模式設置為廣播消費。為了提升接口性能,每個應用節點都會將字典表加載到本地緩存里。ZpX28資訊網——每日最新資訊28at.com

當字典表數據變更時,可以通過業務系統發送一條消息到 RocketMQ ,每個應用節點都會消費消息,刷新本地緩存。ZpX28資訊網——每日最新資訊28at.com

4 總結

集群消費和廣播消費模式下,各功能的支持情況如下:ZpX28資訊網——每日最新資訊28at.com

功能ZpX28資訊網——每日最新資訊28at.com

集群消費ZpX28資訊網——每日最新資訊28at.com

廣播消費ZpX28資訊網——每日最新資訊28at.com

順序消息ZpX28資訊網——每日最新資訊28at.com

支持ZpX28資訊網——每日最新資訊28at.com

不支持ZpX28資訊網——每日最新資訊28at.com

重置消費位點ZpX28資訊網——每日最新資訊28at.com

支持ZpX28資訊網——每日最新資訊28at.com

不支持ZpX28資訊網——每日最新資訊28at.com

消息重試ZpX28資訊網——每日最新資訊28at.com

支持ZpX28資訊網——每日最新資訊28at.com

不支持ZpX28資訊網——每日最新資訊28at.com

消費進度ZpX28資訊網——每日最新資訊28at.com

服務端維護ZpX28資訊網——每日最新資訊28at.com

客戶端維護ZpX28資訊網——每日最新資訊28at.com

廣播消費主要用于兩種場景:消息推送和緩存同步。ZpX28資訊網——每日最新資訊28at.com

參考資料 :ZpX28資訊網——每日最新資訊28at.com

https://www.51cto.com/article/714277.htmlZpX28資訊網——每日最新資訊28at.com

https://ost.51cto.com/posts/21100ZpX28資訊網——每日最新資訊28at.com

本文鏈接:http://www.tebozhan.com/showinfo-26-11888-0.html深入理解 RocketMQ 廣播消費

聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。郵件:2376512515@qq.com

上一篇: 消息隊列技術選型:這七種消息場景一定要考慮!

下一篇: 從零開發可視化大屏制作平臺

標簽:
  • 熱門焦點
  • 中興AX5400Pro+上手體驗:再升級 雙2.5G網口+USB 3.0這次全都有

    2021年11月的時候,中興先后發布了兩款路由器產品,中興AX5400和中興AX5400 Pro,從產品命名上就不難看出這是隸屬于同一系列的,但在外觀設計上這兩款產品可以說是完全沒一點關系
  • 2023 年的 Node.js 生態系統

    隨著技術的不斷演進和創新,Node.js 在 2023 年達到了一個新的高度。Node.js 擁有一個龐大的生態系統,可以幫助開發人員更快地實現復雜的應用。本文就來看看 Node.js 最新的生
  • 讓我們一起聊聊文件的操作

    文件【1】文件是什么?文件是保存數據的地方,是數據源的一種,比如大家經常使用的word文檔、txt文件、excel文件、jpg文件...都是文件。文件最主要的作用就是保存數據,它既可以保
  • 零售大模型“干中學”,攀爬數字化珠峰

    文/侯煜編輯/cc來源/華爾街科技眼對于絕大多數登山愛好者而言,攀爬珠穆朗瑪峰可謂終極目標。攀登珠峰的商業路線有兩條,一是尼泊爾境內的南坡路線,一是中國境內的北坡路線。相
  • 本地生活這塊肥肉,拼多多也想吃一口

    出品/壹覽商業 作者/李彥編輯/木魚拼多多也看上本地生活這塊蛋糕了。近期,拼多多在App首頁&ldquo;充值中心&rdquo;入口上線了本機生活界面。壹覽商業發現,該界面目前主要
  • 消費結構調整丨巨頭低價博弈,拼多多還卷得動嗎?

    來源:征探財經作者:陳香羽隨著流量紅利的退潮,電商的存量博弈越來越明顯。曾經主攻中高端與品質的淘寶天貓、京東重拾&ldquo;低價&rdquo;口號。而過去與他們錯位競爭的拼多多,靠
  • 造車兩年股價跌六成,小米的估值邏輯變了嗎?

    如果從小米官宣造車后的首個交易日起持有小米集團的股票,那么截至2023年上半年最后一個交易日,投資者將浮虧59.16%,同區間的恒生科技指數跌幅為52.78%
  • 引領旗艦級影像能力向中端機普及 OPPO K11 系列發布 1799 元起

    7月25日,OPPO正式發布K系列新品—— OPPO K11 。此次 K11 在中端手機市場長期被忽視的影像板塊發力,突破性地搭載索尼 IMX890 旗艦大底主攝,支持 OIS
  • 2022爆款:ROG魔霸6 冰川散熱系統持續護航

    喜逢開學季,各大商家開始推出自己的新產品,進行打折促銷活動。對于忠實的端游愛好者來說,能夠擁有一款夢寐以求的筆記本電腦是一件十分開心的事。但是現在的
Top