seo排名技巧,站长工具seo推广,软件技术的发展趋势,seo智能优化公司#xff08;Blog.Core框架开发情况#xff0c;着色部分为本次新增#xff09;终于项目继续迭代更新了#xff0c;在开源这两年多#xff0c;也是感谢每一个支持Blog.Core项目的同学#xff0c;同时也感谢每一个在生产环境中使用#xff0c;并提出意见和建议的小伙伴Blog.Core框架开发情况着色部分为本次新增终于项目继续迭代更新了在开源这两年多也是感谢每一个支持Blog.Core项目的同学同时也感谢每一个在生产环境中使用并提出意见和建议的小伙伴2,606个Star是我们相互之间共同的努力和肯定上边的这些都是我和各位使用者提出的需求刚开始很快越是到后边开发起来越难这里先说明几点问题1、增加的东西太多有一部分使用者表示使用不到太笨重2、目前基本比较全面后期新增需求难度系数较高3、功能太多了不好抄代码????不过该更新的还是需要更新的我已经很贴心的把各部分的代码隔开了就差每个功能建立一个类库了这个我也考虑过不过那要是建立起来就是二三十个果断放弃了。因为代码已经隔开了如果自己不需要可以删除掉当然这样也方便其他不使用我框架的粘贴复制到自己项目。今年终于在年末的时候增加上了RabbitMQ消息队列和EventBus事件总线之前新增过Redis的消息队列基于Redis很方便且很简单的一个InitQ组件具体请看《【BCVP】实现基于 Redis 的消息队列》然后大家应该都知道最近我一直在录制一个系列视频教程——《eShopOnContainer微服务系列讲解》里边最重要的就是事件总线基于的也正好是RabbitMQ的分布式消息队列组件当然其中的订单微服务也用到了MediatR作为进程内的订阅发布模式这个MediatR我在DDD系列中已经讲过就不说了这次就重点说说RabbitMQ和EventBus吧也正好属于俩个系列的串烧了。这里说一下我是从eshop代码里拷贝出来做讲解的当然做了适当修改还是要多关注官方支持原作者https://github.com/dotnet-architecture/eShopOnContainers。此外热烈欢迎支付组件的合作者如果你正在开发支付相关组件可以联系我一起推广开发一起造福社区也可以入驻BCVP开发者社区。OK今天就先简单的给大家先说下思路以下每一个小节其实都可以写一篇或多篇文章的本文就当个系列文章导读吧详细讲解以后会有主要就是关于RabbitMQ消息队列和EventBus事件总线的。01消息队列Message QueuePublish/Subscribe基本概念消息队列英语Message queue是一种进程间通信或同一进程的不同线程间的通信方式软件的贮列用来处理一系列的输入通常是来自用户。消息队列提供了异步的通信协议每一个队列中的记录包含详细说明的数据包含发生的时间输入设备的种类以及特定的输入参数也就是说消息的发送者和接收者不需要同时与消息队列交互。消息会保存在队列中直到接收者取回它。最终可以实现解耦的目的。下面通过一个简单的架构模型来解释Producer消息生产者负责产生和发送消息到Broker。Broker消息处理中心。负责消息存储、确认、重试等一般其中会包含多个Queue。Consumer消息消费者负责从 Broker 中获取消息并进行相应处理。消息队列的好处从上边的定义中我们可以看出来优点主要是三块异步、流量削峰与流控、解耦。这三个优点在高并发等三高场景还是很有必要的甚至说是十分必要的。系统A将userId写到消息队列中系统C和系统D从消息队列中拿数据从而实现了解耦的目的图片来源于知乎/question/54152397接下来为了提高用户体验和吞吐量其实可以异步地调用系统B、C、D的接口。所以我们可以弄成是这样的图片来源于知乎/question/54152397最后系统B和系统C根据自己的能够处理的请求数去消息队列中拿数据这样即便有每秒有8000个请求那只是把请求放在消息队列中去拿消息队列的消息由系统自己去控制这样就不会把整个系统给搞崩图片来源于知乎/question/54152397当然消息队列也有些坏处这里就先随便列几个其他的大家自行搜索即可1、高可用如果使用消息队列基本要配合集群的因为如果MQ服务器崩了那就整个服务灾难了。2、数据安全必须保证数据不能丢失也就是要考虑好最终一致性做好补偿机制。3、合理的消费。好啦基本概念先说到这里下边就简单说下代码吧因为篇幅的问题我们只统一讲解接口的设计毕竟实现类是比较复杂的当然我会抽一个实现类的核心方法说一下。02RabbitMQ持久连接IRabbitMQPersistentConnection首先说下关于RabbitMQ的连接这个是很简单的和平时我们使用SqlServer/Redis这种第三方组件是类似的通过连接字符串或者说是服务器然后配置用户名/密码就能连接上了相关的接口是这样的 /// summary/// RabbitMQ持久连接/// 接口/// /summarypublic interface IRabbitMQPersistentConnection: IDisposable{bool IsConnected { get; }bool TryConnect();IModel CreateModel();}
接口一共提供了三个方法分别是是否连接、尝试连接、创建模型。使用的时候首先需要连接nuget包PackageReference IncludeNewtonsoft.Json Version12.0.3 /
PackageReference IncludePolly Version7.2.1 /
PackageReference IncludeRabbitMQ.Client Version6.2.1 /
其实只需要第三个RabbitMQ.Client就行了前边两个是辅助作用分别是提供序列化和重试机制的如果你有一个需求是需要重试的比如连接数据库或者执行某个进程如果遇到异常重试几次可以使用组件Polly它还有其他的功能自己可以多尝试下。那说到了重试我就说一下TryConnect(); 这个核心的方法/// summary
/// 连接
/// /summary
/// returns/returns
public bool TryConnect()
{_logger.LogInformation(RabbitMQ Client is trying to connect);// 加锁lock (sync_root){// 重试策略var policy RetryPolicy.HandleSocketException().OrBrokerUnreachableException().WaitAndRetry(_retryCount,retryAttempt TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) {_logger.LogWarning(ex, RabbitMQ Client could not connect after {TimeOut}s ({ExceptionMessage}), ${time.TotalSeconds:n1}, ex.Message);});// 执行策略policy.Execute(() {// 开始连接RabbitMQ_connection _connectionFactory.CreateConnection();});// 连接成功if (IsConnected){// 追加事件处理器目的是为了异常重试共3种情况_connection.ConnectionShutdown OnConnectionShutdown;_connection.CallbackException OnCallbackException;_connection.ConnectionBlocked OnConnectionBlocked;_logger.LogInformation(RabbitMQ Client acquired a persistent connection to {HostName} and is subscribed to failure events, _connection.Endpoint.HostName);return true;}else{_logger.LogCritical(FATAL ERROR: RabbitMQ connections could not be created and opened);return false;}}
}
相应的逻辑我已经在代码中增加了注释过程肯定能看得懂至于真实的底层原理这里先不说了。可以看到上边就用到了重试机制可以配置策略。这样就可以连接上RabbitMQ服务器了那如何基于这个连接做事件总线呢别着急咱们先说下什么是事件和事件处理器。03事件与处理器IntegrationEventIIntegrationEventHandlerT关于事件如果你看过我DDD领域驱动设计应该会有些印象和了解我这里再简单的说明一下吧。关于事件其实我们每天都在用而且很久之前就用过就比如说asp的时候的按钮事件void btnRegister_Click(object sender, EventArgs e)
其中object sender指代发出事件的对象这里也就是button对象EventArgs e事件参数可以理解为对事件的描述 它们可以统称为事件源。其中的代码逻辑就是对事件的处理。我们可以统称为事件处理程序。所以事件有两部分事件源对象事件处理器程序。关于总线那我们平时肯定会遇到很多很多的事件注册的时候校验成功后持久化到数据库然后发注册成功的邮件。支付的时候判断成功后修改数据库订单库存物流邮件短信等等等等这都是一个个的事件。那如何对这些事件进行统一的管理呢单体下很简单就是按照过程走就行了分布式或者微服务中多个服务已经隔离开无法按照过程一步步走那这个时候就需要一个策略常用的就是——订阅发布模式事件总线是对发布-订阅模式的一种实现。它是一种集中式事件处理机制允许不同的组件之间进行彼此通信而又不需要相互依赖达到一种解耦的目的。代码举例我们用代码来简单看看如何设计事件和事件处理器 /// summary/// 事件模型/// 基类/// /summarypublic class IntegrationEvent{public IntegrationEvent(){Id Guid.NewGuid();CreationDate DateTime.UtcNow;}[JsonConstructor]public IntegrationEvent(Guid id, DateTime createDate){Id id;CreationDate createDate;}[JsonProperty]public Guid Id { get; private set; }[JsonProperty]public DateTime CreationDate { get; private set; }}
事件是一个对象是一个模型那很重要的标识就是Id和Date这两个属性了当然也可以适当做其他的一些处理请注意private set; 的写法。 /// summary/// 集成事件处理程序/// 泛型接口/// /summary/// typeparam nameTIntegrationEvent/typeparampublic interface IIntegrationEventHandlerin TIntegrationEvent : IIntegrationEventHandlerwhere TIntegrationEvent : IntegrationEvent{Task Handle(TIntegrationEvent event);}/// summary/// 集成事件处理程序/// 基 接口/// /summarypublic interface IIntegrationEventHandler{}
对事件的处理比较简单的我们定义接口只需要一个Handle方法即可剩下的就是我们定义一个一个的具体的事件处理器通过继承这个接口来实现具体的业务逻辑。比如我这里定义了一个例子关于博客删除的当然可能不太贴切我只是想举个例子 /// summary/// 博客删除事件处理器/// 删除博客后触发/// /summarypublic class BlogDeletedIntegrationEventHandler : IIntegrationEventHandlerBlogDeletedIntegrationEvent{private readonly IBlogArticleServices _blogArticleServices;private readonly ILoggerBlogDeletedIntegrationEventHandler _logger;public BlogDeletedIntegrationEventHandler(IBlogArticleServices blogArticleServices,ILoggerBlogDeletedIntegrationEventHandler logger){_blogArticleServices blogArticleServices;_logger logger ?? throw new ArgumentNullException(nameof(logger));}public async Task Handle(BlogDeletedIntegrationEvent event){_logger.LogInformation(----- Handling integration event: {IntegrationEventId} at {AppName} - ({IntegrationEvent}), event.Id, Blog.Core, event);ConsoleHelper.WriteSuccessLine($----- Handling integration event: {event.Id} at Blog.Core - ({event}));await _blogArticleServices.DeleteById(event.BlogId.ToString());}}
当我执行删除的时候不去执行而是放到队列里通过订阅发布的模式每一个订阅者来消费信息从而实现解耦的目的。现在明白了事件和处理器那如何对这是事件操作怎么发布又是如何订阅呢事件总线就这么出现了请往下看。04基于RabbitMQ事件总线IEventBusEventBusRabbitMQ上边我们已经连接好了RabbitMQ服务器也明白了什么是事件和处理器现在就是需要发布和订阅了总线是一个很好的方案那设计下接口就是这样的/// summary
/// 事件总线
/// 接口
/// /summary
public interface IEventBus
{/// summary/// 发布/// /summary/// param nameevent事件模型/paramvoid Publish(IntegrationEvent event);/// summary/// 订阅/// /summary/// typeparam nameT约束事件模型/typeparam/// typeparam nameTH约束事件处理器事件模型/typeparamvoid SubscribeT, TH()where T : IntegrationEventwhere TH : IIntegrationEventHandlerT;/// summary/// 取消订阅/// /summary/// typeparam nameT/typeparam/// typeparam nameTH/typeparamvoid UnsubscribeT, TH()where TH : IIntegrationEventHandlerTwhere T : IntegrationEvent;/// summary/// 动态订阅/// /summary/// typeparam nameTH约束事件处理器/typeparam/// param nameeventName/paramvoid SubscribeDynamicTH(string eventName)where TH : IDynamicIntegrationEventHandler;/// summary/// 动态取消订阅/// /summary/// typeparam nameTH/typeparam/// param nameeventName/paramvoid UnsubscribeDynamicTH(string eventName)where TH : IDynamicIntegrationEventHandler;
}
这里定义了基本的常见操作如何实现这个接口可以针对不同的方案既然我们使用了RabbitMQ就说说它当然你也可以使用其他的比如AzureService之类的。基于RabbitMQ的事件总线实现类比较复杂我就不多说明了感兴趣的可以直接看我的代码我这里就说一下构造函数从构造函数中可以知道当前类的依赖项毕竟现在都是使用依赖注入了 /// summary/// RabbitMQ事件总线/// /summary/// param namepersistentConnectionRabbitMQ持久连接/param/// param namelogger日志/param/// param nameautofacautofac容器/param/// param namesubsManager事件总线订阅管理器/param/// param namequeueName队列名称/param/// param nameretryCount重试次数/parampublic EventBusRabbitMQ(IRabbitMQPersistentConnection persistentConnection, ILoggerEventBusRabbitMQ logger,ILifetimeScope autofac, IEventBusSubscriptionsManager subsManager, string queueName null, int retryCount 5){_persistentConnection persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection));_logger logger ?? throw new ArgumentNullException(nameof(logger));_subsManager subsManager ?? new InMemoryEventBusSubscriptionsManager();_queueName queueName;_consumerChannel CreateConsumerChannel();_autofac autofac;_retryCount retryCount;_subsManager.OnEventRemoved SubsManager_OnEventRemoved;}
除了比较常规的日志、RabbitMQ连接、Autofac容器、Polly重试这几个比较基础和必要的还有一个参数是很重要的——IEventBusSubscriptionsManager subsManager 。这个是干什么的呢我们知道单体应用很简单按照过程一一步骤即可单一的订阅发布模式也比较简单就是一对一但是还是有很多复杂的那如何对这些订阅统一管理呢就是需要一个事件总线订阅管理器。05事件总线订阅管理器InMemoryEventBusSubscriptionsManager是对每一个订阅事件需要做管理比如该发布的事件不想被消费了比如需要动态的添加一个订阅者比如全部清除等可以这么设计接口 /// summary/// 事件总线订阅管理器/// 接口/// /summarypublic interface IEventBusSubscriptionsManager{bool IsEmpty { get; }event EventHandlerstring OnEventRemoved;void AddDynamicSubscriptionTH(string eventName)where TH : IDynamicIntegrationEventHandler;void AddSubscriptionT, TH()where T : IntegrationEventwhere TH : IIntegrationEventHandlerT;void RemoveSubscriptionT, TH()where TH : IIntegrationEventHandlerTwhere T : IntegrationEvent;void RemoveDynamicSubscriptionTH(string eventName)where TH : IDynamicIntegrationEventHandler;bool HasSubscriptionsForEventT() where T : IntegrationEvent;bool HasSubscriptionsForEvent(string eventName);Type GetEventTypeByName(string eventName);void Clear();IEnumerableSubscriptionInfo GetHandlersForEventT() where T : IntegrationEvent;IEnumerableSubscriptionInfo GetHandlersForEvent(string eventName);string GetEventKeyT();}
这里应该明白它的作用了吧就是控制当前项目中的总线中的各个订阅情况可以直接在内存里操作注意这个内存是管理总线中的订阅的和RabbitMQ的分布式不一样需要搞清楚二者的区别如果不是很懂可以联系我或者留言。所以可以设计这么一个实现类InMemoryEventBusSubscriptionsManager基本到这里就没啥问题了核心的几个知识点也讲完了当然仅仅是讲完了其中的知识点量要远比这个多的多剩下的可以看看效果。06服务注册和使用Service registration and usage上边的设计完接下来注册一下服务就行了首先就是注册RabbitMQpublic static void AddRabbitMQSetup(this IServiceCollection services)
{if (services null) throw new ArgumentNullException(nameof(services));if (Appsettings.app(new string[] { RabbitMQ, Enabled }).ObjToBool()){services.AddSingletonIRabbitMQPersistentConnection(sp {var logger sp.GetRequiredServiceILoggerRabbitMQPersistentConnection();var factory new ConnectionFactory(){HostName Appsettings.app(new string[] { RabbitMQ, Connection }),DispatchConsumersAsync true};if (!string.IsNullOrEmpty(Appsettings.app(new string[] { RabbitMQ, UserName }))){factory.UserName Appsettings.app(new string[] { RabbitMQ, UserName });}if (!string.IsNullOrEmpty(Appsettings.app(new string[] { RabbitMQ, Password }))){factory.Password Appsettings.app(new string[] { RabbitMQ, Password });}var retryCount 5;if (!string.IsNullOrEmpty(Appsettings.app(new string[] { RabbitMQ, RetryCount }))){retryCount int.Parse(Appsettings.app(new string[] { RabbitMQ, RetryCount }));}return new RabbitMQPersistentConnection(factory, logger, retryCount);});}
}
可以在配置文章中配置下参数。然后注册事件总线EventBus public static void AddEventBusSetup(this IServiceCollection services){if (services null) throw new ArgumentNullException(nameof(services));if (Appsettings.app(new string[] { RabbitMQ, Enabled }).ObjToBool() Appsettings.app(new string[] { EventBus, Enabled }).ObjToBool()){var subscriptionClientName Appsettings.app(new string[] { EventBus, SubscriptionClientName });services.AddSingletonIEventBusSubscriptionsManager, InMemoryEventBusSubscriptionsManager();services.AddTransientBlogDeletedIntegrationEventHandler();services.AddSingletonIEventBus, EventBusRabbitMQ(sp {var rabbitMQPersistentConnection sp.GetRequiredServiceIRabbitMQPersistentConnection();var iLifetimeScope sp.GetRequiredServiceILifetimeScope();var logger sp.GetRequiredServiceILoggerEventBusRabbitMQ();var eventBusSubcriptionsManager sp.GetRequiredServiceIEventBusSubscriptionsManager();var retryCount 5;if (!string.IsNullOrEmpty(Appsettings.app(new string[] { RabbitMQ, RetryCount }))){retryCount int.Parse(Appsettings.app(new string[] { RabbitMQ, RetryCount }));}return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, subscriptionClientName, retryCount);});}}
最后当项目启动的时候直接订阅我们的事件处理程序 var eventBus app.ApplicationServices.GetRequiredServiceIEventBus();eventBus.SubscribeBlogDeletedIntegrationEvent, BlogDeletedIntegrationEventHandler();
我们尝试一下发送一个事件到总线里 [HttpGet][AllowAnonymous]public void EventBusTry([FromServices] IEventBus _eventBus, string blogId 1){var blogDeletedEvent new BlogDeletedIntegrationEvent(blogId);_eventBus.Publish(blogDeletedEvent);}
动图效果如下是不是很简单好啦暂时就先到这里打完手工。