AVt天堂网 手机版,亚洲va久久久噜噜噜久久4399,天天综合亚洲色在线精品,亚洲一级Av无码毛片久久精品

當前位置:首頁 > 科技  > 軟件

RabbitMQ工作模式-Publish/Subscribe發布與訂閱模式

來源: 責編: 時間:2023-11-10 17:08:11 331觀看
導讀訂閱模式類型訂閱模式示例圖:前面2個案例中,只有3個角色:P:生產者,也就是要發送消息的程序C:消費者:消息的接受者,會一直等待消息到來。queue:消息隊列,圖中紅色部分而在訂閱模型中,多了一個exchange角色,而且過程略有變化:P:生產者

oet28資訊網——每日最新資訊28at.com

訂閱模式類型

訂閱模式示例圖:oet28資訊網——每日最新資訊28at.com

oet28資訊網——每日最新資訊28at.com

前面2個案例中,只有3個角色:oet28資訊網——每日最新資訊28at.com

  • P:生產者,也就是要發送消息的程序
  • C:消費者:消息的接受者,會一直等待消息到來。
  • queue:消息隊列,圖中紅色部分

而在訂閱模型中,多了一個exchange角色,而且過程略有變化:oet28資訊網——每日最新資訊28at.com

  • P:生產者,也就是要發送消息的程序,但是不再發送到隊列中,而是發給X(交換機)
  • C:消費者,消息的接受者,會一直等待消息到來。
  • Queue:消息隊列,接收消息、緩存消息。
  • Exchange:交換機,圖中的X。一方面,接收生產者發送的消息。另一方面,知道如何處理消息,例如遞交給某個特別隊列、遞交給所有隊列、或是將消息丟棄。到底如何操作,取決于Exchange的類型。Exchange有常見以下3種類型:
  • Fanout:廣播,將消息交給所有綁定到交換機的隊列
  • Direct:定向,把消息交給符合指定routing key 的隊列
  • Topic:通配符,把消息交給符合routing pattern(路由模式) 的隊列

Exchange(交換機)只負責轉發消息,不具備存儲消息的能力,因此如果沒有任何隊列與Exchange綁定,或者沒有符合路由規則的隊列,那么消息會丟失!oet28資訊網——每日最新資訊28at.com

Publish/Subscribe發布與訂閱模式

1、模式說明

oet28資訊網——每日最新資訊28at.com

發布訂閱模式:oet28資訊網——每日最新資訊28at.com

每個消費者監聽自己的隊列。oet28資訊網——每日最新資訊28at.com

生產者將消息發給broker,由交換機將消息轉發到綁定此交換機的每個隊列,每個綁定交換機的隊列都將接收 到消息oet28資訊網——每日最新資訊28at.com

2、案例

(1)生產者

oet28資訊網——每日最新資訊28at.com

package com.lijw.producer;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * @author Aron.li * @date 2022/3/3 8:16 */public class Producer_PubSub {    public static void main(String[] args) throws IOException, TimeoutException {        //1.創建連接工廠        ConnectionFactory factory = new ConnectionFactory();        //2. 設置參數        factory.setHost("127.0.0.1"); // ip  默認值 localhost        factory.setPort(5672); //端口  默認值 5672        factory.setVirtualHost("/test"); //虛擬機 默認值 /        factory.setUsername("libai"); // 用戶名 默認 guest        factory.setPassword("libai"); //密碼 默認值 guest        //3. 創建連接 Connection        Connection connection = factory.newConnection();        //4. 創建Channel        Channel channel = connection.createChannel();        //5. 創建交換機        /*           exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)           參數:            1. exchange:交換機名稱            2. type:交換機類型                DIRECT("direct"):定向                FANOUT("fanout"):扇形(廣播),發送消息到每一個與之綁定隊列。                TOPIC("topic") 通配符的方式                HEADERS("headers") 參數匹配            3. durable:是否持久化            4. autoDelete:自動刪除            5. internal:內部使用。 一般false            6. arguments:參數        */        String exchangeName = "test_fanout";        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true, false, false, null);        //6. 創建隊列        String queue1Name = "test_fanout_queue1";        String queue2Name = "test_fanout_queue2";        channel.queueDeclare(queue1Name, true, false, false, null);        channel.queueDeclare(queue2Name, true, false, false, null);        // 7. 綁定隊列和交換機        /*            queueBind(String queue, String exchange, String routingKey)            參數:                1. queue:隊列名稱                2. exchange:交換機名稱                3. routingKey:路由鍵,綁定規則                    如果交換機的類型為fanout ,routingKey設置為""         */        channel.queueBind(queue1Name, exchangeName, "");        channel.queueBind(queue2Name, exchangeName, "");        //8. 發送消息至交換機,由交換機分發消息        String body = "日志信息: 肥仔白調用了findAll方法...日志級別: INFO....";        channel.basicPublish(exchangeName, "", null, body.getBytes());        //9. 釋放資源        channel.close();        connection.close();            }}

執行生產者,我們可以查看一下創建的 交換機 以及 隊列信息:oet28資訊網——每日最新資訊28at.com

oet28資訊網——每日最新資訊28at.com

oet28資訊網——每日最新資訊28at.com

下面再來看看隊列,如下:oet28資訊網——每日最新資訊28at.com

oet28資訊網——每日最新資訊28at.com

oet28資訊網——每日最新資訊28at.com

下面我們繼續來寫兩個消費者接收消息。oet28資訊網——每日最新資訊28at.com

(2)消費者1:讀取隊列1的消息

oet28資訊網——每日最新資訊28at.com

package com.lijw.consumer;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * @author Aron.li * @date 2022/3/2 16:16 */public class Consumer_PubSub1 {    //定義接收隊列的名稱    final static String queueName = "test_fanout_queue1";    public static void main(String[] args) throws IOException, TimeoutException {        //1.創建連接工廠        ConnectionFactory factory = new ConnectionFactory();        //2. 設置參數        factory.setHost("127.0.0.1"); // ip  默認值 localhost        factory.setPort(5672); //端口  默認值 5672        factory.setVirtualHost("/test"); //虛擬機 默認值 /        factory.setUsername("libai"); // 用戶名 默認 guest        factory.setPassword("libai"); //密碼 默認值 guest        //3. 創建連接 Connection        Connection connection = factory.newConnection();        //4. 創建Channel        Channel channel = connection.createChannel();        //5. 創建隊列Queue        /*        queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)        參數:            1. queue:隊列名稱            2. durable:是否持久化,當mq重啟之后,還在            3. exclusive:                * 是否獨占。只能有一個消費者監聽這隊列                * 當Connection關閉時,是否刪除隊列            4. autoDelete:是否自動刪除。當沒有Consumer時,自動刪除掉            5. arguments:參數。         */        channel.queueDeclare(queueName, true, false, false, null);        /*        basicConsume(String queue, boolean autoAck, Consumer callback)        參數:            1. queue:隊列名稱            2. autoAck:是否自動確認            3. callback:回調對象         */        // 接收消息        Consumer consumer = new DefaultConsumer(channel){            /*                回調方法,當收到消息后,會自動執行該方法                1. consumerTag:標識                2. envelope:獲取一些信息,交換機,路由key...                3. properties:配置信息                4. body:數據             */            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                System.out.println("接收隊列的數據 body: " + new String(body));            }        };        channel.basicConsume(queueName,true,consumer);        //不需要關閉資源,因為消費者需要持續監聽隊列信息    }}

(3)消費者2:讀取隊列2的消息

oet28資訊網——每日最新資訊28at.com

package com.lijw.consumer;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * @author Aron.li * @date 2022/3/2 16:16 */public class Consumer_PubSub2 {    //定義接收隊列的名稱    final static String queueName = "test_fanout_queue2";    public static void main(String[] args) throws IOException, TimeoutException {        //1.創建連接工廠        ConnectionFactory factory = new ConnectionFactory();        //2. 設置參數        factory.setHost("127.0.0.1"); // ip  默認值 localhost        factory.setPort(5672); //端口  默認值 5672        factory.setVirtualHost("/test"); //虛擬機 默認值 /        factory.setUsername("libai"); // 用戶名 默認 guest        factory.setPassword("libai"); //密碼 默認值 guest        //3. 創建連接 Connection        Connection connection = factory.newConnection();        //4. 創建Channel        Channel channel = connection.createChannel();        //5. 創建隊列Queue        /*        queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)        參數:            1. queue:隊列名稱            2. durable:是否持久化,當mq重啟之后,還在            3. exclusive:                * 是否獨占。只能有一個消費者監聽這隊列                * 當Connection關閉時,是否刪除隊列            4. autoDelete:是否自動刪除。當沒有Consumer時,自動刪除掉            5. arguments:參數。         */        channel.queueDeclare(queueName, true, false, false, null);        /*        basicConsume(String queue, boolean autoAck, Consumer callback)        參數:            1. queue:隊列名稱            2. autoAck:是否自動確認            3. callback:回調對象         */        // 接收消息        Consumer consumer = new DefaultConsumer(channel){            /*                回調方法,當收到消息后,會自動執行該方法                1. consumerTag:標識                2. envelope:獲取一些信息,交換機,路由key...                3. properties:配置信息                4. body:數據             */            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                System.out.println("接收隊列的數據 body: " + new String(body));            }        };        channel.basicConsume(queueName,true,consumer);        //不需要關閉資源,因為消費者需要持續監聽隊列信息    }}

3、測試

啟動所有消費者,然后使用生產者發送消息;在每個消費者對應的控制臺可以查看到生產者發送的所有消息;到達廣播的效果。oet28資訊網——每日最新資訊28at.com

  • 消費者1接收到的消息:

oet28資訊網——每日最新資訊28at.com

  • 消費者2接收到的消息:

oet28資訊網——每日最新資訊28at.com

從結果來看,生產者只需要發送一條消息,其余的消費者全部收到了消息,達到了廣播的效果。oet28資訊網——每日最新資訊28at.com

4、小結

交換機需要與隊列進行綁定,綁定之后;一個消息可以被多個消費者都收到。oet28資訊網——每日最新資訊28at.com

發布訂閱模式與工作隊列模式的區別:oet28資訊網——每日最新資訊28at.com

  • 工作隊列模式不用定義交換機,而發布/訂閱模式需要定義交換機。
  • 發布/訂閱模式的生產方是面向交換機發送消息,工作隊列模式的生產方是面向隊列發送消息(底層使用默認交換機)。
  • 發布/訂閱模式需要設置隊列和交換機的綁定,工作隊列模式不需要設置,實際上工作隊列模式會將隊列綁 定到默認的交換機 。

本文鏈接:http://www.tebozhan.com/showinfo-26-20057-0.htmlRabbitMQ工作模式-Publish/Subscribe發布與訂閱模式

聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。郵件:2376512515@qq.com

上一篇: Oracle數據庫調優實戰:優化SQL查詢的黃金法則!

下一篇: 學會使用Java的遠程調試工具,解決難題

標簽:
  • 熱門焦點
  • Find N3入網:最高支持16+1TB

    OPPO將于近期登場的Find N3折疊屏目前已經正式入網,型號為PHN110。本次Find N3在外觀方面相比前兩代有很大的變化,不再是小號的橫向折疊屏,而是跟別的廠商一樣采用了較為常見的
  • 鴻蒙OS 4.0公測機型公布:甚至連nova6都支持

    華為全新的HarmonyOS 4.0操作系統將于今天下午正式登場,官方在發布會之前也已經正式給出了可升級的機型產品,這意味著這些機型會率先支持升級享用。這次的HarmonyOS 4.0支持
  • Redmi Pad評測:紅米充滿野心的一次嘗試

    從Note系列到K系列,從藍牙耳機到筆記本電腦,紅米不知不覺之間也已經形成了自己頗有競爭力的產品體系,在中端和次旗艦市場上甚至要比小米新機的表現來得更好,正所謂“大丈夫生居
  • 得物效率前端微應用推進過程與思考

    一、背景效率工程隨著業務的發展,組織規模的擴大,越來越多的企業開始意識到協作效率對于企業團隊的重要性,甚至是決定其在某個行業競爭中突圍的關鍵,是企業長久生存的根本。得物
  • 只需五步,使用start.spring.io快速入門Spring編程

    步驟1打開https://start.spring.io/,按照屏幕截圖中的內容創建項目,添加 Spring Web 依賴項,并單擊“生成”按鈕下載 .zip 文件,為下一步做準備。請在進入步驟2之前進行解壓。圖
  • 一文掌握 Golang 模糊測試(Fuzz Testing)

    模糊測試(Fuzz Testing)模糊測試(Fuzz Testing)是通過向目標系統提供非預期的輸入并監視異常結果來發現軟件漏洞的方法。可以用來發現應用程序、操作系統和網絡協議等中的漏洞或
  • “又被陳思誠騙了”

    作者|張思齊 出品|眾面(ID:ZhongMian_ZM)如今的國產懸疑電影,成了陳思誠的天下。最近大爆電影《消失的她》票房突破30億斷層奪魁暑期檔,陳思誠再度風頭無兩。你可以說陳思誠的
  • 華為Mate60標準版細節曝光:經典星環相機模組回歸

    這段時間以來,關于華為新旗艦的爆料日漸密集。據此前多方爆料,今年華為將開始恢復一年雙旗艦戰略,除上半年推出的P60系列外,往年下半年的Mate系列也將
  • 英特爾Xe HPG游戲顯卡:擁有512EU,單風扇版本

    據10 月 30 日外媒 TheVerge 消息報道,英特爾 Xe HPG Arc Alchemist 的正面實被曝光,不僅擁有 512 EU 版顯卡,還擁有 128EU 的單風扇版本。另外,這款顯卡 PCB
Top