左旗网站建设,目前网站建设用哪种语言,免费建站系统wordpress,企业推广方案范例abp 通过IDistributedEventBus接口集成自IEventBus实现分布式事件消息的发布订阅。IEventBus在什么时机触发PublishAsync?当前UnitOfWork完成时#xff0c;触发IEventBus的PublishAsync在没有事务环境下#xff0c;同步调用IEventBus的PublishAsyncabp 默认实现基于RabbitMq… abp 通过IDistributedEventBus接口集成自IEventBus实现分布式事件消息的发布订阅。IEventBus在什么时机触发PublishAsync?当前UnitOfWork完成时触发IEventBus的PublishAsync在没有事务环境下同步调用IEventBus的PublishAsyncabp 默认实现基于RabbitMq消息队列Volo.Abp.EventBus.RabbitMQ实现分布式消息的发布与订阅。消息治理核心问题生产端如何保证投递成功的消息不能丢失。Mq自身如何保证消息不丢失。消费段如何保证消费端的消息不丢失。基于abp 默认实现的DistributedEventBus不能满足以下场景Publisher 生产者无法保证消息一定能投递到MQ。Consumer 消费端在消息消费时出现异常时没有异常错误处理机制确保消费失败的消息能重新被消费。我们引入Masstransit来提升abp对消息治理能力。Masstransit提供以下开箱即用功能Publish/Send/Request-Response等几种消息投递机制。多种IOC容器支持。异常机制。Saga事务管理。事务活动补偿机制Courier消息审计消息管道处理机制Abp 框架下事件消息集成使用MassTransit重新实现IDistributedEventBus。在消费端Consumer传递用户身份信息。使用Asp.Net Core Web Host 作消费端Consumer宿主。集成MassTransit在Module初始化时注入MassTransit实例并启动。Copy/// summary
/// 配置DistributedEventBus
/// /summary
/// param namecontext/param
/// param nameconfiguration/param
/// param namehostingEnvironment/param
private void ConfigureDistributedEventBus(ServiceConfigurationContext context, IConfiguration configuration, IWebHostEnvironment hostingEnvironment)
{var options context.Services.GetConfiguration().GetSection(Rabbitmq).GetMassTransitEventBusOptions();var mqConnectionString rabbitmq:// options.ConnectionString;context.Services.AddMassTransit(mtConfig {//inject consumers into IOC from assemblymtConfig.AddConsumers(typeof(AuthCenterEventBusHostModule));mtConfig.AddBus(provider {var bus Bus.Factory.CreateUsingRabbitMq(mqConfig {var host mqConfig.Host(new Uri(mqConnectionString), h {h.Username(options.UserName);h.Password(options.Password);});// set special message serializermqConfig.UseBsonSerializer();// integrated existed logger compontentmqConfig.UseExtensionsLogging(provider.GetServiceILoggerFactory());mqConfig.ReceiveEndpoint(host, authcenter-queue, q {//set rabbitmq prefetch countq.PrefetchCount 200;//set message retry policyq.UseMessageRetry(r r.Interval(3, 100));q.ConsumerSmsTokenValidationCreatedEventConsumer(provider);EndpointConvention.MapSmsTokenValidationCreatedEvent(q.InputAddress);});mqConfig.ReceiveEndpoint(host, user-synchronization, q {//set rabbitmq prefetch countq.PrefetchCount 50;//q.UseRateLimit(100, TimeSpan.FromSeconds(1));//q.UseConcurrencyLimit(2);//set message retry policyq.UseMessageRetry(r r.Interval(3, 100));q.ConsumerUserSyncEventConsumer(provider);EndpointConvention.MapUserSyncEvent(q.InputAddress);});mqConfig.ConfigureEndpoints(provider);mqConfig.UseAuditingFilter(provider, o {o.ReplaceAuditing true;});});// set authtication middleware for user identitybus.ConnectAuthenticationObservers(provider);return bus;});});
}
在MassTransit中使用IBusControl接口 StartAsync 或 StopAsync 来启动或停止。使用IPublishEndpoint重新实现IDistributedEventBus接口实现与abp分布式事件总线集成。Copy public class MassTransitDistributedEventBus : IDistributedEventBus, ISingletonDependency{private readonly IPublishEndpoint _publishEndpoint;//protected IHybridServiceScopeFactory ServiceScopeFactory { get; }protected AbpDistributedEventBusOptions DistributedEventBusOptions { get; }public MassTransitDistributedEventBus(IOptionsAbpDistributedEventBusOptions distributedEventBusOptions,IPublishEndpoint publishEndpoint){//ServiceScopeFactory serviceScopeFactory;_publishEndpoint publishEndpoint;DistributedEventBusOptions distributedEventBusOptions.Value;//Subscribe(distributedEventBusOptions.Value.Handlers);}/** Not Implementation*/public Task PublishAsyncTEvent(TEvent eventData)where TEvent : class{return _publishEndpoint.Publish(eventData);}public Task PublishAsync(Type eventType, object eventData){return _publishEndpoint.Publish(eventData, eventType);}}到此我们实现了MassTransit与Abp集成。事件消息传递User Claims在实际业务实现过程中我们会用消息队列实现“削峰填谷”的效果。异步消息队列中传递用户身份信息如何实现呢我们先看看abp在WebApi中如何确定当前用户ICurrentUser 提供当前User Claims抽象。而ICurrentUser依赖于ICurrentPrincipalAccessor在Asp.Net core中利用HttpContext User 来记录当前用户身份。在MassTransit中利用IPublishObserver IConsumeObserver 生产者/消费端的观察者来实现传递已认证的用户Claims。Copy /// summary/// 生产者传递当前用户Principal/// /summarypublic class AuthPublishObserver : IPublishObserver{private readonly ICurrentPrincipalAccessor _currentPrincipalAccessor;private readonly IClaimsPrincipalFactory _claimsPrincipalFactory;public AuthPublishObserver(ICurrentPrincipalAccessor currentPrincipalAccessor,IClaimsPrincipalFactory claimsPrincipalFactory){_currentPrincipalAccessor currentPrincipalAccessor;_claimsPrincipalFactory claimsPrincipalFactory;}public Task PrePublishT(PublishContextT context) where T : class{var claimsPrincipal _claimsPrincipalFactory.CreateClaimsPrincipal(_currentPrincipalAccessor.Principal);if (claimsPrincipal ! null){context.Headers.SetAuthenticationHeaders(claimsPrincipal);}return TaskUtil.Completed;}public Task PostPublishT(PublishContextT context) where T : class TaskUtil.Completed;public Task PublishFaultT(PublishContextT context, Exception exception) where T : class TaskUtil.Completed;}
Copy/// summary/// 消费端从MqMessage Heads 中获取当前用户Principal并赋值给HttpContext/// /summarypublic class AuthConsumeObserver : IConsumeObserver{private readonly IHttpContextAccessor _httpContextAccessor;private readonly IServiceScopeFactory _factory;public AuthConsumeObserver(IHttpContextAccessor httpContextAccessor, IServiceScopeFactory factory){_httpContextAccessor httpContextAccessor;_factory factory;}public Task PreConsumeT(ConsumeContextT context) where T : class{if (_httpContextAccessor.HttpContext null){_httpContextAccessor.HttpContext new DefaultHttpContext{RequestServices _factory.CreateScope().ServiceProvider};}var abpClaimsPrincipal context.Headers.TryGetAbpClaimsPrincipal();if (abpClaimsPrincipal ! null abpClaimsPrincipal.IsAuthenticated){var claimsPrincipal abpClaimsPrincipal.ToClaimsPrincipal();_httpContextAccessor.HttpContext.User claimsPrincipal;Thread.CurrentPrincipal claimsPrincipal;}return TaskUtil.Completed;}public Task PostConsumeT(ConsumeContextT context) where T : class{_httpContextAccessor.HttpContext null;return TaskUtil.Completed;}public Task ConsumeFaultT(ConsumeContextT context, Exception exception) where T : class{_httpContextAccessor.HttpContext null;return TaskUtil.Completed;}}
使用Asp.Net Core Web Host 作消费端Consumer宿主基于以下几点原因我们使用Asp.Net Core Web Host 作为消息端Consumer宿主部署在Linux环境下Asp.Net Core Web Host 通常使用守护进程来启动服务实例这样可以保证服务不被中断。根据abp vnext DDD 项目分层最大程度利用Application层应用方法复用abp vnext 框架机制。MassTransit 深入研究延迟消息限流、熔断降级批量消费SagaReferencesabp vnext disctributed event busMassTransit