這篇文章我們聊聊廣播消費(fèi),因?yàn)閺V播消費(fèi)在某些場(chǎng)景下真的有奇效。筆者會(huì)從基礎(chǔ)概念、實(shí)現(xiàn)機(jī)制、實(shí)戰(zhàn)案例三個(gè)方面一一展開,希望能幫助到大家。
RocketMQ 支持兩種消息模式:集群消費(fèi)( Clustering )和廣播消費(fèi)( Broadcasting )。
集群消費(fèi):
同一 Topic 下的一條消息只會(huì)被同一消費(fèi)組中的一個(gè)消費(fèi)者消費(fèi)。也就是說(shuō),消息被負(fù)載均衡到了同一個(gè)消費(fèi)組的多個(gè)消費(fèi)者實(shí)例上。
圖片
廣播消費(fèi):
當(dāng)使用廣播消費(fèi)模式時(shí),每條消息推送給集群內(nèi)所有的消費(fèi)者,保證消息至少被每個(gè)消費(fèi)者消費(fèi)一次。
圖片
首先下圖展示了廣播消費(fèi)的代碼示例。
public class PushConsumer { public static final String CONSUMER_GROUP = "myconsumerGroup"; public static final String DEFAULT_NAMESRVADDR = "localhost:9876"; public static final String TOPIC = "mytest"; public static final String SUB_EXPRESSION = "TagA || TagC || TagD"; public static void main(String[] args) throws InterruptedException, MQClientException { // 定義 DefaultPushConsumer DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP); // 定義名字服務(wù)地址 consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR); // 定義消費(fèi)讀取位點(diǎn) consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); // 定義消費(fèi)模式 consumer.setMessageModel(MessageModel.BROADCASTING); // 訂閱主題信息 consumer.subscribe(TOPIC, SUB_EXPRESSION); // 訂閱消息監(jiān)聽器 consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { try { for (MessageExt messageExt : msgs) { System.out.println(new String(messageExt.getBody())); } }catch (Exception e) { e.printStackTrace(); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); consumer.start(); System.out.printf("Broadcast Consumer Started.%n"); }}
和集群消費(fèi)不同的點(diǎn)在于下面的代碼:
consumer.setMessageModel(MessageModel.BROADCASTING);
接下來(lái),我們從源碼角度來(lái)看看廣播消費(fèi)和集群消費(fèi)有哪些差異點(diǎn) ?
首先進(jìn)入 DefaultMQPushConsumerImpl 類的 start 方法 , 分析啟動(dòng)流程中他們兩者的差異點(diǎn):
圖片
▍ 差異點(diǎn)1:拷貝訂閱關(guān)系
private void copySubscription() throws MQClientException { try { Map<String, String> sub = this.defaultMQPushConsumer.getSubscription(); if (sub != null) { for (final Map.Entry<String, String> entry : sub.entrySet()) { final String topic = entry.getKey(); final String subString = entry.getValue(); SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, subString); this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); } } if (null == this.messageListenerInner) { this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener(); } // 注意下面的代碼 , 集群模式下自動(dòng)訂閱重試主題 switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: break; case CLUSTERING: final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()); SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(retryTopic, SubscriptionData.SUB_ALL); this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData); break; default: break; } } catch (Exception e) { throw new MQClientException("subscription exception", e); }}
在集群模式下,會(huì)自動(dòng)訂閱重試隊(duì)列,而廣播模式下,并沒(méi)有這段代碼。也就是說(shuō)廣播模式下,不支持消息重試。
▍ 差異點(diǎn)2:本地進(jìn)度存儲(chǔ)
switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); break; case CLUSTERING: this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); break; default: break;}this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
我們可以看到消費(fèi)進(jìn)度存儲(chǔ)的對(duì)象是:LocalFileOffsetStore , 進(jìn)度文件存儲(chǔ)在如下的主目錄 /{用戶主目錄}/.rocketmq_offsets。
public final static String LOCAL_OFFSET_STORE_DIR = System.getProperty( "rocketmq.client.localOffsetStoreDir", System.getProperty("user.home") + File.separator + ".rocketmq_offsets");
進(jìn)度文件是 /mqClientId/{consumerGroupName}/offsets.json 。
this.storePath = LOCAL_OFFSET_STORE_DIR + File.separator + this.mQClientFactory.getClientId() + File.separator + this.groupName + File.separator + "offsets.json";
筆者創(chuàng)建了一個(gè)主題 mytest , 包含4個(gè)隊(duì)列,進(jìn)度文件內(nèi)容如下:
圖片
消費(fèi)者啟動(dòng)后,我們可以將整個(gè)流程簡(jiǎn)化如下圖,并繼續(xù)整理差異點(diǎn):
圖片
▍ 差異點(diǎn)3:負(fù)載均衡消費(fèi)該主題的所有 MessageQueue
進(jìn)入負(fù)載均衡抽象類 RebalanceImpl 的rebalanceByTopic方法 。
private void rebalanceByTopic(final String topic, final boolean isOrder) { switch (messageModel) { case BROADCASTING: { Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); if (mqSet != null) { boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder); // 省略代碼 } else { log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic); } break; } case CLUSTERING: { Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup); // 省略代碼 if (mqSet != null && cidAll != null) { List<MessageQueue> mqAll = new ArrayList<MessageQueue>(); mqAll.addAll(mqSet); Collections.sort(mqAll); Collections.sort(cidAll); AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy; List<MessageQueue> allocateResult = null; try { allocateResult = strategy.allocate( this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll); } catch (Throwable e) { // 省略日志打印代碼 return; } Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>(); if (allocateResult != null) { allocateResultSet.addAll(allocateResult); } boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder); //省略代碼 } break; } default: break; }}
從上面代碼我們可以看到消息模式為廣播消費(fèi)模式時(shí),消費(fèi)者會(huì)消費(fèi)該主題下所有的隊(duì)列,這一點(diǎn)也可以從本地的進(jìn)度文件 offsets.json 得到印證。
▍ 差異點(diǎn)4:不支持順序消息
我們知道消費(fèi)消息順序服務(wù)會(huì)向 Borker 申請(qǐng)鎖 。消費(fèi)者根據(jù)分配的隊(duì)列 messageQueue ,向 Borker 申請(qǐng)鎖 ,如果申請(qǐng)成功,則會(huì)拉取消息,如果失敗,則定時(shí)任務(wù)每隔 20 秒會(huì)重新嘗試。
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { ConsumeMessageOrderlyService.this.lockMQPeriodically(); } catch (Throwable e) { log.error("scheduleAtFixedRate lockMQPeriodically exception", e); } } }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);}
但是從上面的代碼,我們發(fā)現(xiàn)只有在集群消費(fèi)的時(shí)候才會(huì)定時(shí)申請(qǐng)鎖,這樣就會(huì)導(dǎo)致廣播消費(fèi)時(shí),無(wú)法為負(fù)載均衡的隊(duì)列申請(qǐng)鎖,導(dǎo)致拉取消息服務(wù)一直無(wú)法獲取消息數(shù)據(jù)。
筆者修改消費(fèi)例子,在消息模式為廣播模式的場(chǎng)景下,將消費(fèi)模式從并發(fā)消費(fèi)修改為順序消費(fèi)。
consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> { try { for (MessageExt messageExt : msgs) { System.out.println(new String(messageExt.getBody())); } }catch (Exception e) { e.printStackTrace(); } return ConsumeOrderlyStatus.SUCCESS;});
圖片
推送服務(wù)是一個(gè) TCP 服務(wù)(自定義協(xié)議),同時(shí)也是一個(gè)消費(fèi)者服務(wù),消息模式是廣播消費(fèi)。
司機(jī)打開司機(jī)端 APP 后,APP 會(huì)通過(guò)負(fù)載均衡和推送服務(wù)創(chuàng)建長(zhǎng)連接,推送服務(wù)會(huì)保存 TCP 連接引用 (比如司機(jī)編號(hào)和 TCP channel 的引用)。
派單服務(wù)是生產(chǎn)者,將派單數(shù)據(jù)發(fā)送到 MetaQ , 每個(gè)推送服務(wù)都會(huì)消費(fèi)到該消息,推送服務(wù)判斷本地內(nèi)存中是否存在該司機(jī)的 TCP channel , 若存在,則通過(guò) TCP 連接將數(shù)據(jù)推送給司機(jī)端。
肯定有同學(xué)會(huì)問(wèn):假如網(wǎng)絡(luò)原因,推送失敗怎么處理 ?有兩個(gè)要點(diǎn):
高并發(fā)場(chǎng)景下,很多應(yīng)用使用本地緩存,提升系統(tǒng)性能 。
本地緩存可以是 HashMap 、ConcurrentHashMap ,也可以是緩存框架 Guava Cache 或者 Caffeine cache 。
圖片
如上圖,應(yīng)用A啟動(dòng)后,作為一個(gè) RocketMQ 消費(fèi)者,消息模式設(shè)置為廣播消費(fèi)。為了提升接口性能,每個(gè)應(yīng)用節(jié)點(diǎn)都會(huì)將字典表加載到本地緩存里。
當(dāng)字典表數(shù)據(jù)變更時(shí),可以通過(guò)業(yè)務(wù)系統(tǒng)發(fā)送一條消息到 RocketMQ ,每個(gè)應(yīng)用節(jié)點(diǎn)都會(huì)消費(fèi)消息,刷新本地緩存。
集群消費(fèi)和廣播消費(fèi)模式下,各功能的支持情況如下:
功能 | 集群消費(fèi) | 廣播消費(fèi) |
順序消息 | 支持 | 不支持 |
重置消費(fèi)位點(diǎn) | 支持 | 不支持 |
消息重試 | 支持 | 不支持 |
消費(fèi)進(jìn)度 | 服務(wù)端維護(hù) | 客戶端維護(hù) |
廣播消費(fèi)主要用于兩種場(chǎng)景:消息推送和緩存同步。
參考資料 :
https://www.51cto.com/article/714277.html
https://ost.51cto.com/posts/21100
本文鏈接:http://www.tebozhan.com/showinfo-26-11888-0.html深入理解 RocketMQ 廣播消費(fèi)
聲明:本網(wǎng)頁(yè)內(nèi)容旨在傳播知識(shí),若有侵權(quán)等問(wèn)題請(qǐng)及時(shí)與本網(wǎng)聯(lián)系,我們將在第一時(shí)間刪除處理。郵件:2376512515@qq.com