百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 热门文章 > 正文

如何使用RabbitMQ实现事件总线

bigegpt 2024-08-07 17:48 3 浏览

1 前置阅读

在阅读本文章之前,你可以先阅读:

  • RabbitMQ入门
  • 什么是观察者模式
  • 什么是事件总线

2 实现

首先,事件源与事件处理的映射字典。

private static Dictionary<string, List<object>> eventHandlers = new Dictionary<string, List<object>>();

然后,初始化RabbitMQ,创建到服务器的连接,创建一个通道等

public RabbitMQEventBus(IConnectionFactory connectionFactory,
    string exchangeName,
    string exchangeType = ExchangeType.Fanout,
    string queueName = null,
    bool autoAck = false)
{
    this.connectionFactory = connectionFactory;
    this.connection = this.connectionFactory.CreateConnection();
    this.channel = this.connection.CreateModel();
    this.exchangeType = exchangeType;
    this.exchangeName = exchangeName;
    this.autoAck = autoAck;

    this.channel.ExchangeDeclare(this.exchangeName, this.exchangeType);

    this.queueName = this.InitializeEventConsumer(queueName);
}

接着,实现订阅,往字典表中添加事件处理实例,并绑定队列

public void Subscribe<TEvent>(IEventHandler<TEvent> eventHandler) where TEvent : IEvent
{
    var eventTypeName = typeof(TEvent).FullName;
    if (eventHandlers.ContainsKey(eventTypeName))
    {
        var handlers = eventHandlers[eventTypeName];
        handlers.Add(eventHandler);
    }
    else
    {
        eventHandlers.Add(eventTypeName, new List<object> { eventHandler });
    }
    this.channel.QueueBind(this.queueName, this.exchangeName, typeof(TEvent).FullName);
}

接着,实现取消订阅,从字典表中删除事件处理实例,并取消绑定队列

public void Unsubscribe<TEvent>(IEventHandler<TEvent> eventHandler) where TEvent : IEvent
{

    var eventType = typeof(TEvent).FullName;
    if (eventHandlers.ContainsKey(eventType))
    {
        var handlers = eventHandlers[eventType];
        if (handlers != null && handlers.Exists(s => s.GetType() == eventHandler.GetType()))
        {
            var handlerToRemove = handlers.First(s => s.GetType() == eventHandler.GetType());
            handlers.Remove(handlerToRemove);

            this.channel.QueueUnbind(this.queueName, this.exchangeName, typeof(TEvent).FullName);
        }
    }
}

接着,实现发布,往队列发布事件

public void Publish<TEvent>(TEvent @event) where TEvent : IEvent
{
    var json = JsonConvert.SerializeObject(@event, new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All });
    var eventBody = Encoding.UTF8.GetBytes(json);
    channel.BasicPublish(this.exchangeName,
        @event.GetType().FullName,
        null,
        eventBody);
}

接着,在EventingBasicConsumer.Received事件处理中,通过事件源找到对应的事件处理类,并执行它

private string InitializeEventConsumer(string queue)
{
    var localQueueName = queue;
    if (string.IsNullOrEmpty(localQueueName))
    {
        localQueueName = this.channel.QueueDeclare().QueueName;
    }
    else
    {
        this.channel.QueueDeclare(localQueueName, true, false, false, null);
    }

    var consumer = new EventingBasicConsumer(this.channel);
    consumer.Received += (model, eventArgument) =>
    {
        var eventBody = eventArgument.Body.ToArray();
        var json = Encoding.UTF8.GetString(eventBody);
        var @event = (IEvent)JsonConvert.DeserializeObject(json, new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All });
        var eventTypeName = eventArgument.RoutingKey;

        if (eventHandlers.ContainsKey(eventTypeName))
        {
            var handlers = eventHandlers[eventTypeName];
            try
            {
                foreach (var handler in handlers)
                {
                    MethodInfo meth = handler.GetType().GetMethod("Handle");
                    meth.Invoke(handler, new Object[] { @event });
                }
            }
            catch (Exception ex)
            {
                throw ex;
            }
        }

        if (!autoAck)
        {
            channel.BasicAck(eventArgument.DeliveryTag, false);
        }
    };

    this.channel.BasicConsume(localQueueName, autoAck: this.autoAck, consumer: consumer);

    return localQueueName;
}

最后,创建客户端类,具体事件源类,具体事件处理类。

using Example.EventBus;
using RabbitMQ.Client;
using System;

namespace Eaxmple.EventBus.RabbitMQ.ConsoleApp01
{
    public class SendedEvent : IEvent
    {
        public string Name { get; private set; }
        public SendedEvent(string name)
        {
            Name = name;
        }
    }

    public class CustomerASendedEventHandler : IEventHandler<SendedEvent>
    {
        public void Handle(SendedEvent @event)
        {
            Console.WriteLine(#34;顾客A收到{@event.Name}通知!");
        }
    }

    public class CustomerBSendedEventHandler : IEventHandler<SendedEvent>
    {
        public void Handle(SendedEvent @event)
        {
            Console.WriteLine(#34;顾客B收到{@event.Name}通知!");
        }
    }

    class Program
    {
        static void Main(string[] args)
        {
            var connectionFactory = new ConnectionFactory { HostName = "localhost" };
            var eventBus = new RabbitMQEventBus(connectionFactory, "Eaxmple.EventBus.RabbitMQ.ConsoleApp01.Exchange", queueName: "Eaxmple.EventBus.RabbitMQ.ConsoleApp01.Queue");

            var sendedEvent = new SendedEvent("优惠");

            var customerASendedEventHandler = new CustomerASendedEventHandler();
            eventBus.Subscribe<SendedEvent>(customerASendedEventHandler);
            var customerBSendedEventHandler = new CustomerBSendedEventHandler();
            eventBus.Subscribe<SendedEvent>(customerBSendedEventHandler);
            Console.WriteLine(#34;商店发了{sendedEvent.Name}通知!");
            eventBus.Publish<SendedEvent>(sendedEvent);

            Console.ReadKey();
        }
    }
}

让我们来看看输出结果:

商店发布优惠通知!
顾客A收到优惠通知。
顾客B收到优惠通知。

相关推荐

php-fpm的配置和优化

目录概述php-fpm配置php-fpm进程优化配置慢日志查询配置php7进阶到架构师相关阅读概述这是关于php进阶到架构之php7核心技术与实战学习的系列课程:php-fpm的配置和优化学习目标:理...

成功安装 Magento2.4.3最新版教程「技术干货」

外贸独立站设计公司xingbell.com经过多次的反复实验,最新版的magento2.4.3在oneinstack的环境下的详细安装教程如下:一.vps系统:LinuxCentOS7.7.19...

十分钟让你学会LNMP架构负载均衡

业务架构、应用架构、数据架构和技术架构一、几个基本概念1、pv值pv值(pageviews):页面的浏览量概念:一个网站的所有页面,在一天内,被浏览的总次数。(大型网站通常是上千万的级别)2、u...

php从远程URL获取(mp4 mp3)音视频的流媒体数据

/***从远程URL中获取媒体(如mp4mp3)的内容*@parammixed$file_url*@parammixed$media_type...

Zabbix5.0安装部署

全盘展示运行状态,减轻运维人员的重复性工作量,提高系统排错速度,加速运维知识学习积累。1.png1、环境安装关闭SELinux并重启系统2.png安装httpd、mariadb、php运行yum-...

php 常见配置详解

以下是PHP常见的配置项及其含义:error_reporting:设置错误报告级别,可以控制PHP显示哪些错误。例如,设置为E_ALL将显示所有错误,而设置为0将禁止显示任何错误。displa...

实践分享|基于基石智算 DeepSeek API + WordPress 插件自动生成访客回复

基石智算举办的DeepSeek案例大赛汇集了不少基于CoresHubDeepSeekAPI服务或模型部署服务的精彩实践。本次我们将分享个人实践:通过DeepSeekAPI+Word...

如何在Eclipse中搭建Zabbix源码的调试和开发环境

Zabbix是一款非常优秀的企业级软件,被设计用于对数万台服务器、虚拟机和网络设备的数百万个监控项进行实时监控。Zabbix是开放源码和免费的,这就意味着当出现bug时,我们可以很方便地通过调试源码来...

MySQL自我保护参数

#头条创作挑战赛#之前(MySQL自我保护工具--pt-kill)提到用pt-kill工具来kill相关的会话,来达到保护数据库的目的,本文再通过修改数据库参数的方式达到阻断长时间运行的SQL的目...

Python闭包深度解析:掌握数据封装的高级技巧

闭包作为Python高级编程特性之一,为开发者提供了一种优雅的方式来实现数据封装和状态保持。这一概念源于函数式编程理论,在现代Python开发中发挥着重要作用。理解和掌握闭包的使用不仅能够提升代码的表...

Java服务网格故障注入与熔断实战

在分布式系统的高可用性挑战中,服务网格的故障注入与熔断机制是检验系统韧性的终极试金石。以下是10道逐步升级的"地狱关卡",每个关卡都对应真实生产环境中可能遇到的致命场景,并附具体场景示...

MySQL数据库性能优化全攻略:程序员必知的七大核心策略

作为程序员,我们每天都要与数据库打交道。当系统用户量突破百万级时,数据库往往成为性能瓶颈的首要怀疑对象。本文将深入探讨MySQL优化的七大核心策略,并提供可直接落地的优化方案,助您构建高效稳定的数据库...

如何在 Windows 11 上使用单个命令安装 XAMPP

XAMPP是一种广泛使用的软件,用于在Windows操作系统上快速运行LAMP服务器包,包括Windows11。尽管LAMP通常用于Linux系统,但XAMPP并不使用Li...

uTorrent怎样将bt种子转换为磁力

如何用uTorrent把BT种子转为磁力链接?以下方法希望能帮到你。1、在uTorrent窗口里,点击工具栏的按钮,所示。2、在打开窗口里,选取要转为磁力的种子文件,然后点击打开按钮,参照图示操作...

支持向量机SVM 分类和回归的实例

支持向量机(SupportVectorMachine)是Cortes和Vapnik于1995年首先提出的,它在解决小样本、非线性及高维模式识别中表现出许多特有的优势,并能够推广应用到函数拟合等其他...