百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 热门文章 > 正文

Python集成ActiveMQ,异步发送处理消息,详细代码手把手操作

bigegpt 2024-10-21 03:48 2 浏览

目录

1, JMS消息协议规范

2, Python集成ActiveMQ

3, 封装服务mq_service.py

4, 接收处理消息mq_listener.py

5, 启动消息监听服务mq.py

6, 单元测试test_mq_serivce.py

7, 发送消息功能调用

8, 常见问题和解决方法


ActiveMQ是一个非常流行的消息队列服务中间件,实现JMS规范,基于STOMP协议(端口为61613)支持Python访问。


STOMP:Simple(or Streaming) Text Orientated Messaging Protocol,简单(流)文本定向消息协议

JMS:Java Message Service


一, JMS消息协议规范

JMS规范定义了2类消息发送接收模型:点对点queue,发布订阅topic,区别是能够重复消费和是否保存。


1, 点对点queue:不可重复消费,消息被消费前一直保存。

生产者发送消息到queue,一个消费者取出并消费消息。

消息被消费后,queue中不再保存,所以只有一个消费者能够取到消息。

queue支持多个消费者存在,但是一个消息只有一个消费者可以消费。

当前没有消费者时,消息一直保存,直到被消费者消费。


2, 发布订阅topic:可重复消费,发布给所有订阅者。

生产者发布消息到topic中,多个订阅者收到并消费消息。

和queue不同,发布到topic中的消息会被所有订阅者消费。

当生产者发布消息时,不管是否有订阅者,都不保存消息。


JMS规范定义的2类消息传输模型queue和topic比较:



Python集成ActiveMQ使用stomp.py,只需简单配置,本文在Django框架下进一步封装服务mq_service.py。典型系统架构示意图和消息队列:


时序图如下:


示例代码:hello_activemq

├── settings.py

├── mq

│ └── mq_service.py

│ └── mq_listener.py

├── tests

│ └── test_mq_service.py

├── management

│ └── commands

│ └── mq.py


二, Python集成ActiveMQ



1. 新建Django项目

运行命令:django-admin startproject hello_activemq


2. 进到目录hello_activemq,增加应用

运行命令:python manage.py startapp app


项目文件结构如下:


3. 安装stomp.py

pip install stomp.py >= 5.0.1


三, 封装服务mq_service.py,调用ActiveMQ发送消息

1. 增加mq_service.py



2. 打开settings.py,配置ActiveMQ信息



3. 为了增加代码的兼容和容错能力,封装get_conn(), close_conn()等辅助函数,详见代码文件mq_service.py。


四, 接收处理消息mq_listener.py

1. 增加mq_listener.py,声明消息处理类,继承stomp.ConnectionListener:



2. 在on_message()函数中,将消息字符串解析为JSON,方便业务处理。

3. 声明on_error()函数处理错误信息。


五, 启动消息监听服务mq.py

1. 将循环接收消息代码封装成函数consume_msg(),增加在服务中mq_serivce.py:



2. 调用set_listener()设置消息接收类实例,使用之前创建的MqListener

3. 调用subscribe()订阅消息,启动循环监听。

4. 将启动服务代码封装成command,在目录management/commands中增加mq.py



5. 运行命令python manage.py mq,看到消息提示,启动监听服务成功。


六, 单元测试test_mq_service.py

增加测试函数,发送消息。



运行python manage.py test,同时看到监听服务收到并处理消息。


七, 发送消息功能调用

1. 在views.py中发送消息,调用mq_servcie.py



2. 在urls.py中配置路由



3. 运行命令启动服务

python manage.py runserver 0.0.0.0:8001

4. REST接口发送消息


八, 常见问题和解决方法

1,启动服务错误:[transport.py: 787, attempt_connection] Could not connect to host 127.0.0.1, port 61613

解决:检查ActiveMQ是否正常启动,特别注意是否开启STOMP协议端口61613

原因:Python连接ActiveMQ使用STOMP协议,端口默认61613


2,发送消息时错误:TypeError: message should be a string or bytes, found <class 'dict'>

解决:将消息内容序列化为JSON,发送时调用json.dumps(),接收时调用json.loads()

原因:Python连接ActiveMQ使用的是STOMP协议,消息格式为简单文本。


注:JMS规范定义的5类消息:

字符串TextMessage,

键值对MapMessage,

序列化对象ObjectMessage

字节流BytesMessage

数据流StreamMessage

ActiveMQ支持5类JMS消息,增加了二进制大文件消息BlobMessage:


3,跨系统对接时接收到的消息类型不是TextMessage

Python开发的业务处理服务 -> Java开发的API服务,接收到的消息类型为BytesMessage,Python发送时设置conn.send('xx', msg_str, content_type="text/plain")仍然接收不到期望的类型TextMessage


解决:stomp建立连接时配置参数

conn = stomp.Connection10([("localhost", 61613)], auto_content_length=False)

原因:Python连接ActiveMQ使用STOMP协议,消息格式为简单文本,不携带类型信息,只通过header中的content-length来判断TextMessage和BytesMessage,所以发送消息时不在header中添加content-length就可以了。


#学浪斗划# #学浪计划-教育创作者扶持计划# #Python# #Python基础# #Python入门推荐#

相关推荐

Java 泛型大揭秘:类型参数、通配符与最佳实践

引言在编程世界中,代码的可重用性和可维护性是至关重要的。为了实现这些目标,Java5引入了一种名为泛型(Generics)的强大功能。本文将详细介绍Java泛型的概念、优势和局限性,以及如何在...

K8s 的标签与选择器:流畅运维的秘诀

在Kubernetes的世界里,**标签(Label)和选择器(Selector)**并不是最炫酷的技术,但却是贯穿整个集群管理与运维流程的核心机制。正是它们让复杂的资源调度、查询、自动化运维变得...

哈希Hash算法:原理、应用(哈希算法 知乎)

原作者:Linux教程,原文地址:「链接」什么是哈希算法?哈希算法(HashAlgorithm),又称为散列算法或杂凑算法,是一种将任意长度的数据输入转换为固定长度输出值的数学函数。其输出结果通常被...

C#学习:基于LLM的简历评估程序(c# 简历)

前言在pocketflow的例子中看到了一个基于LLM的简历评估程序的例子,感觉还挺好玩的,为了练习一下C#,我最近使用C#重写了一个。准备不同的简历:image-20250528183949844查...

55顺位,砍41+14+3!季后赛也成得分王,难道他也是一名球星?

雷霆队最不可思议的新星:一个55号秀的疯狂逆袭!你是不是也觉得NBA最底层的55号秀,就只能当饮水机管理员?今年的55号秀阿龙·威金斯恐怕要打破你的认知了!常规赛阶段,这位二轮秀就像开了窍的天才,直接...

5分钟读懂C#字典对象(c# 字典获取值)

什么是字典对象在C#中,使用Dictionary类来管理由键值对组成的集合,这类集合被称为字典。字典最大的特点就是能够根据键来快速查找集合中的值,其键的定义不能重复,具有唯一性,相当于数组索引值,字典...

c#窗体传值(c# 跨窗体传递数据)

在WinForm编程中我们经常需要进行俩个窗体间的传值。下面我给出了两种方法,来实现传值一、在输入数据的界面中定义一个属性,供接受数据的窗体使用1、子窗体usingSystem;usingSyst...

C#入门篇章—委托(c#委托的理解)

C#委托1.委托的定义和使用委托的作用:如果要把方法作为函数来进行传递的话,就要用到委托。委托是一个类型,这个类型可以赋值一个方法的引用。C#的委托通过delegate关键字来声明。声明委托的...

C#.NET in、out、ref详解(c#.net framework)

简介在C#中,in、ref和out是用于修改方法参数传递方式的关键字,它们决定了参数是按值传递还是按引用传递,以及参数是否必须在传递前初始化。基本语义对比修饰符传递方式可读写性必须初始化调用...

C#广义表(广义表headtail)

在C#中,广义表(GeneralizedList)是一种特殊的数据结构,它是线性表的推广。广义表可以包含单个元素(称为原子),也可以包含另一个广义表(称为子表)。以下是一个简单的C#广义表示例代...

「C#.NET 拾遗补漏」04:你必须知道的反射

阅读本文大概需要3分钟。通常,反射用于动态获取对象的类型、属性和方法等信息。今天带你玩转反射,来汇总一下反射的各种常见操作,捡漏看看有没有你不知道的。获取类型的成员Type类的GetMembe...

C#启动外部程序的问题(c#怎么启动)

IT&OT的深度融合是智能制造的基石。本公众号将聚焦于PLC编程与上位机开发。除理论知识外,也会结合我们团队在开发过程中遇到的具体问题介绍一些项目经验。在使用C#开发上位机时,有时会需要启动外部的一些...

全网最狠C#面试拷问:这20道题没答出来,别说你懂.NET!

在竞争激烈的C#开发岗位求职过程中,面试是必经的一道关卡。而一场高质量的面试,不仅能筛选出真正掌握C#和.NET技术精髓的人才,也能让求职者对自身技术水平有更清晰的认知。今天,就为大家精心准备了20道...

C#匿名方法(c#匿名方法与匿名类)

C#中的匿名方法是一种没有名称只有主体的方法,它提供了一种传递代码块作为委托参数的技术。以下是关于C#匿名方法的一些重要特点和用法:特点省略参数列表:使用匿名方法可省略参数列表,这意味着匿名方法...

C# Windows窗体(.Net Framework)知识总结

Windows窗体可大致分为Form窗体和MDI窗体,Form窗体没什么好细说的,知识点总结都在思维导图里面了,下文将围绕MDI窗体来讲述。MDI(MultipleDocumentInterfac...