网站推广策划书模板,网页剪辑app,建设银行网上银行网站,厦门网直播第 6 章 事件溯源与 CQRS在本章#xff0c;我们来了解一下随着云平台一同出现的设计模式我们先探讨事件溯源和命令查询职责分离#xff08;CQRS#xff09;背后的动机与哲学事件溯源简介事实由事件溯源而来我们大脑就是一种事件溯源系统#xff0c;接收感官多种形式刺激我们来了解一下随着云平台一同出现的设计模式我们先探讨事件溯源和命令查询职责分离CQRS背后的动机与哲学事件溯源简介事实由事件溯源而来我们大脑就是一种事件溯源系统接收感官多种形式刺激大脑负责对这些刺激进行合适排序大约每隔几百毫秒对刺激构成的流进行运算而运算的结果就是我们所说的事实事件溯源的定义传统应用中状态由一系列零散的数据所管理如果客户端向我们发送 PUT 或 POST 请求状态就会改变这种方式很好地给出了系统当前状态却不能指示在当前状态之前系统是如何变化的事件溯源可以解决这个问题因为它把状态管理的职责与接收导致状态变更的刺激的职责区分开来基于事件溯源的系统需要满足一系列要求有序有序事件流幂等等价多个有序事件流的操作结果相同独立不依赖外部信息过去式事件发生在过去流行的区块链技术的基础就是发生在特定私有资源上的安全、可信的事件序列拥抱最终一致性一种我们每天都在用的最终一致性的应用就是社区网络应用有时你从一个设备发出的评论要花几分钟才能展示在朋友的浏览器或者其他设备上这是因为应用的架构人员做了妥协通过放弃同步操作的即时一致性在可接受的范围内增加一定的反馈延迟就能让应用支持巨大的规模与流量CQRS 模式如果把我们讨论的模式直接套用到系统中很快会发现系统必须对输入命令和查询加以区分这也被称为命令查询职责分离CQRS我们用一个例子来说明这种模式的实际应用租户通过一个门户网站查看用电情况每当用户刷新门户页面时就调用某种数据服务并请求汇总一段时间内所有度量事件但这种对于云规模的现代软件开发来说是不可接受的如果将计算职责推卸给数据库很快会造成数据库瓶颈掌握了大多数客户的使用模式让我们能够利用事件溯源来构建一个合理的 CQRS 实现。事件处理器每次收到新事件时重新计算已缓存的度量总和利用这种机制在查询时门户上的用户所期望的结果已经存在于数据库或者缓存中不需要复制的计算也没有临时的聚合与繁杂的汇总只需要一个简单的查询事件溯源于 CQRS 实战--附件的团队成员接下来要开发的新版实例中我们将检测成员彼此相距一个较小距离的时刻系统将支持对这些接近的结果予以响应例如我们可能希望向附近的团队成员的移动设备发送推送通知以提醒他们可以约见对方为了实现这一功能我们把系统职责划分为以下四个组件位置报送服务命令事件处理器对事件进行溯源事实服务查询位置接近监控器对事件进行溯源位置报送服务收到新报送的位置后执行下列操作验证上报数据将命令转换为事件生成事件并用消息队列发送出去GitHub 链接https://github.com/microservices-aspnetcore/es-locationreporter创建位置报送控制器using System;
using Microsoft.AspNetCore.Mvc;
using StatlerWaldorfCorp.LocationReporter.Events;
using StatlerWaldorfCorp.LocationReporter.Models;
using StatlerWaldorfCorp.LocationReporter.Services;namespace StatlerWaldorfCorp.LocationReporter.Controllers
{[Route(/api/members/{memberId}/locationreports)]public class LocationReportsController : Controller{private ICommandEventConverter converter;private IEventEmitter eventEmitter;private ITeamServiceClient teamServiceClient;public LocationReportsController(ICommandEventConverter converter,IEventEmitter eventEmitter,ITeamServiceClient teamServiceClient) {this.converter converter;this.eventEmitter eventEmitter;this.teamServiceClient teamServiceClient;}[HttpPost]public ActionResult PostLocationReport(Guid memberId, [FromBody]LocationReport locationReport){MemberLocationRecordedEvent locationRecordedEvent converter.CommandToEvent(locationReport);locationRecordedEvent.TeamID teamServiceClient.GetTeamForMember(locationReport.MemberID);eventEmitter.EmitLocationRecordedEvent(locationRecordedEvent);return this.Created($/api/members/{memberId}/locationreports/{locationReport.ReportID}, locationReport);}}
}创建 AMQP 事件生成器using System;
using System.Linq;
using System.Text;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
using StatlerWaldorfCorp.LocationReporter.Models;namespace StatlerWaldorfCorp.LocationReporter.Events
{public class AMQPEventEmitter : IEventEmitter{private readonly ILogger logger;private AMQPOptions rabbitOptions;private ConnectionFactory connectionFactory;public AMQPEventEmitter(ILoggerAMQPEventEmitter logger,IOptionsAMQPOptions amqpOptions){this.logger logger;this.rabbitOptions amqpOptions.Value;connectionFactory new ConnectionFactory();connectionFactory.UserName rabbitOptions.Username;connectionFactory.Password rabbitOptions.Password;connectionFactory.VirtualHost rabbitOptions.VirtualHost;connectionFactory.HostName rabbitOptions.HostName;connectionFactory.Uri rabbitOptions.Uri;logger.LogInformation(AMQP Event Emitter configured with URI {0}, rabbitOptions.Uri);}public const string QUEUE_LOCATIONRECORDED memberlocationrecorded;public void EmitLocationRecordedEvent(MemberLocationRecordedEvent locationRecordedEvent){using (IConnection conn connectionFactory.CreateConnection()) {using (IModel channel conn.CreateModel()) {channel.QueueDeclare(queue: QUEUE_LOCATIONRECORDED,durable: false,exclusive: false,autoDelete: false,arguments: null);string jsonPayload locationRecordedEvent.toJson();var body Encoding.UTF8.GetBytes(jsonPayload);channel.BasicPublish(exchange: ,routingKey: QUEUE_LOCATIONRECORDED,basicProperties: null,body: body);}}}}
}配置并启动服务using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using System;
using Microsoft.Extensions.Logging;
using System.Linq;
using StatlerWaldorfCorp.LocationReporter.Models;
using StatlerWaldorfCorp.LocationReporter.Events;
using StatlerWaldorfCorp.LocationReporter.Services;namespace StatlerWaldorfCorp.LocationReporter
{public class Startup{public Startup(IHostingEnvironment env, ILoggerFactory loggerFactory){loggerFactory.AddConsole();loggerFactory.AddDebug();var builder new ConfigurationBuilder().SetBasePath(env.ContentRootPath).AddJsonFile(appsettings.json, optional: false, reloadOnChange: false).AddEnvironmentVariables();Configuration builder.Build();}public IConfigurationRoot Configuration { get; }public void ConfigureServices(IServiceCollection services){services.AddMvc();services.AddOptions();services.ConfigureAMQPOptions(Configuration.GetSection(amqp));services.ConfigureTeamServiceOptions(Configuration.GetSection(teamservice));services.AddSingleton(typeof(IEventEmitter), typeof(AMQPEventEmitter));services.AddSingleton(typeof(ICommandEventConverter), typeof(CommandEventConverter));services.AddSingleton(typeof(ITeamServiceClient), typeof(HttpTeamServiceClient));}public void Configure(IApplicationBuilder app,IHostingEnvironment env,ILoggerFactory loggerFactory,ITeamServiceClient teamServiceClient,IEventEmitter eventEmitter){// Asked for instances of singletons during Startup// to force initialization early.app.UseMvc();}}
}对 Configure 的两次调用让配置子系统把分别从 amqp 和 teamservice 节加载的配置选项以依赖注入的方式提供出来这些配置可以由 appsettings.json 文件提供也可以用环境变量覆盖{amqp: {username: guest,password: guest,hostname: localhost,uri: amqp://localhost:5672/,virtualhost: /},teamservice: {url: http://localhost:5001}
}消费团队服务using System;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System.Linq;
using System.Net.Http;
using System.Net.Http.Headers;
using Newtonsoft.Json;
using StatlerWaldorfCorp.LocationReporter.Models;namespace StatlerWaldorfCorp.LocationReporter.Services
{public class HttpTeamServiceClient : ITeamServiceClient{private readonly ILogger logger;private HttpClient httpClient;public HttpTeamServiceClient(IOptionsTeamServiceOptions serviceOptions,ILoggerHttpTeamServiceClient logger){this.logger logger;var url serviceOptions.Value.Url;logger.LogInformation(Team Service HTTP client using URL {0}, url);httpClient new HttpClient();httpClient.BaseAddress new Uri(url);}public Guid GetTeamForMember(Guid memberId){httpClient.DefaultRequestHeaders.Accept.Clear();httpClient.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue(application/json));HttpResponseMessage response httpClient.GetAsync(String.Format(/members/{0}/team, memberId)).Result;TeamIDResponse teamIdResponse;if (response.IsSuccessStatusCode) {string json response.Content.ReadAsStringAsync().Result;teamIdResponse JsonConvert.DeserializeObjectTeamIDResponse(json);return teamIdResponse.TeamID;}else {return Guid.Empty;}}}public class TeamIDResponse{public Guid TeamID { get; set; }}
}这个例子中我们使用 .Result 属性在等待异步方法响应期间强行阻塞了线程在生产级质量的代码里很可能对此进行重构确保在服务边界之内整个调用链都传递异步结果运行位置报送服务RabbitMQ 已经启动运行默认的配置也指向了本地的 RabbitMQ 实例此时可以使用以下方式启动位置报送服务确保位于 src/StatlerWaldorfCorp.LocationReporter 子目录中$ dotnet restore
$ dotnet build
$ dotnet run --server.urlshttp://0.0.0.0:9090服务运行后只要向服务提交请求就可以体验其功能了$ curl -X POST -d \
{reportID:..., \
origin:..., latitude:10, longtitude:20, \
memberID:...} \
http://...le2 \
/locationreports提交完成后应该能从服务获得一个 HTTP 201 响应事件处理器它的职责是消费来自流的事件并执行合适的操作为确保代码整洁、可测试我们把事件处理的职责划分为如下部分订阅队列并从事件流中获取新的消息将消息写入事件存储处理事件流检测附近的队友作为流的处理结果生成新的消息并发送到队列作为流的处理结果向事实服务的服务器 / 缓存提交状态变更情况GitHub 链接https://github.com/microservices-aspnetcore/es-eventprocessor检测附近队友的基于 GPS 工具类的检测器using System.Collections.Generic;
using StatlerWaldorfCorp.EventProcessor.Location;
using System.Linq;
using System;namespace StatlerWaldorfCorp.EventProcessor.Events
{public class ProximityDetector{/** This method assumes that the memberLocations collection only* applies to members applicable for proximity detection. In other words,* non-team-mates must be filtered out before using this method.* distance threshold is in Kilometers.*/public ICollectionProximityDetectedEvent DetectProximityEvents(MemberLocationRecordedEvent memberLocationEvent,ICollectionMemberLocation memberLocations,double distanceThreshold){GpsUtility gpsUtility new GpsUtility();GpsCoordinate sourceCoordinate new GpsCoordinate() {Latitude memberLocationEvent.Latitude,Longitude memberLocationEvent.Longitude};return memberLocations.Where(ml ml.MemberID ! memberLocationEvent.MemberID gpsUtility.DistanceBetweenPoints(sourceCoordinate, ml.Location) distanceThreshold).Select( ml {return new ProximityDetectedEvent() {SourceMemberID memberLocationEvent.MemberID,TargetMemberID ml.MemberID,TeamID memberLocationEvent.TeamID,DetectionTime DateTime.UtcNow.Ticks,SourceMemberLocation sourceCoordinate,TargetMemberLocation ml.Location,MemberDistance gpsUtility.DistanceBetweenPoints(sourceCoordinate, ml.Location)};}).ToList();}}
}接着我们就可以用这个方法的结果来产生对应的额外效果例如可能需要发出一个 ProximityDetectorEvent 事件并将事件写入事件存储作为主体的事件处理器代码using System;
using System.Collections.Generic;
using Microsoft.Extensions.Logging;
using StatlerWaldorfCorp.EventProcessor.Location;
using StatlerWaldorfCorp.EventProcessor.Queues;namespace StatlerWaldorfCorp.EventProcessor.Events
{public class MemberLocationEventProcessor : IEventProcessor{private ILogger logger;private IEventSubscriber subscriber;private IEventEmitter eventEmitter;private ProximityDetector proximityDetector;private ILocationCache locationCache;public MemberLocationEventProcessor(ILoggerMemberLocationEventProcessor logger,IEventSubscriber eventSubscriber,IEventEmitter eventEmitter,ILocationCache locationCache){this.logger logger;this.subscriber eventSubscriber;this.eventEmitter eventEmitter;this.proximityDetector new ProximityDetector();this.locationCache locationCache;this.subscriber.MemberLocationRecordedEventReceived (mlre) {var memberLocations locationCache.GetMemberLocations(mlre.TeamID);ICollectionProximityDetectedEvent proximityEvents proximityDetector.DetectProximityEvents(mlre, memberLocations, 30.0f);foreach (var proximityEvent in proximityEvents) {eventEmitter.EmitProximityDetectedEvent(proximityEvent);}locationCache.Put(mlre.TeamID, new MemberLocation { MemberID mlre.MemberID, Location new GpsCoordinate {Latitude mlre.Latitude, Longitude mlre.Longitude} });};}public void Start(){this.subscriber.Subscribe();}public void Stop(){this.subscriber.Unsubscribe();}}
}事件处理服务唯一的额外职责是需要将收到的每个事件都写入事件存储这样做到原因有很多包括向其他服务提供可供搜索的历史记录如果缓存崩溃、数据丢失、事件存储也可用于重建事实缓存请记住缓存在架构里仅提供便利性我们不应该在缓存中存储任何无法从其他位置重建的数据我们要给服务里每一个团队创建一个 Redis 哈希hash在哈希中把团队成员的位置经序列化得到的 JSON 正文存储为字段团队成员的 ID 用作键这样就能轻松地并发更新多个团队成员地位置而不会覆盖数据同时也很容易查询给定的任意团队的位置列表因为团队就是一个个哈希事实服务事实服务负责维护每个团队成员的位置不过这些位置只代表最近从一些应用那里收到的位置关于事实服务的这类服务有两条重要的提醒需要记住事实服务并不是事件存储事实服务是不可依赖服务位置接近监控器位置接近监控器的代码包括基本的微服务结构一个队列消费端订阅 ProximityDetectedEvent 事件到达的消息调用一些第三方或云上的服务来发送推动通知运行示例项目下面列出运行本章示例的依赖项RabbitMQ 服务器Redis 服务器所有依赖项都启动运行后可从 GitHub 拉取 es-locationreporter 和 es-eventprocessor 两个服务的代码此外需要一份 teamservice 服务请确保获取的是 master 分支因为在测试期间只需要用到内存存储要启动团队服务在命令行中转到 src/StatlerWaldorfCorp.TeamService 目录并运行以下命令$ dotnet run --server.urlshttp://0.0.0.:5001要启动位置报送服务在命令行中转到 src/StatlerWaldorfCorp.LocationReporter 目录下并运行以下命令$ dotnet run --server.urlshttp://0.0.0:5002启动事件处理器从 src/StatlerWaldorfCorp.EventProcessor 目录运行$ dotnet run --server.urlshttp://0.0.0.:5003可用下列步骤端到端地检验整个事件溯源/CQRS系统1向 http://localhost:5001/teams 发送一个 POST 请求创建一个新团队2向 http://localhost:5001/teams//members 发送一个 POST 请求往团队中添加一个成员3向 http://localhost:5002/api/members//locationreports 发送一个 POST 请求报送团队成员位置4观察由报送的位置转换而成、被放到对应队列中的 MemberLocationReportedEvent 事件5再重复几次第 3 步添加一些相距较远的位置确保不会触发并被检测到位置接近事件6重复第 2 步往第一名测试成员所在团队添加一名新成员7为第二名成员再次重复第 3 步添加一个于第一名成员最近的位置相距几公里以内的位置8现在应该能够在 proximitydetected 队列中看到一条新消息9可用直接查询 Redis 缓存也可以利用事实服务来查看各团队成员最新的位置状态手动操作几次后大多数团队会花些时间把这一过程自动化借助 docker compose 之类的工具或者创建 Kubernetes 部署或者其他容器编排环境可自动将所有服务部署到集成测试环境接着用脚本发送 REST 请求待测试运行完成后断言出现了正确的接近检测的次数值也是正确的