百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 热门文章 > 正文

解锁RabbitMQ延迟队列:从原理到实战的秘籍

bigegpt 2025-03-19 10:42 8 浏览

延迟队列:消息世界的定时炸弹

在生活中,我们常常会遇到各种需要定时执行的任务。比如,在电商购物时,下单后如果一段时间内未支付,订单会自动取消;新用户注册成功后,系统可能会在几分钟后发送欢迎邮件。这些场景背后,都离不开延迟队列的支持。

在消息队列的世界里,延迟队列就像是一颗定时炸弹,它允许我们将消息在队列中延迟一段时间后再被消费,从而实现各种定时任务和异步处理的需求。对于开发分布式系统和高并发应用的程序员来说,掌握延迟队列的使用是一项必备技能。接下来,就让我们一起深入探索 RabbitMQ 中延迟队列的实现方式。

RabbitMQ 延迟队列的实现方案

方案一:TTL + DLX 实现延迟队列

在深入探讨这个方案之前,我们先来认识两个关键概念:TTL(Time To Live)和 DLX(Dead Letter Exchange)。

TTL 即消息的存活时间,它可以在消息发送时设置,也可以在队列声明时设置。如果一条消息设置了 TTL 属性,或者进入了设置 TTL 的队列,当这条消息在 TTL 内的时间未被消费,则该条消息会变成死信。如果同时配置了消息的 TTL 和队列的 TTL,那么较小的那个值会被使用。

DLX,也就是死信交换机,它本质上是一个普通的交换机,和创建其他交换机没有区别。当一个队列中的消息满足某些条件(如消息过期、被消费者拒收且 requeue 为 false、队列长度限制满了)时,这些消息会进入死信路由,被转发到 DLX,进而被路由到与 DLX 绑定的死信队列中。

接下来,我们通过一个实际案例来看看如何使用 TTL 和 DLX 实现延迟队列。假设我们有一个电商系统,订单下单后 15 分钟未支付,订单将自动取消。实现步骤如下:

  1. 创建队列和交换机:首先,创建一个普通队列order_queue,用于存放订单消息,并设置其死信交换机为dlx_exchange,死信路由键为dlx_routing_key。同时,创建一个死信队列dlx_queue,用于接收过期的订单消息,并将其与dlx_exchange通过dlx_routing_key进行绑定。
// 创建普通队列,设置死信交换机和死信路由键

Map args = new HashMap<>();

args.put("x-dead-letter-exchange", "dlx_exchange");

args.put("x-dead-letter-routing-key", "dlx_routing_key");

Queue orderQueue = new Queue("order_queue", true, false, false, args);

// 创建死信队列

Queue dlxQueue = new Queue("dlx_queue", true, false, false);

// 创建死信交换机

DirectExchange dlxExchange = new DirectExchange("dlx_exchange", true, false);

// 绑定死信队列到死信交换机

Binding binding = BindingBuilder.bind(dlxQueue).to(dlxExchange).with("dlx_routing_key");

  1. 发送消息:在订单下单时,将订单消息发送到order_queue,并设置消息的 TTL 为 15 分钟(900000 毫秒)。
// 发送订单消息到order_queue,设置TTL为15分钟

rabbitTemplate.convertAndSend("order_exchange", "order_routing_key", orderMessage, message -> {

message.getMessageProperties().setExpiration("900000");

return message;

});

  1. 消费消息:消费者监听dlx_queue,当有消息进入时,说明订单已超时未支付,执行订单取消操作。
@RabbitListener(queues = "dlx_queue")

public void handleOrderTimeout(String orderMessage) {

// 执行订单取消操作

System.out.println("订单超时未支付,取消订单:" + orderMessage);

}

通过以上步骤,我们就利用 TTL 和 DLX 实现了订单 15 分钟未支付自动取消的功能。这种方案的优点是不需要额外安装插件,基于 RabbitMQ 的原生功能即可实现。然而,它也存在一些局限性,比如消息过期后并不会立即被投递到死信队列,只有当过期的消息到了队列的顶端(队首),才会被真正的丢弃或者进入死信队列,这可能会导致延迟时间不够精确。

方案二:使用延迟插件实现延迟队列

为了克服 TTL + DLX 方案的局限性,我们可以使用 RabbitMQ 官方提供的延迟插件
rabbitmq-delayed-message-exchange。这个插件提供了更精确的延迟消息投递功能,它允许我们在发送消息时直接指定消息的延迟时间。

使用这个插件的优势显而易见。首先,它提供了更灵活的延迟时间设置,可以满足各种不同的业务需求。其次,延迟消息的投递更加精确,不会出现 TTL + DLX 方案中可能出现的延迟不精确问题。

接下来,我们详细介绍一下插件的安装步骤。这里以 Linux 系统为例,假设我们已经安装了 RabbitMQ 3.7.18 版本:

  1. 下载插件:访问插件下载地址https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases,根据 RabbitMQ 的版本选择对应的插件版本,下载后缀为.ez的插件文件,比如rabbitmq_delayed_message_exchange-3.9.0.ez。
  1. 放置插件目录:使用 FTP 工具或者其他文件传输方式,将下载好的插件文件上传到 RabbitMQ 的插件目录下。RabbitMQ 的默认插件目录为/usr/lib/rabbitmq/lib/rabbitmq_server-3.7.18/plugins。
  1. 启动插件:进入命令行,执行以下命令启动插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  1. 重启 RabbitMQ 服务:执行命令重启 RabbitMQ 服务,使插件生效。
systemctl restart rabbitmq-server
  1. 验收结果:打开浏览器,访问 RabbitMQ 的管理界面(通常为http://localhost:15672),在新建交换机的页面中,如果能看到类型为x-delayed-message的交换机选项,说明插件安装成功。

安装好插件后,我们来看看如何使用它实现延迟队列。同样以上述订单超时取消的场景为例,实现步骤如下:

  1. 创建延迟交换机和队列:创建一个类型为x-delayed-message的延迟交换机delayed_exchange,并设置其x-delayed-type属性为direct(也可以根据业务需求设置为topic或fanout)。同时,创建一个队列delayed_queue,并将其与delayed_exchange通过路由键delayed_routing_key进行绑定。
// 创建延迟交换机,设置x-delayed-type为direct

Map args = new HashMap<>();

args.put("x-delayed-type", "direct");

DirectExchange delayedExchange = new DirectExchange("delayed_exchange", true, false, args);

// 创建队列

Queue delayedQueue = new Queue("delayed_queue", true, false, false);

// 绑定队列到延迟交换机

Binding binding = BindingBuilder.bind(delayedQueue).to(delayedExchange).with("delayed_routing_key");

  1. 发送延迟消息:在订单下单时,将订单消息发送到delayed_exchange,并通过消息属性x-delay指定延迟时间为 15 分钟(900000 毫秒)。
// 发送订单消息到delayed_exchange,设置延迟时间为15分钟

rabbitTemplate.convertAndSend("delayed_exchange", "delayed_routing_key", orderMessage, message -> {

message.getMessageProperties().setHeader("x-delay", 900000);

return message;

});

  1. 消费消息:消费者监听delayed_queue,当有消息进入时,说明订单已超时未支付,执行订单取消操作。
@RabbitListener(queues = "delayed_queue")

public void handleOrderTimeout(String orderMessage) {

// 执行订单取消操作

System.out.println("订单超时未支付,取消订单:" + orderMessage);

}

通过使用延迟插件,我们实现了更精确的订单超时取消功能。这种方案在处理对延迟时间要求较高的业务场景时,具有明显的优势。

RabbitMQ 延迟队列代码实战

环境搭建

首先,我们创建一个 Spring Boot 项目。如果你使用的是 IDEA,可以通过 Spring Initializr 快速创建项目。在创建项目时,选择引入 Web 和 RabbitMQ 依赖。如果是手动添加依赖,在pom.xml文件中添加以下内容:

org.springframework.boot

spring-boot-starter-amqp

org.springframework.boot

spring-boot-starter-web

接着,在application.properties文件中配置 RabbitMQ 的基本信息:

spring.rabbitmq.host=localhost

spring.rabbitmq.port=5672

spring.rabbitmq.username=guest

spring.rabbitmq.password=guest

这样,我们的项目环境就搭建好了。

配置交换器和队列

接下来,我们创建一个配置类,用于声明交换机、队列和绑定关系。这里以使用延迟插件的方式为例:

import org.springframework.amqp.core.*;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import java.util.HashMap;

import java.util.Map;

@Configuration

public class RabbitMQConfig {

// 声明延迟交换机

@Bean

public CustomExchange delayedExchange() {

Map args = new HashMap<>();

// 设置延迟交换机类型为direct,也可以根据业务需求设置为topic或fanout

args.put("x-delayed-type", "direct");

return new CustomExchange("delayed_exchange", "x-delayed-message", true, false, args);

}

// 声明队列

@Bean

public Queue delayedQueue() {

return new Queue("delayed_queue", true, false, false);

}

// 绑定队列到延迟交换机

@Bean

public Binding binding() {

return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with("delayed_routing_key").noargs();

}

}

在上述代码中:

  • delayedExchange方法创建了一个类型为x-delayed-message的延迟交换机delayed_exchange,并通过args设置了x-delayed-type属性为direct,表示使用直连交换机的路由规则。
  • delayedQueue方法创建了一个名为delayed_queue的队列,true表示队列持久化,重启 RabbitMQ 后队列依然存在;false表示队列不是独占的,其他连接也可以访问;false表示队列在没有消费者连接时不会自动删除。
  • binding方法将delayed_queue队列与delayed_exchange交换机通过路由键delayed_routing_key进行绑定。

消息发送与接收

消息生产者代码示例:

import org.springframework.amqp.core.Message;

import org.springframework.amqp.core.MessageProperties;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;

@Component

public class MessageSender {

@Autowired

private RabbitTemplate rabbitTemplate;

public void send(String message, int delay) {

MessageProperties messageProperties = new MessageProperties();

// 设置消息延迟时间,单位为毫秒

messageProperties.setHeader("x-delay", delay);

Message msg = new Message(message.getBytes(), messageProperties);

// 发送消息到延迟交换机,指定路由键

rabbitTemplate.send("delayed_exchange", "delayed_routing_key", msg);

}

}

在send方法中,我们通过MessageProperties的setHeader方法设置了消息的x-delay属性,即延迟时间。然后将消息发送到delayed_exchange交换机,路由键为delayed_routing_key。

消息消费者代码示例:

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

@Component

public class MessageReceiver {

@RabbitListener(queues = "delayed_queue")

public void receive(String message) {

System.out.println("接收到延迟消息:" + message);

}

}

MessageReceiver类通过@RabbitListener注解监听delayed_queue队列,当有消息到达时,receive方法会被调用,处理接收到的消息。

通过以上代码,我们就完成了使用 RabbitMQ 延迟队列的基本功能实现。你可以在其他类中注入MessageSender,调用send方法发送延迟消息,观察MessageReceiver的输出结果。

应用案例与场景分析

外卖订单超时提醒

在外卖业务中,用户下单后,如果骑手在规定时间内未接单,系统需要发送提醒给骑手和用户。例如,设置订单 15 分钟内未被骑手接单,则触发提醒。使用 RabbitMQ 延迟队列实现步骤如下:

  1. 创建队列和交换机:创建一个普通队列take_order_queue,用于存放订单消息,并设置其死信交换机为dlx_exchange,死信路由键为dlx_routing_key。同时,创建一个死信队列dlx_take_order_queue,用于接收超时未接单的订单消息,并将其与dlx_exchange通过dlx_routing_key进行绑定。
  1. 发送消息:当用户下单时,将订单消息发送到take_order_queue,并设置消息的 TTL 为 15 分钟(900000 毫秒)。
  1. 消费消息:消费者监听dlx_take_order_queue,当有消息进入时,说明订单已超时未被接单,执行提醒操作,如发送短信或推送通知给骑手和用户。

在这个场景中,使用 RabbitMQ 延迟队列的优势在于能够高效地处理大量订单的超时提醒任务,避免了传统定时轮询方式对系统资源的浪费,提高了系统的响应速度和稳定性。

会议预定通知

在会议预定系统中,需要在会议开始前一定时间(如 30 分钟)通知参会人员。通过 RabbitMQ 延迟队列可以轻松实现这一功能:

  1. 创建队列和交换机:创建一个类型为x-delayed-message的延迟交换机meeting_notify_exchange,并设置其x-delayed-type属性为direct。同时,创建一个队列meeting_notify_queue,并将其与meeting_notify_exchange通过路由键meeting_notify_routing_key进行绑定。
  1. 发送消息:当会议预定成功时,将通知消息发送到meeting_notify_exchange,并通过消息属性x-delay指定延迟时间为 30 分钟(1800000 毫秒)。
  1. 消费消息:消费者监听meeting_notify_queue,当有消息进入时,说明会议即将开始,执行通知操作,如发送邮件或站内信给参会人员。

通过使用 RabbitMQ 延迟队列,会议预定通知的发送更加准确和及时,减少了人为疏忽导致的通知遗漏问题,提升了会议组织的效率和参会体验。

总结与展望

通过本文的介绍,我们深入了解了 RabbitMQ 延迟队列的两种主要实现方案:基于 TTL 和 DLX 的组合,以及使用延迟插件的方式。TTL + DLX 方案利用 RabbitMQ 的原生特性,无需额外插件即可实现延迟队列功能,但在延迟精度上存在一定的局限性。而延迟插件方案则提供了更精确的延迟控制,能够满足对时间精度要求较高的业务场景。

在实际应用中,RabbitMQ 延迟队列在订单处理、会议通知、外卖订单超时提醒等场景中发挥着重要作用,它能够帮助我们实现高效的异步处理和任务调度,提升系统的性能和用户体验。

希望读者通过本文的学习,能够掌握 RabbitMQ 延迟队列的实现方法,并在实际项目中灵活应用。同时,也鼓励大家进一步探索 RabbitMQ 的其他高级特性,挖掘其更多的潜力,为构建更加稳定、高效的分布式系统贡献力量。

相关推荐

得物可观测平台架构升级:基于GreptimeDB的全新监控体系实践

一、摘要在前端可观测分析场景中,需要实时观测并处理多地、多环境的运行情况,以保障Web应用和移动端的可用性与性能。传统方案往往依赖代理Agent→消息队列→流计算引擎→OLAP存储...

warm-flow新春版:网关直连和流程图重构

本期主要解决了网关直连和流程图重构,可以自此之后可支持各种复杂的网关混合、多网关直连使用。-新增Ruoyi-Vue-Plus优秀开源集成案例更新日志[feat]导入、导出和保存等新增json格式支持...

扣子空间体验报告

在数字化时代,智能工具的应用正不断拓展到我们工作和生活的各个角落。从任务规划到项目执行,再到任务管理,作者深入探讨了这款工具在不同场景下的表现和潜力。通过具体的应用实例,文章展示了扣子空间如何帮助用户...

spider-flow:开源的可视化方式定义爬虫方案

spider-flow简介spider-flow是一个爬虫平台,以可视化推拽方式定义爬取流程,无需代码即可实现一个爬虫服务。spider-flow特性支持css选择器、正则提取支持JSON/XML格式...

solon-flow 你好世界!

solon-flow是一个基础级的流处理引擎(可用于业务规则、决策处理、计算编排、流程审批等......)。提供有“开放式”驱动定制支持,像jdbc有mysql或pgsql等驱动,可...

新一代开源爬虫平台:SpiderFlow

SpiderFlow:新一代爬虫平台,以图形化方式定义爬虫流程,不写代码即可完成爬虫。-精选真开源,释放新价值。概览Spider-Flow是一个开源的、面向所有用户的Web端爬虫构建平台,它使用Ja...

通过 SQL 训练机器学习模型的引擎

关注薪资待遇的同学应该知道,机器学习相关的岗位工资普遍偏高啊。同时随着各种通用机器学习框架的出现,机器学习的门槛也在逐渐降低,训练一个简单的机器学习模型变得不那么难。但是不得不承认对于一些数据相关的工...

鼠须管输入法rime for Mac

鼠须管输入法forMac是一款十分新颖的跨平台输入法软件,全名是中州韵输入法引擎,鼠须管输入法mac版不仅仅是一个输入法,而是一个输入法算法框架。Rime的基础架构十分精良,一套算法支持了拼音、...

Go语言 1.20 版本正式发布:新版详细介绍

Go1.20简介最新的Go版本1.20在Go1.19发布六个月后发布。它的大部分更改都在工具链、运行时和库的实现中。一如既往,该版本保持了Go1的兼容性承诺。我们期望几乎所...

iOS 10平台SpriteKit新特性之Tile Maps(上)

简介苹果公司在WWDC2016大会上向人们展示了一大批新的好东西。其中之一就是SpriteKitTileEditor。这款工具易于上手,而且看起来速度特别快。在本教程中,你将了解关于TileE...

程序员简历例句—范例Java、Python、C++模板

个人简介通用简介:有良好的代码风格,通过添加注释提高代码可读性,注重代码质量,研读过XXX,XXX等多个开源项目源码从而学习增强代码的健壮性与扩展性。具备良好的代码编程习惯及文档编写能力,参与多个高...

Telerik UI for iOS Q3 2015正式发布

近日,TelerikUIforiOS正式发布了Q32015。新版本新增对XCode7、Swift2.0和iOS9的支持,同时还新增了对数轴、不连续的日期时间轴等;改进TKDataPoin...

ios使用ijkplayer+nginx进行视频直播

上两节,我们讲到使用nginx和ngixn的rtmp模块搭建直播的服务器,接着我们讲解了在Android使用ijkplayer来作为我们的视频直播播放器,整个过程中,需要注意的就是ijlplayer编...

IOS技术分享|iOS快速生成开发文档(一)

前言对于开发人员而言,文档的作用不言而喻。文档不仅可以提高软件开发效率,还能便于以后的软件开发、使用和维护。本文主要讲述Objective-C快速生成开发文档工具appledoc。简介apple...

macOS下配置VS Code C++开发环境

本文介绍在苹果macOS操作系统下,配置VisualStudioCode的C/C++开发环境的过程,本环境使用Clang/LLVM编译器和调试器。一、前置条件本文默认前置条件是,您的开发设备已...