天门市网站建设,企业网站管理系统站长之家,wex5 wordpress,不会代码建设网站本专栏学习内容又是来自尚硅谷周阳老师的视频 有兴趣的小伙伴可以点击视频地址观看 在学习CompletableFuture之前#xff0c;必须要先了解一下Future
Future
概念
Future接口#xff08;FutureTask实现类#xff09;定义了操作异步任务执行的一些方法#xff0c;如获取异… 本专栏学习内容又是来自尚硅谷周阳老师的视频 有兴趣的小伙伴可以点击视频地址观看 在学习CompletableFuture之前必须要先了解一下Future
Future
概念
Future接口FutureTask实现类定义了操作异步任务执行的一些方法如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等。
比如主线程在执行多个任务时其中有一个任务耗时特别久可以交给子线程执行主线程继续做自己的事情过一会再去获取子任务的执行结果。
FutureTask
FutureTask是Future的一个实现类
引出FutureTask
Future提供了一种异步并行计算的功能目的是异步多线程任务执行且返回有结果三个特点
多线程有返回异步任务 说句题外话小黄一直不了解多线程和异步有什么区别刚好问了ChatGpt 多线程和异步是两个相关但又不完全相同的概念。 多线程指的是在一个程序中同时运行多个线程每个线程都是独立执行的。多线程可以使程序具有并发性提高系统的吞吐量和响应速度。通过多线程可以将耗时的操作和任务分配给不同的线程来执行从而提高整体的工作效率。 异步指的是一种编程模型其中任务的执行不需要等待前一个任务的完成。在异步编程中可以执行一个任务而不需要阻塞等待其完成而是继续执行后续的任务。异步操作通常适用于涉及网络请求、文件读写、数据库查询等可能造成阻塞的操作。通过异步操作可以充分利用等待时间来执行其他任务提高程序的效率。 多线程和异步之间存在一定的关系。在实现异步操作时通常会使用多线程来实现异步任务的执行。通过创建新的线程可以在后台执行耗时的操作而不会阻塞主线程的执行。 例如对于一个网络请求使用多线程可以在主线程发送请求后继续执行其他任务而不需要等待网络请求返回的数据。当网络请求完成后在新的线程中处理响应数据从而实现异步操作。 总结起来多线程是一种实现并发性的机制而异步是一种编程模型在实现异步操作时通常会使用多线程来达到异步执行的效果。多线程可以提供资源的并行使用而异步可以提高程序的运行效率和响应性。 来看一下创建线程的方式
实现Runnable接口可以开启多线程
实现Callable接口可以开启多线程并且有返回值
class MyThread1 implements Runnable {Overridepublic void run() {}
}class MyThread2 implements CallableString {Overridepublic String call() throws Exception {return null;}
}但是Thread的构造方法中只有参数为Runnable的方法无法满足我们的需求 继续往下看发现RunnableFuture接口继承了Runnable以及Future那也就是说他具有多线程、异步两个特点 而FutureTask实现了RunnableFuture接口并且他的构造方法中可以传入Callable那么他就同时具有了多线程、异步、有返回三大特点。 优点
Future配合线程池异步多线程能显著提高程序的效率
需求
主线程需要执行三个任务且三个任务耗时分别是500毫秒、300毫秒、300毫秒如果主线程自己执行的话那程序至少需要花费11秒的时间现在使用Future 线程池来优化
实现
public static void main(String[] args) throws InterruptedException {//创建线程池ExecutorService threadPool Executors.newFixedThreadPool(3);long startTime new Date().getTime();//创建任务FutureTaskString task1 new FutureTask(() - {Thread.sleep(500);return task1 over;});threadPool.submit(task1,t1);FutureTaskString task2 new FutureTask(() - {Thread.sleep(300);return task2 over;});threadPool.submit(task2,t2);Thread.sleep(300);System.out.println(task3 over);long endTime new Date().getTime();System.out.println(花费 (endTime - startTime) 毫秒); //花费338毫秒
}缺点
get方法会阻塞
调用task1.get()会使主线程阻塞因为get()他会一直等待子线程返回结果在继续运行。
public static void main(String[] args) throws ExecutionException, InterruptedException {//创建任务FutureTaskString task1 new FutureTask(() - {Thread.sleep(5000);return task1 over;});new Thread(task1,t1).start();System.out.println(t1线程结果 task1.get());System.out.println(主线程执行完毕);
}isDone方法轮询
对于上述代码没有一个友好的提示导致我们不知道程序为何阻塞FutureTask提供了isDone()调用该方法结果为true表示线程执行完毕。
但是这种方法的结果就是需要不停的轮询大量的消耗了CPU
public static void main(String[] args) throws ExecutionException, InterruptedException {//创建任务FutureTaskString task1 new FutureTask(() - {Thread.sleep(5000);return task1 over;});new Thread(task1,t1).start();while (true) {if (task1.isDone()) {System.out.println(t1线程结果 task1.get());break;}else {Thread.sleep(500);System.out.println(请等待);}}System.out.println(主线程执行完毕);
}更复杂的任务
对于简单的任务使用Future完全可以解决下面有几个更为复杂的需求Future不好解决了 多个任务前后可以组合处理 例如子线程A计算返回的结果在子线程B中需要用到 对计算速度选最快 例如联机游戏谁先到终点谁就赢了那么当A到达终点时B的线程也需要中断
对此就引出了CompletableFuture这就有点像一个名场面东厂管不了的我西厂来管东厂管得了的我西厂更要管也就是说Future有的功能CompletableFuture都有Future没有的功能CompletableFuture也有有点像plus版本。
CompletableFuture
之前介绍了Future发现他只能解决一些简单的逻辑并且阻塞的方式和异步编程的设计理念相违背而轮询的方式会消耗无谓的CPU资源所有CompletableFuture应运而生。
概念
CompletableFuture提供了一种类似于观察者模式的机制可以让任务执行完后通知监听的一方。
他是JDK1.8新增的类实现了Future、CompletionStage接口
Future不用在介绍了CompletionStage提供了一种处理异步操作结果的机制可以与回调函数一起使用来处理 CompletableFuture 的计算结果。 创建方式
我们学习一个新的类的方式第一步就是看他的构造函数创建这个类CompletableFuture虽然有一个空参构造函数但是官方并不推荐我们使用一般我们通过4个静态方法来创建。
调用静态方法创建返回值CompletableFuture runAsync(Runnable runnable)无CompletableFuture runAsync(Runnable runnable,Executor executor)无CompletableFuture supplyAsync(Supplier supplier)有supplyAsync(Supplier supplier, Executor executor)有
代码实现
runAsync()
不带有线程池默认使用ForkJoinPool的线程
public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFutureVoid completableFuture CompletableFuture.runAsync(() - {System.out.println(进入子线程: Thread.currentThread().getName());try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}});System.out.println(completableFuture.get()); //nullSystem.out.println(主线程结束);
}supplyAsync
public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFutureInteger completableFuture CompletableFuture.supplyAsync(() - {System.out.println(进入子线程: Thread.currentThread().getName());try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}int result ThreadLocalRandom.current().nextInt(10);System.out.println(子线程执行结果: result);return result;});System.out.println(completableFuture.get());System.out.println(主线程结束);
}带有线程池的创建就不举例了
通用异步编程
上面还是在演示Future原有的功能接下来学一下新的功能
通过whenComplete来监听子进程执行完毕来做一系列操作
通过exceptionally来解决子进程出现异常的情况
public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFutureInteger completableFuture CompletableFuture.supplyAsync(() - {System.out.println(进入子线程: Thread.currentThread().getName());System.out.println(Thread.currentThread().isDaemon()); //守护线程try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}int result ThreadLocalRandom.current().nextInt(10);System.out.println(子线程执行结果: result);return result;}).whenComplete((v,e) - {if (e null) {System.out.println(计算结果为 v);}}).exceptionally(e - {//处理异常e.printStackTrace();System.out.println(计算过程出现异常 e.getCause() \t e.getMessage());return null;});System.out.println(主线程结束);
}//输出
进入子线程:ForkJoinPool.commonPool-worker-9
主线程结束但是发现控制台输出没有等待结果主线程就直接结束了这是因为默认情况下ForkJoinPool里面是守护线程解决方法有两种
在主线程结束前等待使用自定义的线程池
修改代码
public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService threadPool Executors.newFixedThreadPool(10);CompletableFutureInteger completableFuture CompletableFuture.supplyAsync(() - {System.out.println(进入子线程: Thread.currentThread().getName());System.out.println(Thread.currentThread().isDaemon()); //守护线程try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}int result ThreadLocalRandom.current().nextInt(10);System.out.println(子线程执行结果: result);return result;},threadPool).whenComplete((v,e) - {if (e null) {System.out.println(计算结果为 v);}}).exceptionally(e - {//处理异常e.printStackTrace();System.out.println(计算过程出现异常 e.getCause() \t e.getMessage());return null;});System.out.println(主线程结束);threadPool.shutdown();
}//输出
进入子线程:pool-1-thread-1
false
主线程结束
子线程执行结果:7
计算结果为7通过控制台输出可以看到自定义线程池创建的是用户线程所以即使是主线程执行完毕程序还是要等待所有用户线程执行完毕才会结束。
链式语法
接下来会用到很多链式语法这个在Java8很常见其实就是在写法上更加简洁了
public class CompletableFutureDemo5 {public static void main(String[] args) throws ExecutionException, InterruptedException {Student student new Student();/*以前的写法student.setId(1);student.setName(张三));*/student.setId(1).setName(张三);System.out.println(student);}
}Data
AllArgsConstructor
NoArgsConstructor
Accessors(chain true)
class Student{private Integer id;private String name;
}join方法
在介绍join()之前必须先介绍一下get()
get()是获取异步计算结果但是在编译期会需要抛出异常 join()也是获取异步计算结果但是不需要抛出异常 电商比价案例 需求 需要查询《深入理解JVM虚拟机》这本书在各大电商平台销售的价格显示结果如下 《深入理解JVM虚拟机》in jd price is 100 普通解决方案
使用同步方式一步一步来查一个保存一个价格
此方法的优点是简洁粗暴缺点是非常的耗时
public class CompletableFutureDemo7 {static ListNetMall malls Arrays.asList(new NetMall(jd),new NetMall(tb),new NetMall(pdd));public static ListString step(ListNetMall list ,String productName){return list.stream().map(netMall - String.format(productName in %s price is %.2f,netMall.getNetMallName(),netMall.calcPrice(productName))).collect(Collectors.toList());}public static void main(String[] args) {long startTime new Date().getTime();ListString list step(malls, MySQL);long endTime new Date().getTime();System.out.println(耗时 (endTime - startTime));for (String item : list) {System.out.println(item);}}
}Data
AllArgsConstructor
NoArgsConstructor
class NetMall{private String netMallName;public double calcPrice(String productName) {//模拟请求过程耗时try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}return ThreadLocalRandom.current().nextDouble() * 2 productName.charAt(0);}
}//结果
耗时3067
MySQL in jd price is 78.86
MySQL in tb price is 78.95
MySQL in pdd price is 78.98异步解决方案
核心计算方法使用异步的方式进行这样大大的节约了时间
public static ListString byCompletableFuture(ListNetMall list ,String productName){return list.stream().map(netMall - CompletableFuture.supplyAsync(() - String.format(productName in %s price is %.2f,netMall.getNetMallName(),netMall.calcPrice(productName)))).collect(Collectors.toList()).stream().map(s - s.join()).collect(Collectors.toList());
}//结果
耗时1050
MySQL in jd price is 77.77
MySQL in tb price is 77.18
MySQL in pdd price is 77.32计算方法
CompletableFuture提供了非常多的计算方法
获取结果和触发计算
方法名作用public T get()获取结果会造成当前线程阻塞public T get(long timeout, TimeUnit unit)获取结果在指定的时间内获取不到抛出异常public T join()获取结果跟get()用法一致区别是编译器不需要抛异常public T getNow(T valueIfAbsent)立刻获取结果如果结果没出来使用指定值代替结果public boolean complete(T value)中断计算计算过程被中断返回true并且用指定值代替计算结果
public static void main(String[] args) throws InterruptedException {CompletableFutureString completableFuture CompletableFuture.supplyAsync(() - {try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {throw new RuntimeException(e);}return 123;});TimeUnit.SECONDS.sleep(2);System.out.println(completableFuture.complete(completeValue) \t completableFuture.join());//true completeValue
}对计算结果进行处理
方法名作用public CompletableFuture thenApply()获取计算结果对其进行处理public CompletableFuture handle()作用同thenApply区别在于遇到异常不会组织下一步运行
thenApply()
public class CompletableFutureDemo9 {public static void main(String[] args) throws InterruptedException {ExecutorService threadPool Executors.newFixedThreadPool(10);CompletableFutureInteger completableFuture CompletableFuture.supplyAsync(() - {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println(111);return 1;},threadPool).thenApply(f - {System.out.println(222);return f 1;}).thenApply(f - {System.out.println(333);return f 2;}).whenComplete((v,e) - {if (e null) {System.out.println(计算结果 v);}}).exceptionally(e - {e.printStackTrace();return null;});System.out.println(主线程去忙其他的了 Thread.currentThread().getName());threadPool.shutdown();}
}可以看到程序在抛异常时就停止了不会继续往下执行 hanlde()
public static void main(String[] args) throws InterruptedException {ExecutorService threadPool Executors.newFixedThreadPool(10);CompletableFutureInteger completableFuture CompletableFuture.supplyAsync(() - {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println(111);return 1;}, threadPool).handle((f, e) - {int i 1 / 0;System.out.println(222);return f 1;}).handle((f, e) - {System.out.println(333);return f 2;}).whenComplete((v, e) - {if (e null) {System.out.println(计算结果 v);}}).exceptionally(e - {e.printStackTrace();return null;});System.out.println(主线程去忙其他的了 Thread.currentThread().getName());threadPool.shutdown();
}通过输出可以看出在第一个handle中出现异常不继续往下执行该handle的方法但是不影响后续的hanlde方法 对计算结果进行消费
消费顾名思义就是把这条消息消费掉后面的人就获取不到这条消息了。
方法作用public CompletableFuture thenRun(Runnable action)任务A执行完执行B并且B不需要A的结果public CompletableFuture thenAccept(Consumer? super T action)任务A执行完执行BB需要A的结果但是任务B没有返回值public CompletableFuture thenApply任务A执行完执行BB需要A的结果同事任务B有返回值
如下所示thenAccept()方法在获取结果时为null
public static void main(String[] args) throws InterruptedException {System.out.println(CompletableFuture.supplyAsync(() - resultA).thenRun(() - {}).join()); //nullSystem.out.println(CompletableFuture.supplyAsync(() - resultB).thenAccept(y - {System.out.println(y);}).join()); //resultB nullSystem.out.println(CompletableFuture.supplyAsync(() - resultC).thenApply(y - y resultD).join()); //resultCresultD
}对运行线程池进行选择
不使用线程池默认走的是ForkJoinPool 使用线程池走的全都是自定义线程池 使用线程池中间调用了thenRunAsync方法那么之后的方法都会使用ForkJoinPool 源码
thenRun()和thenRunAsync()区别在于一个传参使用了默认的线程池 对计算速度进行选用
调用applyToEither()方法他会将两个异步任务先完成的值返回
public class CompletableFutureDemo12 {public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {CompletableFutureString future1 CompletableFuture.supplyAsync(() - {try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {throw new RuntimeException(e);}return future1;});CompletableFutureString future2 CompletableFuture.supplyAsync(() - {try {TimeUnit.SECONDS.sleep(4);} catch (InterruptedException e) {throw new RuntimeException(e);}return future2;});CompletableFutureString result future1.applyToEither(future2, s - s is win);System.out.println(result.join()); //future1 is win}
}对计算结果进行合并
thenCombine()可以将两个计算结果合并
public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {CompletableFutureInteger future1 CompletableFuture.supplyAsync(() - {try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {throw new RuntimeException(e);}return 10;});CompletableFutureInteger future2 CompletableFuture.supplyAsync(() - {try {TimeUnit.SECONDS.sleep(4);} catch (InterruptedException e) {throw new RuntimeException(e);}return 20;});CompletableFutureInteger result future1.thenCombine(future2, (x,y) - {System.out.println(计算结果合并);return x y;});System.out.println(result.join()); //30
}