做韩国网站有哪些,北京官网开发,seo的范畴是什么,wordpress首页在哪里修改借助RxJava 1.1.1中引入的Completable抽象#xff0c;如何并行执行阻止“仅副作用”#xff08;也称为void#xff09;任务的并行执行变得更加容易。 “ 正如您可能已经注意到#xff0c;阅读我的博客时#xff0c;我主要专注于软件Craft.io和自动代码测试。 但是#x… 借助RxJava 1.1.1中引入的Completable抽象如何并行执行阻止“仅副作用”也称为void任务的并行执行变得更加容易。 “ 正如您可能已经注意到阅读我的博客时我主要专注于软件Craft.io和自动代码测试。 但是此外我还是持续交付和广义并发的狂热者。 最后一点从C中的纯线程和信号量到更高级别的解决方案例如ReactiveX和actor模型不等。 这次是全新RxJava 1.1.1 – rx.Completable引入的非常方便在特定情况下功能的用例。 与我的许多博客条目类似这也反映了我在处理实际任务和用例时遇到的实际事件。 要做的任务 想象一下一个系统对来自不同来源的异步事件进行了非常复杂的处理。 过滤合并转换分组丰富等等。 RxJava非常适合这里特别是如果我们想要反应式的话。 假设我们已经实现了它外观和效果很好只剩下一件事了。 在开始处理之前需要告知3个外部系统我们准备好接收消息。 对旧系统的3个同步调用通过RMIJMX或SOAP。 它们每个都可以持续几秒钟我们需要在开始之前等待所有它们。 幸运的是它们已经实现我们将它们视为可能成功或失败的例外的黑匣子。 我们只需要调用它们最好同时调用并等待完成即可。 rx.Observable –方法1 触手可及的RxJava似乎是显而易见的方法。 首先可以通过Observable来包装作业执行 private ObservableVoid rxJobExecute(Job job) {return Observable.fromCallable(() - { job.execute();return null;});
} 不幸的是在我们的例子中 Observable期望返回一些元素。 我们需要使用Void并且尴尬地return null 而不是仅仅引用方法job::execute 。 接下来我们可以使用subscribeOn()方法来使用另一个线程来执行我们的工作而不是阻塞主/当前线程–我们不想顺序地执行我们的工作。 Schedulers.io()为调度Schedulers.io()提供了一组用于IO绑定工作的线程。 ObservableVoid run1 rxJobExecute(job1).subscribeOn(Schedulers.io());
ObservableVoid run2 rxJobExecute(job2).subscribeOn(Schedulers.io()); 最后我们需要等待所有它们完成所有Obvervable s完成。 为此可以调整zip功能。 它结合了Obserbable拉链发射的物品的序列号。 在我们的例子中我们只对每个Observable到的作业中的第一个伪项目感兴趣我们仅发出null以满足API并以阻塞的方式等待它们。 zip运算符中的zip函数需要返回某些内容因此我们需要重复null的变通方法。 Observable.zip(run1, run2, (r1, r2) - return null).toBlocking().single(); 显而易见 Observable设计为Observable使用值流并且需要进行一些额外的工作才能将其调整为仅产生副作用不返回任何值操作。 当我们需要将仅具有副作用的操作与其他返回一些值的值合并例如合并时情况变得更加糟糕–需要更丑陋的转换。 请参阅RxNetty API的实际用例 。 public void execute() {ObservableVoid run1 rxJobExecute(job1).subscribeOn(Schedulers.io());ObservableVoid run2 rxJobExecute(job2).subscribeOn(Schedulers.io());Observable.zip(run1, run2, (r1, r2) - null).toBlocking().single();
}private ObservableVoid rxJobExecute(Job job) {return Observable.fromCallable(() - { job.execute();return null;});
}rx.Observable –方法2 可能会使用另一种方法。 代替生成人工项目可以将我们的任务中的空Observable作为onComplete操作执行。 这迫使我们从zip操作切换到merge 。 结果我们需要提供一个onNext动作对于空的Observable永远不会执行这肯定了我们试图破解该系统的信念。 public void execute() {ObservableObject run1 rxJobExecute(job1).subscribeOn(Schedulers.io());ObservableObject run2 rxJobExecute(job2).subscribeOn(Schedulers.io());Observable.merge(run1, run2).toBlocking().subscribe(next - {});
}private ObservableObject rxJobExecute(Job job) {return Observable.empty().doOnCompleted(job::execute);
}可完成 RxJava 1.1.1解决了对不返回任何值的Observable的更好支持。 Completable可以视为Observable的简化版本可以成功完成发出onCompleted事件或失败 onError 。 创建Completable实例的最简单方法是使用fromAction方法该方法采用不返回任何值的Action0 例如Runnable 。 Completable completable1 Completable.fromAction(job1::execute).subscribeOn(Schedulers.io());
Completable completable2 Completable.fromAction(job2::execute).subscribeOn(Schedulers.io()); 接下来我们可以使用merge()方法该方法返回一个Completable实例该实例立即订阅所有下游Completable 并在所有下游Completable完成或其中一个失败时完成。 当我们使用带外部调度程序的subscribeOn方法时所有作业将并行执行在不同线程中。 Completable.merge(completable1, completable2).await(); await()方法将阻塞直到所有作业完成如果发生错误将重新抛出异常。 纯粹而简单。 public void execute() {Completable completable1 Completable.fromAction(job1::execute).subscribeOn(Schedulers.io());Completable completable2 Completable.fromAction(job2::execute).subscribeOn(Schedulers.io());Completable.merge(completable1, completable2).await();
}java.util.concurrent.CompletableFuture 有人可能会问为什么不只使用CompletableFuture 这将是一个很好的问题。 Java 5中引入的纯Future可能需要我们做更多的工作而ListenableFuture 来自Guava和CompletableFuture 来自Java 8使其变得微不足道。 首先我们需要运行/安排作业执行。 接下来使用CompletableFuture.allOf()方法我们可以创建一个新的CompletableFuture 该工作在所有作业完成CompletableFuture完成了我们之前没有看到过这个概念吗。 get()方法只是阻止等待。 public void execute() {try {CompletableFutureVoid run1 CompletableFuture.runAsync(job1::execute);CompletableFutureVoid run2 CompletableFuture.runAsync(job2::execute);CompletableFuture.allOf(run1, run2).get();} catch (InterruptedException | ExecutionException e) {throw new RuntimeException(Jobs execution failed, e);}
} 我们需要对受检查的异常进行处理很多时候我们不想用它们来污染我们的API但是总的来说这看起来很明智。 但是值得记住的是当需要更复杂的链处理时 CompletableFuture不足。 此外在我们的项目中已经使用RxJava时使用相同或相似的API而不是引入全新的东西通常会很有用。 摘要 多亏了rx.Completable 使用RxJava仅完成副作用不返回任何内容任务的执行要舒适得多。 在已经使用RxJava的代码库中即使在简单情况下它也可能比CompletableFuture更为可取。 但是 Completable提供了许多先进的操作员和技术此外还可以轻松地将它与Observable混合使用这使其功能更加强大。 要了解有关Completable更多信息您可能需要查看发行说明 。 对于那些想更深入地了解主题的人Advanced RxJava博客 第1部分和第2 部分 上有关于Completable API的非常详细的介绍。 可以从GitHub获得代码示例的源代码。 顺便说一句如果您总体上对RxJava感兴趣我可以凭良心向您推荐一本书该书目前由Tomasz Nurkiewicz和Ben Christensen – RxJava的反应式编程编写 。 翻译自: https://www.javacodegeeks.com/2016/03/parallel-execution-blocking-tasks-rxjava-completable.html