延迟消息的方案探索(消息延迟搞笑)
bigegpt 2025-03-19 10:42 10 浏览
一、背景
日常开发中,我们经常会遇到这一类的业务场景例如:订单超时、定时结算、通知提醒、失败重试等等。这类场景的需求大体上都可以通过延迟消息来实现,今天和大家一起来探索几种延迟消息的方案
延迟队列&延迟消息
- 延迟队列:是一种特殊的队列。它里面的每个元素都有一个过期时间,当元素还没到过期时间的时候,如果你试图从队列里面获取一个元素,这个时候就会被阻塞。当有元素过期的时候,我们首先拿到的永远是最先过期的那个元素。很多语言本身就提供了延迟队列的实现,比如: Java 里面的 DelayQueue。
- 延迟消息:是一种特殊形态的延迟队列,或者说是基于消息队列的延迟队列。具体来说,延迟消息是指消息不是立刻被消费的,而是在经过一段时间之后,才会被消费。在此之前,这个消息一直都被存储在消息队列的服务器上。比如:我们订单超时取消 就用到了延迟消息。
假如我们需要利用 延迟消息来实现 订单超时取消的 这个场景,方案有哪些?
方案1:利用定时任务调度
利用定时任务来实现延迟消息最简单的办法。对于一个延迟到 30 分钟后才可以被消费的消息,我们是不是也可以认为是 30 分钟后才需要发送?也就是说,我们可以设定一个定时任务,这个任务会在 30 分钟后把消息发送到消息服务器上。
如上图所示:生产者在定时任务平台上注册一个任务,这个任务就是在 30 分钟之后发送一条消息到 Kafka 上。之后消费者就能直接消费kafka的消息。
优点:
- 简单、链路短
缺点:
- 无法支撑高并发
- 无法处理大量数据
- 依赖定时任务平台的可靠性
支撑不住高并发。这是因为绝大多数定时任务中间件都没办法支撑住高并发、大数据的定时任务调度,所以只有应用规模小,延迟消息也不多的话,可以考虑使用这个方案。如果想要支持高并发、大数据的场景,还是要考虑利用消息队列。
既然我们想到了用消息队列,那么消息中间件那么多,有现成的插件能用吗?
方案2:RabbitMQ延迟消息的插件
目前,大部分云厂商版本的消息队列都支持了延迟消息,RabbitMQ 有插件支持延迟消息功能,而 Kafka 则只能自己研发。
RabbitMQ 有一个延迟消息的插件
rabbitmq_delayed_message_exchange,只需要启用这个插件就可以使用延迟消息。这个插件的基本原理也比较简单,它实现了一个 exchange。这个 exchange 控制住了消息什么时候会被真正被投递到队列里。
如上图所示,消息会先暂时存储在 exchange 里面。它使用的是 Mnesia 来存储, Mnesia 是什么?可以直观地把它看作一个基于文件的数据库。
当延迟的时间满足条件之后,这些存储的数据就会被投递到真正的消息队列里面。紧接着消费者就可以消费到这个消息了。
这个方案也足够简单,
但是..但是.. 这个插件本身也是有很多限制的,在它的官网主页里面就有说明,其中有两个最突出的限制。
- 消息在真的被投递到目标消息队列之前,是存放在接收到了这个消息的服务端本地的 Mnesia 里面。也就是说,如果这个时候还没有刷新磁盘,那么消息就会丢失;如果这个节点不可用了,那么消息也同样会丢失
- 不支持高并发、大数据量。显然,现实中很多时候都是要在高并发、大数据量场景下使用延迟消息的。因此这个缺点也限制了这个插件被广泛使用。
除了这个插件,可以自己手动实现延迟消息吗?
方案3: RabbitMQ 手动实现延迟消息
手动实现延迟消息。这就要利用到 RabbitMQ 的 ttl 功能和死信队列
死信队列:是一种逻辑上的概念,也就是说它本身只是一个普通的队列。而死信的意思是指过期的无法被消费的消息,这些消息会被投送到这个死信队列。
简单来说,就是我们准备一个队列 delay_queue,为这个 delay_queue 设置过期时间,这个 delay_queue 不需要消费者。然后把真实的业务队列biz_queue 绑定到这个 delay_queue,作为它的死信队列。
如上图所示:生产者发先送消息到 delay_queue,因为没有消费者,所以消息会过期。过期之后的消息被转发到死信队列,也就是 biz_queue 里面。这时候消费者就能拿到消息了
这种方案并没有上面插件的那两个缺点。但是 ttl 的设置是在队列级别上,也就是一个 delay_queue 延迟多长时间是固定的,不能做到随机。比如我们有一条消息需要延后3分钟,另外一条消息需要延后5分钟,这种情况下单个queue就无法满足了。因此,我们可能需要创建很多不同过期时间的 delay_queue 才能满足业务需要。
上面第2、3方案都是基于RabbitMQ来实现的;我们可以用性能更优越的Kafka来能实现延迟消息吗?
方案4: kafka 借助定时任务job
思路类似于方案1,增加一个检测服务,这个检测服务job,每分钟从延迟队列中获取消息,然后判断这些延迟消息是否过期,如果到时间,就把这些消息发送到业务队列,如果没到期,继续放到延迟队列里面,这样也能简单的实现延迟消息的功能
这个方案的和方案1 一样无法支撑高并发、大量数据处理;并且定时任务调度,还有1分钟的延迟。
如果我们 不借助定时任务调度,kafka中还能实现延迟消息吗?
方案5: kafka 分区设置不同延迟时间
类似于上面方案3的思路,创建了一个 delay_topic,这个 topic 有 N 个分区,每个分区设定了不同的延迟时间。然后我们创建了一个消费组去消费这个 delay_topic,每个分区有一个独立的消费者。每个消费者在读取到一条消息之后,就会根据消息里面的延迟时间来等待一段时间。等待完之后,再把消息发送到业务 topic 上。
这里关键的角色是 delay_topic 和延迟消费组。
delay_topic 同一个分区接收了相同延迟时间的消息,不同的分区,可以满足不同的延迟需求。
延迟消费组 会创建出和分区数量一样的消费者,每一个消费者消费一个分区。消费者每次读取一个消息,等延迟足够长的时间之后,就会转发给 biz_topic。
因此对于生产者来说,他们需要根据自己的延迟时间来选择正确的分区。而消费者则是对整个过程是无感的,也就是说他们并不知道中间有延迟消费者在做转发的事情
这个 N 和可以分区数量相关。
- 5 个分区:延迟时间分别是 1min、5min、10min、30min、60min。
- 10 个分区:延迟时间分别是 1min、3min,5min、10min、30min、60min、90min、120min、180min、240min。
这个方案需要注意两个问题
rebalance 问题
在这个方案中,因为消费者睡眠了,睡眠期间不会消费消息,所以 Kafka 就会判定这个消费者已经崩溃了,就会触发 rebalance(再均衡)。发生 rebalance 之后,等消费者再恢复过来,就不知道又会被分配到哪个分区,那么之前的睡眠就可以认为是白睡了。
为了避免这个问题,就需要确保在睡眠期间不会触发 rebalance。因此需要利用 Kafka 的暂停(Pause)功能,在睡眠结束之后,再恢复(Resume)。注意,Kafka 的暂停功能相当于拉取 0 条数据,并不是说不拉数据,也就是说还是会发起 Poll 调用。
所以整体逻辑就是这样的:
- 拉取一条消息,假如说 offset = N,查看剩余的延迟时间 t。
- 暂停消费,睡眠一段时间 t。睡眠结束之后,
- 恢复消费,继续从 offset = N 开始消费。
关键点:Kafka 的暂停消费,并不是不再发起 Poll 请求,而是请求了,但是不会真的拉消息。这样可以让 Kafka 始终认为消费者还活着。
一致性问题
前面说的是从服务端拉取到消息,然后转发到 biz_topic 里面。这里面涉及到了一个关键问题,是先提交消息,还是先转发?
如果是先提交,那么就会出现消息提交了,但是还来不及转发 biz_topic 就宕机的情况。
但是如果先转发 biz_topic,然后提交。那如果提交之前宕机了,后面恢复过来,又会转发一次,这样就转发两次。
这里建议先转发 biz_topic,然后再提交。如果在转发 biz_topic 之后,提交失败了,下一次就还可以重试,但是 biz_topic 就可能收到两条同样的消息。在这种场景下,就只能要求消费者去做幂等。因为发送方为了确保发送成功,本身就可能会重试。
优点:
- 这个方案相对简单,对业务方的影响很小,业务方只需要根据自己的延迟时间选择正确的分区就可以了。
- 借助kafka本身优越的性能,能支持并发、大数量数据处理的场景
缺点:
- 延迟时间必须预先设定好,比如只能允许延迟 1min、3min 或者 10min 的消息,不支持随机延迟时间。虽然延迟的时间一般都是固定的,但是不排除一些需要根据具体业务数据来计算延迟时间的情况,那么这个就不适用了。
- 分区之间负载不均匀。比如很多业务可能只需要延迟 3min,那么 1min 和 10min 分区的数据就很少。这会进一步导致一个问题,就是负载高的分区会出现消息积压的问题。 在这里只能考虑多设置几个延迟时间相近的分区,比如说在 3min 附近设置 2min30s,3min30s 这种分区来分摊压力。
最后:
关于延迟消息的实现方案还有很多,技术上永远没有十分完美的方案;具体还需结合自己的业务逻辑特点、规模、技术栈、研发效能等诸多因素来选择一个最优解来实现。
相关推荐
- 最全的MySQL总结,助你向阿里“开炮”(面试题+笔记+思维图)
-
前言作为一名编程人员,对MySQL一定不会陌生,尤其是互联网行业,对MySQL的使用是比较多的。对于求职者来说,MySQL又是面试中一定会问到的重点,很多人拥有大厂梦,却因为MySQL败下阵来。实际上...
- Redis数据库从入门到精通(redis数据库设计)
-
目录一、常见的非关系型数据库NOSQL分类二、了解Redis三、Redis的单节点安装教程四、Redis的常用命令1、Help帮助命令2、SET命令3、过期命令4、查找键命令5、操作键命令6、GET命...
- netcore 急速接入第三方登录,不看后悔
-
新年新气象,趁着新年的喜庆,肝了十来天,终于发了第一版,希望大家喜欢。如果有不喜欢看文字的童鞋,可以直接看下面的地址体验一下:https://oauthlogin.net/前言此次带来得这个小项目是...
- 精选 30 个 C++ 面试题(含解析)(c++面试题和答案汇总)
-
大家好,我是柠檬哥,专注编程知识分享。欢迎关注@程序员柠檬橙,编程路上不迷路,私信发送以下关键字获取编程资源:发送1024打包下载10个G编程资源学习资料发送001获取阿里大神LeetCode...
- Oracle 12c系列(一)|多租户容器数据库
-
作者杨禹航出品沃趣技术Oracle12.1发布至今已有多年,但国内Oracle12C的用户并不多,随着12.2在去年的发布,选择安装Oracle12c的客户量明显增加,在接下来的几年中,Or...
- flutter系列之:UI layout简介(flutter-ui-nice)
-
简介对于一个前端框架来说,除了各个组件之外,最重要的就是将这些组件进行连接的布局了。布局的英文名叫做layout,就是用来描述如何将组件进行摆放的一个约束。在flutter中,基本上所有的对象都是wi...
- Flutter 分页功能表格控件(flutter 列表)
-
老孟导读:前2天有读者问到是否有带分页功能的表格控件,今天分页功能的表格控件详细解析来来。PaginatedDataTablePaginatedDataTable是一个带分页功能的DataTable,...
- Flutter | 使用BottomNavigationBar快速构建底部导航
-
平时我们在使用app时经常会看到底部导航栏,而在flutter中它的实现也较为简单.需要用到的组件:BottomNavigationBar导航栏的主体BottomNavigationBarI...
- Android中的数据库和本地存储在Flutter中是怎样实现的
-
如何使用SharedPreferences?在Android中,你可以使用SharedPreferencesAPI来存储少量的键值对。在Flutter中,使用Shared_Pref...
- Flet,一个Flutter应用的实用Python库!
-
▼Flet:用Python轻松构建跨平台应用!在纷繁复杂的Python框架中,Flet宛如一缕清风,为开发者带来极致的跨平台应用开发体验。它用最简单的Python代码,帮你实现移动端、桌面端...
- flutter系列之:做一个图像滤镜(flutter photo)
-
简介很多时候,我们需要一些特效功能,比如给图片做个滤镜什么的,如果是h5页面,那么我们可以很容易的通过css滤镜来实现这个功能。那么如果在flutter中,如果要实现这样的滤镜功能应该怎么处理呢?一起...
- flutter软件开发笔记20-flutter web开发
-
flutterweb开发优势比较多,采用统一的语言,就能开发不同类型的软件,在web开发中,特别是后台式软件中,相比传统的html5开发,更高效,有点像c++编程的方式,把web设计出来了。一...
- Flutter实战-请求封装(五)之设置抓包Proxy
-
用了两年的flutter,有了一些心得,不虚头巴脑,只求实战有用,以供学习或使用flutter的小伙伴参考,学习尚浅,如有不正确的地方还望各路大神指正,以免误人子弟,在此拜谢~(原创不易,转发请标注来...
- 为什么不在 Flutter 中使用全局变量来管理状态
-
我相信没有人用全局变量来管理Flutter应用程序的状态。毫无疑问,我们的Flutter应用程序需要状态管理包或Flutter的基本小部件(例如InheritedWidget或St...
- Flutter 攻略(Dart基本数据类型,变量 整理 2)
-
代码运行从main方法开始voidmain(){print("hellodart");}变量与常量var声明变量未初始化变量为nullvarc;//未初始化print(c)...
- 一周热门
- 最近发表
-
- 最全的MySQL总结,助你向阿里“开炮”(面试题+笔记+思维图)
- Redis数据库从入门到精通(redis数据库设计)
- netcore 急速接入第三方登录,不看后悔
- 精选 30 个 C++ 面试题(含解析)(c++面试题和答案汇总)
- Oracle 12c系列(一)|多租户容器数据库
- flutter系列之:UI layout简介(flutter-ui-nice)
- Flutter 分页功能表格控件(flutter 列表)
- Flutter | 使用BottomNavigationBar快速构建底部导航
- Android中的数据库和本地存储在Flutter中是怎样实现的
- Flet,一个Flutter应用的实用Python库!
- 标签列表
-
- mybatiscollection (79)
- mqtt服务器 (88)
- keyerror (78)
- c#map (65)
- xftp6 (83)
- bt搜索 (75)
- c#var (76)
- xcode-select (66)
- mysql授权 (74)
- 下载测试 (70)
- linuxlink (65)
- pythonwget (67)
- androidinclude (65)
- libcrypto.so (74)
- linux安装minio (74)
- ubuntuunzip (67)
- vscode使用技巧 (83)
- secure-file-priv (67)
- vue阻止冒泡 (67)
- jquery跨域 (68)
- php写入文件 (73)
- kafkatools (66)
- mysql导出数据库 (66)
- jquery鼠标移入移出 (71)
- 取小数点后两位的函数 (73)