一文搞懂消息推送技术选型
bigegpt 2024-11-21 10:41 3 浏览
- Ajax短轮询
- MQ
- redis 订阅/发布
Ajax短轮询
优点:
简单高效、浏览器使用循环不断地、间隔地发送请求获取数据
缺点:
频繁创建/断开连接,每次请求都会查询一遍数据不管有无都返回,对服务器业务处理的性能有很大的需求和压力;因为请求间有间隔时间,获取的数据是伪实时的,不适应对实时性要求很高的项目。
典型运用:
扫码登录
MQ
MQ的引入虽然 会造成技术的复杂度提升,但是合理的使用会极大的提高系统的 容错能力。
- 优点:
- 一般MQ都用于 系统解耦、流量削峰、数据分发
- 缺点:
如果MQ服务挂了,导致消息发送和接收就无法使用了
复杂度提高。
MQ的对比
特性 | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
单机吞吐量 | 万级,吞吐量比RocketMQ和Kafka要低了一个数量级 | 万级,吞吐量比RocketMQ和Kafka要低了一个数量级 | 10万级,RocketMQ也是可以支撑高吞吐的一种MQ | 10万级别,这是kafka最大的优点,就是吞吐量高。 一般配合大数据类的系统来进行实时数据计算、日志采集等场景 |
topic数量对吞吐量的影响 | topic可以达到几百,几千个的级别,吞吐量会有较小幅度的下降 这是RocketMQ的一大优势,在同等机器下,可以支撑大量的topic | topic从几十个到几百个的时候,吞吐量会大幅度下降 所以在同等机器下,kafka尽量保证topic数量不要过多。如果要支撑大规模topic,需要增加更多的机器资源 | ||
时效性 | ms级 | 微秒级,这是rabbitmq的一大特点,延迟是最低的 | ms级 | 延迟在ms级以内 |
可用性 | 高,基于主从架构实现高可用性 | 高,基于主从架构实现高可用性 | 非常高,分布式架构 | 非常高,kafka是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用 |
消息可靠性 | 有较低的概率丢失数据 | 有较低的概率丢失数据 | 经过参数优化配置,可以做到0丢失 | 经过参数优化配置,消息可以做到0丢失 |
rabbitmq 基于Elang语言编写 虽然提供了天然的高并发能力,但是 不利于 深入了解与掌握。
MQ的引入 需要保证两点:可靠性、高可用、幂等性。
mq想如果需要保证可靠性、在某些 对于实时性要求较高的 业务中,那么需要对消息进行持久化、以及保证消息的不丢失。
可靠性
结合三点就是生产者丢失消息、mq自身丢失消息、消费者丢失消息
MQ 若要保障消息的不丢失,对于rabbitmq来讲,常用的有两种方式:
1、开启事务
// 开启事务
channel.txSelect
try {
// 这里发送消息
} catch (Exception e) {
channel.txRollback
// 这里再次重发这条消息
}
// 提交事务
channel.txCommit
但是 对于rabbitmq 来说 开启事务 造成性能上的 浪费是很大的
消息数量 | 开启事务 | 未开启事务 |
10w | 320796ms | 10246ms |
开启事务 与不开启事务 对于 性能上的开销是 320倍。因为 其被 @Transaction注解 标注过,对于每条消息都会被事务拦截器拦截处理。
2、ACK机制
关闭自动ACK,使用手动ACK。RabbitMQ中有一个ACK机制,默认情况下消费者接收到到消息,RabbitMQ会自动提交ACK,之后这条消息就不会再发送给消费者了。我们可以更改为手动ACK模式,每次处理完消息之后,再手动ack一下。不过这样可能会出现刚处理完还没手动ack确认,消费者挂了,导致消息重复消费。
spring:
rabbitmq:
addresses: 127.0.0.1
port: 5672
username: guest
password: guest
# 发送者开启 confirm 确认机制
publisher-confirm-type: correlated
# 发送者开启 return 确认机制
publisher-returns: true
listener:
simple:
concurrency: 10
max-concurrency: 10
prefetch: 1
auto-startup: true
default-requeue-rejected: true
# 设置消费端手动 ack
acknowledge-mode: manual
# 是否支持重试
retry:
enabled: true
@RabbitHandler
public void handlerMq(String msg, Channel channel, Message message) throws IOException {
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
if (message.getMessageProperties().getRedelivered()) {
log.error("消息已重复处理失败,拒绝再次接收...", e);
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息
} else {
log.error("消息即将再次返回队列处理...", e);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
rabbitq提供了两个回调方法
confirm 与return 回调
confirm 是用于生产者发送消息,保证交换机exchange能正常收到,但是无法保证 从exchange的消息 正常发送给队列去消费。
return回调是处理一些 不可正确路由的消息,如exchange 不存在,或者就是路由key 无法正确找到队列。
这两种机制 是可靠性的 重要保障,可以保证消息正常的在mq中传递。
@Component
@Slf4j
public class RabbitMQConfirmAndReturn implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
/**
* confirm机制只保证消息到达exchange,不保证消息可以路由到正确的queue,如果exchange错误,就会触发confirm机制
*
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (!ack) {
log.error("rabbitmq confirm fail,cause:{}", cause);
}
}
/**
* Return 消息机制用于处理一个不可路由的消息。在某些情况下,如果我们在发送消息的时候,当前的 exchange 不存在或者指定路由 key 路由不到,这个时候我们需要监听这种不可达的消息
* @param message
* @param replyCode
* @param replyText
* @param exchange
* @param routingKey
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.error("mq消息不可达,message:{},replyCode:{},replyText:{},exchange:{},routing:{}", message.toString(), replyCode, replyText, exchange, routingKey);
String messageId = message.getMessageProperties().getMessageId();
}
}
MQ的幂等性搭建:
ACK机制能保证消息一定能被消费但是无法保证消息被消息了几次,这就需要额外编码来保证幂等性,而rabbitmq没有提供额外的幂等操作需要额外代码保证。
MQ的高可用
rabbitmq的消息是储存在一个节点中,让mq的节点崩溃后 其存储的消息就会丢失,会造成服务的不可用,如果使用缓存使用一个持久化的queue,但是在message发送并写入磁盘之间会存在一个虽然短暂的时间差。
为了避免节点失效,将mq节点进行集群处理,当一个节点失效后 就有第二个节点接替前一个节点工作。单失效的那个节点上的消息无法被找回。
镜像队列的配置:
rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]
-p Vhost: 可选参数,针对指定vhost下的queue进行设置
Name: policy的名称
Pattern: queue的匹配模式(正则表达式)
Definition:镜像定义,包括三个部分ha-mode, ha-params, ha-sync-mode
ha-mode:指n明镜像队列的模式,有效值为 all/exactly/nodes
all:表示在集群中所有的节点上进行镜像
exactly:表示在指定个数的节点上进行镜像,节点的个数由ha-params指定
nodes:表示在指定的节点上进行镜像,节点名称通过ha-params指定
ha-params:ha-mode模式需要用到的参数
ha-sync-mode:进行队列中消息的同步方式,有效值为automatic和manual
priority:可选参数,policy的优先级
rabbitmqctl set_policy --priority 0 --apply-to queues mirror_queue "^queue_" '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
可以通过下面命令判断那些slaves已经完成同步
rabbitmqctl list_queues name slave_pids synchronised_slave_pids
镜像队列的原理:
redis 订阅/通知
优缺点:
redis的 订阅通知 与rabbitmq相比,其优势体现在不想要搭建复杂笨重的 MQ ,简单轻量。但是由于redis没有类似于mq的消息持久化与ACK的保证,所以redis实现的发布/订阅功能并不可靠,仅适用于实时、且可靠性不高的场景(因为redis的订阅/发布目前是发送即忘的形式,如果客户端短线即会丢失)。如一些列消息的弹窗通知、有效期等等。
实现方式之一:
redis的键空间通知
配置:
- 首先找到redis.conf配置文件,打开文件,查找notify-keyspace-events,将前面的#去掉便可。注意:这里配置的是notify-keyspace-events的Ex参数,即说明,当键过时的时候会触发通知,若是只须要哈希命令键触发通知则能够设置为notify-keyspace-events Eh。
- 重启redis-server。
- 配置完成。
redis:
localhost: localhost
port: 6379
database: 7
password:
# 过期事件订阅,接收7号数据库中所有key的过期事件
listen-pattern: __keyevent@7__:expired
@Configuration
public class RedisListenerConfiguration {
@Value("${spring.redis.listen-pattern}")
public String pattern;
@Bean
public RedisMessageListenerContainer listenerContainer(RedisConnectionFactory redisConnection) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(redisConnection);
/**
* Topic是消息发布(Pub)者和订阅(Sub)者之间的传输中介
*/
Topic topic = new PatternTopic(this.pattern);
container.addMessageListener(new RedisMessageListener(), topic);
return container;
}
}
监听:
public class RedisMessageListener implements MessageListener {
/**
* Redis 事件监听回调
* @param message
* @param pattern
*/
@Override
public void onMessage(Message message, byte[] pattern) {
}
}
当向redis订阅一个 过期时间的时候,当key过期的时候 redis会发送一个通知高速服务器,key事件已过期,然后 服务器可以执行自己的相关逻辑,可以在key过期的时候 执行一系列操作
4】三种通信方式的优缺点
短轮询 | 长轮询 | WebSocket | |
浏览器支持 | 几乎所有现代浏览器 | 几乎所有现代浏览器 | IE 10+ Edge Firefox 4+ Chrome 4+ Safari 5+ Opera 11.5+ |
服务器负载 | 较少的CPU资源,较多的内存资源和带宽资源 | 与传统轮询相似,但是占用带宽较少 | 无需循环等待(长轮询),CPU和内存资源不以客户端数量衡量,而是以客户端事件数衡量。三种方式里性能最佳。 |
客户端负载 | 占用较多的内存资源与请求数。 | 与传统轮询相似。 | 同Server-Sent Event。 |
延迟 | 非实时,延迟取决于请求间隔。 | 同传统轮询。 | 实时。 |
实现复杂度 | 非常简单。 | 需要服务器配合,客户端实现非常简单。 | 需要Socket程序实现和额外端口,客户端实现简单。 |
技术方案的选型:
关于业务场景,如果并发量不大,请求频率不高的情况下 选用轮询难度实现上小很多,而且容错率更高。如果在频繁请求资源,一次请求无法返回所有数据的情况下 适合使用websocket。具体需要看业务场景决定。
MQ的典型应用:
哔哩哔哩的弹幕技术架构
- Kafka(第三方服务)
消息队列系统。Kafka 是一个分布式的基于发布/订阅的消息系统,它是支持水平扩展的。每条发布到 Kafka 集群的消息都会打上一个名为 Topic(逻辑上可以被认为是一个 queue)的类别,起到消息分布式分发的作用。 - Router
存储消息。Comet 将信息传送给 Logic 之后,Logic 会对所收到的信息进行存储,采用 register session 的方式在 Router 上进行存储。Router 里面会收录用户的注册信息,这样就可以知道用户是与哪个机器建立的连接。 - Logic
对消息进行逻辑处理。用户建立连接之后会将消息转发给 Logic ,在 Logic 上可以进行账号验证。当然,类似于 IP 过滤以及黑名单设置此类的操作也可以经由 Logic 进行。 - Comet
维护客户端长链接。在上面可以规定一些业务需求,比如可以规定用户传送的信息的内容、输送用户信息等。Comet 提供并维持服务端与客户端之间的链接,这里保证链接可用性的方法主要是发送链接协议(如 Socket 等)。 - Client
客户端。与 Comet 建立链接。 - Jop
消息分发。可以起多个 Jop 模块放到不同的机器上进行覆盖,将消息收录之后,分发到所有的 Comet 上,之后再由 Comet 转发出去。
MQ的实现延迟的方式
rabbitmq:
rabbitmq并没有提供 原生的 延迟队列的实现方式,如果要实现延迟的效果可以使用 死信队列的方式
“死信”是RabbitMQ中的一种消息机制,死信是当MQ出现以下情况的时候:
- 消息被否定确认,使用 channel.basicNack 或 channel.basicReject ,并且此时requeue 属性被设置为false。
- 消息在队列的存活时间超过设置的TTL时间。
- 消息队列的消息数量已经超过最大队列长度
如何配置死信队列
- 配置业务队列,绑定到业务交换机上
- 为业务队列配置死信交换机和路由key
- 为死信交换机配置死信队列
@Configuration
public class RabbitMQConfig {
public static final String BUSINESS_EXCHANGE_NAME = "dead.letter.demo.simple.business.exchange";
public static final String BUSINESS_QUEUEA_NAME = "dead.letter.demo.simple.business.queuea";
public static final String BUSINESS_QUEUEB_NAME = "dead.letter.demo.simple.business.queueb";
public static final String DEAD_LETTER_EXCHANGE = "dead.letter.demo.simple.deadletter.exchange";
public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "dead.letter.demo.simple.deadletter.queuea.routingkey";
public static final String DEAD_LETTER_QUEUEB_ROUTING_KEY = "dead.letter.demo.simple.deadletter.queueb.routingkey";
public static final String DEAD_LETTER_QUEUEA_NAME = "dead.letter.demo.simple.deadletter.queuea";
public static final String DEAD_LETTER_QUEUEB_NAME = "dead.letter.demo.simple.deadletter.queueb";
// 声明业务Exchange
@Bean("businessExchange")
public FanoutExchange businessExchange(){
return new FanoutExchange(BUSINESS_EXCHANGE_NAME);
}
// 声明死信Exchange
@Bean("deadLetterExchange")
public DirectExchange deadLetterExchange(){
return new DirectExchange(DEAD_LETTER_EXCHANGE);
}
// 声明业务队列A
@Bean("businessQueueA")
public Queue businessQueueA(){
Map<String, Object> args = new HashMap<>(2);
// x-dead-letter-exchange 这里声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
// x-dead-letter-routing-key 这里声明当前队列的死信路由key
args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY);
return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).withArguments(args).build();
}
// 声明死信队列A
@Bean("deadLetterQueueA")
public Queue deadLetterQueueA(){
return new Queue(DEAD_LETTER_QUEUEA_NAME);
}
// 声明业务队列A绑定关系
@Bean
public Binding businessBindingA(@Qualifier("businessQueueA") Queue queue,
@Qualifier("businessExchange") FanoutExchange exchange){
return BindingBuilder.bind(queue).to(exchange);
}
// 声明死信队列A绑定关系
@Bean
public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue,
@Qualifier("deadLetterExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY);
}
最后总结:选用哪种推送技术需要根据具体的业务场景,一句话一切脱离业务的设计都是耍流氓,不能杀鸡用牛刀,也不能不考虑未来。
相关推荐
- 悠悠万事,吃饭为大(悠悠万事吃饭为大,什么意思)
-
新媒体编辑:杜岷赵蕾初审:程秀娟审核:汤小俊审签:周星...
- 高铁扒门事件升级版!婚宴上‘冲喜’老人团:我们抢的是社会资源
-
凌晨两点改方案时,突然收到婚庆团队发来的视频——胶东某酒店宴会厅,三个穿大红棉袄的中年妇女跟敢死队似的往前冲,眼瞅着就要扑到新娘的高额钻石项链上。要不是门口小伙及时阻拦,这婚礼造型团队熬了三个月的方案...
- 微服务架构实战:商家管理后台与sso设计,SSO客户端设计
-
SSO客户端设计下面通过模块merchant-security对SSO客户端安全认证部分的实现进行封装,以便各个接入SSO的客户端应用进行引用。安全认证的项目管理配置SSO客户端安全认证的项目管理使...
- 还在为 Spring Boot 配置类加载机制困惑?一文为你彻底解惑
-
在当今微服务架构盛行、项目复杂度不断攀升的开发环境下,SpringBoot作为Java后端开发的主流框架,无疑是我们手中的得力武器。然而,当我们在享受其自动配置带来的便捷时,是否曾被配置类加载...
- Seata源码—6.Seata AT模式的数据源代理二
-
大纲1.Seata的Resource资源接口源码2.Seata数据源连接池代理的实现源码3.Client向Server发起注册RM的源码4.Client向Server注册RM时的交互源码5.数据源连接...
- 30分钟了解K8S(30分钟了解微积分)
-
微服务演进方向o面向分布式设计(Distribution):容器、微服务、API驱动的开发;o面向配置设计(Configuration):一个镜像,多个环境配置;o面向韧性设计(Resista...
- SpringBoot条件化配置(@Conditional)全面解析与实战指南
-
一、条件化配置基础概念1.1什么是条件化配置条件化配置是Spring框架提供的一种基于特定条件来决定是否注册Bean或加载配置的机制。在SpringBoot中,这一机制通过@Conditional...
- 一招解决所有依赖冲突(克服依赖)
-
背景介绍最近遇到了这样一个问题,我们有一个jar包common-tool,作为基础工具包,被各个项目在引用。突然某一天发现日志很多报错。一看是NoSuchMethodError,意思是Dis...
- 你读过Mybatis的源码?说说它用到了几种设计模式
-
学习设计模式时,很多人都有类似的困扰——明明概念背得滚瓜烂熟,一到写代码就完全想不起来怎么用。就像学了一堆游泳技巧,却从没下过水实践,很难真正掌握。其实理解一个知识点,就像看立体模型,单角度观察总...
- golang对接阿里云私有Bucket上传图片、授权访问图片
-
1、为什么要设置私有bucket公共读写:互联网上任何用户都可以对该Bucket内的文件进行访问,并且向该Bucket写入数据。这有可能造成您数据的外泄以及费用激增,若被人恶意写入违法信息还可...
- spring中的资源的加载(spring加载原理)
-
最近在网上看到有人问@ContextConfiguration("classpath:/bean.xml")中除了classpath这种还有其他的写法么,看他的意思是想从本地文件...
- Android资源使用(android资源文件)
-
Android资源管理机制在Android的开发中,需要使用到各式各样的资源,这些资源往往是一些静态资源,比如位图,颜色,布局定义,用户界面使用到的字符串,动画等。这些资源统统放在项目的res/独立子...
- 如何深度理解mybatis?(如何深度理解康乐服务质量管理的5个维度)
-
深度自定义mybatis回顾mybatis的操作的核心步骤编写核心类SqlSessionFacotryBuild进行解析配置文件深度分析解析SqlSessionFacotryBuild干的核心工作编写...
- @Autowired与@Resource原理知识点详解
-
springIOCAOP的不多做赘述了,说下IOC:SpringIOC解决的是对象管理和对象依赖的问题,IOC容器可以理解为一个对象工厂,我们都把该对象交给工厂,工厂管理这些对象的创建以及依赖关系...
- java的redis连接工具篇(java redis client)
-
在Java里,有不少用于连接Redis的工具,下面为你介绍一些主流的工具及其特点:JedisJedis是Redis官方推荐的Java连接工具,它提供了全面的Redis命令支持,且...
- 一周热门
- 最近发表
- 标签列表
-
- mybatiscollection (79)
- mqtt服务器 (88)
- keyerror (78)
- c#map (65)
- resize函数 (64)
- xftp6 (83)
- bt搜索 (75)
- c#var (76)
- mybatis大于等于 (64)
- xcode-select (66)
- mysql授权 (74)
- 下载测试 (70)
- linuxlink (65)
- pythonwget (67)
- androidinclude (65)
- logstashinput (65)
- hadoop端口 (65)
- vue阻止冒泡 (67)
- oracle时间戳转换日期 (64)
- jquery跨域 (68)
- php写入文件 (73)
- kafkatools (66)
- mysql导出数据库 (66)
- jquery鼠标移入移出 (71)
- 取小数点后两位的函数 (73)