后續延遲隊列優化用Springboot整合,先理解死信隊列
<!--RabbitMQ依賴--> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.12.0</version> </dependency>
由于特定原因導致隊列中的消息不能被消費,這樣的消息如果沒有后續處理就可以放入死信隊列中,例如一個訂單如果超時未被支付從而自動失效,就將這個訂單放到死信隊列中。(死信隊列中的消息是可以被消費的)
就是在規定的時間內消息沒有被消費,(和延遲隊列不同,延遲隊列時表示到達時間消息才可以被消費)
在生產者代碼中設置消息過期時間:
//生產者發送消息,將消息設置為TTL消息 AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
修改隊列參數argument的特殊屬性:
arguments.put("x-dead-letter-exchange", EXCHANGE_DIRECT_DEAD);//死信交換機arguments.put("x-dead-letter-routing-key", "routingkey_direct-dead");//死信rotingkeyarguments.put("x-message-TTL", 10000);//設置過期時間(單位毫秒) //將死信交換機與死信隊列綁定
消費者1
public class Consumer01 { public static final String EXCHANGE_DIRECT = "exchange_direct";//普通交換機的名稱 public static final String EXCHANGE_DIRECT_DEAD = "exchange_direct_dead";//死信交換機的名稱 public static final String QUEUE_PLAIN = "queue_plain";//普通隊列的名稱 public static final String QUEUE_PLAIN_DEAD = "queue_plain_dead";//死信隊列的名稱 public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Channel channel = RabbitMqUtils.createChannel(); //聲明死信交換機和普通交換機 channel.exchangeDeclare(EXCHANGE_DIRECT, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(EXCHANGE_DIRECT_DEAD, BuiltinExchangeType.DIRECT); //聲明普通隊列(綁定普通隊列與死信交換機的關系,在通過rotingkey綁定死信隊列 Map<String, Object> arguments = new HashMap<>(); arguments.put("x-dead-letter-exchange", EXCHANGE_DIRECT_DEAD);//死信交換機 arguments.put("x-dead-letter-routing-key", "routingkey_direct-dead");//死信rotingkey //設置過期時間(單位毫秒) arguments.put("x-message-TTL", 10000); channel.queueDeclare(QUEUE_PLAIN, false, false, false, arguments); //聲明死信隊列 channel.queueDeclare(QUEUE_PLAIN_DEAD, false, false, false, null); //普通交換機和隊列的綁定 channel.queueBind(QUEUE_PLAIN, EXCHANGE_DIRECT, "routingkey_direct"); //死信交換機和死信隊列的綁定 channel.queueBind(QUEUE_PLAIN_DEAD, EXCHANGE_DIRECT_DEAD, "routingkey_direct-dead"); //模擬超時時間消息未被消費 Thread.sleep(1000000); channel.basicConsume(QUEUE_PLAIN, true, (consumerTag, message) -> { System.out.println("Consumer01.main接受到消息:" + new String(message.getBody())); }, (consumerTag, sig) -> { }); }}
生產者
public class Produce { public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.createChannel(); //生產者發送消息,將消息設置為TTL消息 AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build(); for (int i = 0; i < 10; i++) { String message = i + ""; channel.basicPublish(Consumer01.EXCHANGE_DIRECT,"routingkey_direct",properties,message.getBytes(StandardCharsets.UTF_8)); } }}
消費者2
public class Consumer2 { public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.createChannel(); channel.basicConsume(Consumer01.QUEUE_PLAIN_DEAD, true, (consumerTag, message) -> { System.out.println("Consumer2.main接受死信隊列的消息:" + new String(message.getBody())); }, (consumerTag, sig) -> { }); }}/**輸出結果:Consumer2.main接受死信隊列的消息:0Consumer2.main接受死信隊列的消息:1Consumer2.main接受死信隊列的消息:2Consumer2.main接受死信隊列的消息:3Consumer2.main接受死信隊列的消息:4Consumer2.main接受死信隊列的消息:5Consumer2.main接受死信隊列的消息:6Consumer2.main接受死信隊列的消息:7Consumer2.main接受死信隊列的消息:8Consumer2.main接受死信隊列的消息:9 */
將RabbiMQ的隊列的argument屬性的鍵設置為 x-max-length 表示隊列可以容納的最大條數
將自動應答設為false
在消費者調一個Channel.basicReject,設置參數requeue為false,表示不重新排隊,將消息丟到死信隊列
延遲隊列就是講一個消息延遲發送,例如消息在隊列中10s后才能被取出,可以通過RabbitMQ的插件或者死信隊列來實現
用死信隊列實現延遲隊列的思路:
在于死信隊列綁定的普通隊列不設置消費者,利用TTL延遲消息,當TTL時間過期后,到達死信隊列被消費這樣就形成一個延遲隊列。
延遲隊列的使用場景:①典型的就是流量削峰,對于不重要的消息,可以延遲消費,有助于減輕數據庫的壓力,強化分布式系統的高可用和并發性能。②還可以實現一個消息提醒,例如用戶三天未登錄發送一個消息提醒。
在實際生產中可能存在很多不同的延遲時間要求,不可能每一個延遲要求就創造一個隊列,我們可以用生產者實現延遲信息,而隊列不設置TTL就可以根據生產的延遲消息進行延遲發送。
但是此方法雖然實現了一個隊列就可以轉發不同延時時間的消息,但是有缺陷,隊列中的消息是排隊發送的,也就是說如果我第一條消息發送20s延時,接著第二條消息發送2s延時。最后卻是20s消息先消費,而2s消息后消費,因為RabbitMQ在檢測一條消息時發生了20s的阻塞。如下:
###GET http://localhost:8080/ttl/sendExpirationMessage/aaaaa/20000###GET http://localhost:8080/ttl/sendExpirationMessage/bbbbb/2000最后輸出結果是先消費aaaa后消費bbbb
可以通過RabbitMQ的插件實現延時隊列,此方法沒有這缺陷
從官網上下載對應版本的延遲插件,下載后如圖:交換機類型會多出一個 x-delayed-message
在我們自定義的交換機中,這是一種新的交換機類型,該類型消息支持延遲投遞機制,消息傳遞后并不會立即投遞到目標隊列中,而是存儲在mnesia(一個分布式數據系統)表中,當達到投遞時間時,才會投遞到目標隊列中。
代碼實例:
配置類:
@Configurationpublic class RabbitDelayedConfig { //延遲交換機 public static final String DELAYED_EXCHANGE = "delayed.exchange"; //延遲隊列b public static final String DELAYED_QUEUE = "delayed.queue"; //延遲交換機和隊列的routingkey public static final String DELAYED_ROTINGKEY = "delayed.routingkey"; //public CustomExchange(String name, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments) { // super(name, durable, autoDelete, arguments); // this.type = type; // } @Bean public CustomExchange delayedExchange() { Map<String, Object> arguments = new HashMap<>(); //定義延遲消息類型由那種交換機規則處置 arguments.put("x-delayed-type", "direct"); return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", false, false, arguments); } @Bean public Queue delayedQueue() { return QueueBuilder .nonDurable(DELAYED_QUEUE) .build(); } @Bean public Binding delayedBinding() { return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with(DELAYED_ROTINGKEY).noargs(); }}
生產者:
/*延遲交換機發送消息*/ @GetMapping("/sendDelayedMessage/{message}/{delayedTTL}") public void sendDelayedMessage(@PathVariable String message, @PathVariable Integer delayedTTL) { log.info("當前時間:{},發送一條延遲時間為{}的延遲消息給延遲隊列:{}", new Date().toString(), delayedTTL, message); rabbitTemplate.convertAndSend(RabbitDelayedConfig.DELAYED_EXCHANGE, RabbitDelayedConfig.DELAYED_ROTINGKEY, message, msg -> { msg.getMessageProperties().setDelay(delayedTTL);//設置消息的延遲消息時間 return msg; }); }
消費者:
@Slf4j@Componentpublic class DelayedQueueConsumer { @RabbitListener(queues = RabbitDelayedConfig.DELAYED_QUEUE) public void queue(Message message) { log.info("接受到延遲隊列的消息,當前時間:{},消息:{}",new Date().toString(),new String(message.getBody())); }}
本文鏈接:http://www.tebozhan.com/showinfo-26-12687-0.htmlSpringboot實現Rabbitmq死信隊列以及延遲隊列的優化
聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。郵件:2376512515@qq.com