当前位置: 首页 > news >正文

网站建设山西wordpress 404模板

网站建设山西,wordpress 404模板,手机上的应用商店,企业资源管理软件kata在第1部分#xff1a;线程池中#xff0c;我们设计并实现了相对简单的系统#xff0c;用于实时处理事件。 确保您阅读了上一部分#xff0c;因为它包含一些我们将重用的类。 以防万一这是要求#xff1a; 一个系统每秒发送大约一千个事件。 每个Event至少具有两个属性… kata 在第1部分线程池中我们设计并实现了相对简单的系统用于实时处理事件。 确保您阅读了上一部分因为它包含一些我们将重用的类。 以防万一这是要求 一个系统每秒发送大约一千个事件。 每个Event至少具有两个属性 clientId –我们期望一个客户端每秒最多可以处理几个事件 UUID –全球唯一 消耗一个事件大约需要10毫秒。 设计此类流的使用者 允许实时处理事件 与一个客户端有关的事件应按顺序进行处理即您不能并行处理同一clientId事件 如果10秒钟内出现重复的UUID 请将其删除。 假设10秒钟后不会出现重复 到目前为止我们提出的是线程池和共享缓存的组合。 这次我们将使用RxJava实现解决方案。 首先我没有透露EventStream的实现方式仅提供了API interface EventStream {void consume(EventConsumer consumer);} 实际上对于手动测试我构建了一个简单的RxJava流其行为与系统的要求类似 Slf4j class EventStream {void consume(EventConsumer consumer) {observe().subscribe(consumer::consume,e - log.error(Error emitting event, e));}ObservableEvent observe() {return Observable.interval(1, TimeUnit.MILLISECONDS).delay(x - Observable.timer(RandomUtils.nextInt(0, 1_000), TimeUnit.MICROSECONDS)).map(x - new Event(RandomUtils.nextInt(1_000, 1_100), UUID.randomUUID())).flatMap(this::occasionallyDuplicate, 100).observeOn(Schedulers.io());}private ObservableEvent occasionallyDuplicate(Event x) {final ObservableEvent event Observable.just(x);if (Math.random() 0.01) {return event;}final ObservableEvent duplicated event.delay(RandomUtils.nextInt(10, 5_000), TimeUnit.MILLISECONDS);return event.concatWith(duplicated);}} 了解此模拟器的工作原理不是必不可少的但很有趣。 首先我们产生的源源不断的Long值 0 1 2 ...每毫秒使用每秒千个事件 interval()操作。 然后我们使用delay()运算符将每个事件延迟0到1_000微秒之间的随机时间。 这样事件将在难以预测的时刻出现而情况会更加现实。 最后我们将每个Long值映射使用ekhem map()运算符 map()到一个随机Event其中clientId介于1_000到1_100 包含在内之间。 最后一点很有趣。 我们想模拟偶尔的重复。 为此我们将每个事件使用flatMap() 映射到自身在99的情况下。 但是在1的情况下我们两次返回此事件第二次发生在10毫秒至5秒后。 在实践中该事件的重复实例将在其他数百个事件之后出现这使流的行为逼真。 与EventStream交互的方式有两种-通过consume()回调和通过observe()流。 我们可以利用ObservableEvent来快速建立功能与第1部分非常相似但更简单的处理管道。 缺少背压 利用RxJava的第一个幼稚方法很快就失败了 EventStream es new EventStream(); EventConsumer clientProjection new ClientProjection(new ProjectionMetrics(new MetricRegistry()));es.observe().subscribe(clientProjection::consume,e - log.error(Fatal error, e)); ClientProjection ProjectionMetrics等人来自第1部分 。 我们几乎立即获得MissingBackpressureException 这是预期的。 还记得我们的第一个解决方案是如何通过处理越来越多的延迟来滞后吗 RxJava尝试避免这种情况并避免队列溢出。 由于使用者 ClientProjection 无法实时处理事件因此抛出MissingBackpressureException 。 这是快速失败的行为。 最快的解决方案是像以前一样使用RxJava的功能将消耗转移到单独的线程池中 EventStream es new EventStream(); EventConsumer clientProjection new FailOnConcurrentModification(new ClientProjection(new ProjectionMetrics(new MetricRegistry())));es.observe().flatMap(e - clientProjection.consume(e, Schedulers.io())).window(1, TimeUnit.SECONDS).flatMap(Observable::count).subscribe(c - log.info(Processed {} events/s, c),e - log.error(Fatal error, e)); EventConsumer接口具有一个辅助方法该方法可以在提供的Scheduler上异步使用事件 FunctionalInterface interface EventConsumer {Event consume(Event event);default ObservableEvent consume(Event event, Scheduler scheduler) {return Observable.fromCallable(() - this.consume(event)).subscribeOn(scheduler);}} 通过在单独的Scheduler.io()使用flatMap()使用事件可以异步调用每个使用。 这次事件几乎是实时处理的但是存在更大的问题。 由于某种原因我用FailOnConcurrentModification装饰了ClientProjection 。 事件彼此独立使用因此可能会同时处理同一clientId两个事件。 不好。 幸运的是在RxJava中解决此问题比使用普通线程要容易得多 es.observe().groupBy(Event::getClientId).flatMap(byClient - byClient.observeOn(Schedulers.io()).map(clientProjection::consume)).window(1, TimeUnit.SECONDS).flatMap(Observable::count).subscribe(c - log.info(Processed {} events/s, c),e - log.error(Fatal error, e)); 有点改变了。 首先我们将事件按clientId分组。 这将单个Observable流拆分为流 。 每个名为byClient子流代表与同一clientId相关的所有事件。 现在如果我们映射到此子流我们可以确保与同一个clientId相关的事件不会同时处理。 外部流是惰性的因此我们必须订阅它。 与其单独订阅每个事件我们不每秒收集事件并进行计数。 这样我们每秒就会收到一个Integer类型的单个事件该事件表示每秒消耗的事件数。 使用全局状态的不纯非惯常容易出错不安全的重复数据删除解决方案 现在我们必须删除重复的UUID 。 丢弃重复项的最简单但非常愚蠢的方法是利用全局状态。 我们可以通过在filter()运算符之外可用的缓存中查找重复项来简单地过滤掉重复项 final CacheUUID, UUID seenUuids CacheBuilder.newBuilder().expireAfterWrite(10, TimeUnit.SECONDS).build();es.observe().filter(e - seenUuids.getIfPresent(e.getUuid()) null).doOnNext(e - seenUuids.put(e.getUuid(), e.getUuid())).subscribe(clientProjection::consume,e - log.error(Fatal error, e)); 如果要监视此机制的使用只需添加指标 Meter duplicates metricRegistry.meter(duplicates);es.observe().filter(e - {if (seenUuids.getIfPresent(e.getUuid()) ! null) {duplicates.mark();return false;} else {return true;}}) 从操作员内部访问全局状态尤其是可变状态是非常危险的并且破坏了RxJava的唯一目的–简化并发。 显然我们使用了Guava的线程安全Cache 但是在许多情况下很容易错过从多个线程访问共享全局可变状态的地方。 如果您发现自己在运算符链之外对某些变量进行了变异请非常小心。 RxJava 1.x中的自定义 RxJava 1.x有一个distinct()运算符大概可以完成此工作 es.observe().distinct(Event::getUuid).groupBy(Event::getClientId) 不幸的是 distinct()在内部将所有密钥 UUID distinct()存储在不断增长的HashSet 。 但是我们只关心最近10秒钟内的重复 通过复制粘贴DistinctOperator的实现我创建了DistinctEvent运算符该运算符利用Guava的缓存仅存储了最后10秒钟的UUID值。 我故意在此运算符中对Event进行硬编码而不是使其通用性更强以使代码更易于理解 class DistinctEvent implements Observable.OperatorEvent, Event {private final Duration duration;DistinctEvent(Duration duration) {this.duration duration;}Overridepublic Subscriber? super Event call(Subscriber? super Event child) {return new SubscriberEvent(child) {final MapUUID, Boolean keyMemory CacheBuilder.newBuilder().expireAfterWrite(duration.toMillis(), TimeUnit.MILLISECONDS).UUID, Booleanbuild().asMap();Overridepublic void onNext(Event event) {if (keyMemory.put(event.getUuid(), true) null) {child.onNext(event);} else {request(1);}}Overridepublic void onError(Throwable e) {child.onError(e);}Overridepublic void onCompleted() {child.onCompleted();}};} } 用法非常简单整个实现加上自定义运算符如下 es.observe().lift(new DistinctEvent(Duration.ofSeconds(10))).groupBy(Event::getClientId).flatMap(byClient - byClient.observeOn(Schedulers.io()).map(clientProjection::consume)).window(1, TimeUnit.SECONDS).flatMap(Observable::count).subscribe(c - log.info(Processed {} events/s, c),e - log.error(Fatal error, e)); 实际上如果您跳过每秒的日志记录它甚至可以更短 es.observe().lift(new DistinctEvent(Duration.ofSeconds(10))).groupBy(Event::getClientId).flatMap(byClient - byClient.observeOn(Schedulers.io()).map(clientProjection::consume)).subscribe(e - {},e - log.error(Fatal error, e)); 该解决方案比以前的基于线程池和装饰器的解决方案要短得多。 唯一尴尬的部分是自定义运算符它在存储太多历史UUID时避免内存泄漏。 幸运的是RxJava 2得以解救 RxJava 2.x和更强大的内置 实际上我是从提交公关RxJava具有更强大的执行这种紧密distinct()操作。 但是在我检查2.x分支之前它是 distinct()允许提供自定义Collection 而不是硬编码的HashSet 。 信不信由你依赖倒置不仅涉及Spring框架或Java EE。 当库允许您提供其内部数据结构的自定义实现时这也是DI。 首先我创建一个辅助方法该方法可以构建由MapUUID, Boolean支持由CacheUUID, Boolean支持的SetUUID CacheUUID, Boolean 。 我们一定喜欢代表团 private SetUUID recentUuids() {return Collections.newSetFromMap(CacheBuilder.newBuilder().expireAfterWrite(10, TimeUnit.SECONDS).UUID, Booleanbuild().asMap()); } 有了这种方法我们可以使用以下表达式实现整个任务 es.observe().distinct(Event::getUuid, this::recentUuids).groupBy(Event::getClientId).flatMap(byClient - byClient.observeOn(Schedulers.io()).map(clientProjection::consume)).subscribe(e - {},e - log.error(Fatal error, e)); 优雅简洁清晰 它看起来像是一个问题 观察事件流 仅考虑不同的UUID 客户分组活动 为每个客户消耗顺序 希望您喜欢所有这些解决方案并发现它们对您的日常工作很有用。 也可以看看 小规模流处理kata。 第1部分线程池 小规模流处理kata。 第2部分RxJava 1.x / 2.x 翻译自: https://www.javacodegeeks.com/2016/10/small-scale-stream-processing-kata-part-2-rxjava-1-x2-x.htmlkata
http://www.huolong8.cn/news/324124/

相关文章:

  • 建设网站租服务器装修在线设计平台
  • 网站开发实例企业门户网站源码下载
  • 030159网站建设与维护昆仑万维做网站
  • 卡片式设计的网站最好网站建设
  • 网站页面设计需求网站备案需要哪些资料
  • 千博企业网站管理系统完整版 2014那做网站
  • 福建省建设厅网站人员服装厂家东莞网站建设
  • 网站的建设费用预算工信部网站icp备案查询
  • 贵州省网站备案企业网站做静态网站还是
  • 新网站必须做301定向吗高仿微博wordpress
  • 网盘视频直接做网站在东莞做网站
  • 重庆网站设计哪家好少儿培训
  • 企业网站建设有几种青岛手机建站多少钱
  • 自己做的网站能赚钱吗国内知名展示设计公司
  • 网站开发计入会计 什么科目营销型网站如何策划
  • 东营网站设计公司建筑门户网站
  • 网站的在线支付模块怎么做企业所得税优惠政策2020
  • 坪山网站建设行情做网站除了域名还要买什么
  • 免费资料网站网址下载视频网站开发php
  • 什么是网站建设技术react 网站开发
  • 校园淘宝店网站开发百度灰色关键词代发
  • 深圳外贸建站网络推广公司做爰全程的网站
  • 如何做网站连接手机app ui设计
  • 免费建立网站教程php作品源代码免费下载
  • 如何做网站的营销高端网页制作公司
  • 美仑-专门做服装的网站餐饮网站开发
  • 做网站的手机软件济南企业网站搭建
  • 专业做网站+上海建设制作网站
  • 做网站公司 备案在线做头像网站
  • 常德营销型网站建设小白如何建网站