内存事件总线和MQ消息队列实现的分布式事件总线
发布:2023-11-30 07:08
更新:2025-04-30 10:53
作者:   0xdFFF
浏览:   1697
字数:5509

事件总线EventBus

作用

为什么要使用EventBus
Eventbus可以帮助不同的微服务之间实现解耦,使它们能够独立开发、部署和扩展。微服务可以通过发布和订阅事件的方式进行通信,而不需要显式地了解其他微服务的存在和实现细节。
MQ的eventBus底层使用了事务来保持数据的一致性,而内存总线使用UOW工作单元实现数据一致性
Eventbus使用发布/订阅模型,其中组件可以发布事件或订阅感兴趣的事件,当事件被发布到Eventbus时,Eventbus会负责将事件传播给所有订阅者;

内存事件总线和MQ事件总线的区别
内存事件总线是一种在应用程序内部实现的事件传输机制,用于在应用程序的不同组件之间进行通信和数据传递。
相比于使用消息队列等外部服务,内存事件总线可以提供更低的延迟和更高的吞吐量。
但MQ事件总线可以适用于不同的应用,适用于微服务架构之间的信息传递。

框架EventBus使用方法

先通过DI注入

  1. /// <summary>
  2. /// 事件总线
  3. /// </summary>
  4. private readonly IEventBus _eventBus;
  5. /// <summary>
  6. /// 构造
  7. /// </summary>
  8. public TestManager(IServiceProvider serviceProvider, IEventBus eventBus) : base(serviceProvider)
  9. {
  10. _repository = repository ?? throw new ArgumentNullException(nameof(repository));
  11. _eventBus = eventBus ?? throw new ArgumentNullException(nameof(eventBus));
  12. }

发布订阅消息

  1. public async Task ValidateDeleteAsync(List<Test> entities)
  2. {
  3. await _eventBus.PublishAsync(new TestDeleteValidationEvent(entities.Select(t => t.Id).ToList()));
  4. }

创建订阅类

  1. /// <summary>
  2. /// 删除验证事件
  3. /// </summary>
  4. public record TestDeleteValidationEvent : IEvent
  5. {
  6. public IEnumerable<Guid> CodeIds { get; }
  7. /// <summary>
  8. /// 初始化缺陷代码删除验证事件
  9. /// </summary>
  10. public TestCodeDeleteValidationEvent(IEnumerable<Guid> CodeIds)
  11. {
  12. CodeIds = CodeIds;
  13. }
  14. }

接受订阅消息

  1. public class TestDeleteValidationEventHandles : EventHandlerBase<TestDeleteValidationEvent >
  2. {
  3. private readonly ITestCodeProcessRepository _repository;
  4. public TestCodeDeleteValidationEventHandles(ITestCodeProcessRepository repository)
  5. {
  6. _repository = repository;
  7. }
  8. public override async Task HandleAsync(TestCodeDeleteValidationEvent @event, CancellationToken cancellationToken)
  9. {
  10. if (@event.CodeIds != null)
  11. {
  12. if (await _repository.ExistsAsync(t => @event.TestCodeIds.Contains(t.TestId)))
  13. {
  14. throw new Warning(Util.Localization.Helper.Format("DeleteHandleWarning"));
  15. }
  16. }
  17. }
  18. }

实现原理

为保证数据的一致性(强一致性)
当我们要发出事件时,我们会把事件的存储逻辑与我们的业务逻辑的事务合并,在同一个事务里提交,只要事件存储的逻辑或者业务代码写入逻辑有一个失败就回滚到最初状态。
同理,订阅逻辑也相同。

图片

配置发布对象以及订阅响应

接下来,我们需要创建一个事件和一个事件处理程序。事件是我们希望在系统中传递的消息,而事件处理程序则负责处理这些事件。

创建一个名为 MyEvent 的类,用于表示一个简单的事件:

  1. public class MyEvent
  2. {
  3. public string Message { get; set; }
  4. }

接下来,我们创建一个名为 MyEventHandler 的类,用于处理 MyEvent 事件:

  1. public class MyEventHandler
  2. {
  3. public void Handle(MyEvent myEvent)
  4. {
  5. // 处理事件的逻辑
  6. }

配置 RabbitMQ 连接

我们需要在应用程序中配置 RabbitMQ 连接。在你的项目中创建一个名为 RabbitMQConfig 的类,用于存储连接配置信息:

  1. public class RabbitMQConfig
  2. {
  3. public string Hostname { get; set; }
  4. public string Username { get; set; }
  5. public string Password { get; set; }
  6. }

在你的应用程序的配置文件(例如 appsettings.json)中添加 RabbitMQ 的连接配置:

  1. {
  2. "RabbitMQConfig": {
  3. "Hostname": "localhost",
  4. "Username": "guest",
  5. "Password": "guest"
  6. }
  7. }

消息发布实现

消息发布的实现采用的是将发送信息序列化,发布给订阅后再由订阅反序列化。

  1. using RabbitMQ.Client;
  2. using System.Text;
  3. using Microsoft.Extensions.Configuration;
  4. using Newtonsoft.Json;
  5. // ...
  6. var factory = new ConnectionFactory
  7. {
  8. HostName = configuration.GetSection("RabbitMQConfig:Hostname").Value,
  9. UserName = configuration.GetSection("RabbitMQConfig:Username").Value,
  10. Password = configuration.GetSection("RabbitMQConfig:Password").Value
  11. };
  12. using (var connection = factory.CreateConnection())
  13. using (var channel = connection.CreateModel())
  14. {
  15. //定义mq管道
  16. channel.QueueDeclare(queue: "my_queue",
  17. durable: false,
  18. exclusive: false,
  19. autoDelete: false,
  20. arguments: null);
  21. var myEvent = new MyEvent { Message = "Hello, world!" };
  22. var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(myEvent));
  23. channel.BasicPublish(exchange: "",
  24. routingKey: "my_queue",
  25. basicProperties: null,
  26. body: body);
  27. }

消息订阅实现

  1. using RabbitMQ.Client;
  2. using RabbitMQ.Client.Events;
  3. using System;
  4. using System.Text;
  5. using Newtonsoft.Json;
  6. // ...
  7. var factory = new ConnectionFactory
  8. {
  9. HostName = configuration.GetSection("RabbitMQConfig:Hostname").Value,
  10. UserName = configuration.GetSection("RabbitMQConfig:Username").Value,
  11. Password = configuration.GetSection("RabbitMQConfig:Password").Value
  12. };
  13. using (var connection = factory.CreateConnection())
  14. using (var channel = connection.CreateModel())
  15. {
  16. //定义mq管道
  17. channel.QueueDeclare(queue: "my_queue",
  18. durable: false,
  19. exclusive: false,
  20. autoDelete: false,
  21. arguments: null);
  22. var consumer = new EventingBasicConsumer(channel);
  23. consumer.Received += (model, ea) =>
  24. {
  25. var body = ea.Body.ToArray();
  26. var message = Encoding.UTF8.GetString(body);
  27. var myEvent = JsonConvert.DeserializeObject<MyEvent>(message);
  28. // 处理接收到的事件对象
  29. Console.WriteLine("Received event: " + myEvent.Message);
  30. };
  31. channel.BasicConsume(queue: "my_queue",
  32. autoAck: true,
  33. consumer: consumer);
  34. // 保持程序运行,等待事件对象的到达
  35. Console.WriteLine("Waiting for events...");
  36. Console.ReadLine();
  37. }
-- 完结 --