如何做企业网站,怎样做企业手机网站首页,吉工之家找工作建筑工作,wordpress创建注册回顾上文 作为单体程序#xff0c;依赖的第三方服务虽不多#xff0c;但是2C的程序还是有不少内容可讲#xff1b; 作为一个常规互联网系统#xff0c;无外乎就是接受请求、处理请求#xff0c;输出响应。 由于业务渐渐增长#xff0c;数据处理的过程会越来越复杂和冗长依赖的第三方服务虽不多但是2C的程序还是有不少内容可讲 作为一个常规互联网系统无外乎就是接受请求、处理请求输出响应。 由于业务渐渐增长数据处理的过程会越来越复杂和冗长【连贯高效的处理数据】 越来越被看重 .Net 提供了TPL Dataflow组件使我们更高效的实现基于数据流和 流水线操作的代码。 下图是单体程序中 数据处理的用例图。 程序中用到的TPL Dataflow 组件Dataflow是微软前几年给出的数据处理库 是由不同的处理块组成可将这些块组装成一个处理管道块对应处理管道中的阶段 可类比AspNetCore 中Middleware 和pipeline.。 TPL Dataflow库为消息传递和并行化CPU密集型和I / O密集型应用程序提供了编程基础这些应用程序具有高吞吐量和低延迟。它还可以让您明确控制数据的缓冲方式并在系统中移动。 为了更好地理解数据流编程模型请考虑从磁盘异步加载图像并创建这些图像的应用程序。 传统的编程模型通常使用回调和同步对象如锁来协调任务和访问共享数据 从宏观看传统模型 任务是一步步紧接着完成的。 通过使用数据流编程模型您可以创建在从磁盘读取图像时处理图像的数据流对象。在数据流模型下您可以声明数据在可用时的处理方式以及数据之间的依赖关系。由于运行时管理数据之间的依赖关系因此通常可以避免同步访问共享数据的要求。此外由于运行时调度基于数据的异步到达而工作因此数据流可以通过有效地管理底层线程来提高响应性和吞吐量。 也就是说 你定义的是任务内容和任务之间的依赖不关注数据什么时候流到这个任务 。 需要注意的是:TPL Dataflow 非分布式数据流消息在进程内传递, 使用nuget引用 System.Threading.Tasks.Dataflow 包。TPL Dataflow 核心概念 1. Buffer Block TPL Dataflow 内置的Block覆盖了常见的应用场景当然如果内置块不能满足你的要求你也可以自定“块”。 Block可以划分为下面3类 Buffering Only 【Buffer不是缓存Cache的概念 而是一个缓冲区的概念】 Execution Grouping 使用以上块混搭处理管道, 大多数的块都会执行一个操作有些时候需要将消息分发到不同Block这时可使用特殊类型的缓冲块给管道“”分叉”。 2. Execution Block 可执行的块有两个核心组件 输入、输出消息的缓冲区一般称为Input,Output队列 在消息上执行动作的委托 消息在输入和输出时能够被缓冲当Func委托的运行速度比输入的消息速度慢时后续消息将在到达时进行缓冲当下一个块的输入缓冲区中没有容量时将在输出时缓冲。 每个块我们可以配置 缓冲区的总容量 默认无上限 执行操作委托的并发度 默认情况下块按照顺序处理消息一次一个。 我们将块链接在一起形成一个处理管道生产者将消息推向管道。 TPL Dataflow有一个基于pull的机制使用Receive和TryReceive方法但我们将在管道中使用块连接和推送机制。 TransformBlockExecution category-- 由输入输出缓冲区和一个FuncTInput, TOutput委托组成消费的每个消息都会输出另外一个你可以使用这个Block去执行输入消息的转换或者转发输出的消息到另外一个Block。 TransformManyBlock (Execution category) -- 由输入输出缓冲区和一个FuncTInput, IEnumerableTOutput委托组成 它为输入的每个消息输出一个 IEnumerableTOutput BroadcastBlock Buffering category-- 由只容纳1个消息的缓冲区和FuncT, T委托组成。缓冲区被每个新传入的消息所覆盖委托仅仅为了让你控制怎样克隆这个消息不做消息转换。 该块可以链接到多个块管道的分叉虽然它一次只缓冲一条消息但它一定会在该消息被覆盖之前将该消息转发到链接块链接块还有缓冲区。 ActionBlock Execution category-- 由缓冲区和ActionT委托组成他们一般是管道的结尾他们不再给其他块转发消息他们只会处理输入的消息。 BatchBlock (Grouping category)-- 告诉它你想要的每个批处理的大小它将累积消息直到它达到那个大小然后将它作为一组消息转发到下一个块。 还有一下其他的Block类型BufferBlock、WriteOnceBlock、JoinBlock、BatchedJoinBlock我们暂时不会深入。 3. Pipeline Chain React 当输入缓冲区达到上限容量为其供货的上游块的输出缓冲区将开始填充当输出缓冲区已满时该块必须暂停处理直到缓冲区有空间这意味着一个Block的处理瓶颈可能导致所有前面的块的缓冲区被填满。 但是不是所有的块变满时都会暂停BroadcastBlock 有允许1个消息的缓冲区每个消息都会被覆盖 因此如果这个广播块不能将消息转发到下游则在下个消息到达的时候消息将丢失这在某种意义上是一种限流比较生硬. 编程实践 将按照上图实现TPL Dataflow ① 定义Dataflow pipeline public EqidPairHandler(IHttpClientFactory httpClientFactory, RedisDatabase redisCache, IConfiguration con, LogConfig logConfig, ILoggerFactory loggerFactory){_httpClient httpClientFactory.CreateClient(bce-request);_redisDB0 redisCache[0];_redisDB redisCache;_logger loggerFactory.CreateLogger(nameof(EqidPairHandler));var option new DataflowLinkOptions { PropagateCompletion true };publisher _redisDB.RedisConnection.GetSubscriber();_eqid2ModelTransformBlock new TransformBlockEqidPair, EqidModel(// redis piublih 没有做在TransformBlock fun里面 因为publih失败可能影响后续的block传递eqidPair EqidResolverAsync(eqidPair),new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism con.GetValueint(MaxDegreeOfParallelism)});// https://docs.microsoft.com/en-us/dotnet/standard/parallel-programming/walkthrough-creating-a-dataflow-pipeline_logBatchBlock new LogBatchBlockEqidModel(logConfig, loggerFactory);_logPublishBlock new ActionBlockEqidModel(x PublishAsync(x) );_broadcastBlock new BroadcastBlockEqidModel(x x); // 由只容纳一个消息的缓存区和拷贝函数组成_broadcastBlock.LinkTo(_logBatchBlock.InputBlock, option);_broadcastBlock.LinkTo(_logPublishBlock, option);_eqid2ModelTransformBlock.LinkTo(_broadcastBlock, option);} public class LogBatchBlockT : ILogDestinationT where T : IModelBase{private readonly string _dirPath;private readonly Timer _triggerBatchTimer;private readonly Timer _openFileTimer;private DateTime? _nextCheckpoint;private TextWriter _currentWriter;private readonly LogHead _logHead;private readonly object _syncRoot new object();private readonly ILogger _logger;private readonly BatchBlockT _packer;private readonly ActionBlockT[] batchWriterBlock;private readonly TimeSpan _logFileIntervalTimeSpan;/// summary/// Generate request log file./// /summarypublic LogBatchBlock(LogConfig logConfig, ILoggerFactory loggerFactory){_logger loggerFactory.CreateLoggerLogBatchBlockT();_dirPath logConfig.DirPath;if (!Directory.Exists(_dirPath)){Directory.CreateDirectory(_dirPath);}_logHead logConfig.LogHead;_packer new BatchBlockT(logConfig.BatchSize);batchWriterBlock new ActionBlockT[](models WriteToFile(models)); _packer.LinkTo(batchWriterBlock, new DataflowLinkOptions { PropagateCompletion true });_triggerBatchTimer new Timer(state {_packer.TriggerBatch();}, null, TimeSpan.Zero, TimeSpan.FromSeconds(logConfig.Period));_logFileIntervalTimeSpan TimeSpan.Parse(logConfig.LogFileInterval);_openFileTimer new Timer(state {AlignCurrentFileTo(DateTime.Now);}, null, TimeSpan.Zero, _logFileIntervalTimeSpan);}public ITargetBlockT InputBlock _packer;private void AlignCurrentFileTo(DateTime dt){if (!_nextCheckpoint.HasValue){OpenFile(dt);}if (dt _nextCheckpoint.Value){CloseFile();OpenFile(dt);}}private void OpenFile(DateTime now, string fileSuffix null){string filePath null;try{var currentHour now.Date.AddHours(now.Hour);_nextCheckpoint currentHour.Add(_logFileIntervalTimeSpan);int hourConfiguration _logFileIntervalTimeSpan.Hours;int minuteConfiguration _logFileIntervalTimeSpan.Minutes;filePath ${_dirPath}/u_ex{now.ToString(yyMMddHH)}{fileSuffix}.log;var appendHead !File.Exists(filePath);if (filePath ! null){var stream new FileStream(filePath, FileMode.Append, FileAccess.Write);var sw new StreamWriter(stream, Encoding.Default);if (appendHead){sw.Write(GenerateHead());}_currentWriter sw;_logger.LogDebug(${currentHour} TextWriter has been created.);}}catch (UnauthorizedAccessException ex){_logger.LogWarning(I/O error or specific type of scecurity error,{0}, ex);throw;}catch (Exception e){if (fileSuffix null){_logger.LogWarning($OpenFile failed:{e.StackTrace.ToString()}:{e.Message}., e.StackTrace);OpenFile(now, $-{Guid.NewGuid()});}else{_logger.LogError($OpenFile failed after retry: {filePath}, e);throw;}}}private void CloseFile(){if (_currentWriter ! null){_currentWriter.Flush();_currentWriter.Dispose();_currentWriter null;_logger.LogDebug(${DateTime.Now} TextWriter has been disposed.);}_nextCheckpoint null;}private string GenerateHead(){StringBuilder head new StringBuilder();head.AppendLine(#Software: _logHead.Software).AppendLine(#Version: _logHead.Version).AppendLine($#Date: {DateTime.UtcNow.ToString(yyyy-MM-dd HH:mm:ss)}).AppendLine(#Fields: _logHead.Fields);return head.ToString();}private void WriteToFile(T[] models){try{lock (_syncRoot){var flag false;foreach (var model in models){if (model null)continue;flag true;AlignCurrentFileTo(model.ServerLocalTime);_currentWriter.WriteLine(model.ToString());}if (flag)_currentWriter.Flush();}}catch (Exception ex){_logger.LogError(WriteToFile Error : {0}, ex.Message);}}public bool AcceptLogModel(T model){return _packer.Post(model);}public string GetDirPath(){return _dirPath;}public async Task CompleteAsync(){_triggerBatchTimer.Dispose();_openFileTimer.Dispose();_packer.TriggerBatch();_packer.Complete();await InputBlock.Completion;lock (_syncRoot){CloseFile();}}} 仿IIS日志存储代码 ② 异常处理 上述程序在部署时就遇到相关的坑位在测试环境_eqid2ModelTransformBlock 内Func委托稳定执行程序并未出现异样 部署到生产之后 该Pipeline会运行一段时间就停止工作一直很困惑 后来通过监测_eqid2ModelTransformBlock.Completion 属性该块提前进入“完成态” 程序在执行某次Func委托时报错Block提前进入完成态 TransfomrBlock.Completion 一个Task对象当TPL Dataflow不再处理消息并且能保证不再处理消息的时候就被定义为完成态 Task对象的TaskStatus枚举值将标记此Block进入完成态的真实原因 - TaskStatus.RanToCompletion 根据Block定义的任务成功完成 - TaskStatus.Fault 因为未处理的异常 导致过早的完成 - TaskStatus.Cancled 因为取消操作 导致 过早的完成 我们需要小心处理异常 一般情况下我们使用try、catch包含所有的执行代码以确保所有的异常都被处理。 可将TPL Dataflow 做为进程内消息队列本文只是一个入门参考更多复杂用法还是看官网 你需要记住的是 这是一个.Net 进程内数据流组件 能让你专注于流程。 作者JulianHuang 感谢您的认真阅读如有问题请大胆斧正觉得有用请下方或加关注。 本文欢迎转载但请保留此段声明且在文章页面明显位置注明本文的作者及原文链接。 转载于:https://www.cnblogs.com/JulianHuang/p/11177766.html