延迟消息的方案探索(消息延迟搞笑)
bigegpt 2025-03-19 10:42 8 浏览
一、背景
日常开发中,我们经常会遇到这一类的业务场景例如:订单超时、定时结算、通知提醒、失败重试等等。这类场景的需求大体上都可以通过延迟消息来实现,今天和大家一起来探索几种延迟消息的方案
延迟队列&延迟消息
- 延迟队列:是一种特殊的队列。它里面的每个元素都有一个过期时间,当元素还没到过期时间的时候,如果你试图从队列里面获取一个元素,这个时候就会被阻塞。当有元素过期的时候,我们首先拿到的永远是最先过期的那个元素。很多语言本身就提供了延迟队列的实现,比如: Java 里面的 DelayQueue。
- 延迟消息:是一种特殊形态的延迟队列,或者说是基于消息队列的延迟队列。具体来说,延迟消息是指消息不是立刻被消费的,而是在经过一段时间之后,才会被消费。在此之前,这个消息一直都被存储在消息队列的服务器上。比如:我们订单超时取消 就用到了延迟消息。
假如我们需要利用 延迟消息来实现 订单超时取消的 这个场景,方案有哪些?
方案1:利用定时任务调度
利用定时任务来实现延迟消息最简单的办法。对于一个延迟到 30 分钟后才可以被消费的消息,我们是不是也可以认为是 30 分钟后才需要发送?也就是说,我们可以设定一个定时任务,这个任务会在 30 分钟后把消息发送到消息服务器上。
如上图所示:生产者在定时任务平台上注册一个任务,这个任务就是在 30 分钟之后发送一条消息到 Kafka 上。之后消费者就能直接消费kafka的消息。
优点:
- 简单、链路短
缺点:
- 无法支撑高并发
- 无法处理大量数据
- 依赖定时任务平台的可靠性
支撑不住高并发。这是因为绝大多数定时任务中间件都没办法支撑住高并发、大数据的定时任务调度,所以只有应用规模小,延迟消息也不多的话,可以考虑使用这个方案。如果想要支持高并发、大数据的场景,还是要考虑利用消息队列。
既然我们想到了用消息队列,那么消息中间件那么多,有现成的插件能用吗?
方案2:RabbitMQ延迟消息的插件
目前,大部分云厂商版本的消息队列都支持了延迟消息,RabbitMQ 有插件支持延迟消息功能,而 Kafka 则只能自己研发。
RabbitMQ 有一个延迟消息的插件
rabbitmq_delayed_message_exchange,只需要启用这个插件就可以使用延迟消息。这个插件的基本原理也比较简单,它实现了一个 exchange。这个 exchange 控制住了消息什么时候会被真正被投递到队列里。
如上图所示,消息会先暂时存储在 exchange 里面。它使用的是 Mnesia 来存储, Mnesia 是什么?可以直观地把它看作一个基于文件的数据库。
当延迟的时间满足条件之后,这些存储的数据就会被投递到真正的消息队列里面。紧接着消费者就可以消费到这个消息了。
这个方案也足够简单,
但是..但是.. 这个插件本身也是有很多限制的,在它的官网主页里面就有说明,其中有两个最突出的限制。
- 消息在真的被投递到目标消息队列之前,是存放在接收到了这个消息的服务端本地的 Mnesia 里面。也就是说,如果这个时候还没有刷新磁盘,那么消息就会丢失;如果这个节点不可用了,那么消息也同样会丢失
- 不支持高并发、大数据量。显然,现实中很多时候都是要在高并发、大数据量场景下使用延迟消息的。因此这个缺点也限制了这个插件被广泛使用。
除了这个插件,可以自己手动实现延迟消息吗?
方案3: RabbitMQ 手动实现延迟消息
手动实现延迟消息。这就要利用到 RabbitMQ 的 ttl 功能和死信队列
死信队列:是一种逻辑上的概念,也就是说它本身只是一个普通的队列。而死信的意思是指过期的无法被消费的消息,这些消息会被投送到这个死信队列。
简单来说,就是我们准备一个队列 delay_queue,为这个 delay_queue 设置过期时间,这个 delay_queue 不需要消费者。然后把真实的业务队列biz_queue 绑定到这个 delay_queue,作为它的死信队列。
如上图所示:生产者发先送消息到 delay_queue,因为没有消费者,所以消息会过期。过期之后的消息被转发到死信队列,也就是 biz_queue 里面。这时候消费者就能拿到消息了
这种方案并没有上面插件的那两个缺点。但是 ttl 的设置是在队列级别上,也就是一个 delay_queue 延迟多长时间是固定的,不能做到随机。比如我们有一条消息需要延后3分钟,另外一条消息需要延后5分钟,这种情况下单个queue就无法满足了。因此,我们可能需要创建很多不同过期时间的 delay_queue 才能满足业务需要。
上面第2、3方案都是基于RabbitMQ来实现的;我们可以用性能更优越的Kafka来能实现延迟消息吗?
方案4: kafka 借助定时任务job
思路类似于方案1,增加一个检测服务,这个检测服务job,每分钟从延迟队列中获取消息,然后判断这些延迟消息是否过期,如果到时间,就把这些消息发送到业务队列,如果没到期,继续放到延迟队列里面,这样也能简单的实现延迟消息的功能
这个方案的和方案1 一样无法支撑高并发、大量数据处理;并且定时任务调度,还有1分钟的延迟。
如果我们 不借助定时任务调度,kafka中还能实现延迟消息吗?
方案5: kafka 分区设置不同延迟时间
类似于上面方案3的思路,创建了一个 delay_topic,这个 topic 有 N 个分区,每个分区设定了不同的延迟时间。然后我们创建了一个消费组去消费这个 delay_topic,每个分区有一个独立的消费者。每个消费者在读取到一条消息之后,就会根据消息里面的延迟时间来等待一段时间。等待完之后,再把消息发送到业务 topic 上。
这里关键的角色是 delay_topic 和延迟消费组。
delay_topic 同一个分区接收了相同延迟时间的消息,不同的分区,可以满足不同的延迟需求。
延迟消费组 会创建出和分区数量一样的消费者,每一个消费者消费一个分区。消费者每次读取一个消息,等延迟足够长的时间之后,就会转发给 biz_topic。
因此对于生产者来说,他们需要根据自己的延迟时间来选择正确的分区。而消费者则是对整个过程是无感的,也就是说他们并不知道中间有延迟消费者在做转发的事情
这个 N 和可以分区数量相关。
- 5 个分区:延迟时间分别是 1min、5min、10min、30min、60min。
- 10 个分区:延迟时间分别是 1min、3min,5min、10min、30min、60min、90min、120min、180min、240min。
这个方案需要注意两个问题
rebalance 问题
在这个方案中,因为消费者睡眠了,睡眠期间不会消费消息,所以 Kafka 就会判定这个消费者已经崩溃了,就会触发 rebalance(再均衡)。发生 rebalance 之后,等消费者再恢复过来,就不知道又会被分配到哪个分区,那么之前的睡眠就可以认为是白睡了。
为了避免这个问题,就需要确保在睡眠期间不会触发 rebalance。因此需要利用 Kafka 的暂停(Pause)功能,在睡眠结束之后,再恢复(Resume)。注意,Kafka 的暂停功能相当于拉取 0 条数据,并不是说不拉数据,也就是说还是会发起 Poll 调用。
所以整体逻辑就是这样的:
- 拉取一条消息,假如说 offset = N,查看剩余的延迟时间 t。
- 暂停消费,睡眠一段时间 t。睡眠结束之后,
- 恢复消费,继续从 offset = N 开始消费。
关键点:Kafka 的暂停消费,并不是不再发起 Poll 请求,而是请求了,但是不会真的拉消息。这样可以让 Kafka 始终认为消费者还活着。
一致性问题
前面说的是从服务端拉取到消息,然后转发到 biz_topic 里面。这里面涉及到了一个关键问题,是先提交消息,还是先转发?
如果是先提交,那么就会出现消息提交了,但是还来不及转发 biz_topic 就宕机的情况。
但是如果先转发 biz_topic,然后提交。那如果提交之前宕机了,后面恢复过来,又会转发一次,这样就转发两次。
这里建议先转发 biz_topic,然后再提交。如果在转发 biz_topic 之后,提交失败了,下一次就还可以重试,但是 biz_topic 就可能收到两条同样的消息。在这种场景下,就只能要求消费者去做幂等。因为发送方为了确保发送成功,本身就可能会重试。
优点:
- 这个方案相对简单,对业务方的影响很小,业务方只需要根据自己的延迟时间选择正确的分区就可以了。
- 借助kafka本身优越的性能,能支持并发、大数量数据处理的场景
缺点:
- 延迟时间必须预先设定好,比如只能允许延迟 1min、3min 或者 10min 的消息,不支持随机延迟时间。虽然延迟的时间一般都是固定的,但是不排除一些需要根据具体业务数据来计算延迟时间的情况,那么这个就不适用了。
- 分区之间负载不均匀。比如很多业务可能只需要延迟 3min,那么 1min 和 10min 分区的数据就很少。这会进一步导致一个问题,就是负载高的分区会出现消息积压的问题。 在这里只能考虑多设置几个延迟时间相近的分区,比如说在 3min 附近设置 2min30s,3min30s 这种分区来分摊压力。
最后:
关于延迟消息的实现方案还有很多,技术上永远没有十分完美的方案;具体还需结合自己的业务逻辑特点、规模、技术栈、研发效能等诸多因素来选择一个最优解来实现。
相关推荐
- 【机器学习】数据挖掘神器LightGBM详解(附代码)
-
来源:机器学习初学者本文约11000字,建议阅读20分钟本文为你介绍数据挖掘神器LightGBM。LightGBM是微软开发的boosting集成模型,和XGBoost一样是对GBDT...
- 3分钟,用DeepSeek全自动生成语音计算器,还带括号表达式!
-
最近,大家慢慢了解到了DeepSeek的强大功能,特别是它在编程领域也同样强大。编程零基础小白,一行代码不用写,也能全自动生成一个完整的、可运行的软件来!很多程序员一直不相信小白不写代码也能编软件!下...
- python学习笔记 3.表达式
-
在Python中,表达式是由值、变量和运算符组成的组合。以下是一些常见的Python表达式:算术表达式:由数值和算术运算符组成的表达式,如加减乘除等。例如:5+3、7*2、10/3等。字符...
- 5.7 VS 8.x,为什么用户不升级MySql
-
一般来说为了更好的功能和性能,都需要将软件升级到最新的版本,然而在开源软件中,由于一些开发商变化或其他的问题(开源授权变化),致使人们不愿使用最新的版本,一个最典型的问题就是CentOS操作系统。还有...
- 大厂高频:讲一下MySQL主从复制
-
大家经常听说主从复制,那么主从复制的意义?能解决的问题有哪些?主从复制能解决的问题就是在我们平时开发的程序中操作数据库的时候,大多数的情况查询的操作大大超过了写的操作,也就说对数据库读取数据的压力比较...
- MYSQL数据库的五大安全防护措施
-
以技术为基础的企业里最有价值的资产莫过于是客户或者其数据库中的产品信息了。因此,在这样的企业中,保证数据库免受外界攻击是数据库管理的重要环节。很多数据库管理员并没有实施什么数据库保护措施,只是因为觉得...
- docker安装mysql
-
准备工作已安装Docker环境(官方安装文档)终端/命令行工具(Linux/macOS/WSL)步骤1:拉取MySQL镜像打开终端执行以下命令,拉取官方MySQL镜像(默认最新版本):d...
- Zabbix监控系统系列之六:监控 mysql
-
zabbix监控mysql1、监控规划在创建监控项之前要尽量考虑清楚要监控什么,怎么监控,监控数据如何存储,监控数据如何展现,如何处理报警等。要进行监控的系统规划需要对Zabbix很了解,这里只是...
- 详解MySQL的配置文件及优化
-
#头条创作挑战赛#在Windows系统中,MySQL服务器启动时最先读取的是my.ini这个配置文件。在Linux系统中,配置文件为my.cnf,其路径一般为/etc/my.cnf或/etc/mysq...
- Mysql 几个批处理执行脚本
-
学习mysql过程中,需要创建测试数据,并让多人每人一个数据库连接并进行作业检查。整合部分批处理创建数据批量创建数据库DELIMITER$CREATEPROCEDURECreateDatab...
- MySQL学到什么程度?才有可以在简历上写精通
-
前言如今互联网行业用的最多就是MySQL,然而对于高级Web面试者,尤其对于寻找30k下工作的求职者,很多MySQL相关知识点基本都会涉及,如果面试中,你的相关知识答的模糊和不切要点,基...
- mysql 主、从服务器配置“Slave_IO_Running: Connecting” 问题分析
-
#在进行mysql主、从服务器配置时,”SHOWSLAVESTATUS;“查看从库状态Slave_IO_Runing,出现错误:“Slave_IO_Running:Connectin...
- MYSQL数据同步
-
java开发工程师在实际的开发经常会需要实现两台不同机器上的MySQL数据库的数据同步,要解决这个问题不难,无非就是mysql数据库的数据同步问题。但要看你是一次性的数据同步需求,还是定时数据同步,亦...
- 「MySQL 8」MySQL 5.7都即将停只维护了,是时候学习一波MySQL 8了
-
MySQL8新特性选择MySQL8的背景:MySQL5.6已经停止版本更新了,对于MySQL5.7版本,其将于2023年10月31日停止支持。后续官方将不再进行后续的代码维护。另外,...
- Prometheus监控mysql
-
通过Prometheus监控Mysql,我们需要在Mysql端安装一个mysql-exporter,然后Prometheus通过mysql-exporter暴露的端口抓取数据。1.安装一个MYSQL配...
- 一周热门
- 最近发表
- 标签列表
-
- mybatiscollection (79)
- mqtt服务器 (88)
- keyerror (78)
- c#map (65)
- resize函数 (64)
- xftp6 (83)
- bt搜索 (75)
- c#var (76)
- mybatis大于等于 (64)
- xcode-select (66)
- mysql授权 (74)
- httperror403.14-forbidden (63)
- logstashinput (65)
- hadoop端口 (65)
- dockernetworkconnect (63)
- esxi7 (63)
- vue阻止冒泡 (67)
- c#for循环 (63)
- oracle时间戳转换日期 (64)
- jquery跨域 (68)
- php写入文件 (73)
- kafkatools (66)
- mysql导出数据库 (66)
- jquery鼠标移入移出 (71)
- 取小数点后两位的函数 (73)