百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 热门文章 > 正文

解锁RabbitMQ延迟队列:从原理到实战的秘籍

bigegpt 2025-03-19 10:42 10 浏览

延迟队列:消息世界的定时炸弹

在生活中,我们常常会遇到各种需要定时执行的任务。比如,在电商购物时,下单后如果一段时间内未支付,订单会自动取消;新用户注册成功后,系统可能会在几分钟后发送欢迎邮件。这些场景背后,都离不开延迟队列的支持。

在消息队列的世界里,延迟队列就像是一颗定时炸弹,它允许我们将消息在队列中延迟一段时间后再被消费,从而实现各种定时任务和异步处理的需求。对于开发分布式系统和高并发应用的程序员来说,掌握延迟队列的使用是一项必备技能。接下来,就让我们一起深入探索 RabbitMQ 中延迟队列的实现方式。

RabbitMQ 延迟队列的实现方案

方案一:TTL + DLX 实现延迟队列

在深入探讨这个方案之前,我们先来认识两个关键概念:TTL(Time To Live)和 DLX(Dead Letter Exchange)。

TTL 即消息的存活时间,它可以在消息发送时设置,也可以在队列声明时设置。如果一条消息设置了 TTL 属性,或者进入了设置 TTL 的队列,当这条消息在 TTL 内的时间未被消费,则该条消息会变成死信。如果同时配置了消息的 TTL 和队列的 TTL,那么较小的那个值会被使用。

DLX,也就是死信交换机,它本质上是一个普通的交换机,和创建其他交换机没有区别。当一个队列中的消息满足某些条件(如消息过期、被消费者拒收且 requeue 为 false、队列长度限制满了)时,这些消息会进入死信路由,被转发到 DLX,进而被路由到与 DLX 绑定的死信队列中。

接下来,我们通过一个实际案例来看看如何使用 TTL 和 DLX 实现延迟队列。假设我们有一个电商系统,订单下单后 15 分钟未支付,订单将自动取消。实现步骤如下:

  1. 创建队列和交换机:首先,创建一个普通队列order_queue,用于存放订单消息,并设置其死信交换机为dlx_exchange,死信路由键为dlx_routing_key。同时,创建一个死信队列dlx_queue,用于接收过期的订单消息,并将其与dlx_exchange通过dlx_routing_key进行绑定。
// 创建普通队列,设置死信交换机和死信路由键

Map args = new HashMap<>();

args.put("x-dead-letter-exchange", "dlx_exchange");

args.put("x-dead-letter-routing-key", "dlx_routing_key");

Queue orderQueue = new Queue("order_queue", true, false, false, args);

// 创建死信队列

Queue dlxQueue = new Queue("dlx_queue", true, false, false);

// 创建死信交换机

DirectExchange dlxExchange = new DirectExchange("dlx_exchange", true, false);

// 绑定死信队列到死信交换机

Binding binding = BindingBuilder.bind(dlxQueue).to(dlxExchange).with("dlx_routing_key");

  1. 发送消息:在订单下单时,将订单消息发送到order_queue,并设置消息的 TTL 为 15 分钟(900000 毫秒)。
// 发送订单消息到order_queue,设置TTL为15分钟

rabbitTemplate.convertAndSend("order_exchange", "order_routing_key", orderMessage, message -> {

message.getMessageProperties().setExpiration("900000");

return message;

});

  1. 消费消息:消费者监听dlx_queue,当有消息进入时,说明订单已超时未支付,执行订单取消操作。
@RabbitListener(queues = "dlx_queue")

public void handleOrderTimeout(String orderMessage) {

// 执行订单取消操作

System.out.println("订单超时未支付,取消订单:" + orderMessage);

}

通过以上步骤,我们就利用 TTL 和 DLX 实现了订单 15 分钟未支付自动取消的功能。这种方案的优点是不需要额外安装插件,基于 RabbitMQ 的原生功能即可实现。然而,它也存在一些局限性,比如消息过期后并不会立即被投递到死信队列,只有当过期的消息到了队列的顶端(队首),才会被真正的丢弃或者进入死信队列,这可能会导致延迟时间不够精确。

方案二:使用延迟插件实现延迟队列

为了克服 TTL + DLX 方案的局限性,我们可以使用 RabbitMQ 官方提供的延迟插件
rabbitmq-delayed-message-exchange。这个插件提供了更精确的延迟消息投递功能,它允许我们在发送消息时直接指定消息的延迟时间。

使用这个插件的优势显而易见。首先,它提供了更灵活的延迟时间设置,可以满足各种不同的业务需求。其次,延迟消息的投递更加精确,不会出现 TTL + DLX 方案中可能出现的延迟不精确问题。

接下来,我们详细介绍一下插件的安装步骤。这里以 Linux 系统为例,假设我们已经安装了 RabbitMQ 3.7.18 版本:

  1. 下载插件:访问插件下载地址https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases,根据 RabbitMQ 的版本选择对应的插件版本,下载后缀为.ez的插件文件,比如rabbitmq_delayed_message_exchange-3.9.0.ez。
  1. 放置插件目录:使用 FTP 工具或者其他文件传输方式,将下载好的插件文件上传到 RabbitMQ 的插件目录下。RabbitMQ 的默认插件目录为/usr/lib/rabbitmq/lib/rabbitmq_server-3.7.18/plugins。
  1. 启动插件:进入命令行,执行以下命令启动插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  1. 重启 RabbitMQ 服务:执行命令重启 RabbitMQ 服务,使插件生效。
systemctl restart rabbitmq-server
  1. 验收结果:打开浏览器,访问 RabbitMQ 的管理界面(通常为http://localhost:15672),在新建交换机的页面中,如果能看到类型为x-delayed-message的交换机选项,说明插件安装成功。

安装好插件后,我们来看看如何使用它实现延迟队列。同样以上述订单超时取消的场景为例,实现步骤如下:

  1. 创建延迟交换机和队列:创建一个类型为x-delayed-message的延迟交换机delayed_exchange,并设置其x-delayed-type属性为direct(也可以根据业务需求设置为topic或fanout)。同时,创建一个队列delayed_queue,并将其与delayed_exchange通过路由键delayed_routing_key进行绑定。
// 创建延迟交换机,设置x-delayed-type为direct

Map args = new HashMap<>();

args.put("x-delayed-type", "direct");

DirectExchange delayedExchange = new DirectExchange("delayed_exchange", true, false, args);

// 创建队列

Queue delayedQueue = new Queue("delayed_queue", true, false, false);

// 绑定队列到延迟交换机

Binding binding = BindingBuilder.bind(delayedQueue).to(delayedExchange).with("delayed_routing_key");

  1. 发送延迟消息:在订单下单时,将订单消息发送到delayed_exchange,并通过消息属性x-delay指定延迟时间为 15 分钟(900000 毫秒)。
// 发送订单消息到delayed_exchange,设置延迟时间为15分钟

rabbitTemplate.convertAndSend("delayed_exchange", "delayed_routing_key", orderMessage, message -> {

message.getMessageProperties().setHeader("x-delay", 900000);

return message;

});

  1. 消费消息:消费者监听delayed_queue,当有消息进入时,说明订单已超时未支付,执行订单取消操作。
@RabbitListener(queues = "delayed_queue")

public void handleOrderTimeout(String orderMessage) {

// 执行订单取消操作

System.out.println("订单超时未支付,取消订单:" + orderMessage);

}

通过使用延迟插件,我们实现了更精确的订单超时取消功能。这种方案在处理对延迟时间要求较高的业务场景时,具有明显的优势。

RabbitMQ 延迟队列代码实战

环境搭建

首先,我们创建一个 Spring Boot 项目。如果你使用的是 IDEA,可以通过 Spring Initializr 快速创建项目。在创建项目时,选择引入 Web 和 RabbitMQ 依赖。如果是手动添加依赖,在pom.xml文件中添加以下内容:

org.springframework.boot

spring-boot-starter-amqp

org.springframework.boot

spring-boot-starter-web

接着,在application.properties文件中配置 RabbitMQ 的基本信息:

spring.rabbitmq.host=localhost

spring.rabbitmq.port=5672

spring.rabbitmq.username=guest

spring.rabbitmq.password=guest

这样,我们的项目环境就搭建好了。

配置交换器和队列

接下来,我们创建一个配置类,用于声明交换机、队列和绑定关系。这里以使用延迟插件的方式为例:

import org.springframework.amqp.core.*;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import java.util.HashMap;

import java.util.Map;

@Configuration

public class RabbitMQConfig {

// 声明延迟交换机

@Bean

public CustomExchange delayedExchange() {

Map args = new HashMap<>();

// 设置延迟交换机类型为direct,也可以根据业务需求设置为topic或fanout

args.put("x-delayed-type", "direct");

return new CustomExchange("delayed_exchange", "x-delayed-message", true, false, args);

}

// 声明队列

@Bean

public Queue delayedQueue() {

return new Queue("delayed_queue", true, false, false);

}

// 绑定队列到延迟交换机

@Bean

public Binding binding() {

return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with("delayed_routing_key").noargs();

}

}

在上述代码中:

  • delayedExchange方法创建了一个类型为x-delayed-message的延迟交换机delayed_exchange,并通过args设置了x-delayed-type属性为direct,表示使用直连交换机的路由规则。
  • delayedQueue方法创建了一个名为delayed_queue的队列,true表示队列持久化,重启 RabbitMQ 后队列依然存在;false表示队列不是独占的,其他连接也可以访问;false表示队列在没有消费者连接时不会自动删除。
  • binding方法将delayed_queue队列与delayed_exchange交换机通过路由键delayed_routing_key进行绑定。

消息发送与接收

消息生产者代码示例:

import org.springframework.amqp.core.Message;

import org.springframework.amqp.core.MessageProperties;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;

@Component

public class MessageSender {

@Autowired

private RabbitTemplate rabbitTemplate;

public void send(String message, int delay) {

MessageProperties messageProperties = new MessageProperties();

// 设置消息延迟时间,单位为毫秒

messageProperties.setHeader("x-delay", delay);

Message msg = new Message(message.getBytes(), messageProperties);

// 发送消息到延迟交换机,指定路由键

rabbitTemplate.send("delayed_exchange", "delayed_routing_key", msg);

}

}

在send方法中,我们通过MessageProperties的setHeader方法设置了消息的x-delay属性,即延迟时间。然后将消息发送到delayed_exchange交换机,路由键为delayed_routing_key。

消息消费者代码示例:

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

@Component

public class MessageReceiver {

@RabbitListener(queues = "delayed_queue")

public void receive(String message) {

System.out.println("接收到延迟消息:" + message);

}

}

MessageReceiver类通过@RabbitListener注解监听delayed_queue队列,当有消息到达时,receive方法会被调用,处理接收到的消息。

通过以上代码,我们就完成了使用 RabbitMQ 延迟队列的基本功能实现。你可以在其他类中注入MessageSender,调用send方法发送延迟消息,观察MessageReceiver的输出结果。

应用案例与场景分析

外卖订单超时提醒

在外卖业务中,用户下单后,如果骑手在规定时间内未接单,系统需要发送提醒给骑手和用户。例如,设置订单 15 分钟内未被骑手接单,则触发提醒。使用 RabbitMQ 延迟队列实现步骤如下:

  1. 创建队列和交换机:创建一个普通队列take_order_queue,用于存放订单消息,并设置其死信交换机为dlx_exchange,死信路由键为dlx_routing_key。同时,创建一个死信队列dlx_take_order_queue,用于接收超时未接单的订单消息,并将其与dlx_exchange通过dlx_routing_key进行绑定。
  1. 发送消息:当用户下单时,将订单消息发送到take_order_queue,并设置消息的 TTL 为 15 分钟(900000 毫秒)。
  1. 消费消息:消费者监听dlx_take_order_queue,当有消息进入时,说明订单已超时未被接单,执行提醒操作,如发送短信或推送通知给骑手和用户。

在这个场景中,使用 RabbitMQ 延迟队列的优势在于能够高效地处理大量订单的超时提醒任务,避免了传统定时轮询方式对系统资源的浪费,提高了系统的响应速度和稳定性。

会议预定通知

在会议预定系统中,需要在会议开始前一定时间(如 30 分钟)通知参会人员。通过 RabbitMQ 延迟队列可以轻松实现这一功能:

  1. 创建队列和交换机:创建一个类型为x-delayed-message的延迟交换机meeting_notify_exchange,并设置其x-delayed-type属性为direct。同时,创建一个队列meeting_notify_queue,并将其与meeting_notify_exchange通过路由键meeting_notify_routing_key进行绑定。
  1. 发送消息:当会议预定成功时,将通知消息发送到meeting_notify_exchange,并通过消息属性x-delay指定延迟时间为 30 分钟(1800000 毫秒)。
  1. 消费消息:消费者监听meeting_notify_queue,当有消息进入时,说明会议即将开始,执行通知操作,如发送邮件或站内信给参会人员。

通过使用 RabbitMQ 延迟队列,会议预定通知的发送更加准确和及时,减少了人为疏忽导致的通知遗漏问题,提升了会议组织的效率和参会体验。

总结与展望

通过本文的介绍,我们深入了解了 RabbitMQ 延迟队列的两种主要实现方案:基于 TTL 和 DLX 的组合,以及使用延迟插件的方式。TTL + DLX 方案利用 RabbitMQ 的原生特性,无需额外插件即可实现延迟队列功能,但在延迟精度上存在一定的局限性。而延迟插件方案则提供了更精确的延迟控制,能够满足对时间精度要求较高的业务场景。

在实际应用中,RabbitMQ 延迟队列在订单处理、会议通知、外卖订单超时提醒等场景中发挥着重要作用,它能够帮助我们实现高效的异步处理和任务调度,提升系统的性能和用户体验。

希望读者通过本文的学习,能够掌握 RabbitMQ 延迟队列的实现方法,并在实际项目中灵活应用。同时,也鼓励大家进一步探索 RabbitMQ 的其他高级特性,挖掘其更多的潜力,为构建更加稳定、高效的分布式系统贡献力量。

相关推荐

最全的MySQL总结,助你向阿里“开炮”(面试题+笔记+思维图)

前言作为一名编程人员,对MySQL一定不会陌生,尤其是互联网行业,对MySQL的使用是比较多的。对于求职者来说,MySQL又是面试中一定会问到的重点,很多人拥有大厂梦,却因为MySQL败下阵来。实际上...

Redis数据库从入门到精通(redis数据库设计)

目录一、常见的非关系型数据库NOSQL分类二、了解Redis三、Redis的单节点安装教程四、Redis的常用命令1、Help帮助命令2、SET命令3、过期命令4、查找键命令5、操作键命令6、GET命...

netcore 急速接入第三方登录,不看后悔

新年新气象,趁着新年的喜庆,肝了十来天,终于发了第一版,希望大家喜欢。如果有不喜欢看文字的童鞋,可以直接看下面的地址体验一下:https://oauthlogin.net/前言此次带来得这个小项目是...

精选 30 个 C++ 面试题(含解析)(c++面试题和答案汇总)

大家好,我是柠檬哥,专注编程知识分享。欢迎关注@程序员柠檬橙,编程路上不迷路,私信发送以下关键字获取编程资源:发送1024打包下载10个G编程资源学习资料发送001获取阿里大神LeetCode...

Oracle 12c系列(一)|多租户容器数据库

作者杨禹航出品沃趣技术Oracle12.1发布至今已有多年,但国内Oracle12C的用户并不多,随着12.2在去年的发布,选择安装Oracle12c的客户量明显增加,在接下来的几年中,Or...

flutter系列之:UI layout简介(flutter-ui-nice)

简介对于一个前端框架来说,除了各个组件之外,最重要的就是将这些组件进行连接的布局了。布局的英文名叫做layout,就是用来描述如何将组件进行摆放的一个约束。在flutter中,基本上所有的对象都是wi...

Flutter 分页功能表格控件(flutter 列表)

老孟导读:前2天有读者问到是否有带分页功能的表格控件,今天分页功能的表格控件详细解析来来。PaginatedDataTablePaginatedDataTable是一个带分页功能的DataTable,...

Flutter | 使用BottomNavigationBar快速构建底部导航

平时我们在使用app时经常会看到底部导航栏,而在flutter中它的实现也较为简单.需要用到的组件:BottomNavigationBar导航栏的主体BottomNavigationBarI...

Android中的数据库和本地存储在Flutter中是怎样实现的

如何使用SharedPreferences?在Android中,你可以使用SharedPreferencesAPI来存储少量的键值对。在Flutter中,使用Shared_Pref...

Flet,一个Flutter应用的实用Python库!

▼Flet:用Python轻松构建跨平台应用!在纷繁复杂的Python框架中,Flet宛如一缕清风,为开发者带来极致的跨平台应用开发体验。它用最简单的Python代码,帮你实现移动端、桌面端...

flutter系列之:做一个图像滤镜(flutter photo)

简介很多时候,我们需要一些特效功能,比如给图片做个滤镜什么的,如果是h5页面,那么我们可以很容易的通过css滤镜来实现这个功能。那么如果在flutter中,如果要实现这样的滤镜功能应该怎么处理呢?一起...

flutter软件开发笔记20-flutter web开发

flutterweb开发优势比较多,采用统一的语言,就能开发不同类型的软件,在web开发中,特别是后台式软件中,相比传统的html5开发,更高效,有点像c++编程的方式,把web设计出来了。一...

Flutter实战-请求封装(五)之设置抓包Proxy

用了两年的flutter,有了一些心得,不虚头巴脑,只求实战有用,以供学习或使用flutter的小伙伴参考,学习尚浅,如有不正确的地方还望各路大神指正,以免误人子弟,在此拜谢~(原创不易,转发请标注来...

为什么不在 Flutter 中使用全局变量来管理状态

我相信没有人用全局变量来管理Flutter应用程序的状态。毫无疑问,我们的Flutter应用程序需要状态管理包或Flutter的基本小部件(例如InheritedWidget或St...

Flutter 攻略(Dart基本数据类型,变量 整理 2)

代码运行从main方法开始voidmain(){print("hellodart");}变量与常量var声明变量未初始化变量为nullvarc;//未初始化print(c)...