做网站认证违法吗,wordpress 文章表,企业信息填报报送年度报告,网站域名怎么购买受Heinz Kabutz最近的时事通讯以及我在最近的书中研究的Scala的期货的启发#xff0c;我着手使用Java 8编写了一个示例#xff0c;该示例如何将工作提交给执行服务并异步地响应其结果#xff0c;并使用了回调。无需阻止任何线程等待执行服务的结果。 理论认为#xff0c;调… 受Heinz Kabutz最近的时事通讯以及我在最近的书中研究的Scala的期货的启发我着手使用Java 8编写了一个示例该示例如何将工作提交给执行服务并异步地响应其结果并使用了回调。无需阻止任何线程等待执行服务的结果。 理论认为调用拦截方法如get上java.util.concurrent.Future是坏的因为系统会需要超过线程的最佳数量如果它是不断做工作并在浪费时间与结果上下文切换 。 在Scala世界中像Akka这样的框架都使用编程模型这意味着这些框架永远不会阻塞-线程阻塞的唯一时间是用户对阻塞的对象进行编程时他们不愿意这样做。 通过永不阻塞该框架可以避免每个内核使用大约一个线程这比说说标准JBoss Java EE Application Server在启动后最多拥有400个线程要少得多。 很大程度上归功于Akka框架的工作Scala 2.10添加了Futures和Promises 但是Java中还不存在这些东西。 以下代码显示了我的预期目标。 它分为三个部分。 首先使用在类ch.maxant.async.Future找到的static future方法将新任务添加到执行服务中。 它返回一个Future 但不是从java.util.concurrent包中返回一个Future 而是从ch.maxant.async包中返回其子类。 其次 Future具有一种名为map的方法该方法遵循Scala或新的Java 8 Stream类的功能样式。 map方法允许您注册一个回调或更准确地说我们可以将第一个future包含的值映射转换为新值。 在第一个Future完成后映射将在将来的其他时间执行因此会产生新的Future 。 第三我们在Future类中使用另一个方法来注册一个回调一旦我们创建的所有期货都完成该回调将运行。 任何时候都不会使用Future API的任何阻塞方法 final Random random new Random();
int numTasks 10;
ListFutureInteger futures new ArrayList();for(int i 0; i numTasks; i){final int j i;log(adding future i);// PART 1//start some work async / in the futureFutureString f future(new TaskString( () - {sleep(random.nextInt(1000));if(j 5){log(working success);return 20;}else{log(working failure);throw new Exception();}}));// PART 2//register a callback, to be called when the work is donelog(adding mapping callback to future);final FutureInteger f2 f.map( (TryString stringNumber) - {return stringNumber.map( (String s) - {log(mapping s to int);return Integer.parseInt(s);}).recover( (Exception e) - {log(recovering);return -10;}).get(); //wont throw an exception, because we provided a recovery!});futures.add(f2);
}// PART 3
log(registering callback for final result);
Future.registerCallback(futures, (ListTryInteger results) - {Integer finalResult results.stream().map( (TryInteger t) - {log(mapping t);try {return t.get();} catch (Exception e) {return 0;}}).reduce(0, (Integer i1, Integer i2) - {log(reducing i1 and i2);return i1 i2;});log(final result is finalResult);Future.shutdown();if(finalResult ! 50){throw new RuntimeException(FAILED);}else{log(SUCESS);}
});System.out.println(Completed submitting all tasks on thread Thread.currentThread().getId());//this main thread will now die, but the Future executor is still up and running. the callback will shut it down and with it, the jvm. 第11行调用了future方法来注册一个新Task 该Task是使用Work实例构造的在这里是使用Java 8 lambda构造的。 工作会睡一会儿然后要么返回数字20作为字符串要么抛出异常以演示如何处理错误。 使用第11行从执行服务返回的Future 第25行将其值从字符串映射到整数从而生成FutureInteger而不是FutureString 。 该结果将添加到第35行的Future列表中第3部分在第40行中使用该列表registerCallback方法将确保在完成最后一个future之后调用给定的回调。 第25-33行的映射使用传递给Try对象的lambda完成。 Try有点像Java 8 Optional 它是Success和Failure类的抽象超类我是根据对Scala对应类的了解而实现的。 与必须显式检查错误相比它可使程序员更轻松地处理故障。 我对Try接口的实现如下 public interface TryT {/** returns the value, or throws an exception if its a failure. */T get() throws Exception;/** converts the value using the given function, resulting in a new Try */S TryS map(Function1T, S func);/** can be used to handle recovery by converting the exception into a {link Try} */TryT recover(RecoveryT r);} 发生的事情是Success和Failure的实现可以优雅地处理错误。 例如如果第一个清单的第11行上的Future完成但有例外则将第一个清单的第25行上的lambda传递给Failure对象并且在Failure上调用map方法绝对没有任何作用。 没有例外没有任何问题。 为了进行补偿您可以在第一个清单的第29行上调用recover方法该方法使您可以处理异常并返回程序可以继续使用的值例如默认值。 另一方面 Success类以不同的方式实现Try接口的map和recover方法这样调用map会导致给定的函数被调用但是调用recover绝对不会执行任何操作。 map和recover方法无需显式编码try / catch块而是提供了一种更好的语法该语法在读取或查看代码时更容易验证与编写相比这种情况在代码中更常见。 由于map和recover方法将函数的结果包装在Try s中因此可以将调用链接在一起例如第Try和32行。Scala的Try API具有比我在这里实现的三种方法更多的方法。 请注意我选择在Try API java.util.function.Function不使用java.util.function.Function 因为它的apply方法不会throw Exception 这意味着第一个清单中显示的代码不像现在那样好。 相反我写了 Function1接口。 难题的第3部分是如何使程序在所有Future完成之后做一些有用的事情而又不会像对Future#get()方法那样讨厌调用。 解决方案是注册一个回调如第40行所示。该回调与此处显示的所有其他回调一样都已提交给执行服务。 这意味着我们无法保证哪个线程可以运行它这会带来副作用即线程本地存储TLS不再起作用-某些框架的较旧版本Hibernate依赖TLS而它们只会胜任。在这里工作。 Scala有一个很好的方法可以使用implicit关键字来解决该问题而Java还没有但…因此需要使用其他机制。 我在提到它只是为了让您知道它。 因此当最后一个Future完成时将调用40-60行并传递包含Integer而不是Future的Try的List 。 registerCallback方法将期货转换为适当的Success或Failure 。 但是我们如何将它们转换为有用的东西呢 幸运的是Java 8现在有了一个简单的map / reduce就支持了Stream类该类通过调用stream()方法从第42行的Try集合中Try了。 首先我将Try映射转换为它们的值然后在第49行上将流减少为单个值。我本可以使用而不是传递自己的求和值的lambda实现。 Integer::sum 例如someStream.reduce(0, Integer::sum) 。 我上次运行该程序时它输出以下内容 Thread-1 says: adding future 0
Thread-1 says: adding mapping callback to future
Thread-1 says: adding future 1
Thread-1 says: adding mapping callback to future
Thread-1 says: adding future 2
Thread-1 says: adding mapping callback to future
Thread-1 says: adding future 3
Thread-1 says: adding mapping callback to future
Thread-1 says: adding future 4
Thread-1 says: adding mapping callback to future
Thread-1 says: adding future 5
Thread-1 says: adding mapping callback to future
Thread-1 says: adding future 6
Thread-1 says: adding mapping callback to future
Thread-1 says: adding future 7
Thread-1 says: adding mapping callback to future
Thread-1 says: adding future 8
Thread-1 says: adding mapping callback to future
Thread-1 says: adding future 9
Thread-1 says: adding mapping callback to future
Thread-1 says: registering callback for final result
Thread-10 says: working success
Completed submitting all tasks on thread 1
Thread-14 says: working success
Thread-10 says: working failure
Thread-14 says: working failure
Thread-12 says: working success
Thread-10 says: working failure
Thread-10 says: mapping 20 to int
Thread-10 says: mapping 20 to int
Thread-10 says: recovering
Thread-10 says: recovering
Thread-10 says: mapping 20 to int
Thread-10 says: recovering
Thread-11 says: working success
Thread-11 says: mapping 20 to int
Thread-13 says: working success
Thread-10 says: mapping 20 to int
Thread-12 says: working failure
Thread-12 says: recovering
Thread-14 says: working failure
Thread-14 says: recovering
Thread-14 says: mapping Success(20)
Thread-14 says: mapping Success(20)
Thread-14 says: mapping Success(20)
Thread-14 says: mapping Success(20)
Thread-14 says: mapping Success(20)
Thread-14 says: mapping Success(-10)
Thread-14 says: mapping Success(-10)
Thread-14 says: mapping Success(-10)
Thread-14 says: mapping Success(-10)
Thread-14 says: mapping Success(-10)
Thread-14 says: final result is 50
Thread-14 says: SUCESS 如您所见主线程添加了所有任务并注册了所有映射功能第1-20行。 然后它注册回调输出的第21行与清单的第39行相对应最后从清单中的第63行输出文本此后它死了因为它无事可做。 然后输出的第22行和第24-42行显示了池中的各个线程包含5个线程这些线程处理工作以及从String到Integer的映射或从异常中恢复。 这是第一个清单的第1部分和第2部分中的代码。 您可以看到它是完全异步的在所有初始工作完成之前会发生一些映射/恢复将第38行或第40行分别映射到输出的第41行并将其映射到输出的第41行此行是最后的输出最初的工作。 第43-52行是map / reduce的输出它是主列表的第3部分。 请注意没有记录reduce因为我运行的代码位于Github上使用上面提到的Integer::sum快捷方式而不是上面显示的第一个清单的第50-51行。 尽管使用Java 6甚至5可以实现所有这些功能例如通过获取提交到池中的任务来自己提交回调但是一旦完成执行该操作所需的代码量就会更大并且代码本身将比此处显示的代码丑陋。 可以使用回调进行映射的Java 8 lambda Future以及具有简洁错误处理功能的Try API都可以使此处所示的解决方案更具可维护性。 上面显示的代码以及ch.maxant.async包中类的代码在Apache License Version 2.0下可用并且可以从我的Github帐户下载。 参考 Zoo博客The Kitchen中来自JCG合作伙伴 Ant Kutschera 的非阻塞式异步Java 8和Scala的Try / Success / Failure 。 翻译自: https://www.javacodegeeks.com/2013/10/non-blocking-asynchronous-java-8-and-scalas-trysuccessfailure.html