百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 热门文章 > 正文

基于Redis实现简单的延时消息队列

bigegpt 2025-03-19 10:41 7 浏览


说到消息队列相信作为开发人员的大家都不陌生,在实际的工作中我们可能在很多场景下都会用到消息队列,消息队列不仅仅是用于收发消息,而且也可以用于解耦我们的应用系统设计,在大型的应用系统或者分布式应用系统中,我们必然会用到消息队列。

总结下,消息队列的应用场景一般有以下几种场景:

  1. 异步处理任务;
  2. 应用系统解耦;
  3. 大流量削峰;
  4. 日志处理系统;
  5. 消息通讯;


目前主流的消息队列框架有:

  • Apache的ActiveMQ;
  • Erlang语言实现的RabbitMQ;
  • Apache的RocketMQ;
  • Apache的Kafka;

这几种主流的消息队列框架各有各的优势,也有略微的不同。

消息队列比较



当然,消息队列也有一些特殊的使用场景,比如:一些电商系统中,当用户下单后,需要在规定的时间内对订单发起支付,如果在规定的时间内没有支付订单,那么该订单将自动取消。这个问题的常规解决方案有两个:

  1. 使用定时任务定时去扫描表,修改过期订单的状态。这种方案当然存在许多局限性,当订单数量不多,而且对系统性能要求不高的情况下可以考虑使用。这种方案也不够优雅;
  2. 使用基于消息队列的延时消息队列,这种方案可以对整个消息队列设置一个消息过期时间,也可以给每一个消息设置一个过期时间,这个时候消息的过期时间取决于设置的最小时间;

延时消息队列我们可以采用上面所说的消息队列框架去实现,也可以采用比较简单的基于Redis的方式去实现,众所周知Redis并不是一个消息队列框架,但是Redis在某些应用场景下可以采用其高级特性为我们提供消息队列的特性。


Rdis在常规的应用场景下,我们使用它作为高速缓存框架,搜索百度百科我们可以发现,对Redis的定义如下:

Redis是一个key-value存储系统。和Memcached类似,它支持存储的value类型相对更多,包括string(字符串)、list(链表)、set(集合)、zset(sorted set --有序集合)和hash(哈希类型)。这些数据类型都支持push/pop、add/remove及取交集并集和差集及更丰富的操作,而且这些操作都是原子性的。在此基础上,redis支持各种不同方式的排序。与memcached一样,为了保证效率,数据都是缓存在内存中。区别的是redis会周期性的把更新的数据写入磁盘或者把修改操作写入追加的记录文件,并且在此基础上实现了master-slave(主从)同步。

Redis的最基本用法是我们使用它的Key-Value特性,存储我们的热点数据或者高频访问数据,来达到提高整个应用系统的吞吐量。

就像上面对Redis的定义,zset是对有序集合的操作,我们可以利用这一特性来实现我们的延时消息队列功能。大致实现的原理如下:

Zset本质就是Set结构上加了个排序的功能,除了添加数据value之外,还提供另一属性score,这一属性在添加修改元素时候可以指定,每次指定后,Zset会自动重新按新的值调整顺序。可以理解为有两列字段的数据表,一列存value,一列存顺序编号。操作中key理解为zset的名字,那么对延时队列又有何用呢?试想如果score代表的是想要执行时间的时间戳,在某个时间将它插入Zset集合中,它便会按照时间戳大小进行排序,也就是对执行时间前后进行排序,这样的话,起一个死循环线程不断地进行取第一个key值,如果当前时间戳大于等于该key值的socre就将它取出来进行消费删除,就可以达到延时执行的目的, 注意不需要遍历整个Zset集合,以免造成性能浪费。



有了实现原理,下面我们通过一个简单的例子来演示下具体的操作过程,可能对基于Redis实现延时消息队列这一主题有更好的理解,演示的源代码链接在文章末尾附上。

新建工程:delay-message-queue-redis

pom.xml



    4.0.0
    
        com.delay.message.queue
        delay-message-queue
        0.0.1
    
    com.delay.message.queue
    delay-message-queue-redis
    0.0.1
    delay-message-queue-redis
    Demo project for Spring Boot

    
        1.8
    

    
        
            org.springframework.boot
            spring-boot-starter-data-redis
        
        
            org.springframework.boot
            spring-boot-starter-web
        

        
            org.springframework.boot
            spring-boot-devtools
            runtime
            true
        
        
            org.springframework.boot
            spring-boot-configuration-processor
            true
        
        
            org.projectlombok
            lombok
            true
        
        
            org.springframework.boot
            spring-boot-starter-test
            test
        
    

    
        
            
                org.springframework.boot
                spring-boot-maven-plugin
            
        
    


操作Redis我们使用RedisTemplate,简单的配置如下:

@Configuration
public class RedisConfig {
    @Bean
    public RedisConnectionFactory redisConnectionFactory() {
        return new LettuceConnectionFactory(new RedisStandaloneConfiguration("10.0.0.50", 6379));
    }

    @Bean
    public RedisTemplate redisTemplate() {
        RedisTemplate redisTemplate = new RedisTemplate();
        redisTemplate.setConnectionFactory(redisConnectionFactory());
        return redisTemplate;
    }
}

定义我们的消息生产者:

@Slf4j
@Service
public class ProducerService {
    private static final String QUEUE_NAME = "delay_order_queue";
    @Autowired
    private RedisTemplate redisTemplate;

    public void produce(String orderId, long expiredTime) {
        redisTemplate.opsForZSet().add(QUEUE_NAME, orderId, expiredTime);
        //log.info("order id:{} set success", orderId);
        long length = redisTemplate.opsForZSet().size(QUEUE_NAME);
        //log.info("[produce]{} length:{}", QUEUE_NAME, length);
    }
}

消息生产者主要完成的功能:以订单的过期时间为元素(value)的得分,将数据添加到队列中去;

定义我们的消息消费者:

@Slf4j
@Service
public class ConsumerService {
    private static final String QUEUE_NAME = "delay_order_queue";
    @Autowired
    private RedisTemplate redisTemplate;

    public void consume() {
        while (true) {
            Set set = redisTemplate.opsForZSet().rangeByScore(QUEUE_NAME, 0, System.currentTimeMillis(), 0, 1);
            if (set == null || set.isEmpty()) {
                try {
                    //log.info("no data will sleep");
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    log.error("InterruptedException", e);
                }
                continue;
            }
            String orderId = set.iterator().next();
            if (redisTemplate.opsForZSet().remove(QUEUE_NAME, orderId) > 0) {
                log.info("order id:{} handle success", orderId);
                long length = redisTemplate.opsForZSet().size(QUEUE_NAME);
                log.info("[consume]{} length:{}", QUEUE_NAME, length);
            }
        }
    }
}

消息消费者的主要功能如下:

  • 循环从Redis的Zset中拿取一个0<=score<=当前时间的元素(value);
  • 如果没有取到值,则整个线程休眠一秒钟;
  • 如果取到值,则从该队列(key)删除取到的元素(value),删除的目的是防止一个数据被重复的消费;然后对取到的值进行后续的数据处理,这里是将数据打印出来。

整个生产者和消费者我们已经实现了,下面我们通过Junit来生产消息和消费消息;

生产消息的过程:

@Slf4j
@SpringBootTest
@RunWith(SpringJUnit4ClassRunner.class)
public class ProducerServiceTest {
    @Autowired
    private ProducerService producerService;

    @Test
    public void produce() {
        Random random = new Random(1);
        for (int i = 0; i < 10; i++) {
            Calendar calendar = Calendar.getInstance();
            // 产生一个随机数
            int time = random.nextInt(100);
            calendar.add(Calendar.SECOND, time);
            String orderId = "order-id-" + i;
            long expired = calendar.getTimeInMillis();
            log.info("order id:{}, expired:{}", orderId, time);
            producerService.produce(orderId, expired);
            try {
                //log.info("no data will sleep");
                Thread.sleep(500);
            } catch (InterruptedException e) {
                log.error("InterruptedException", e);
            }
        }
    }
}

为了方便我们观察,我们采用在当前时间的基础上加上一个随机的时间作为订单的过期时间,每产生一个订单线程休眠0.5秒,总共产生10个订单。

消息的消费过程:

@SpringBootTest
@RunWith(SpringJUnit4ClassRunner.class)
public class ConsumerServiceTest {
    @Autowired
    private ConsumerService consumerService;

    @Test
    public void consume() {
        consumerService.consume();
    }
}

该过程比较简单,它主要是启动我们的循环函数从Redis中取数据。

先启动我们的消息消费过程ConsumerServiceTest,然后再启动我们的消息生产过程ProducerServiceTest,观察日志的打印输出。

消息生产过程:


消息生产过程

消息消费过程:


消息消费过程

仔细对比我们可以很容易发现:

  1. order-id-5的过期时间最短,它也是最先被消费掉的,其次是order-id-7;
  2. 当生产消息的过程完成后共产生了10个消息,消息消费过程中,每消费一个消息,又没有产生新的消息的时候,整个消息队列的长度在变小;


通过上面的演示,我们实现了基于Redis实现简单的延时消息队列,最主要我们使用了Redis的Zset特性来完成延时消息队列的功能。

上面的演示在高并发场景下,可能会存在问题:

  • 高并发场景下对Redis的操作不够原子性;
  • 不适合分布式应用场景下使用;
  • 适合单体应用中延时消息队列的使用;

基于上面所提到的问题,最好的解决方案是采用消息队列框架去实现,例如:RabbitMQ给延时队列统一设置消息过期时间,过期的消息将被路由到另一个队列中;也可以给每一个消息设置一个过期时间,过期的消息同样路由到另一个队列中。


源代码GitHub地址:

https://github.com/bq-xiao/delay-message-queue


不积跬步,无以至千里;不积小流,无以成江海!

相关推荐

得物可观测平台架构升级:基于GreptimeDB的全新监控体系实践

一、摘要在前端可观测分析场景中,需要实时观测并处理多地、多环境的运行情况,以保障Web应用和移动端的可用性与性能。传统方案往往依赖代理Agent→消息队列→流计算引擎→OLAP存储...

warm-flow新春版:网关直连和流程图重构

本期主要解决了网关直连和流程图重构,可以自此之后可支持各种复杂的网关混合、多网关直连使用。-新增Ruoyi-Vue-Plus优秀开源集成案例更新日志[feat]导入、导出和保存等新增json格式支持...

扣子空间体验报告

在数字化时代,智能工具的应用正不断拓展到我们工作和生活的各个角落。从任务规划到项目执行,再到任务管理,作者深入探讨了这款工具在不同场景下的表现和潜力。通过具体的应用实例,文章展示了扣子空间如何帮助用户...

spider-flow:开源的可视化方式定义爬虫方案

spider-flow简介spider-flow是一个爬虫平台,以可视化推拽方式定义爬取流程,无需代码即可实现一个爬虫服务。spider-flow特性支持css选择器、正则提取支持JSON/XML格式...

solon-flow 你好世界!

solon-flow是一个基础级的流处理引擎(可用于业务规则、决策处理、计算编排、流程审批等......)。提供有“开放式”驱动定制支持,像jdbc有mysql或pgsql等驱动,可...

新一代开源爬虫平台:SpiderFlow

SpiderFlow:新一代爬虫平台,以图形化方式定义爬虫流程,不写代码即可完成爬虫。-精选真开源,释放新价值。概览Spider-Flow是一个开源的、面向所有用户的Web端爬虫构建平台,它使用Ja...

通过 SQL 训练机器学习模型的引擎

关注薪资待遇的同学应该知道,机器学习相关的岗位工资普遍偏高啊。同时随着各种通用机器学习框架的出现,机器学习的门槛也在逐渐降低,训练一个简单的机器学习模型变得不那么难。但是不得不承认对于一些数据相关的工...

鼠须管输入法rime for Mac

鼠须管输入法forMac是一款十分新颖的跨平台输入法软件,全名是中州韵输入法引擎,鼠须管输入法mac版不仅仅是一个输入法,而是一个输入法算法框架。Rime的基础架构十分精良,一套算法支持了拼音、...

Go语言 1.20 版本正式发布:新版详细介绍

Go1.20简介最新的Go版本1.20在Go1.19发布六个月后发布。它的大部分更改都在工具链、运行时和库的实现中。一如既往,该版本保持了Go1的兼容性承诺。我们期望几乎所...

iOS 10平台SpriteKit新特性之Tile Maps(上)

简介苹果公司在WWDC2016大会上向人们展示了一大批新的好东西。其中之一就是SpriteKitTileEditor。这款工具易于上手,而且看起来速度特别快。在本教程中,你将了解关于TileE...

程序员简历例句—范例Java、Python、C++模板

个人简介通用简介:有良好的代码风格,通过添加注释提高代码可读性,注重代码质量,研读过XXX,XXX等多个开源项目源码从而学习增强代码的健壮性与扩展性。具备良好的代码编程习惯及文档编写能力,参与多个高...

Telerik UI for iOS Q3 2015正式发布

近日,TelerikUIforiOS正式发布了Q32015。新版本新增对XCode7、Swift2.0和iOS9的支持,同时还新增了对数轴、不连续的日期时间轴等;改进TKDataPoin...

ios使用ijkplayer+nginx进行视频直播

上两节,我们讲到使用nginx和ngixn的rtmp模块搭建直播的服务器,接着我们讲解了在Android使用ijkplayer来作为我们的视频直播播放器,整个过程中,需要注意的就是ijlplayer编...

IOS技术分享|iOS快速生成开发文档(一)

前言对于开发人员而言,文档的作用不言而喻。文档不仅可以提高软件开发效率,还能便于以后的软件开发、使用和维护。本文主要讲述Objective-C快速生成开发文档工具appledoc。简介apple...

macOS下配置VS Code C++开发环境

本文介绍在苹果macOS操作系统下,配置VisualStudioCode的C/C++开发环境的过程,本环境使用Clang/LLVM编译器和调试器。一、前置条件本文默认前置条件是,您的开发设备已...