百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 热门文章 > 正文

基于Redis实现简单的延时消息队列

bigegpt 2025-03-19 10:41 9 浏览


说到消息队列相信作为开发人员的大家都不陌生,在实际的工作中我们可能在很多场景下都会用到消息队列,消息队列不仅仅是用于收发消息,而且也可以用于解耦我们的应用系统设计,在大型的应用系统或者分布式应用系统中,我们必然会用到消息队列。

总结下,消息队列的应用场景一般有以下几种场景:

  1. 异步处理任务;
  2. 应用系统解耦;
  3. 大流量削峰;
  4. 日志处理系统;
  5. 消息通讯;


目前主流的消息队列框架有:

  • Apache的ActiveMQ;
  • Erlang语言实现的RabbitMQ;
  • Apache的RocketMQ;
  • Apache的Kafka;

这几种主流的消息队列框架各有各的优势,也有略微的不同。

消息队列比较



当然,消息队列也有一些特殊的使用场景,比如:一些电商系统中,当用户下单后,需要在规定的时间内对订单发起支付,如果在规定的时间内没有支付订单,那么该订单将自动取消。这个问题的常规解决方案有两个:

  1. 使用定时任务定时去扫描表,修改过期订单的状态。这种方案当然存在许多局限性,当订单数量不多,而且对系统性能要求不高的情况下可以考虑使用。这种方案也不够优雅;
  2. 使用基于消息队列的延时消息队列,这种方案可以对整个消息队列设置一个消息过期时间,也可以给每一个消息设置一个过期时间,这个时候消息的过期时间取决于设置的最小时间;

延时消息队列我们可以采用上面所说的消息队列框架去实现,也可以采用比较简单的基于Redis的方式去实现,众所周知Redis并不是一个消息队列框架,但是Redis在某些应用场景下可以采用其高级特性为我们提供消息队列的特性。


Rdis在常规的应用场景下,我们使用它作为高速缓存框架,搜索百度百科我们可以发现,对Redis的定义如下:

Redis是一个key-value存储系统。和Memcached类似,它支持存储的value类型相对更多,包括string(字符串)、list(链表)、set(集合)、zset(sorted set --有序集合)和hash(哈希类型)。这些数据类型都支持push/pop、add/remove及取交集并集和差集及更丰富的操作,而且这些操作都是原子性的。在此基础上,redis支持各种不同方式的排序。与memcached一样,为了保证效率,数据都是缓存在内存中。区别的是redis会周期性的把更新的数据写入磁盘或者把修改操作写入追加的记录文件,并且在此基础上实现了master-slave(主从)同步。

Redis的最基本用法是我们使用它的Key-Value特性,存储我们的热点数据或者高频访问数据,来达到提高整个应用系统的吞吐量。

就像上面对Redis的定义,zset是对有序集合的操作,我们可以利用这一特性来实现我们的延时消息队列功能。大致实现的原理如下:

Zset本质就是Set结构上加了个排序的功能,除了添加数据value之外,还提供另一属性score,这一属性在添加修改元素时候可以指定,每次指定后,Zset会自动重新按新的值调整顺序。可以理解为有两列字段的数据表,一列存value,一列存顺序编号。操作中key理解为zset的名字,那么对延时队列又有何用呢?试想如果score代表的是想要执行时间的时间戳,在某个时间将它插入Zset集合中,它便会按照时间戳大小进行排序,也就是对执行时间前后进行排序,这样的话,起一个死循环线程不断地进行取第一个key值,如果当前时间戳大于等于该key值的socre就将它取出来进行消费删除,就可以达到延时执行的目的, 注意不需要遍历整个Zset集合,以免造成性能浪费。



有了实现原理,下面我们通过一个简单的例子来演示下具体的操作过程,可能对基于Redis实现延时消息队列这一主题有更好的理解,演示的源代码链接在文章末尾附上。

新建工程:delay-message-queue-redis

pom.xml



    4.0.0
    
        com.delay.message.queue
        delay-message-queue
        0.0.1
    
    com.delay.message.queue
    delay-message-queue-redis
    0.0.1
    delay-message-queue-redis
    Demo project for Spring Boot

    
        1.8
    

    
        
            org.springframework.boot
            spring-boot-starter-data-redis
        
        
            org.springframework.boot
            spring-boot-starter-web
        

        
            org.springframework.boot
            spring-boot-devtools
            runtime
            true
        
        
            org.springframework.boot
            spring-boot-configuration-processor
            true
        
        
            org.projectlombok
            lombok
            true
        
        
            org.springframework.boot
            spring-boot-starter-test
            test
        
    

    
        
            
                org.springframework.boot
                spring-boot-maven-plugin
            
        
    


操作Redis我们使用RedisTemplate,简单的配置如下:

@Configuration
public class RedisConfig {
    @Bean
    public RedisConnectionFactory redisConnectionFactory() {
        return new LettuceConnectionFactory(new RedisStandaloneConfiguration("10.0.0.50", 6379));
    }

    @Bean
    public RedisTemplate redisTemplate() {
        RedisTemplate redisTemplate = new RedisTemplate();
        redisTemplate.setConnectionFactory(redisConnectionFactory());
        return redisTemplate;
    }
}

定义我们的消息生产者:

@Slf4j
@Service
public class ProducerService {
    private static final String QUEUE_NAME = "delay_order_queue";
    @Autowired
    private RedisTemplate redisTemplate;

    public void produce(String orderId, long expiredTime) {
        redisTemplate.opsForZSet().add(QUEUE_NAME, orderId, expiredTime);
        //log.info("order id:{} set success", orderId);
        long length = redisTemplate.opsForZSet().size(QUEUE_NAME);
        //log.info("[produce]{} length:{}", QUEUE_NAME, length);
    }
}

消息生产者主要完成的功能:以订单的过期时间为元素(value)的得分,将数据添加到队列中去;

定义我们的消息消费者:

@Slf4j
@Service
public class ConsumerService {
    private static final String QUEUE_NAME = "delay_order_queue";
    @Autowired
    private RedisTemplate redisTemplate;

    public void consume() {
        while (true) {
            Set set = redisTemplate.opsForZSet().rangeByScore(QUEUE_NAME, 0, System.currentTimeMillis(), 0, 1);
            if (set == null || set.isEmpty()) {
                try {
                    //log.info("no data will sleep");
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    log.error("InterruptedException", e);
                }
                continue;
            }
            String orderId = set.iterator().next();
            if (redisTemplate.opsForZSet().remove(QUEUE_NAME, orderId) > 0) {
                log.info("order id:{} handle success", orderId);
                long length = redisTemplate.opsForZSet().size(QUEUE_NAME);
                log.info("[consume]{} length:{}", QUEUE_NAME, length);
            }
        }
    }
}

消息消费者的主要功能如下:

  • 循环从Redis的Zset中拿取一个0<=score<=当前时间的元素(value);
  • 如果没有取到值,则整个线程休眠一秒钟;
  • 如果取到值,则从该队列(key)删除取到的元素(value),删除的目的是防止一个数据被重复的消费;然后对取到的值进行后续的数据处理,这里是将数据打印出来。

整个生产者和消费者我们已经实现了,下面我们通过Junit来生产消息和消费消息;

生产消息的过程:

@Slf4j
@SpringBootTest
@RunWith(SpringJUnit4ClassRunner.class)
public class ProducerServiceTest {
    @Autowired
    private ProducerService producerService;

    @Test
    public void produce() {
        Random random = new Random(1);
        for (int i = 0; i < 10; i++) {
            Calendar calendar = Calendar.getInstance();
            // 产生一个随机数
            int time = random.nextInt(100);
            calendar.add(Calendar.SECOND, time);
            String orderId = "order-id-" + i;
            long expired = calendar.getTimeInMillis();
            log.info("order id:{}, expired:{}", orderId, time);
            producerService.produce(orderId, expired);
            try {
                //log.info("no data will sleep");
                Thread.sleep(500);
            } catch (InterruptedException e) {
                log.error("InterruptedException", e);
            }
        }
    }
}

为了方便我们观察,我们采用在当前时间的基础上加上一个随机的时间作为订单的过期时间,每产生一个订单线程休眠0.5秒,总共产生10个订单。

消息的消费过程:

@SpringBootTest
@RunWith(SpringJUnit4ClassRunner.class)
public class ConsumerServiceTest {
    @Autowired
    private ConsumerService consumerService;

    @Test
    public void consume() {
        consumerService.consume();
    }
}

该过程比较简单,它主要是启动我们的循环函数从Redis中取数据。

先启动我们的消息消费过程ConsumerServiceTest,然后再启动我们的消息生产过程ProducerServiceTest,观察日志的打印输出。

消息生产过程:


消息生产过程

消息消费过程:


消息消费过程

仔细对比我们可以很容易发现:

  1. order-id-5的过期时间最短,它也是最先被消费掉的,其次是order-id-7;
  2. 当生产消息的过程完成后共产生了10个消息,消息消费过程中,每消费一个消息,又没有产生新的消息的时候,整个消息队列的长度在变小;


通过上面的演示,我们实现了基于Redis实现简单的延时消息队列,最主要我们使用了Redis的Zset特性来完成延时消息队列的功能。

上面的演示在高并发场景下,可能会存在问题:

  • 高并发场景下对Redis的操作不够原子性;
  • 不适合分布式应用场景下使用;
  • 适合单体应用中延时消息队列的使用;

基于上面所提到的问题,最好的解决方案是采用消息队列框架去实现,例如:RabbitMQ给延时队列统一设置消息过期时间,过期的消息将被路由到另一个队列中;也可以给每一个消息设置一个过期时间,过期的消息同样路由到另一个队列中。


源代码GitHub地址:

https://github.com/bq-xiao/delay-message-queue


不积跬步,无以至千里;不积小流,无以成江海!

相关推荐

最全的MySQL总结,助你向阿里“开炮”(面试题+笔记+思维图)

前言作为一名编程人员,对MySQL一定不会陌生,尤其是互联网行业,对MySQL的使用是比较多的。对于求职者来说,MySQL又是面试中一定会问到的重点,很多人拥有大厂梦,却因为MySQL败下阵来。实际上...

Redis数据库从入门到精通(redis数据库设计)

目录一、常见的非关系型数据库NOSQL分类二、了解Redis三、Redis的单节点安装教程四、Redis的常用命令1、Help帮助命令2、SET命令3、过期命令4、查找键命令5、操作键命令6、GET命...

netcore 急速接入第三方登录,不看后悔

新年新气象,趁着新年的喜庆,肝了十来天,终于发了第一版,希望大家喜欢。如果有不喜欢看文字的童鞋,可以直接看下面的地址体验一下:https://oauthlogin.net/前言此次带来得这个小项目是...

精选 30 个 C++ 面试题(含解析)(c++面试题和答案汇总)

大家好,我是柠檬哥,专注编程知识分享。欢迎关注@程序员柠檬橙,编程路上不迷路,私信发送以下关键字获取编程资源:发送1024打包下载10个G编程资源学习资料发送001获取阿里大神LeetCode...

Oracle 12c系列(一)|多租户容器数据库

作者杨禹航出品沃趣技术Oracle12.1发布至今已有多年,但国内Oracle12C的用户并不多,随着12.2在去年的发布,选择安装Oracle12c的客户量明显增加,在接下来的几年中,Or...

flutter系列之:UI layout简介(flutter-ui-nice)

简介对于一个前端框架来说,除了各个组件之外,最重要的就是将这些组件进行连接的布局了。布局的英文名叫做layout,就是用来描述如何将组件进行摆放的一个约束。在flutter中,基本上所有的对象都是wi...

Flutter 分页功能表格控件(flutter 列表)

老孟导读:前2天有读者问到是否有带分页功能的表格控件,今天分页功能的表格控件详细解析来来。PaginatedDataTablePaginatedDataTable是一个带分页功能的DataTable,...

Flutter | 使用BottomNavigationBar快速构建底部导航

平时我们在使用app时经常会看到底部导航栏,而在flutter中它的实现也较为简单.需要用到的组件:BottomNavigationBar导航栏的主体BottomNavigationBarI...

Android中的数据库和本地存储在Flutter中是怎样实现的

如何使用SharedPreferences?在Android中,你可以使用SharedPreferencesAPI来存储少量的键值对。在Flutter中,使用Shared_Pref...

Flet,一个Flutter应用的实用Python库!

▼Flet:用Python轻松构建跨平台应用!在纷繁复杂的Python框架中,Flet宛如一缕清风,为开发者带来极致的跨平台应用开发体验。它用最简单的Python代码,帮你实现移动端、桌面端...

flutter系列之:做一个图像滤镜(flutter photo)

简介很多时候,我们需要一些特效功能,比如给图片做个滤镜什么的,如果是h5页面,那么我们可以很容易的通过css滤镜来实现这个功能。那么如果在flutter中,如果要实现这样的滤镜功能应该怎么处理呢?一起...

flutter软件开发笔记20-flutter web开发

flutterweb开发优势比较多,采用统一的语言,就能开发不同类型的软件,在web开发中,特别是后台式软件中,相比传统的html5开发,更高效,有点像c++编程的方式,把web设计出来了。一...

Flutter实战-请求封装(五)之设置抓包Proxy

用了两年的flutter,有了一些心得,不虚头巴脑,只求实战有用,以供学习或使用flutter的小伙伴参考,学习尚浅,如有不正确的地方还望各路大神指正,以免误人子弟,在此拜谢~(原创不易,转发请标注来...

为什么不在 Flutter 中使用全局变量来管理状态

我相信没有人用全局变量来管理Flutter应用程序的状态。毫无疑问,我们的Flutter应用程序需要状态管理包或Flutter的基本小部件(例如InheritedWidget或St...

Flutter 攻略(Dart基本数据类型,变量 整理 2)

代码运行从main方法开始voidmain(){print("hellodart");}变量与常量var声明变量未初始化变量为nullvarc;//未初始化print(c)...