网站设计论文范文大全集,天天联盟没网站怎么做,0基础建站网站搭建教程,php网站开发电子书【README】
并发后半部分#xff08;并发2#xff0c;从21.4.3中断开始#xff09;参见#xff1a; https://blog.csdn.net/PacosonSWJTU/article/details/106878087 ;
本文章包括了并发1和并发2 #xff0c;期初新建并发2是为了方便编写文档#xff0c;因为并发内容实…【README】
并发后半部分并发2从21.4.3中断开始参见 https://blog.csdn.net/PacosonSWJTU/article/details/106878087 ;
本文章包括了并发1和并发2 期初新建并发2是为了方便编写文档因为并发内容实属太多所以分了2次post
【21.2】基本线程机制
并发编码使我们可以将程序划分为多个分离的独立运行的任务
cpu将轮流给每个任务分配其占用时间。 【21.2.1】定义任务
线程可以驱动任务一种描述任务的方式是使用 Runnable 接口
方式1直接调用 Runnalbe接口的run 方法创建线程驱动任务
/*** 用 Runnable 接口定义任务*/
public class LiftOff implements Runnable {protected int countDown 10;private static int taskCount 0;private final int id taskCount;public LiftOff(){}public LiftOff(int countDown) {this.countDown countDown;}public String status() {return # id ( (countDown 0? countDown: liftoff) ), ; }Override public void run() {while(countDown-- 0 ) {System.out.println(status());Thread.yield(); // 当前线程转入可运行状态把cpu时间片让步给其他线程 }}public static void main(String[] args) {LiftOff obj new LiftOff();obj.run(); // 这里直接调用 Runnalbe接口的run 方法创建线程驱动任务}
}
运行结果
#0(9),
#0(8),
#0(7),
#0(6),
#0(5),
#0(4),
#0(3),
#0(2),
#0(1),
#0(liftoff), 【21.2.2】Thread类
开启线程的第2种方式使用Thread来驱动任务调用thread 的start方法start方法会调用 runnable 接口实现类的run 方法
/*** 开启线程的第2种方式使用Thread来驱动任务 */
public class BasicThreads {public static void main(String[] args) {Thread t new Thread(new LiftOff());t.start();System.out.println(waiting for liftOff); }
}
main 方法通过主线程来驱动而 LiftOff中run方法的逻辑通过main方法分发的子线程来驱动
运行结果
waiting for liftOff
#0(9),
#0(8),
#0(7),
#0(6),
#0(5),
#0(4),
#0(3),
#0(2),
#0(1),
#0(liftoff), 启动多个线程运行多个任务可以看到线程切换的动作
/*** 启动多个线程运行多个任务可以看到线程切换的动作 */
public class MoreBasicThreads {public static void main(String[] args) {for (int i0; i 5; i) {new Thread(new LiftOff()).start();}System.out.println(waiting for lift off);}
}
/*waiting for lift off
#3(9),
#3(8),
#4(9),
#4(8),
#4(7),
#0(9),
#0(8),
#0(7),
#0(6),
#0(5),
#0(4),
#0(3),
#1(9),
#1(8),
#1(7),
#2(9),
#2(8),
#1(6),
#1(5),
#1(4),
#1(3),
#1(2),
#0(2),
#0(1),
#0(liftoff),
#4(6),
#3(7),
#4(5),
#4(4),
#4(3),
#1(1),
#1(liftoff),
#2(7),
#4(2),
#4(1),
#3(6),
#4(liftoff),
#2(6),
#3(5),
#2(5),
#2(4),
#2(3),
#3(4),
#3(3),
#3(2),
#3(1),
#3(liftoff),
#2(2),
#2(1),
#2(liftoff),
*/
通过主线程显式创建多个子线程的问题
主线程创建多个子线程每个子线程Thread 都注册了他自己内存存在对他的引用所以在子线程退出其 run 方法之前垃圾回收器无法清除它 这不便于内存回收与分配 【21.2.3】使用 Executor 执行器CachedThreadPool、FixedThreadPool、SingleThreadExecutor
1、使用 Executor 执行器管理Thread线程对象 可以简化并发编程且处理线程占用的内存回收事宜
Executor允许你管理异步任务的执行 而无需显式地管理线程的生命周期 。
Executor执行器在 java5或6中是启动任务的优选方法
荔枝1、基于 newCachedThreadPool实现线程池
/*** page657/线程池* shutdown 方法调用可以防止新任务被提交给 Executor 当前线程* 将继续运行在 shutdown被调用之前提交的所有任务 * author */
public class CachedThreadPool {public static void main(String[] args) {ExecutorService exec Executors.newCachedThreadPool();for (int i0; i5; i) {exec.execute(new LiftOff());}/*shutdown 方法调用可以防止新任务被提交给 Executor 当前线程* 将继续运行在 shutdown被调用之前提交的所有任务 */exec.shutdown();}
}
/*#1(9),
#2(9),
#2(8),
#2(7),
#3(9),
#4(9),
#4(8),
#4(7),
#4(6),
#0(9),
#4(5),
#4(4),
#3(8),
#2(6),
#1(8),
#1(7),
#1(6),
#1(5),
#2(5),
#3(7),
#3(6),
#4(3),
#0(8),
#0(7),
#4(2),
#4(1),
#3(5),
#3(4),
#3(3),
#3(2),
#3(1),
#3(liftoff),
#2(4),
#1(4),
#1(3),
#1(2),
#2(3),
#4(liftoff),
#0(6),
#2(2),
#1(1),
#1(liftoff),
#2(1),
#0(5),
#2(liftoff),
#0(4),
#0(3),
#0(2),
#0(1),
#0(liftoff),
*/
常见的是 单个Executor 执行器被用来创建和管理系统中的所有任务
荔枝2、基于 newFixedThreadPool实现线程池
newFixedThreadPool 可以一次性预先执行代价高昂的线程分配因此可以限制线程数量
/*** page 657 * newFixedThreadPool线程池 */
public class FixedThreadPool {public static void main(String[] args) {// newFixedThreadPool 可以一次性预先执行代价高昂的线程分配因此可以限制线程数量 ExecutorService exec Executors.newFixedThreadPool(5);for (int i0; i5; i) {exec.execute(new LiftOff());}/*shutdown 方法调用可以防止新任务被提交给 Executor 当前线程* 将继续运行在 shutdown被调用之前提交的所有任务 */exec.shutdown();}
}
/*#0(9),
#2(9),
#2(8),
#3(9),
#3(8),
#4(9),
#1(9),
#1(8),
#4(8),
#4(7),
#3(7),
#3(6),
#3(5),
#3(4),
#3(3),
#3(2),
#3(1),
#3(liftoff),
#2(7),
#0(8),
#2(6),
#4(6),
#1(7),
#4(5),
#4(4),
#4(3),
#2(5),
#0(7),
#0(6),
#0(5),
#0(4),
#0(3),
#2(4),
#4(2),
#4(1),
#4(liftoff),
#1(6),
#2(3),
#0(2),
#0(1),
#0(liftoff),
#2(2),
#1(5),
#1(4),
#2(1),
#1(3),
#2(liftoff),
#1(2),
#1(1),
#1(liftoff),
*/
注意CachedThreadPool 与 FixedThreadPool 线程池的区别 CacheThreadPool 在程序执行过程中通常会创建于所需数量相同的线程然后在他回收旧线程时停止创建新线程 CachedThreadPool 是合理的的 Executor执行器的首选。
而FixedThreadPool 是可以限制线程数量的线程池只有当 CachedThreadPool 出现问题时才需要切换到 FixedThreadPool 荔枝3 SingleThreadPool 是 线程数量为1的 FixedThreadPool。
如果向 SingleThreadPool 提交多个任务 这些任务将排队每个任务都会在下一个任务开始之前结束所有任务使用相同的线程。因为 SingleThreadExecutor 会序列化所有提交给他的任务并会维护他自己隐藏的悬挂任务队列。
/*** page 657 /* newSingleThreadExecutor 类似于线程数量为1的 FixedThreadPool */
public class SingleThreadExecutor {public static void main(String[] args) {ExecutorService exec Executors.newSingleThreadExecutor();for (int i0; i5; i) {exec.execute(new LiftOff());}exec.shutdown(); }
}
/*#0(9),
#0(8),
#0(7),
#0(6),
#0(5),
#0(4),
#0(3),
#0(2),
#0(1),
#0(liftoff),
#1(9),
#1(8),
#1(7),
#1(6),
#1(5),
#1(4),
#1(3),
#1(2),
#1(1),
#1(liftoff),
#2(9),
#2(8),
#2(7),
#2(6),
#2(5),
#2(4),
#2(3),
#2(2),
#2(1),
#2(liftoff),
#3(9),
#3(8),
#3(7),
#3(6),
#3(5),
#3(4),
#3(3),
#3(2),
#3(1),
#3(liftoff),
#4(9),
#4(8),
#4(7),
#4(6),
#4(5),
#4(4),
#4(3),
#4(2),
#4(1),
#4(liftoff),
*/
【21.2.4】 从任务中产生返回值
若任务在执行完成时需要返回值则使用 Callable 而不是 Runnable 来描述任务
/*** page 658 * newCachedThreadPool任务执行完成后可以返回执行结果 */
public class CallableDemo {public static void main(String[] args) {ExecutorService exec Executors.newCachedThreadPool();ArrayListFutureString results new ArrayList();for (int i0; i10; i) {// submit 方法会产生 Future 对象 他用 Callable返回结果的特定类型进行了参数化 results.add(exec.submit(new TaskWithResult(i))); // 驱动或运行任务使用 ExecutorService.submit() 方法 }for (FutureString fu : results) {try {System.out.println(fu.get()); // 结果值 } catch (Exception e) {e.printStackTrace();} finally {exec.shutdown(); }}System.out.println(我是main线程);}
}
/*result of task with result 0
result of task with result 1
result of task with result 2
result of task with result 3
result of task with result 4
result of task with result 5
result of task with result 6
result of task with result 7
result of task with result 8
result of task with result 9
我是main线程
*/
class TaskWithResult implements CallableString {private int id; public TaskWithResult(int id) {this.id id ;}Overridepublic String call() throws Exception {return result of task with result id;}
}
显然 在子线程全部返回前主线程是阻塞的因为 主线程打印的消息在所有子线程返回结果之后
【21.2.5】休眠
方法1 sleep方法 让线程休眠给定时间然后又重新回到可运行状态
方法2yield方法表示当前线程的重要任务已经运行完毕了 让出占用的cpu时间片给其他线程 /*** page659 * Thread.sleep 线程休眠 */
public class SleepingTask extends LiftOff {public void run() {try {while (countDown-- 0) {System.out.println(status());Thread.sleep(1000); // 当前线程休眠1秒钟 老式方法TimeUnit.SECONDS.sleep(1);// 休眠1秒钟 推荐方法}} catch (Exception e) {e.printStackTrace();}}public static void main(String[] args) {ExecutorService exec Executors.newCachedThreadPool();for (int i0; i3; i) {exec.execute(new SleepingTask());}exec.shutdown();}
}
/*#0(9),
#2(9),
#1(9),
#2(8),
#0(8),
#1(8),
#1(7),
#2(7),
#0(7),
#2(6),
#0(6),
#1(6),
#2(5),
#1(5),
#0(5),
#0(4),
#1(4),
#2(4),
#2(3),
#0(3),
#1(3),
#1(2),
#2(2),
#0(2),
#2(1),
#0(1),
#1(1),
#1(liftoff),
#2(liftoff),
#0(liftoff),
*/
Thread.sleep(1000); // 当前线程休眠1秒钟 老方法TimeUnit.SECONDS.sleep(1);// 休眠1秒钟 新方法java5或6推荐
【21.2.6】优先级
线程优先级将线程重要性传递给了调度器优先级低的线程仅仅是执行频率较低。
试图通过控制线程优先级是一种错误。因为cpu的时间片划分是未知的可能碰到中断如io所以不建议使用优先级这里仅仅看下代码
/*** page660 * 线程优先级*/
public class SimplePriority implements Runnable {private int countDown 5; private volatile double d; // volative 确保变量不被任何编译器优化指令优化 private int priority;public SimplePriority(int priority) {this.priority priority;}public String toString() {return Thread.currentThread() : countDown;}Override public void run() {Thread.currentThread().setPriority(priority); // 设置线程优先级while(true) {for (int i1; i100000; i) {d (Math.PI Math.E) / (double) i;if (i % 1000 0) {Thread.yield(); // 当前线程释放cpu时间片给其他线程}System.out.println(this);if (--countDown 0) return ; }}}public static void main(String[] args) {ExecutorService exec Executors.newCachedThreadPool();for (int i0; i 5; i) {exec.execute(new SimplePriority(Thread.MIN_PRIORITY)); // 最小优先级}exec.execute(new SimplePriority(Thread.MAX_PRIORITY)); // 最高优先级 exec.shutdown(); }
}
/*Thread[pool-1-thread-3,1,main]:5
Thread[pool-1-thread-3,1,main]:4
Thread[pool-1-thread-6,10,main]:5
Thread[pool-1-thread-6,10,main]:4
Thread[pool-1-thread-5,1,main]:5
Thread[pool-1-thread-1,1,main]:5
Thread[pool-1-thread-4,1,main]:5
Thread[pool-1-thread-2,1,main]:5
Thread[pool-1-thread-2,1,main]:4
Thread[pool-1-thread-2,1,main]:3
Thread[pool-1-thread-2,1,main]:2
Thread[pool-1-thread-2,1,main]:1
Thread[pool-1-thread-4,1,main]:4
Thread[pool-1-thread-1,1,main]:4
Thread[pool-1-thread-1,1,main]:3
Thread[pool-1-thread-1,1,main]:2
Thread[pool-1-thread-1,1,main]:1
Thread[pool-1-thread-5,1,main]:4
Thread[pool-1-thread-6,10,main]:3
Thread[pool-1-thread-3,1,main]:3
Thread[pool-1-thread-3,1,main]:2
Thread[pool-1-thread-6,10,main]:2
Thread[pool-1-thread-6,10,main]:1
Thread[pool-1-thread-5,1,main]:3
Thread[pool-1-thread-5,1,main]:2
Thread[pool-1-thread-4,1,main]:3
Thread[pool-1-thread-5,1,main]:1
Thread[pool-1-thread-3,1,main]:1
Thread[pool-1-thread-4,1,main]:2
Thread[pool-1-thread-4,1,main]:1
*/
【21.2.7】让步
1、当前线程调用 Thread.yield方法将给线程调度器一个暗示
我的工作已经完成了 可以让别的线程使用cpu时间片了但这里仅仅是一个暗示没有任何机制保证它将被采纳
注意 yield方法经常被误用。
【21.2.8】 后台线程
1、后台线程指在程序运行的时候在后台提供一种通用服务的线程并且这种线程并不属于程序中不可或缺的部分。
当所有非后台线程结束时 程序也就终止了同时会杀死进程中的所有后台线程。
2、后天线程 daemon 荔枝
/*** page662 * 后台线程 daemon.setDaemon(true); 后台线程不影响非后台线程的结束如main主线程就是非后台线程 * 当所有非后台线程结束时程序终止了同事会杀死进程中的所有后台线程 */
public class SimpleDaemons implements Runnable {Override public void run() {try {while(true) {Thread.sleep(1000);System.out.println(Thread.currentThread() this);}} catch (Exception e){e.printStackTrace();}}public static void main(String[] args) {for (int i0; i10; i) {Thread daemon new Thread(new SimpleDaemons());daemon.setDaemon(true); // 在 start 方法前调用设置为后台线程 daemon.start(); }System.out.println(all daemons started);try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}}
}3、通过 ThreadFactory 线程工厂创建后台线程
/*** page662 * 编写定制的 ThreadFactory 可以定制由 Executor 创建的线程属性*/
public class DaemonThreadFactory implements ThreadFactory {Overridepublic Thread newThread(Runnable r) {Thread t new Thread(r);t.setDaemon(true);// 设置为后台线程return t; }
}
/*** page 663 * 基于 ThreadFactory 创建后台线程 */
public class DaemonFromFactory implements Runnable {Overridepublic void run() {try {while(true) {
// Thread.sleep(1000);TimeUnit.SECONDS.sleep(1);System.out.println(Thread.currentThread() this);}} catch (Exception e) {e.printStackTrace();} }public static void main(String[] args) { /*DaemonThreadFactory创建的全是后台线程*/ExecutorService exec Executors.newCachedThreadPool(new DaemonThreadFactory()); for (int i0; i10; i) {exec.execute(new DaemonFromFactory());}System.out.println(all daemons started);try {
// Thread.sleep(3000);TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace();} }
}
/*all daemons started
Thread[Thread-8,5,main] diy.chapter21.DaemonFromFactory12b68fd4
Thread[Thread-7,5,main] diy.chapter21.DaemonFromFactory1ce6edff
Thread[Thread-9,5,main] diy.chapter21.DaemonFromFactory50f7270b
Thread[Thread-0,5,main] diy.chapter21.DaemonFromFactory4c19ebce
Thread[Thread-5,5,main] diy.chapter21.DaemonFromFactory70c680f2
Thread[Thread-2,5,main] diy.chapter21.DaemonFromFactory3812167c
Thread[Thread-3,5,main] diy.chapter21.DaemonFromFactory2b0a5613
Thread[Thread-6,5,main] diy.chapter21.DaemonFromFactory15547dba
Thread[Thread-1,5,main] diy.chapter21.DaemonFromFactory16c1c280
Thread[Thread-4,5,main] diy.chapter21.DaemonFromFactory58bbe46a
Thread[Thread-8,5,main] diy.chapter21.DaemonFromFactory12b68fd4
Thread[Thread-7,5,main] diy.chapter21.DaemonFromFactory1ce6edff
Thread[Thread-2,5,main] diy.chapter21.DaemonFromFactory3812167c
Thread[Thread-1,5,main] diy.chapter21.DaemonFromFactory16c1c280
Thread[Thread-0,5,main] diy.chapter21.DaemonFromFactory4c19ebce
Thread[Thread-6,5,main] diy.chapter21.DaemonFromFactory15547dba
Thread[Thread-9,5,main] diy.chapter21.DaemonFromFactory50f7270b
Thread[Thread-3,5,main] diy.chapter21.DaemonFromFactory2b0a5613
Thread[Thread-5,5,main] diy.chapter21.DaemonFromFactory70c680f2
Thread[Thread-4,5,main] diy.chapter21.DaemonFromFactory58bbe46a
Thread[Thread-8,5,main] diy.chapter21.DaemonFromFactory12b68fd4
Thread[Thread-7,5,main] diy.chapter21.DaemonFromFactory1ce6edff
*/
4、 自定义线程执行器
/*** 自定义线程执行器-DaemonThreadPoolExecutor * ThreadPoolExecutor extends AbstractExecutorService* AbstractExecutorService implements ExecutorService* interface ExecutorService extends Executor*/
public class DaemonThreadPoolExecutor extends ThreadPoolExecutor {public DaemonThreadPoolExecutor() {super(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueueRunnable(), new DaemonThreadFactory());}
}
/*** Creates a new {code ThreadPoolExecutor} with the given initial* parameters and default rejected execution handler.** param corePoolSize the number of threads to keep in the pool, even* if they are idle, unless {code allowCoreThreadTimeOut} is set* param maximumPoolSize the maximum number of threads to allow in the* pool* param keepAliveTime when the number of threads is greater than* the core, this is the maximum time that excess idle threads* will wait for new tasks before terminating.* param unit the time unit for the {code keepAliveTime} argument* param workQueue the queue to use for holding tasks before they are* executed. This queue will hold only the {code Runnable}* tasks submitted by the {code execute} method.* param threadFactory the factory to use when the executor* creates a new thread* throws IllegalArgumentException if one of the following holds:br* {code corePoolSize 0}br* {code keepAliveTime 0}br* {code maximumPoolSize 0}br* {code maximumPoolSize corePoolSize}* throws NullPointerException if {code workQueue}* or {code threadFactory} is null*/public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueueRunnable workQueue,ThreadFactory threadFactory) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,threadFactory, defaultHandler);}
5、后台线程创建的任何线程都将被自动设置为后台线程
/*** page663* 后台线程创建的任何线程都将被自动设置为后台线程 */
public class Daemons {public static void main(String[] args) {Thread d new Thread(new Daemon());d.setDaemon(true); // 主线程是后台线程 其创建的10个线程也是后台线程 d.start();System.out.println(d.isDaemon() d.isDaemon() , ); try {TimeUnit.SECONDS.sleep(1); // 睡眠1秒 } catch (InterruptedException e) {e.printStackTrace(); } }
}
// 后台线程
class Daemon implements Runnable {private Thread[] t new Thread[10];Override public void run() { for (int i0; it.length; i) {t[i] new Thread(new DaemonSpawn());t[i].start();System.out.println(daemon spawn i started);}while(true) {Thread.yield(); // 一直处于可运行状态但无法获取cpu时间片运行 }}
}
// 任务
class DaemonSpawn implements Runnable {public void run() {while(true) {Thread.yield(); // 把cpu时间片让给其他线程 }}
}
6、后台线程在不执行 finally 子句的情况下就会终止其 run 方法
/*** page664 * 后台线程在不执行 finally 子句的情况下就会终止其 run 方法* 即终止run方法时不会执行finally子句中的代码* 但把 t.setDaemon(true) 给删除掉则会执行 finally 子句中的代码 */
public class ADaemon implements Runnable {Overridepublic void run() {try {System.out.println(starting daemon);TimeUnit.SECONDS.sleep(1);} catch (Exception e) {e.printStackTrace();} finally {System.out.println(this should always run? ); // 没有执行 }}/*** 一旦main 方法退出 jvm就会立即关闭所有的后台进程* 因为不能以优雅的方式来关闭后台线程故非后台线程的 Executors 是一种更好的方式 * 因为 Executors 控制的所有任务可以同时被关闭* param args*/public static void main(String[] args) {Thread t new Thread(new ADaemon());t.setDaemon(true);t.start(); }
}
/*starting daemon
*/
非后台的Executor 通常是一种更好的方式因为 Executor控制的所有任务可以同时被关闭 【21.2.9】编码的变体
1、上面描述任务的方式都是通过实现 Runnable接口 还有一种方式是继承 Thread 类
/*** page665 * 创建线程的第2种方式 继承了Thread就不能继承其他类了 */
public class SimpleThread extends Thread {private int countDown 5; private static int threadCount 0;public SimpleThread() {super(Integer.toString(threadCount));start();}public String toString() {return # getName() ( countDown ), ;}Override public void run() {while(true) {System.out.println(this);if (--countDown 0) {return ;}}}public static void main(String[] args) {for (int i0; i 5; i) {new SimpleThread(); }}
}
/*#2(5),
#5(5),
#5(4),
#5(3),
#5(2),
#5(1),
#4(5),
#1(5),
#3(5),
#1(4),
#1(3),
#1(2),
#4(4),
#2(4),
#2(3),
#2(2),
#4(3),
#1(1),
#3(4),
#4(2),
#2(1),
#4(1),
#3(3),
#3(2),
#3(1),
*/
2、第2种方式比较常见是 实现 Runnable接口
/*** page666 * 与继承Thread不同的是这里是实现 Runnable接口 * 在构造器中启动线程可能有问题 因为另一个任务可能会在构造器结束之前开始执行 * 这意味着该任务能够访问处于不稳定状态的对象 * 这是优选Executor 而不是显式创建Thread对象的另一个原因*/
public class SelfManaged implements Runnable {private int countDown 5; private Thread t new Thread(this);public SelfManaged() {t.start();}public String toString() {return Thread.currentThread().getName() ( countDown );}Overridepublic void run() {while(true) {System.out.println(this);if (--countDown 0) {return ;}}} public static void main(String[] args) {for(int i0; i5; i) {new SelfManaged();}}
}小结 在构造器中启动线程可能有问题 因为另一个任务可能会在构造器结束之前开始执行 这意味着该任务能够访问处于不稳定状态的对象。 这是优选Executor 而不是显式创建Thread对象的另一个原因。
3、通过内部类隐藏线程代码
/*** 使用内部类隐藏线程代码 */
public class ThreadVariations {public static void main(String[] args) {new InnerThread1(InnerThread1);new InnerThread2(InnerThread2);new InnerRunnable1(InnerRunnable1);new InnerRunnable2(InnerRunnable2);new ThreadMethod(ThreadMethod).runTaks();}
}
// 一个单独的方法开启线程运行任务
class ThreadMethod {private int countDown 5;private Thread t;private String name;public ThreadMethod(String name) {this.name name;} public void runTaks() {if (t null) {t new Thread(name) {public void run() {try {while (true) {System.out.println(this);if (--countDown 0)return;sleep(1000);}} catch (Exception e) {e.printStackTrace();}}public String toString() {return getName() : countDown;}};t.start();}}
}// 匿名内部类继承 Thread
class InnerThread2 {private int countDown 5;private Thread t;public InnerThread2(String name) {t new Thread(name) {public void run() {try {while (true) {System.out.println(this);if (--countDown 0)return;sleep(1000);}} catch (Exception e) {e.printStackTrace();}}public String toString() {return getName() : countDown;}};t.start();}
}/*** 匿名内部类实现 Runnable接口*/
class InnerRunnable2 {private int countDown 5;private Thread t;public InnerRunnable2(String name) {t new Thread(new Runnable() { // 匿名内部类实现 Runnable接口Overridepublic void run() {try {while (true) {System.out.println(this);if (--countDown 0)return;TimeUnit.MILLISECONDS.sleep(1000);}} catch (Exception e) {e.printStackTrace();}}public String toString() {return Thread.currentThread().getName() : countDown;}}, name);t.start();}}/*** 内部类继承线程Thread*/
class InnerThread1 {private int countDown 5;private Inner inner;public InnerThread1(String name) {inner new Inner(name);}// 内部类private class Inner extends Thread {Inner(String name) {super(name);start();}Overridepublic void run() {try {while (true) {System.out.println(this);if (--countDown 0)return;sleep(1000);}} catch (Exception e) {e.printStackTrace();}}public String toString() {return getName() : countDown;}}
}/*** 内部类实现 Runnable接口*/
class InnerRunnable1 {private int countDown 5;private Inner inner;public InnerRunnable1(String name) {inner new Inner(name);}private class Inner implements Runnable {Thread t;Inner(String name) {t new Thread(this, name);t.start();}public void run() {try {while (true) {System.out.println(this);if (--countDown 0)return;TimeUnit.MILLISECONDS.sleep(10);}} catch (Exception e) {e.printStackTrace();}}public String toString() {return t.getName() : countDown;}}
}
/*InnerThread1:5
InnerThread2:5
InnerRunnable1:5
InnerRunnable2:5
ThreadMethod: 5
InnerRunnable1:4
InnerRunnable1:3
InnerRunnable1:2
InnerRunnable1:1
InnerThread2:4
InnerThread1:4
InnerRunnable2:4
ThreadMethod: 4
InnerThread2:3
InnerThread1:3
InnerRunnable2:3
ThreadMethod: 3
InnerThread2:2
InnerThread1:2
InnerRunnable2:2
ThreadMethod: 2
InnerThread2:1
InnerThread1:1
InnerRunnable2:1
ThreadMethod: 1
*/【21.2.11】加入一个线程
Thread.join() 方法 若线程A 在另一个线程B上调用 B.join()则线程A将被挂起直到目标线程B结束才恢复
也可以在join()方法上加个超时参数如果目标线程在超时时间内没有结束的话join还是返回
/*** page670 * join() 方法 在 线程a上调用 线程b的join方法则线程a阻塞* 直到线程b运行结束线程a才继续运行 * 注意java.util.concurrent类库包含 CyclicBarrier工具类比join更加适合使线程让步*/
public class Joining {public static void main(String[] args) {Sleeper s1 new Sleeper(s1, 1500);Sleeper s2 new Sleeper(s2, 1500);Joiner j1 new Joiner(j1, s1);Joiner j2 new Joiner(j2, s2);// s2 被强制中断s2.interrupt(); }
}
/*** 睡眠线程*/
class Sleeper extends Thread {private int duration ;public Sleeper(String name, int sleepTime) {super(name);this.duration sleepTime;start();}Overridepublic void run() {try {sleep(duration);} catch (Exception e) {System.out.println(getName() was interrupted, isInterrupted() isInterrupted());return ;}System.out.println(线程 getName() 已经被唤醒);}
}class Joiner extends Thread {private Sleeper sleeper;public Joiner(String name, Sleeper sleeper) {super(name); this.sleeper sleeper;start();}public void run() {try {sleeper.join(); // 主线程 Joiner 调用 其他线程 sleeper的join 方法 sleeper没有执行完 主线程一直阻塞 } catch (Exception e) {System.out.println(interrupted);}System.out.println(线程 getName() join 完成); }
}
/*s2 was interrupted, isInterrupted() false
线程 j2 join 完成
线程 s1 已经被唤醒
线程 j1 join 完成
*/
【21.2.12】创建有响应的用户界面
/*** page 671 * 有响应的用户界面* 要想程序有响应就需要把计算程序放在 run方法里这样他才能让出cpu时间片给其他线程*/
public class ResponsiveUI extends Thread {private static volatile double d 1; public ResponsiveUI() {setDaemon(true); // 把当前线程设置为后台线程 start();}Overridepublic void run() {while(true) {d d (Math.PI Math.E) / d;}}public static void main(String[] args) throws Exception {// 创建无响应的ui
// new UnresponsiveUI();// 创建响应式 ui new ResponsiveUI();System.in.read(); System.out.println(d); }
}
/*** 无响应的ui */
class UnresponsiveUI {private volatile double d 1; public UnresponsiveUI() throws Exception { while(d0) {d d (Math.PI Math.E) / d;}System.in.read(); // 永远不会执行到这里 此谓无响应}
}
【21.2.13】线程组
【21.2.14】捕获子线程Thread Runnable子类或线程池子线程异常
看个荔枝下面的程序总会抛出异常 /*** page 672 * 线程异常捕获线程异常*/
public class ExceptionThread implements Runnable {Overridepublic void run() {throw new RuntimeException();}public static void main(String[] args) {ExecutorService exec Executors.newCachedThreadPool();try {// 把子线程抛出异常的代码放到 主线程的try-catch 块里主线程的try-catch块无法捕获的 exec.execute(new ExceptionThread()); } catch (Exception e) {System.out.println(抛出了异常); }}
}
/*
Exception in thread pool-1-thread-1 java.lang.RuntimeExceptionat diy.chapter21.ExceptionThread.run(ExceptionThread.java:13)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748)*/
可见把main方法放在 try-catch 中没有作用
荔枝2
/*** page 672 * 捕获异常 - 还是没有捕获异常 */
public class NavieExceptionHandler {public static void main(String[] args) {try {ExecutorService exec Executors.newCachedThreadPool();exec.execute(new ExceptionThread());} catch(RuntimeException e) {System.out.println(Exception has been handled);}}
}
/*
Exception in thread pool-1-thread-1 java.lang.RuntimeExceptionat diy.chapter21.ExceptionThread.run(ExceptionThread.java:13)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748)*/
为了解决无法捕获线程异常的问题需要修改Executor产生线程的方式。设置未捕获异常处理器去捕获子线程抛出的异常。干货——非常重要如何捕获子线程异常
/*** 设置未捕获异常处理器去捕获子线程抛出的异常 */
public class CaptureUncaughtException {public static void main(String[] args) {// 传入带有未捕获异常处理器的线程工厂到 线程池以改变线程池创建线程的方式 ExecutorService executorService Executors.newCachedThreadPool(new MyHandlerThreadFactory());// 运行任务 executorService.execute(new MyExceptionThread()); }
}
// 线程类
class MyExceptionThread implements Runnable {Override public void run() {Thread t Thread.currentThread(); System.out.println(run() by t);// getUncaughtExceptionHandler 表示获取未捕获异常处理器 System.out.println(异常处理器 t.getUncaughtExceptionHandler());// 抛出运行时异常 throw new RuntimeException(); }
}
// 类-未捕获异常处理器
class MyUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {// uncaughtException方法会在线程因未捕获的异常而临近死亡时被调用 Override public void uncaughtException(Thread t, Throwable e) {System.out.println(我是未捕获异常处理器-MyUncaughtExceptionHandler我捕获到的异常信息为 e); }
}
// 线程处理器工厂
class MyHandlerThreadFactory implements ThreadFactory {// 定义创建线程的方式 Overridepublic Thread newThread(Runnable r) {System.out.println(this 创建新线程);Thread t new Thread(r, System.currentTimeMillis());System.out.println(新线程信息 t);// 为线程设置未捕获异常处理器 t.setUncaughtExceptionHandler(new MyUncaughtExceptionHandler());System.out.println(异常处理器 t.getUncaughtExceptionHandler());return t; }
}
/*diy.chapter21.MyHandlerThreadFactory4e25154f 创建新线程
新线程信息Thread[Thread-0,5,main]
异常处理器 diy.chapter21.MyUncaughtExceptionHandler70dea4e
run() by Thread[Thread-0,5,main]
异常处理器 diy.chapter21.MyUncaughtExceptionHandler70dea4e
diy.chapter21.MyHandlerThreadFactory4e25154f 创建新线程
新线程信息Thread[Thread-1,5,main]
异常处理器 diy.chapter21.MyUncaughtExceptionHandler15547dba
caught java.lang.RuntimeException
*/
设置默认的未捕获异常处理器
/*** page 674 * 设置默认的未捕获异常处理器* 这个处理器-MyUncaughtExceptionHandler* 只有在不存在线程专有的未捕获异常处理器的情况下才会被调用 */
public class SettingDefaultHandler {public static void main(String[] args) {Thread.setDefaultUncaughtExceptionHandler(new MyUncaughtExceptionHandler());ExecutorService executorService Executors.newCachedThreadPool();executorService.execute(new ExceptionThread()); }
}
/*
* 我是未捕获异常处理器-MyUncaughtExceptionHandler我捕获到的异常信息为java.lang.RuntimeException
*/
【21.3】共享受限资源临界资源
【21.3.1】不正确地访问资源
多个线程在没有并发控制前提下同时访问和修改共享资源导致资源状态异常 不满足业务场景
【21.3.2】解决共享资源竞争
1、java提供了关键字 synchronized 关键字为防止资源冲突提供了内置支持。当任务要执行被 synchronized 关键字保护起来的代码片段的时候他将检查锁是否可用然后获取锁执行代码释放锁
共享资源一般指的是对象但也可以是 文件输入输出端口或者打印机
对于同一个对象来说其所拥有的的 synchronized 方法共享同一个锁这可以被用来防止多个任务同时访问同一个对象内存
注意 在使用并发时将域设置为 private 非常重要 否则 synchronized 关键字就无法防止其他任务直接访问域了
2、对于静态 static数据synchronized static 方法可以在类的方位内防止对 static数据的并发访问
/** 使用Synchronized 同步 * page 678* */
public class SynchronizedEvenGenerator extends IntGenerator {private int currentEventValue 0;// synchronized 关键字修饰方法以防止多个线程同时访问该方法Overridepublic synchronized int next() { currentEventValue;Thread.yield();currentEventValue;return currentEventValue;}public static void main(String[] args) {EvenChecker.test(new MutextEvenGenerator());}
}
3、什么时候需要同步呢
如果你正在写一个变量 他可能接下来将被另一个线程读取或者正在读取一个上一次已经被另一个线程写过的变量那么你必须使用同步 并且读写线程都必须用相同的监视器锁同步
注意每个访问临界资源的方法都必须被同步否则他们就不会正确的工作 4、使用显式的Lock 对象
lock() 与 unlock() 方法之间的代码就是临界资源
/** 使用显式的 Lock对象 * page 678 * */
public class MutextEvenGenerator extends IntGenerator{private int currentEventValue 0;private Lock lock new ReentrantLock();Overridepublic int next() {lock.lock(); // 加锁 try {currentEventValue;Thread.yield();currentEventValue;return currentEventValue; } finally {lock.unlock(); // 解锁 }}public static void main(String[] args) {EvenChecker.test(new MutextEvenGenerator());}
}
干货如何编写基于Lock的并发控制代码当你在使用 Lock对象时将这里所示的惯用法内部化是很重要的。紧接着的对 lock() 方法的调用你必须吧临界资源代码放置在 finally 子句中带有 unlock() 的 try-finally 语句中。 很在意 return 语句必须在 try 子句中出现以确保 unlock()方法不会过早发生从而将数据暴露给第2个任务
5、synchronzied 与 Lock 对别
如果使用 synchronized 关键字时某些事物失败了那么就会抛出一个异常 而你没有机会去做任何清理工作以维护系统使其处于良好状态
但显式的Lock对象 你就可以使用 finally 子句将系统维护在正确的状态了
小结 通常情况下推荐使用 synchronized 关键字因为写的代码量更少并且用户出现错误的可能性也会降低
只有在特殊情况下才会使用 显式的 Lock对象
/*** page 679* 尝试获取锁 */
public class AttemptLocking {/* 可重入锁 */private ReentrantLock lock new ReentrantLock();// 获得锁未设置超时时间 public void untimed() {boolean captured lock.tryLock(); // 获得锁 try {System.out.println(trylock() : captured);} finally {if (captured) {lock.unlock(); // 在 finally 子句中解锁 }}}// 获得锁设置了超时时间 public void timed() {boolean captured false; try {captured lock.tryLock(2, TimeUnit.SECONDS); // 基于超时控制尝试获取锁超时时间为3秒 } catch (Exception e) {throw new RuntimeException(); }try {System.out.println( tryLock(2, TimeUnit.SECONDS): captured);} finally {if (captured) { // 是否获得锁若获得锁则解锁lock.unlock(); // 解锁 }}}public static void main(String[] args) {final AttemptLocking al new AttemptLocking();al.untimed();al.timed();new Thread() { // 子线程 {setDaemon(true);} // 设置其为后台线程 public void run() {al.lock.lock();System.out.println(acquired:);}}.start();Thread.yield(); // 当前线程让出 cpu时间片 try { TimeUnit.SECONDS.sleep(1); // 主线程睡眠1秒让 子线程先获得锁 } catch (InterruptedException e) {e.printStackTrace();}al.untimed(); // 没有设置超时时间的获得锁al.timed(); // 设置了超时时间获得锁 }
}
/*
trylock() : truetryLock(2, TimeUnit.SECONDS): true
acquired:
trylock() : falsetryLock(2, TimeUnit.SECONDS): false
*/
Lock小结 显式的Lock 对象在加锁和释放锁方面相对于内建的 synchronized 锁来说还赋予了你更细粒度的控制力 【21.3.3】原子性与易变性
1、原子操作不需要进行同步控制 原子操作是不能被线程调度机制中断的 操作
2、但是有个问题
原子性可以应用于 除了 long 和 double 之外的所有基本类型的操作。 因为jvm 可以将64位long和double遍历的读取与写入当做两个分离的32位操作来执行这就产生了在一个读取和写入操作中间发送上下文切换从而导致不同 的任务可以看到不正确结果的可能。干货——这也叫做字撕裂
但是 当你定义long 或double变量时如果使用 volatile关键字就会获得原子性干货——原子性volatile 可以获得原子性
3、可见性或可视性问题 在多处理器系统上 相对单处理器系统而言可视性问题远比原子性问题多得多。即一个任务作出的修改即使在不中断的意义上讲是原子性的但对其他任务也可能是不可见的 因此不同的任务对应用的状态是不同的视图
4、volatile关键字 确保了应用中的可视性或可见性只要对 volatile域产生了写操作那么所有的读操作都可以看这个这个修改。即便使用了本地缓存 情况也是如此。因为volatile域会立即被写入主存中而读取操作就发生在主存中干货——因为volatile域会立即被写入主存中而读取操作就发生在主存中故 volatile可以保证可见性
补充1 理解原子性与易变性是两个不同的概念很重要。 在非 volatile域上的原子操作不必刷新到主存去因此其他读取该域的任务也没有必要当然也不会看到这个新值如果多个任务同时访问某个域那么这个域就应该是 volatile的否则这个域就应该同步访问
补充2当一个域的值依赖于他之前的值时如递增一个计数器则 volatile无法工作如果某个域的值收到其他域的值的限制那么volatile也无法工作如 Range类的 lower 和 upper 边界就必须遵循lower upper的限制
小结使用volatile而不是 synchronized的唯一安全的情况是类中只有一个可变的域。再次提醒你的第一选择应该是 synchronized 这是最安全的方式而其他方式都是有风险的干货
/*** page 682 * 原子性测试 */
public class AtomicityTest implements Runnable {private int i 0; public int getValue() { // 这里的 getValue没有 synchronized 来修饰有并发问题 return i ;}// 偶数增量 private synchronized void evenIncrement() {i; i; // 当执行第1次自加时 线程切换 导致 i为奇数 }Overridepublic void run() {while(true) {evenIncrement();}}public static void main(String[] args) {ExecutorService exec Executors.newCachedThreadPool();AtomicityTest at new AtomicityTest();exec.execute(at);while(true) {int val at.getValue();if (val % 2 ! 0) {System.out.println(val); // 1 或 3 或 7 System.exit(0); }}}
}
//1 或 3 或 7
上面的程序 evenIncrement 和 getValue方法都应该是 synchronized 关键字来修饰否则会有并发问题
/*** page 683* 序列号生成器 */
public class SerialNumberGenerator {private static volatile int serialNumber 0;public static int nextSerialNumber() { // 非线程安全因为方法没有被设置为同步 // 先返回然后再自增 return serialNumber; }
}*** page 683 * 数字检测器 */
public class SerialNumberChecker {private static final int SIZE 10;private static CircularSet serials new CircularSet(1000);private static ExecutorService exec Executors.newCachedThreadPool();// 序列数检测器保证序列数是唯一的 static class SerialChecker implements Runnable {Overridepublic void run() {while(true) {int serial SerialNumberGenerator.nextSerialNumber();if (serials.contains(serial)) { // 序列数字集合是否存在该数字存在结束System.out.println(duplicate: serial);System.exit(0); // 终止当前虚拟机 }serials.add(serial); // 否则 把该数字添加到集合 }}}public static void main(String[] args) {for (int i0; iSIZE; i) {exec.execute(new SerialChecker());}if (args.length 0) {try {TimeUnit.SECONDS.sleep(new Integer(args[0])); // 睡眠 System.out.println(no duplicates detected.);System.exit(0); } catch (Exception e) {e.printStackTrace();} }}/*duplicate:39705duplicate:39704duplicate:38934duplicate:39706duplicate:39707
*/
}
/*** 循环集合 */
class CircularSet {private int[] array ;private int len;private int index 0;// 构造方法 public CircularSet(int size) {array new int[size];len size; for (int i0; isize; i) {array[i] -1;}}// 同步加法 public synchronized void add(int i) {array[index] i ;index index % len; }// 同步是否包含 public synchronized boolean contains(int val) {for (int i0; ilen ; i) {if (array[i] val) return true; }return false; }
}
//duplicate:1680
//duplicate:1681
//duplicate:1619
//duplicate:1682
这里存在线程安全问题 因为 SerialNumberGenerator.nextSerialNumber 方法 不是synchronized 修饰 所以才会有重复的数字所以才会终止如果把该方法修改为 synchronized 则程序不会终止
【21.3.4】原子类 AtomicInteger AutomicLong AtomicReference
提供了原子性条件更新操作 compareAndSet(expectValue, updateValue);
荔枝 AtomicInteger
/*** page 684 * 原子类-AtomicInteger-提供原子性条件更新操作 compareAndSet 方法 */
public class AtomicIntegerTest implements Runnable {private AtomicInteger i new AtomicInteger(0);public int getValue() {return i.get(); }// 原子性加法用 AtomicInteger 替换了 synchronized 方法 private void evenIncrement() { i.addAndGet(2); // 原子操作 线程安全 }Overridepublic void run() {while(true) {evenIncrement(); }}public static void main(String[] args) {Timer timer new Timer(); // 定时器 timer.schedule(new TimerTask() {Overridepublic void run() {System.err.println(aborting);System.exit(0); // 终止当前虚拟机 }}, 5000); // 5秒后运行该子线程 ExecutorService exec Executors.newCachedThreadPool(); // 线程池 AtomicIntegerTest ait new AtomicIntegerTest();exec.execute(ait);while(true) {int val ait.getValue();if (val % 2 ! 0) {System.out.println(val); // 永远不会执行到这里因为 val 一定为偶数System.exit(0); // 终止当前虚拟机 }}}
}
/*aborting*/
强调Atomic类被设计用来构建 java.util.concurrent中的类因此只有在特殊情况下才在自己的代码中使用他们 即便使用了也需要确保不存在其他可能出现单的问题。通常依赖于锁更安全一些要么是 synchronized关键字 要么是显式的Lock对象
【21.3.5】临界区
1、有时临界区是方法内部的部分代码而不是整个方法synchronized 被用来指定某个对象此对象的锁被用来对花括号内部代码进行同步控制 同步控制块如下
synchronized(synchObject) { 临界区代码
}
2、使用 synchronized来创建临界区
public class CriticalSection {static void testApproaches(PairManager manager1, PairManager manager2) {ExecutorService exec Executors.newCachedThreadPool();PairManipulator pm1 new PairManipulator(manager1);PairManipulator pm2 new PairManipulator(manager2);PairChecker checker1 new PairChecker(manager1);PairChecker checker2 new PairChecker(manager2);exec.execute(pm1);exec.execute(pm2);exec.execute(checker1);exec.execute(checker2);try {TimeUnit.MILLISECONDS.sleep(500);} catch (InterruptedException e ) {System.out.println(sleep interrupted);}System.out.println(pm1: pm1 , pm2 pm2);System.exit(0);}public static void main(String[] args) {PairManager manager1 new PairManager1();PairManager manager2 new PairManager2();testApproaches(manager1, manager2); }/*pm1:pair:x 3, y 3, checkCounter 1, pm2 pair:x 7, y 7, checkCounter 3*/
}
// 命名内部类
class Pair {private int x, y ;public Pair(int x, int y) {this.x x; this.y y; }public Pair () {this(0, 0);}public int getX() {return x; }public int getY() {return y;}// x 自增1 public void incrementX() {x;}// y 自增1 public void incrementY() {y; }public String toString() {return x x , y y; }// 自定义异常类 public class PairValuesNotEqualException extends RuntimeException {public PairValuesNotEqualException() {super(pair values not equal: Pair.this);}}// 检测x与y是否相等 public void checkState() {if (x ! y) {throw new PairValuesNotEqualException(); }}
}
// 对子管理器
abstract class PairManager {AtomicInteger checkCounter new AtomicInteger(0);protected Pair p new Pair();private ListPair storage Collections.synchronizedList(new ArrayListPair()); // 加锁的list// 同步获取 pair 对象 public synchronized Pair getPair() {return new Pair(p.getX(), p.getY()); }protected void store(Pair p) {storage.add(p);try {TimeUnit.SECONDS.sleep(10);} catch (InterruptedException e) {}}public abstract void increment();
}
class PairManager1 extends PairManager {public synchronized void increment() {p.incrementX(); // 让 x 自增 p.incrementY(); // y 自增 this.store(p);}
}
class PairManager2 extends PairManager {public synchronized void increment() {Pair temp; synchronized(this) { // 同步控制块 p.incrementX(); // 让 x 自增 p.incrementY(); // y 自增 temp this.getPair(); }this.store(temp); }
}
/*** 对子操纵器 */
class PairManipulator implements Runnable {private PairManager pm ;public PairManipulator(PairManager pm) {this.pm pm;}Overridepublic void run() {while(true) {pm.increment();}}public String toString() {return pair: pm.getPair() , checkCounter pm.checkCounter.get(); }
}
// 对子检测器
class PairChecker implements Runnable {private PairManager pm ; public PairChecker(PairManager pm) {this.pm pm; }Overridepublic void run() {while(true) {pm.checkCounter.incrementAndGet();pm.getPair().checkState();}}
}
3、使用 Lock来创建临界区
// 使用Lock对象创建临界区
class ExplicitPairManager1 extends PairManager {private Lock lock new ReentrantLock();public synchronized void increment() {lock.lock(); // 加锁 try {p.incrementX();p.incrementY();store(getPair());} finally {lock.unlock(); // 解锁 }}
}
【21.3.6】在其他对象上同步
1、synchronized 块必须给定一个在其上进行同步的对象并且最合理的方式 是 使用其方法正在被调用的当前对象如 synchronize(this)
class DualSynch {private Object syncObject new Object();
// public synchronized void f() {public synchronized void f() {synchronized(syncObject) { // 对 syncObject 进行同步控制获取的是 syncObject 对象的锁 for (int i 0; i 5; i) {print(f());Thread.yield();}}}public void g() {synchronized (syncObject) { // 对 syncObject 进行同步控制获取的是 syncObject 对象的锁 for (int i 0; i 5; i) {print(g());Thread.yield();}}}
}
public class SyncObject {public static void main(String[] args) {final DualSynch ds new DualSynch();new Thread() {public void run() {ds.f();}}.start();ds.g();}
}
/*
g()
g()
g()
g()
g()
f()
f()
f()
f()
f()
*/
【21.3.7】线程本地存储 ThreadLocal类实现
1、可以为使用相同变量的每个不同线程都创建不同的存储有5个线程都是用变量x多表示的对象name线程本地存储就会生成5块内存
class Accessor implements Runnable {private final int id;public Accessor(int idn) {id idn;}public void run() {while (!Thread.currentThread().isInterrupted()) {ThreadLocalVariableHolder.increment();System.out.println(this);Thread.yield();}}public String toString() {return Thread.currentThread().getName() , # id : ThreadLocalVariableHolder.get();}
}public class ThreadLocalVariableHolder {/** 使用 ThreadLocal 根除不同线程对变量的共享 */private static ThreadLocalInteger value new ThreadLocalInteger() {private Random rand new Random(47);protected synchronized Integer initialValue() {return rand.nextInt(10000);}};public static void increment() {value.set(value.get() 1);}public static int get() {return value.get();}public static void main(String[] args) throws Exception {ExecutorService exec Executors.newCachedThreadPool();for (int i 0; i 5; i)exec.execute(new Accessor(i));TimeUnit.SECONDS.sleep(3); // Run for a whileexec.shutdownNow(); // All Accessors will quit}
}
/*pool-1-thread-3, #2: 25714
pool-1-thread-3, #2: 25715
pool-1-thread-3, #2: 25716
pool-1-thread-3, #2: 25717*/【21.4】终结任务
/*** 终结任务-装饰性花园 获取多个大门进入公园的总人数每个大门有计数器 * 本测试案例的作用在于 并发控制使得每个线程的计数总和等于 主线程的总计数值*/
public class OrnamentalGarden {public static void main(String[] args) throws Exception {ExecutorService exec Executors.newCachedThreadPool(); // 线程池 for(int i0; i5; i) {exec.execute(new Entrance(i)); // 运行任务 }TimeUnit.SECONDS.sleep(3); // 睡眠3秒 Entrance.cancel(); // 所有线程停止运行即所有大门关闭 exec.shutdown(); // 关闭线程池 // ExecutorService.awaitTermination 方法等待每个任务结束// 如果所有任务在超时时间达到之前全部结束则返回true否则返回false if (!exec.awaitTermination(1000, TimeUnit.MICROSECONDS)) {System.out.println(some tasks were not terminated);}System.out.println(total : Entrance.getTotalCount()); // 获取count总计数值 System.out.println(sum of entrances: Entrance.sumEntrances()); // 所有大门人数相加}
}class Count {private int count 0;private Random random new Random(47);public synchronized int increment() {// 自增同步方法 int temp count;if (random.nextBoolean()) {// 获取随机布尔值50%机会为true Thread.yield(); // 当前线程让出cpu时间片 }return (count temp); // count 自增}public synchronized int value() { // 同步方法-获取count return count; }
}
/*** 入口大门 */
class Entrance implements Runnable {private static Count count new Count();// 总计数器所有大门人数计数器 private static ListEntrance entrances new ArrayList(); // 入口列表 private int number 0; // 每个大门的人数 private int id 0;public static volatile boolean canceled false;// 停止线程运行默认为false public static void cancel() {canceled true; }public Entrance(int id) {this.id id;entrances.add(this);}Overridepublic void run() {while(!canceled) {synchronized(this) {// 同步块-获取当前对象的锁 number; // number 自增 每个大门人数自增 }System.out.println(this , total: count.increment()); // 总计数器自增即所有大门人数计数器自增 try {TimeUnit.MICROSECONDS.sleep(1000);// 睡眠1秒 } catch (InterruptedException e) {System.out.println(sleep interrupted);}}System.out.println(stopping this); }public synchronized int getValue() { // 同步方法-获取本大门的人数 return number; }public String toString() {return entrance id : getValue();}public static int getTotalCount() { // 获取总计数值 return count.value();}public static int sumEntrances() { // 获取总和int sum 0;for (Entrance e : entrances) {sum e.getValue(); // 所有大门人数求和 }return sum; }
}
/*stopping entrance 1: 1693
stopping entrance 2: 1695
stopping entrance 4: 1695
total :8473
sum of entrances:8473
*/
【21.4.2】在阻塞时终结
1、线程状态 新建当线程被创建时他会短暂处于这个状态 此时线程已经分配了资源并完成了初始化 就绪 只要调度器把cpu时间片分配该这个线程他就可以运行 阻塞线程能够运行 但有个条件阻止他运行。线程可以由阻塞状态重新进入就绪状态 死亡处于死亡或终止状态的线程不在是可调度的并且也不会得到 cpu时间片他的任务已经结束
2、进入阻塞状态进入阻塞状态的原因 2.1、调用 sleep 方法使线程进入休眠状态 2.2、调用wait()方法使线程挂起 直到线程得到了 notify或notifyAll方法调用或 SE5中的java.util.concurrent类库中的 signal或signalAll() 方法调用 线程才会进入就绪状态 2.3、任务在等待某个输入输出完成 2.4、任务试图在某个对象上调用其同步控制方法 但是对象锁不可用因为另一个任务已经获取了这个锁
【21.4.3】中断
1、Thread类包含 interrupt方法可以终止被阻塞的任务。这个方法将设置线程的中断状态。 如果一个线程被阻塞或者视图执行一个阻塞操作那么设置这个线程的中断状态将抛出 InterruptedException异常。当抛出该异常或该任务调用了 Thread.interrupted() 方法时 中断状态将被复位设置为true
2、如果调用 Executor上调用 shutdownNow方法那么它将发送一个 interrupte方法调用给他启动的所有线程。
通过调用submit()而不是executor() 来启动任务就可以持有该任务的上下文。
【21.4.3】中断
1、Thread类包含 interrupt方法可以终止被阻塞的任务。这个方法将设置线程的中断状态。 如果一个线程被阻塞或者视图执行一个阻塞操作那么设置这个线程的中断状态将抛出 InterruptedException异常。当抛出该异常或该任务调用了 Thread.interrupted() 方法时 中断状态将被复位设置为true
2、如果调用 Executor上调用 shutdownNow方法那么它将发送一个 interrupte方法调用给他启动的所有线程。
通过调用submit()而不是executor() 来启动任务就可以持有该任务的上下文。submit()方法将返回一个泛型Future? 持有这个Future的关键在于你可以调用该对象的cancle() 方法 并因此可以使用他来中断某个特定任务。如果把true传给 cancel方法他就会拥有在该线程上调用 interrupt方法以停止这个线程的权限。
/*** 中断由线程池管理的某个线程 */
public class Interrupting {private static ExecutorService exec Executors.newCachedThreadPool();// 线程池 static void test(Runnable r) throws InterruptedException {// 使用 ExecutorService.submit() 而不是 ExecutorService.execute()方法来启动任务就可以持有该任务的上下文 submit() 方法返回 Future 对象 // exec.execute(r); 不用 execute() 方法 Future? f exec.submit(r);TimeUnit.MILLISECONDS.sleep(1000); // 睡眠1秒 System.out.println(interrupting r.getClass().getName()); // 正在中断某个线程 // 调用Future.cancel() 方法来中断某个特定任务 // 把true传给cancel() 方法该方法就拥有在该线程上调用interrupt() 方法以停止这个线程的权限 // cancel 是一种中断由 Executor启动的单个线程的方式 f.cancel(true);System.out.println(interrupt sent to r.getClass().getName()); // 中断信号发送给线程 System.out.println( seperate line );}public static void main(String[] args) throws Exception {test(new SleepBlocked());test(new IOBlocked(System.in));test(new SynchronizedBlocked()); TimeUnit.SECONDS.sleep(3);System.out.println(aborting with System.exit(0));System.exit(0);// 终止当前虚拟机进程所以有部分打印信息无法没有正常输出}
}
// 睡眠式阻塞线程 可中断的阻塞
class SleepBlocked implements Runnable {Overridepublic void run() {try {TimeUnit.SECONDS.sleep(3);// 睡眠3秒} catch (InterruptedException e ) { // 捕获中断异常System.out.println(interrupted exception in SleepBlocked );}System.out.println(exiting SleepBlocked.run());}
}
// IO式阻塞线程 不可中断的阻塞
class IOBlocked implements Runnable {private InputStream in;public IOBlocked(InputStream is) {in is; }Overridepublic void run() {try {System.out.println(waiting for read(););in.read(); // 等待输入流输入数据 } catch (IOException e) { // IO 异常 但执行结果没有报 IO 异常 if (Thread.currentThread().isInterrupted()) {System.out.println(interrupted from blocked IO);} else {throw new RuntimeException();}}System.out.println(exiting IOBlocked.run());}
}
// 线程同步式阻塞不可中断的阻塞
class SynchronizedBlocked implements Runnable {public synchronized void f() {while(true) {Thread.yield(); // 让出cpu时间片 }}public SynchronizedBlocked() { // 构造器开启一个线程 new Thread() { // 匿名线程调用f() 方法获取 SynchronizedBlocked 对象锁且不释放其他线程只能阻塞 public void run() {f();// f() 为同步方法 }}.start(); }Override public void run() {System.out.println(trying to call 同步f());f(); // 调用f() 同步方法 让出cpu时间片 System.out.println(exiting SynchronizedBlocked.run()); // 这里永远不会执行 }
}
/*interrupting diy.chapter21.SleepBlocked
interrupt sent to diy.chapter21.SleepBlockedseperate line
interrupted exception in SleepBlocked
exiting SleepBlocked.run()
waiting for read();
interrupting diy.chapter21.IOBlocked
interrupt sent to diy.chapter21.IOBlockedseperate line
trying to call 同步f()
interrupting diy.chapter21.SynchronizedBlocked
interrupt sent to diy.chapter21.SynchronizedBlockedseperate line
aborting with System.exit(0)
*/小结
序号阻塞方式是否可以中断1sleep是2IO否3synchronized获取锁否
所以对于IO操作线程或synchronized操作的线程其具有锁住多线程程序的潜在危险。
如何解决呢 关闭任务在其上发生阻塞的底层资源
/*** 无法中断线程但可以关闭任务阻塞所依赖的资源。* 这里只能够中断 基于socket输入流的io线程因为socket输入流可以关闭* 但无法中断基于系统输入流的io线程因为系统输入流无法关闭*/
public class CloseResource {public static void main(String[] args) throws Exception {ExecutorService exec Executors.newCachedThreadPool(); // 线程池 ServerSocket server new ServerSocket(8080); // 服务端套接字 InputStream socketInput new Socket(localhost, 8080).getInputStream();/* 启动线程 */exec.execute(new IOBlocked(socketInput));exec.execute(new IOBlocked(System.in));TimeUnit.MILLISECONDS.sleep(1000); // 睡眠1秒 System.out.println(shutting down all threads);exec.shutdownNow(); // 发送一个interrupte() 信号给exec启动的所有线程 TimeUnit.SECONDS.sleep(1); // 睡眠1秒 System.out.println(closing socketInput.getClass().getName());socketInput.close(); // 关闭io线程依赖的资源 TimeUnit.SECONDS.sleep(1);System.out.println(closing System.in.getClass().getName());System.in.close(); // 关闭io线程依赖的资源 }
}
/**
waiting for read();
waiting for read();
shutting down all threads
closing java.net.SocketInputStream
interrupted from blocked IO
exiting IOBlocked.run()
closing java.io.BufferedInputStream
*/
3、nio类提供了更人性化的IO中断被阻塞的nio通道会自动响应中断
/*** page 698* nio中断 */
public class NIOInterruption {public static void main(String[] args) throws Exception {ExecutorService exec Executors.newCachedThreadPool(); // 线程池 ServerSocket ss new ServerSocket(8080); // 服务器套接字 // InetAddress:类的主要作用是封装IP及DNS, // InetSocketAddress类主要作用是封装端口 他是在在InetAddress基础上加端口但它是有构造器的。InetSocketAddress isa new InetSocketAddress(localhost, 8080);SocketChannel sc1 SocketChannel.open(isa); // 套接字通道 SocketChannel sc2 SocketChannel.open(isa); // 套接字通道 // 使用 ExecutorService.submit() 而不是 ExecutorService.execute()方法来启动任务就可以持有该任务的上下文 submit() 方法返回 Future 对象 Future? f exec.submit(new NIOBlocked(sc1));// 以submit方式启动线程 exec.execute(new NIOBlocked(sc2)); // 以 execute方式启动线程 exec.shutdown(); // 关闭所有线程 TimeUnit.SECONDS.sleep(1); // 睡眠1秒// 调用Future.cancel() 方法来中断某个特定任务 // 把true传给cancel() 方法该方法就拥有在该线程上调用interrupt() 方法以停止这个线程的权限 // cancel 是一种中断由 Executor启动的单个线程的方式f.cancel(true); // sc2.close(); // }
}
// NIO 新io式阻塞
class NIOBlocked implements Runnable {private final SocketChannel sc;public NIOBlocked(SocketChannel sc) {this.sc sc; }Overridepublic void run() {try {System.out.println(waiting for read() in this);sc.read(ByteBuffer.allocate(1));} catch (ClosedByInterruptException e1) {System.out.println(ClosedByInterruptException, this this);} catch (AsynchronousCloseException e2) {System.out.println(AsynchronousCloseException, this this);} catch (IOException e3) {throw new RuntimeException(e3); }System.out.println(exiting NIOBlocked.run() this);}
}
/**
waiting for read() in diy.chapter21.NIOBlocked3856c761
waiting for read() in diy.chapter21.NIOBlocked55de2e48
ClosedByInterruptException, this diy.chapter21.NIOBlocked55de2e48
exiting NIOBlocked.run() diy.chapter21.NIOBlocked55de2e48
AsynchronousCloseException, this diy.chapter21.NIOBlocked3856c761
exiting NIOBlocked.run() diy.chapter21.NIOBlocked3856c761
*/
4、被互斥所阻塞 一个任务能够调用在同一个对象中的其他的 synchronized 方法而这个任务已经持有锁了
/*** 被互斥所阻塞* 同步方法f1 和 f2 相互调用直到 count为0 * 一个任务应该能够调用在同一个对象中的其他 synchronized 方法因为这个任务已经获取这个对象的锁* 2020/04/16 */
public class MultiLock {public synchronized void f1(int count) { // 同步方法 f1 if(count-- 0) {System.out.println(f1() calling f2() with count count);f2(count); // 调用 f2 }}public synchronized void f2(int count) { // 同步方法f2 if(count-- 0) {System.out.println(f2() calling f1() with count count);f1(count); // 调用f1 }}public static void main(String[] args) {final MultiLock multiLock new MultiLock();new Thread() {public void run() {multiLock.f1(5);}}.start(); }
}
/**
f1() calling f2() with count 4
f2() calling f1() with count 3
f1() calling f2() with count 2
f2() calling f1() with count 1
f1() calling f2() with count 0
*/
5、java se5 并发类库中添加了一个特性在 ReentrantLock 可重入锁上阻塞的任务具备可以被中断的能力
/*** 可重入锁的可中断式加锁 * page 700 */
public class Interrupting2 {public static void main(String[] args) throws Exception {Thread t new Thread(new Blocked2());t.start();TimeUnit.SECONDS.sleep(1);System.out.println(issuing t.interrupt()); // 2 t.interrupt(); // 中断线程 }
}
/*** 阻塞互斥量 */
class BlockedMutex {private Lock lock new ReentrantLock(); // 可重入锁 public BlockedMutex() {lock.lock(); // 构造器即加锁且从不会释放锁 }public void f() {try {lock.lockInterruptibly(); // 可中断式加锁 System.out.println(lock acquired in f());} catch(InterruptedException e) {System.out.println(interrupted from lock acquisition in f()); // 3 可中断阻塞捕获中断异常 }}
}
class Blocked2 implements Runnable {BlockedMutex blocked new BlockedMutex(); Overridepublic void run() {System.out.println(waiting for f() in Blocked Mutex); // 1 blocked.f();System.out.println(broken out of blocked call); // 4 }
}
/*** waiting for f() in Blocked Mutex
issuing t.interrupt()
interrupted from lock acquisition in f()
broken out of blocked call*/
【21.4.4】检查中断
1、在线程上调用 interrupt方法去中断线程执行时能够中断线程的前提是 任务要进入到阻塞操作中已经在阻塞操作内部否则调用 interrupt方法是无法中断线程的需要通过其他方式
其他方式是 由中断状态来表示 其状态可以通过调用 interrupt 来设置。通过 Thread.interrupted() 来检查中断 。
/*** 通过 Thread.interrupted() 来检查中断 * page 701*/
public class InterruptingIdiom {public static void main(String[] args) throws Exception {if(args.length ! 1) {System.out.println(InterruptingIdiom-傻瓜式中断);}Thread t new Thread(new Blocked3()); t.start();TimeUnit.SECONDS.sleep(3); // 睡眠 t.interrupt(); // 中断 }
}class NeedsCleanup {private final int id;public NeedsCleanup(int id) {this.id id;System.out.println(NeedsCleanup id);}public void cleanup() {System.out.println(clean up id);}
}
/*** 在run()方法中创建的 NeedsCleanup 资源都必须在其后面紧跟 try-finally 子句 * 以确保 清理资源方法被调用 */
class Blocked3 implements Runnable {private volatile double d 0.0; Overridepublic void run() {try {int index 1;// interrupted方法来检查中断状态 while(!Thread.interrupted()) { // 只要当前线程没有中断 System.out.println( 第 index 次循环 ); NeedsCleanup n1 new NeedsCleanup(1);try {System.out.println(sleeping-睡眠一秒);TimeUnit.SECONDS.sleep(1);NeedsCleanup n2 new NeedsCleanup(2);try {System.out.println(calculating-高强度计算);for (int i1; i250000; i) {d d (Math.PI Math.E) / d;}System.out.println(finished time-consuming operation 完成耗时操作.); } finally {n2.cleanup(); // 清理 }} finally{n1.cleanup(); // 清理 }}System.out.println(exiting via while() test-从while循环退出 ); // 从while循环退出 } catch (InterruptedException e) {System.out.println(exiting via InterruptedException-从中断InterruptedException退出 ); // 从中断退出 }}}
/***
InterruptingIdiom-傻瓜式中断第 1 次循环
NeedsCleanup 1
sleeping-睡眠一秒
NeedsCleanup 2
calculating-高强度计算
finished time-consuming operation 完成耗时操作.
clean up 2
clean up 1第 2 次循环
NeedsCleanup 1
sleeping-睡眠一秒
NeedsCleanup 2
calculating-高强度计算
finished time-consuming operation 完成耗时操作.
clean up 2
clean up 1第 3 次循环
NeedsCleanup 1
sleeping-睡眠一秒
clean up 1
exiting via InterruptedException-从中断InterruptedException退出 */ 【21.5】线程间的协作
1、当任务协作时关键问题是任务间的握手。握手可以通过 Object.wait() Object.notify() 方法来安全实现。当然了 java se5 的并发类库还提供了具有 await() 和 signal() 方法的Condition对象
【21.5.1】wait()方法与notifyAll() 方法
1、wait() 方法会在等待外部世界产生变化的时候将任务挂起并且只有在 nofity() 或notifyall() 发生时即表示发生了某些感兴趣的事务这个任务才会被唤醒去检查锁产生的变化。wait()方法提供了一种在任务之间对活动同步的方式。
还有调用wait() 方法将释放锁意味着另一个任务可以获得锁所以该对象上的其他synchronized方法可以在线程A wait期间被其他线程调用
2、有两种形式的 wait() 调用
形式1 wait方法接收毫秒数作为参数在wait()期间对象锁是释放的通过 notify() notifyAll() 方法或者时间到期后从 wait() 恢复执行
形式2wait方法不接受任何参数这种wait将无线等待下去直到线程接收到 notify或 notifyAll方法
补充1wait方法notify方法 notifyAll方法都是基类Object的一部分因为这些方法操作的锁也是对象的一部分而所有对象都是OBject的子类
补充2实际上只能在同步控制方法或同步控制块里调用 wait notify notifyAll方法因为不操作锁所有sleep方法可以在非同步控制方法里调用。如果在非同步方法中调用 wait notify notifyAll方法 编译可以通过但运行就报 IllegalMonitorStateException 异常异常意思是 在调用wait notify notifyAll方法前必须获取对象的锁
干货——只能在同步控制方法或同步控制块里调用 wait notify notifyAll方法 【荔枝】涂蜡与抛光 抛光任务在涂蜡完成之前是不能执行其工作的而涂蜡任务在涂另一层蜡之前必须等待抛光任务完成 抛光 WaxOn WaxOff 使用了wait和notifyAll方法来挂起和重启这些任务
/*** 汽车上蜡与抛光* 抛光任务在涂蜡完成之前是不能执行其工作的而涂蜡任务在涂另一层蜡之前必须等待抛光任务完成 * page 705 */
public class WaxOMatic {public static void main(String[] args) throws Exception {Car car new Car();ExecutorService exec Executors.newCachedThreadPool();exec.execute(new WaxOff(car)); // 先上蜡 exec.execute(new WaxOn(car)); // 后抛光 TimeUnit.SECONDS.sleep(1); // 睡眠5秒 exec.shutdown(); // 线程池关闭 }
}
// 汽车
class Car {private boolean waxOn false; // 是否上蜡// 已上蜡 /*** notifyAll() 和 wait() 方法只能在 synchronized方法或synchronized块中执行因为获取或释放锁 */public synchronized void waxed() { // 上蜡 waxOn true; notifyAll(); // 唤醒所有调用 wait() 方法锁阻塞的线程 // 为了使该任务从 wait() 中唤醒线程必须重新获得之前进入wait()时释放的锁。// 在这个锁变得可用之前这个任务是不会被唤醒的。}public synchronized void buffed() { // 抛光 waxOn false; notifyAll();}public synchronized void waitForWaxing() throws InterruptedException { // 等待上蜡 while (waxOn false) { // 若没有上蜡则等待 wait(); // 线程被挂起 当前线程持有的car对象锁被释放 }}public synchronized void waitForBuffing() throws InterruptedException { // 等待抛光 while(waxOn true) { // 若已上蜡则等待抛光wait(); // 线程被挂起 当前线程持有的car对象锁被释放}}
}
class WaxOn implements Runnable { // 上蜡线程本线程先执行第1次上蜡等待抛光抛光线程第1次执行抛光后本线程执行第2次上蜡...... private Car car; public WaxOn(Car c) {this.car c; }Overridepublic void run() {try {while(!Thread.interrupted()) {System.out.println(wax on !);TimeUnit.MILLISECONDS.sleep(200);car.waxed(); // 先上蜡完成 把waxOn设置为true唤醒等待上蜡的线程 car.waitForBuffing(); // 再等待抛光当waxOn为ture则抛光线程一直等待 }} catch (InterruptedException e ) {System.out.println(exiting via interrupt);}System.out.println(ending wax on task); }
}
class WaxOff implements Runnable { // 抛光线程本线程先等待上蜡上蜡线程第1次执行后本线程立即执行第1次抛光接着本线程等待第2次上蜡...... private Car car; public WaxOff(Car c) {this.car c; }Overridepublic void run() {try {while(!Thread.interrupted()) {car.waitForWaxing(); // 先等待上蜡 , 当waxOn为false则上蜡线程一直等待 System.out.println(wax off !); // TimeUnit.MILLISECONDS.sleep(200);car.buffed(); // 抛光完成后把waxOn设置为false唤醒等待抛光的线程 }} catch (InterruptedException e ) {System.out.println(exiting via interrupt);}System.out.println(ending wax off task); }
}
/*** wax on !
wax off !
wax on !
wax off !
...... */
补充前面的实例强调必须用一个检查感兴趣的条件的while循环包围wait方法。这很重要因为为啥要用while包裹wait呢
前面的示例强调必须用一个检查感兴趣的条件的while循环包围wait()。这很重要原因如下 原因1可能有多个任务出于相同的原因在等待同一个锁而第一个唤醒任务可能会改变这种状况如果属于这种情况那么任务应该被再次挂起直到其感兴趣的条件发生变化 原因2在本任务从其 wait()中被唤醒的时刻有可能会有某个其他任务已经做出了改变从而使得本任务在此时不能执行或者执行其操作已显得无关紧要此时应该再次执行wait()将其重新挂起个人理解——比如有2个任务AB都在等待资源R可用而阻塞当R可用时任务A和B均被唤醒但任务A被唤醒后立即拿到了临界资源或获取了锁则任务B仍然需要再次阻塞这就是while的作用 原因3有可能某些任务出于不同的原因在等待你的对象上的锁必须使用notifyAll唤醒在这种情况下需要检查是否已经由正确的原因唤醒如果不是则再次调用wait方法
用while 包围wait方法的本质检查所有感兴趣的条件并在条件不满足的情况下再次调用wait方法让任务再次阻塞
3、错失的信号当两个线程使用 notify/wait() 或 notifyAll()/ wait() 方法进行协作时有可能会错过某个信号即 notify或 notifyAll发出的信号带有wait的线程无法感知到。
荔枝
// T1:
synchronized(sharedMonitor) {setup condition for T2sharedMonitor.notify() // 唤醒所有等待线程
}
// T2:
while(someCondition) {// point 1 synchronized(sharedMonitor) {sharedMonitor.wait(); // 当前线程阻塞 }
}
当T2 还没有调用 wait方法时T1就发送了notify信号 这个时候T2线程肯定接收不到这个信号T1发送信号notify后T2才调用wait方法这时T2将永久阻塞下去因为他错过了T1的notify信号
T2正确的写法如下
// T2正确的写法如下
synchronized(sharedMonitor) {while(someCondition) {sharedMonitor.wait(); // 当前线程阻塞 }
}
如果T1先执行后释放锁此时T2获取锁且检测到 someCondition已经发生了变化T2不会调用wait() 方法
如果T2先执行且调用了wait方法 释放了锁 这时T1后执行然后调用notify()唤醒阻塞线程 这时T2可以收到T1的 notify信号从而被唤醒 由T1修改了 someCondition的条件 所以T2 不会进入while循环 【21.5.2】notify与notifyAll方法 1、notify()方法在使用 notify方法时在众多等待同一个锁的任务中只有一个会被唤醒如果你希望使用notify就必须保证被唤醒的是恰当的任务。 2、notifyAll将唤醒所有正在等待的任务。这是否意味着在任何地方任何处于wait状态中的任务都将被任何对notifyAll的调用唤醒呢。事实上当notifyAll因某个特定锁而被调用时只有等待这个锁的任务才会被唤醒
/*** page 707* notify 与 notifyAll的对比 * notify 唤醒单个阻塞线程而notifyAll唤醒所有阻塞线程*/
public class NotifyVsNotifyAll {public static void main(String[] args) throws Exception {ExecutorService exec Executors.newCachedThreadPool();// 线程池 for (int i 0; i5; i) {exec.execute(new Task()); // 运行5个任务 只要task 任务一运行就会阻塞除非被唤醒 }exec.execute(new Task2()); // 运行第6个任务 只要task2 任务一运行就会阻塞除非被唤醒 Timer timer new Timer(); // 定时器 // 定时调度 延迟400毫秒开始执行两次运行的时间间隔为500毫秒 timer.scheduleAtFixedRate(new TimerTask() {boolean prod true; Overridepublic void run() {if (prod) {System.out.println(\n notify() );Task.blocker.prod(); // 唤醒单个阻塞线程 prod false ;} else {System.out.println(\n notifyAll());Task.blocker.prodAll(); // 唤醒所有阻塞线程 prod true ;}}}, 400, 500);TimeUnit.SECONDS.sleep(5);timer.cancel(); // 关闭定时器关闭所有线程正在运行的任务除外 System.out.println(timer canceled);TimeUnit.MILLISECONDS.sleep(500); // 睡眠500毫秒 System.out.println(task2.blocker.prodAll());Task2.blocker.prodAll(); // task2 唤醒所有阻塞线程 TimeUnit.MILLISECONDS.sleep(500); // 睡眠500毫秒System.out.println(\n shutting down);exec.shutdownNow(); // 关闭线程池 }
}
// 阻塞器
class Blocker {synchronized void waitingCall() {try {while(!Thread.interrupted()) {wait(); // 期初所有线程均阻塞等待 notify 或 notifyAll 来唤醒 System.out.println(Thread.currentThread() );}} catch (InterruptedException e ) {}}synchronized void prod() {notify();// 唤醒单个阻塞线程 }synchronized void prodAll() {notifyAll(); // 唤醒所有阻塞线程 }
}
// 任务
class Task implements Runnable {static Blocker blocker new Blocker();// 阻塞器 Overridepublic void run() {blocker.waitingCall();// wait() 方法阻塞 }
}
// 任务2
class Task2 implements Runnable {static Blocker blocker new Blocker(); // 阻塞器 Overridepublic void run() {blocker.waitingCall(); // wait() 方法阻塞 }
}
/*notify()
Thread[pool-1-thread-1,5,main] notifyAll()
Thread[pool-1-thread-1,5,main]
Thread[pool-1-thread-5,5,main]
Thread[pool-1-thread-4,5,main]
Thread[pool-1-thread-3,5,main]
Thread[pool-1-thread-2,5,main] notify()
Thread[pool-1-thread-1,5,main] notifyAll()
Thread[pool-1-thread-1,5,main]
Thread[pool-1-thread-2,5,main]
Thread[pool-1-thread-3,5,main]
Thread[pool-1-thread-4,5,main]
Thread[pool-1-thread-5,5,main] notify()
Thread[pool-1-thread-1,5,main] notifyAll()
Thread[pool-1-thread-1,5,main]
Thread[pool-1-thread-5,5,main]
Thread[pool-1-thread-4,5,main]
Thread[pool-1-thread-3,5,main]
Thread[pool-1-thread-2,5,main] notify()
Thread[pool-1-thread-1,5,main] notifyAll()
Thread[pool-1-thread-1,5,main]
Thread[pool-1-thread-2,5,main]
Thread[pool-1-thread-3,5,main]
Thread[pool-1-thread-4,5,main]
Thread[pool-1-thread-5,5,main] notify()
Thread[pool-1-thread-1,5,main] notifyAll()
Thread[pool-1-thread-1,5,main]
Thread[pool-1-thread-5,5,main]
Thread[pool-1-thread-4,5,main]
Thread[pool-1-thread-3,5,main]
Thread[pool-1-thread-2,5,main]
timer canceled
task2.blocker.prodAll()
Thread[pool-1-thread-6,5,main] shutting down */
补充
// 阻塞器
class Blocker {synchronized void waitingCall() {try {while(!Thread.interrupted()) {wait(); // 期初所有线程均阻塞等待 notify 或 notifyAll 来唤醒 System.out.println(Thread.currentThread() );}} catch (InterruptedException e ) {}}synchronized void prod() {notify();// 唤醒单个阻塞线程 }synchronized void prodAll() {notifyAll(); // 唤醒所有阻塞线程 }
}
Blocker.waitingCall 方法中的while循环 有两种方式可以离开这个循环 方式1发生异常而离开 方式2通过检查 interrupted标志离开
【21.5.3】生产者与消费者 1、对于一个饭店有一个厨师和服务员。服务员必须等待厨师准备好膳食。当厨师准备好时他会通知服务员之后服务员上菜然后返回继续等待下一次上菜。 这是一个任务协作的荔枝厨师代表生产者服务员代表消费者。两个任务必须在膳食被生产和消费时进行握手而系统必须以有序的方式关闭。
/*** page 709* 生产者厨师chef-与消费者服务员WaitPerson */
public class Restaurant { // 餐馆 Meal meal ; ExecutorService exec Executors.newCachedThreadPool(); // 线程池 WaitPerson waitPerson new WaitPerson(this); // 服务员 Chef chef new Chef(this); // 厨师 // 构造器中通过线程池 运行厨师和服务员的任务 public Restaurant() {exec.execute(chef);exec.execute(waitPerson);}public static void main(String[] args) {new Restaurant(); }
}
class Meal { // 膳食 private final int orderNum; // 订单号 public Meal(int orderNum) {this.orderNum orderNum; }Override public String toString() {return meal orderNum; }
}
class WaitPerson implements Runnable { // 服务员消费者private Restaurant restaurant; // 餐馆 public WaitPerson(Restaurant restaurant) {this.restaurant restaurant;} Overridepublic void run() {try {while(!Thread.interrupted()) {synchronized (this) {while(restaurant.meal null) { // 没有菜可以上服务员等待 wait(); // 阻塞直到 notify 或 notifyAll 唤醒 }}System.out.println(服务器取到餐WaitPerson got restaurant.meal);synchronized (restaurant.chef) { // 厨师 restaurant.meal null; restaurant.chef.notifyAll(); // 唤醒所有阻塞在 chef对象上的线程 }}} catch (InterruptedException e) {System.out.println(WaitPerson interrupted服务员线程中断);}}
}
class Chef implements Runnable {// 厨师生产者 private Restaurant restaurant; private int count 0;public Chef(Restaurant r) {restaurant r; }Overridepublic void run() {try {while(!Thread.interrupted()) {synchronized (this) {while(restaurant.meal ! null) { // 菜没有被端走厨师等待 wait(); // 阻塞直到 notify 或 notifyAll 唤醒}}if (count 10) { // 厨师只做10个菜 System.out.println(out of food, closing。厨师只做10个菜关闭线程池);restaurant.exec.shutdownNow(); // 关闭餐馆的线程池该池运行着厨师和服务员任务 }System.out.println(厨师说上菜了order up!);synchronized (restaurant.waitPerson) {restaurant.meal new Meal(count); // 厨师生产一个菜 restaurant.waitPerson.notifyAll(); // 唤醒服务员端菜 }TimeUnit.MILLISECONDS.sleep(100);}} catch (InterruptedException e) {System.out.println(chef interrupted);}}
}
/*
厨师说上菜了order up!
服务器取到餐WaitPerson got meal 1
厨师说上菜了order up!
服务器取到餐WaitPerson got meal 2
厨师说上菜了order up!
服务器取到餐WaitPerson got meal 3
厨师说上菜了order up!
服务器取到餐WaitPerson got meal 4
厨师说上菜了order up!
服务器取到餐WaitPerson got meal 5
厨师说上菜了order up!
服务器取到餐WaitPerson got meal 6
厨师说上菜了order up!
服务器取到餐WaitPerson got meal 7
厨师说上菜了order up!
服务器取到餐WaitPerson got meal 8
厨师说上菜了order up!
服务器取到餐WaitPerson got meal 9
out of food, closing。厨师只做10个菜关闭线程池
厨师说上菜了order up!
WaitPerson interrupted服务员线程中断
chef interrupted
*/
2、代码解说 Restraurant是 WaitPerson和Chef的焦点作为连接两者的桥梁。他们知道在为哪个Restraurant工作因为他们必须和这家饭店打交道以便放置或拿取膳食。2.1、干货再次提问如果在等待一个订单一旦你被唤醒这个订单就必定是可以获得的吗 答案不是的。因为在并发应用中某个其他的任务可能会在WaitPerson被唤醒时会突然插足并拿走订单唯一安全的方式是使用下面这种惯用的wait() 方法来保证在退出等待循环前条件将得到满足。如果条件不满足还可以确保你可以重返等待状态。 while(conditionIsNotMet) { wait(); } 2.2、shutdownNow()将向所有由 ExecutorService启动的任务发送 interrupt信号。但是在Chef中任务并没有在获得该interrupt信号后立即关闭因为当任务试图进入一个可中断阻塞操作时 这个中断只能抛出 InterruptException。然后当 Chef 试图调用sleep()时抛出了 InterruptedException。如果移除对sleep()的调用那么这个任务将回到run()循环的顶部并由于Thread.interrupted() 测试而退出同时并不抛出异常。
3、使用显式的Lock和 Condition 对象 使用互斥并允许任务挂起的基本类是 Condition调用Condition的await() 可以挂起一个任务调用signal() 可以唤醒一个任务调用signalAll() 可以唤醒所有在这个Condition上被其自身挂起的任务。 干货——与notifyAll()相比signalAll()方法是更安全的方式
/*** page 711 * 使用显式的Lock 和 Condition对象 */
public class WaxOMatic2 {public static void main(String[] args) throws InterruptedException {Car2 car new Car2(); ExecutorService executorService Executors.newCachedThreadPool();executorService.execute(new WaxOff2(car)); // 抛光executorService.execute(new WaxOn2(car)); // 打蜡 TimeUnit.SECONDS.sleep(1); // 睡眠5秒 executorService.shutdownNow(); }
}
class Car2 {private Lock lock new ReentrantLock(); // 可重入锁 private Condition condition lock.newCondition(); // 获取锁的条件 private boolean waxOn false; // 期初时没有上蜡public void waxed() { // 上蜡 lock.lock(); // 加锁 try { waxOn true; // 上蜡完成 condition.signalAll(); // 唤醒所有等待线程 } finally {lock.unlock(); // 解锁 }}public void buffed() { // 抛光 lock.lock(); // 加锁 try {waxOn false; // 抛光完成待上蜡 condition.signalAll();} finally {lock.unlock(); // 解锁 }}public void waitForWaxing() throws InterruptedException { // 等待上蜡 lock.lock();try {while(waxOn false) { // 还未上蜡等待上蜡condition.await(); // 挂起 }} finally {lock.unlock();}}public void waitForBuffing() throws InterruptedException { // 等待抛光 lock.lock();try {while(waxOn true) { // 上蜡完成等待抛光 condition.await(); // 挂起 }} finally {lock.unlock(); }}
}
class WaxOn2 implements Runnable { // 打蜡任务 private Car2 car ; public WaxOn2(Car2 c) {this.car c; }Overridepublic void run() {try {while(!Thread.interrupted()) {System.out.println(wax on );TimeUnit.MILLISECONDS.sleep(200);car.waxed(); // 打蜡完成 car.waitForBuffing(); // 等待抛光 }} catch(InterruptedException e ) {System.out.println(WaxOn2 exiting via interrupt);}System.out.println(WaxOn2 ending wax on task);}
}
class WaxOff2 implements Runnable {// 打蜡结束开始抛光任务 private Car2 car; public WaxOff2(Car2 c) {this.car c; }Overridepublic void run() {try {while(!Thread.interrupted()) {car.waitForWaxing(); // 等待打蜡 System.out.println(wax off);TimeUnit.MILLISECONDS.sleep(200);car.buffed(); // 抛光完成 }} catch(InterruptedException e ) {System.out.println(WaxOff2 exiting via interrupt);}System.out.println(WaxOff2 ending wax off task);}
}
/*
wax on
wax off
wax on
wax off
wax on
WaxOff2 exiting via interrupt
WaxOff2 ending wax off task
WaxOn2 exiting via interrupt
WaxOn2 ending wax on task
*/
代码解说每个对lock()的调用都必须紧跟一个try-finally子句用来保证在所有情况下都可以释放锁。在使用内建版本时任务在可以调用 await() signal(), signalAll() 方法前必须拥有这个锁。干货——不推荐使用Lock和Condition对象来控制并发使用Lock和Condition对象来控制并发比较复杂只有在更加困难的多线程问题中才使用他们
【21.5.4】生产者与消费者队列 1、wait()和notifyAll() 是一种低级的方式来解决任务协作问题也可以使用同步队列这种高级方式来解决同步队列在任何时刻都只允许一个任务插入或移除元素。 2、同步队列 BlockingQueue两个实现LinkedBlockingQueue无界队列 ArrayBlockingQueue-固定尺寸放置有限数量的元素 3、若消费者任务试图从队列中获取元素而该队列为空时队列可以挂起消费者任务让其阻塞并且当有更多元素可用时队列可以唤醒消费者任务。 阻塞队列可以解决非常多的问题且比 wait()与notifyAll()简单得多。 【看个荔枝】
/*** 阻塞队列 * page 714 */
public class TestBlockingQueues {static void getKey() {try {// 从控制台读入用户输入new BufferedReader(new InputStreamReader(System.in)).readLine(); } catch (IOException e) {throw new RuntimeException(e); }}static void getKey(String msg) {System.out.println(msg);getKey(); }static void test(String msg, BlockingQueueLiftOff queue) {System.out.println(msg);LiftOffRunner runner new LiftOffRunner(queue);Thread t new Thread(runner);t.start();for (int i0; i3; i) {runner.add(new LiftOff(3)); // 添加5个发射任务到阻塞队列 } getKey(press enter msg);t.interrupt(); // 线程中断 System.out.println(finished msg test);}public static void main(String[] args) {test(LinkedBlockingQueue, new LinkedBlockingQueueLiftOff()); // 链表阻塞队列无界 test(ArrayBlockingQueue, new ArrayBlockingQueueLiftOff(3)); // 数组阻塞队列固定长度 test(SynchronousQueue, new SynchronousQueueLiftOff()); // 同步队列 }
}
// lift off 发射起飞
class LiftOffRunner implements Runnable {private BlockingQueueLiftOff rockets; // 阻塞队列火箭队列public LiftOffRunner(BlockingQueueLiftOff queue) {this.rockets queue; }public void add(LiftOff lo) { // LiftOff 发射起飞任务 try {rockets.put(lo); // 往队列里面放入 发射起飞任务 } catch (InterruptedException e) {System.out.println(interupted during put());}}Overridepublic void run() {try {while(!Thread.interrupted()) {LiftOff rocket rockets.take(); // 从队列中取出任务运行没有任务则阻塞 rocket.run(); }} catch (InterruptedException e) {System.out.println(waking from task());}System.out.println(exiting LiftOffRunner); }
}
/*
LinkedBlockingQueue
press enter LinkedBlockingQueue
#0(2),
#0(1),
#0(liftoff),
#1(2),
#1(1),
#1(liftoff),
#2(2),
#2(1),
#2(liftoff), finished LinkedBlockingQueue test
waking from task()
exiting LiftOffRunner
ArrayBlockingQueue
press enter ArrayBlockingQueue
#3(2),
#3(1),
#3(liftoff),
#4(2),
#4(1),
#4(liftoff),
#5(2),
#5(1),
#5(liftoff), finished ArrayBlockingQueue test
waking from task()
exiting LiftOffRunner
SynchronousQueue
#6(2),
#6(1),
#6(liftoff),
#7(2),
#7(1),
#7(liftoff),
#8(2),
press enter SynchronousQueue
#8(1),
#8(liftoff), finished SynchronousQueue test
waking from task()
exiting LiftOffRunner*/
【吐司BlockingQueue】 1、一台机器有3个任务 一个制作吐司一个给吐司抹黄油另一个在抹过黄油的吐司上涂果酱
/*** 吐司制作程序-* 一台机器有3个任务第1制作吐司 第2抹黄油第3涂果酱阻塞队列-LinkedBlockingQueue * page 715 */
public class ToastOMatic {public static void main(String[] args) throws Exception {ToastQueue dryQueue new ToastQueue(); // 烘干的吐司队列 ToastQueue butterQueue new ToastQueue(); // 涂黄油的吐司队列 ToastQueue finishQueue new ToastQueue(); // 制作完成的吐司队列 /* 线程池 */ExecutorService exec Executors.newCachedThreadPool();exec.execute(new Toaster(dryQueue)); // 吐司exec.execute(new Butterer(dryQueue, butterQueue)); // 黄油 exec.execute(new Jammer(butterQueue, finishQueue)); // 果酱 exec.execute(new Eater(finishQueue)); // 吃 TimeUnit.SECONDS.sleep(1);exec.shutdownNow(); // 发出中断信号 线程报 InterruptedException 中断异常 , 所有线程均结束exec.shutdown(); }
}
class Toast { // 吐司类 public enum Status{DRY, BUTTERED, JAMMED}; // 枚举类 dry-烘干 butter-黄油jam-果酱 private Status status Status.DRY; private final int id ;public Toast(int id) { // 编号 this.id id; }public void butter() { // 抹黄油结束 status Status.BUTTERED;}public void jam() { // 涂果酱结束 status Status.JAMMED;}public Status getStatus() {return status; }public int getId() {return id; }public String toString() {return toast id : status; }
}
class ToastQueue extends LinkedBlockingQueueToast {} // 吐司队列 class Toaster implements Runnable { // 第1个工序 做吐司 private ToastQueue toastQueue; // 吐司队列 private int count 0; // 计数器 private Random rand new Random(47);public Toaster(ToastQueue toastQueue) {this.toastQueue toastQueue; } Override public void run() {try {while(!Thread.interrupted()) { // 只要任务不中断 TimeUnit.MILLISECONDS.sleep(100 rand.nextInt(500));Toast t new Toast(count) ; System.out.println(t);toastQueue.put(t); // 往队列添加吐司 }} catch (InterruptedException e) {System.out.println(toaster interrupted);}System.out.println(toaster off); // 吐司制作完成 }
}
class Butterer implements Runnable { // 第2个工序黄油 private ToastQueue dryQueue, butterQueue;public Butterer(ToastQueue dryQueue, ToastQueue butterQueue) { // 已烘干吐司队列 已抹黄油的吐司队列 this.dryQueue dryQueue;this.butterQueue butterQueue; }Overridepublic void run() {try {while(!Thread.interrupted()) {Toast t dryQueue.take(); // 获取烘干的吐司 t.butter(); // 抹黄油 System.out.println(t);butterQueue.put(t); // 往黄油队列添加 }} catch (InterruptedException e) {System.out.println(butterer interrupted );}System.out.println(butterer off);}
}
class Jammer implements Runnable { // 第3个工序涂果酱 private ToastQueue butterQueue, finishQueue; public Jammer(ToastQueue butterQueue, ToastQueue finishQueue) {this.butterQueue butterQueue;this.finishQueue finishQueue;}Overridepublic void run() {try {while(!Thread.interrupted()) {Toast t butterQueue.take(); // 从抹黄油队列中获取吐司 t.jam(); // 涂果酱 System.out.println(t);finishQueue.put(t); // 添加到完成队列 }} catch (InterruptedException e ) {System.out.println(jammer interrupted);}System.out.println(jammer off); // 涂果酱完成 }
}
class Eater implements Runnable { // 消费者吃吐司 private ToastQueue finishQueue; private int counter 0;public Eater(ToastQueue finishQueue) {this.finishQueue finishQueue;}Overridepublic void run() {try {while(!Thread.interrupted()) {Toast t finishQueue.take(); // 从吐司制作完成队列中获取吐司 if (t.getId() ! counter || t.getStatus() ! Toast.Status.JAMMED) {System.out.println( Error: t);System.exit(1);} else {System.out.println(chomp ! t); // chomp-大声咀嚼吃吐司 }}} catch (InterruptedException e) {System.out.println(eater interrupted);}System.out.println(eat off); // 吃饱回家 }
}
/*toast 0 : DRY
toast 0 : BUTTERED
toast 0 : JAMMED
chomp !toast 0 : JAMMED
toast 1 : DRY
toast 1 : BUTTERED
toast 1 : JAMMED
chomp !toast 1 : JAMMED
toast 2 : DRY
toast 2 : BUTTERED
toast 2 : JAMMED
chomp !toast 2 : JAMMED
eater interrupted
eat off
toaster interrupted
toaster off
butterer interrupted
butterer off
jammer interrupted
jammer off* */
【21.5.5】任务间使用管道进行输入输出 1、通过输入输出在线程间通信很常用。提供线程功能的类库以管道的形式对线程间的输入输出提供了支持分别是PipedWriter和PipedReader类分别允许任务向管道写和允许不同任务从同一个管道读取。 这种模式可以看做是 生产者-消费者问题的变体管道就是一个封装好了的解决方案。管道可以看做是一个阻塞队列存在于多个引入 BlockingQueue之间的java版本中。
/*** 任务间使用管道进行输入输出 * page 718 */
public class PipedIO {public static void main(String[] args) throws Exception {Sender sender new Sender(); // 发送器Receiver receiver new Receiver(sender); // 接收器 ExecutorService exec Executors.newCachedThreadPool(); // 线程池 exec.execute(sender); // 发送 exec.execute(receiver); // 接收 TimeUnit.SECONDS.sleep(3);exec.shutdown(); // 关闭线程池 }
}
//发送者任务
class Sender implements Runnable { private Random rand new Random(47); // 随机数 private PipedWriter out new PipedWriter(); // 管道输出对象 public PipedWriter getPipedWriter() {return out; }Overridepublic void run() {try {while(true) {for (char c A; c z; c) {out.write(c); // 把字符输出到管道 TimeUnit.MILLISECONDS.sleep(rand.nextInt(500));}}} catch (IOException e) {System.out.println(\n e sender write exception );} catch (InterruptedException e2) {System.out.println(\n e2 sender sleep interrupted. );}}
}
// 接收者任务
class Receiver implements Runnable {private PipedReader in ; public Receiver(Sender sender) throws IOException {in new PipedReader(sender.getPipedWriter()); // 管道输入对象 }Overridepublic void run() {try {while(true ) {System.out.print(read: (char)in.read() , );// 从管道读取数据 }} catch (IOException e ) {System.out.println(\n e receiver read exception.);}}
}
代码解说1 当Receiver调用read() 方法时如果没有更多的数据管道将自动阻塞 补充1注意sender和receiver是在main()中启动的即对象构造彻底完成以后。如果你启动一个没有构造完成的对象在不同的平台上管道可能会产生不一致的行为。BlockingQueue使用起来更加健壮且容易干货 补充2在shudownNow() 被调用时PipedReader与普通IO之间的区别是PipiedReader是可以中断的。 如果将 in.read() 修改为System.in.read() 那么interrupt调用将不能打断read()调用。 干货
【21.6】死锁
1、作为哲学家他们很穷只能买5根筷子通俗讲筷子数量与哲学家数量相同他们坐在桌子周围每人之间放一根筷子当一个哲学家要就餐的时候这个哲学家必须同时拥有左边和右边的筷子。 如果一个哲学家左边或右表已经有人在使用筷子了那么这个哲学家就必须等待直至可以得到必须的筷子。 【代码-Chopstick】
/*** 死锁-筷子* page719 */
public class Chopstick {private boolean taken false;// 拿起筷子 public synchronized void take() throws InterruptedException {while (taken) {wait();}taken true; }// 放下筷子 public synchronized void drop() {taken false; notifyAll(); }
}
【代码-Philosopher】
/*** 哲学家* page 718 */
public class Philosopher implements Runnable {private Chopstick left; // 左筷private Chopstick right; // 右筷 private int id 1; // 编号 private int ponderFactor 0; // 思考因素 private Random rand new Random(47); // 随机数发生器 private void pause() throws InterruptedException { // 暂停 if (ponderFactor 0) return ; TimeUnit.MILLISECONDS.sleep(rand.nextInt(ponderFactor * 250)); // 睡眠 }public Philosopher(Chopstick left, Chopstick right, int ident, int ponder) {this.left left; this.right right; this.id ident; this.ponderFactor ponder; } Overridepublic void run() {try {while (!Thread.interrupted()) {System.out.println(this thinking); // 思考 pause(); // 暂停 System.out.println(this grabbing right); // 拿走右边筷子 right.take(); System.out.println(this grabbing left); // 拿走左边筷子 left.take();System.out.println(this eating); // 吃饭 pause(); // 暂停 right.drop(); //放下右边筷子left.drop(); //放下左边筷子 }} catch (InterruptedException e) {System.out.println(this exiting via interrupt. ); }}Overridepublic String toString() {return Philosopher-哲学家 id; }
}
【代码-DeadLockDiningPhilosophers】
/*** 发生死锁的哲学家晚餐* page720 */
public class DeadLockDiningPhilosophers {public static void main(String[] args) throws Exception {int ponder 0; // 把 ponder-思考时间 调整为0发生死锁 int size 5; ExecutorService exec Executors.newCachedThreadPool(); // 线程池 Chopstick[] sticks new Chopstick[size]; // 筷子数组 for (int i 0; i sticks.length; i) {sticks[i] new Chopstick(); // 初始化数组 }for (int i 0; i sticks.length; i) {exec.execute(new Philosopher(sticks[i], sticks[(i1)%size], i1, ponder)); // 执行哲学家任务 }System.out.println(press enter to quit);System.in.read(); exec.shutdownNow(); }
}
/** 死锁发生了
Philosopher-哲学家2 thinking
Philosopher-哲学家4 thinking
press enter to quit
Philosopher-哲学家1 thinking
Philosopher-哲学家3 thinking
Philosopher-哲学家3 grabbing right
Philosopher-哲学家1 grabbing right
Philosopher-哲学家5 thinking
Philosopher-哲学家4 grabbing right
Philosopher-哲学家4 grabbing left
Philosopher-哲学家2 grabbing right
Philosopher-哲学家5 grabbing right
Philosopher-哲学家1 grabbing left
Philosopher-哲学家3 grabbing left
Philosopher-哲学家5 grabbing left
Philosopher-哲学家2 grabbing left
*/ 代码解说如果philosopher花费更多的时间思考而不是进餐ponder值越大思考时间越长那么他们请求共享资源的可能性就会小许多这样你就会确信该程序不会死锁尽管他们并非如此。
2、死锁发生条件 当以下4个条件同事满足时死锁发生干货——死锁发生的4个条件同时满足 条件1互斥条件。 任务使用的资源至少有一个是不能共享的这里一根筷子一次就只能被一个哲学家使用 条件2有任务请求被其他任务占用的共享资源。至少有一个任务它必须持有一个资源且正在等待获取一个当前被别的任务持有的资源即要发生死锁哲学家必须拿着一根筷子且等待另一根 条件3资源不能被任务抢占。任务必须把资源释放当做普通事件即哲学家不会从其他哲学家那里抢筷子 条件4必须有循环等待。一个任务等待其他任务所持有的资源后者又在等待另一个任务所持有的资源这样一直下去直到有一个任务在等待第一个任务所持有的资源使得大家都被锁住。 在 DeadLockDiningPhilosophers 程序中每个哲学家都试图先得到右边的筷子然后得到左边的筷子所以发生了循环等待
3、要防止死锁只需要破坏其中一个条件即可。最容易的方法是破坏第4个条件。 因为每个哲学家都先拿右边筷子后拿左边筷子。 如果最后一个哲学家先拿左边筷子后拿右边筷子那么这个哲学家将永远不会阻止其右边的哲学家拿起筷子。即破坏了第4个条件。
【21.7】新类库中的构件 【21.7.1】 CountDownLatch 1、作用被用来同步一个或多个任务强制他们等待由其他任务执行的一组操作完成 2、向CountDownLatch 对象设置一个初始值任何在这个对象上调用wait() 方法都将阻塞直到这个计数值到达0调用 countDown()来减小这个计数值 3、CountDownLatch 只能被初始化一次或触发一次计数值不能被重置。如果需要重置使用 CyclicBarrier 4、典型用法将一个程序分为n个相互独立的可解决任务并创建值为0的 CountDownLatch当每个任务完成时都会在这个锁上调用 await方法将自己拦住直到锁存器计数为0结束 【代码-CountDownLatchDemo】
/*** count down-倒计时* latch-锁存器 * page723 */
public class CountDownLatchDemo {static final int size 5;public static void main(String[] args) {ExecutorService exec Executors.newCachedThreadPool();CountDownLatch latch new CountDownLatch(size); // 倒计时锁存器 for (int i 0; i size; i) {exec.execute(new WaitingTask(latch)); // 运行10个等待任务直到锁存器计数值为0 }for (int i 0; i size; i) {exec.execute(new TaskPortion(latch)); // 运行任务使得锁存器计数值递减 }System.out.println(launched all tasks); // 启动所有任务 exec.shutdown(); // 待所有线程执行完成则线程池自动关闭 }
}
class TaskPortion implements Runnable { // 任务部分 private static int counter 0; // 计数器private final int id counter;private static Random rand new Random(47);private final CountDownLatch latch; // 递减锁存器 public TaskPortion(CountDownLatch latch) {this.latch latch ; }Overridepublic void run() {try {this.doWork(); // 做任务-睡眠 latch.countDown(); // 减少锁存器计数 } catch (InterruptedException e ) {System.out.println(this interrupted); } }public void doWork() throws InterruptedException {TimeUnit.MILLISECONDS.sleep(rand.nextInt(2000));System.out.println(this completed );}public String toString() { return String.format(%1$-3d, id);}
}
class WaitingTask implements Runnable { // 等待任务 private static int counter 0;private final int id counter;private final CountDownLatch latch; // 递减式锁存器public WaitingTask(CountDownLatch latch) {this.latch latch;}Overridepublic void run() {try {latch.await(); // 使得当前线程等待或把自己拦住直到锁存器latch计数减至0 或者本线程中断 System.out.println(通过锁存器障碍-latch barrier passed for this);} catch (InterruptedException e ) {System.out.println(this interrupted);}}Override public String toString() {return String.format(waiting task %1$-3d, id); }
}
/*launched all tasks
1 completed
2 completed
4 completed
0 completed
3 completed
通过锁存器障碍-latch barrier passed for waiting task 2
通过锁存器障碍-latch barrier passed for waiting task 4
通过锁存器障碍-latch barrier passed for waiting task 3
通过锁存器障碍-latch barrier passed for waiting task 1
通过锁存器障碍-latch barrier passed for waiting task 0
*/
【21.7.2】CyclicBarrier 1、定义可以将锁存器计数重置 2、应用场景创建一组任务并行地执行工作然后再进行下一个步骤之前等待直到所有任务都完成。它使得所有的并行任务都讲在栅栏处列队等待可以一致向前移动 3、区别 CountDownLatch 只能使用一次而 CyclicBarrier 可以循环重复使用
/*** 循环障碍-赛马比赛 * page724*/
public class HorseRace {static final int FINISH_LINE 10; private ListHorse horses new ArrayList(); private ExecutorService exec Executors.newCachedThreadPool(); // 线程池 private CyclicBarrier barrier; public HorseRace(int number, final int pause) {// 当有number个线程等待时barrier就会断开。当其断开时给定的任务就会执行// 由最后一个进入 barrier的线程提供资源执行。 barrier new CyclicBarrier(number, new Runnable() { Override public void run() { // 这个任务会多次执行因为 当有number个线程等待时会发生多次 StringBuilder builder new StringBuilder(); for (int i 0; i FINISH_LINE; i) {builder.append(); // 追加字符串 }System.out.println(builder);for (Horse horse : horses) {System.out.println(horse.tracks()) ; // 调用跟踪方法 } for (Horse horse2 : horses) { // 遍历任务 if (horse2.getStrides() FINISH_LINE) { System.out.println(horse2 own !); exec.shutdownNow(); // 这里会终止掉所有线程的执行 return ;}}try {TimeUnit.MILLISECONDS.sleep(pause);} catch (InterruptedException e) {System.out.println(barrier-action sleep interrupted);}} }); for (int i 0; i number; i) {Horse horse new Horse(barrier);horses.add(horse);exec.execute(horse); // 执行赛马任务 }}public static void main(String[] args) {int number 5; // 5个马 int pause 100;// 暂停时间 new HorseRace(number, pause); }
}
class Horse implements Runnable { // 赛马任务 private static int counter 0; // 计数器 private final int id counter; // 编号 private int strides 0; // 步数 private static Random rand new Random(47);private static CyclicBarrier barrier; // 栅栏 public Horse(CyclicBarrier cyclicBarrier) {this.barrier cyclicBarrier;}public synchronized int getStrides () {return strides; }Overridepublic void run() {try {while(!Thread.interrupted()) {synchronized(this) {strides rand.nextInt(3); // 步数自加 } barrier.await(); // 当前线程等待直到所有线程在该 barrier等待为止 } } catch (InterruptedException e) {throw new RuntimeException(e);} catch (BrokenBarrierException e2) {throw new RuntimeException(e2);} }Override public String toString() {return horse id ; }public String tracks() { // 跟踪方法 StringBuilder builder new StringBuilder();for (int i 0; i getStrides(); i) {builder.append(*); }builder.append(id); return builder.toString(); }
}
/*builder
**0
**1
*2
**3
*4
builder
***0
****1
***2
**3
**4
builder
***0
*****1
***2
****3
**4
builder
*****0
******1
****2
*****3
***4
builder
*******0
*******1
****2
*****3
****4
builder
*******0
*********1
******2
******3
*****4
builder
********0
**********1
******2
******3
******4
horse 1 own !
*/
【21.7.3】 DelayQueue 1、定义无界的阻塞队列 BlockingQueue用于放置实现了 Delayed接口的对象其中的对象只能在其到期时才能从队列中取走 2、这种队列是有序的即队头对象的延迟到期的时间最长。如果没有任何延迟到期那么就不会有任何头元素并且poll() 方法将返回null所以不能将null放置到该队列中 【代码-DelayQueueDemo】
/*** 阻塞队列演示* page 726 */
public class DelayQueueDemo {public static void main(String[] args) {Random rand new Random(47);ExecutorService exec Executors.newCachedThreadPool(); // 线程池 DelayQueueDelayedTask queue new DelayQueue(); // 延迟队列 for (int i 0; i 5; i) {queue.put(new DelayedTask(rand.nextInt(5000))); // 往队列添加延迟任务延迟时间为5000内的随机数小于5000毫秒 }queue.add(new DelayedTask.EndSentinel(5000, exec)); // 再添加一个延迟任务该任务负责关闭线程池延迟时间为5000毫秒exec.execute(new DelayedTaskConsumer(queue)); // 执行任务 }
}
class DelayedTask implements Runnable, Delayed {private static int counter 0; // 计数器 private final int id counter;private final int delta; // 三角洲 private final long trigger; // 触发器 protected static ListDelayedTask sequence new ArrayList(); // 列表 public DelayedTask(int delayInMilliseconds) {delta delayInMilliseconds; // 延迟毫秒数 trigger System.nanoTime() TimeUnit.NANOSECONDS.convert(delta, TimeUnit.MILLISECONDS); // 时间单位转换有精度丢失 sequence.add(this);}Override public long getDelay(TimeUnit unit) { // 获取延迟时间 return unit.convert(trigger - System.nanoTime(), TimeUnit.NANOSECONDS);}Overridepublic int compareTo(Delayed o) { // 时间比较 DelayedTask that (DelayedTask) o; if (trigger that.trigger) return -1; if (trigger that.trigger) return 1; return 0;}Overridepublic void run() {System.out.println(this );}Overridepublic String toString() {return String.format([%1$-4d], delta) task id ; }public String summary() { return ( id : delta );}public static class EndSentinel extends DelayedTask { // 哨兵 private ExecutorService exec; public EndSentinel(int delay, ExecutorService exec) {super(delay);this.exec exec; }Overridepublic void run() { for (DelayedTask task : sequence) { // 获取每个延迟任务 System.out.println(task.summary() task.summary());}System.out.println(this calling shutdownNow() 立即关闭线程池-关闭所有线程);exec.shutdownNow(); // 关闭线程池 }}
}
class DelayedTaskConsumer implements Runnable { // 延迟任务消费者 private DelayQueueDelayedTask queue; public DelayedTaskConsumer(DelayQueueDelayedTask queue) {this.queue queue; }Overridepublic void run() {try {while(!Thread.interrupted()) {queue.take().run(); // 获取队列任务并运行 }} catch (InterruptedException e) {System.out.println(DelayedTaskConsumer interrupted);}System.out.println(finish DelayedTaskConsumer);}
}
/*[555 ] task 1
[961 ] task 4
[1693] task 2
[1861] task 3
[4258] task 0
task.summary() (0: 4258)
task.summary() (1: 555)
task.summary() (2: 1693)
task.summary() (3: 1861)
task.summary() (4: 961)
task.summary() (5: 5000)
[5000] task 5 calling shutdownNow() 立即关闭线程池-关闭所有线程
finish DelayedTaskConsumer
*/
代码解说 上述控制台输出信息为
[555 ] task 1 [961 ] task 4 [1693] task 2 [1861] task 3 [4258] task 0 task.summary() (0: 4258) task.summary() (1: 555) task.summary() (2: 1693) task.summary() (3: 1861) task.summary() (4: 961) task.summary() (5: 5000) 小结1其中 555 是最小的延迟时间即 DelayedTaskConsumer 将最紧急的任务从队列中取出然后运行它 小结2在 DelayedQueue中 任务创建顺序与执行没有关系任务是按照所期望的延迟顺序来执行的 如上 task1 最先执行但其是第2个创建的任务task.summary 是从 sequence取值的sequence记录了创建顺序
【21.7.4】 PriorityBlockingQueue 优先级阻塞队列 1、定义其执行顺序是按照优先级顺序来执行的 【代码-PriorityBlockQueueDemo】
/*** 优先级阻塞队列演示* page728 */
public class PriorityBlockQueueDemo {public static void main(String[] args) {ExecutorService exec Executors.newCachedThreadPool(); // 线程吃 PriorityBlockingQueueRunnable queue new PriorityBlockingQueue(); // 优先级阻塞队列 exec.execute(new PrioritizedTaskProducer(queue, exec)); // 任务生产者 -创建41个任务exec.execute(new PrioritizedTaskConsumer(queue));}
}
// 优先级任务
class PrioritizedTask implements Runnable, ComparablePrioritizedTask {private Random rand new Random(47);private static int counter 0;private final int id counter; // 计数器 private int priority 0; // 优先级 protected static ListPrioritizedTask sequence new ArrayList(); public PrioritizedTask(int priority) {this.priority priority; sequence.add(this); }Overridepublic int compareTo(PrioritizedTask o) { // 比较 // 值越大优先级越高越先执行 return priority o.priority ? 1 : (priority o.priority ? -1 : 0) ;}Override public void run() { try {TimeUnit.MILLISECONDS.sleep(rand.nextInt(250)); // 做任务就是睡眠 } catch (InterruptedException e) {System.out.println(PrioritizedTask Interrupted); }System.out.println(this); }Overridepublic String toString() {return String.format(toString() 线程优先级[%1$-3d], priority) task-线程编号- id; }public String summary() {return (summary() 线程编号 id : 线程优先级 priority ); }public static class EndSentinel extends PrioritizedTask { // 哨兵任务 private ExecutorService exec; public EndSentinel(ExecutorService exec) {super(-1); // 优先级为-1 值越小越后执行 this.exec exec; }Overridepublic void run() {int count 0;for (PrioritizedTask task : sequence) { // 遍历每个任务打印任务详情 System.out.println(task.summary());if (count % 5 0) {System.out.println( --------------------我是换行符-------------------- );}} System.out.println();System.out.println(this calling shutdownNow() 关闭线程池 );this.exec.shutdownNow(); // 关闭所有任务 }}
}
// 任务生产者 -创建16个任务
class PrioritizedTaskProducer implements Runnable {private Random rand new Random(47);private QueueRunnable queue; // 任务队列 优先级队列 private ExecutorService exec; // 线程池 public PrioritizedTaskProducer(QueueRunnable queue, ExecutorService exec) {this.queue queue; this.exec exec; }Overridepublic void run() { // 任务生产者 for (int i 0; i 10; i) { queue.add(new PrioritizedTask(rand.nextInt(10))); // 往队列添加任务优先级小于10比10先执行 Thread.yield(); // 当前线程让出cpu 时间片 }try {for (int i 0; i 3; i) {TimeUnit.MILLISECONDS.sleep(250); // 当前线程睡眠 queue.add(new PrioritizedTask(10)); // 再往队列添加3个任务 其优先级为10已添加13个}for (int i 0; i 2; i) {queue.add(new PrioritizedTask(i)); // 再添加2个任务其优先级为i已添15个}// 添加任务EndSentinel该任务会遍历每个任务打印任务详情并会关闭线程池queue.add(new PrioritizedTask.EndSentinel(exec));} catch (InterruptedException e) {System.out.println(PrioritizedTaskProducer interrupted);}System.out.println(finish PrioritizedTaskProducer);}
}
// 任务消费者
class PrioritizedTaskConsumer implements Runnable {private PriorityBlockingQueueRunnable queue; // PriorityBlockingQueue-优先级阻塞队列 public PrioritizedTaskConsumer(PriorityBlockingQueueRunnable queue) {this.queue queue; }Override public void run() {try {while(!Thread.interrupted()) {// 从优先级队列中取出任务并执行 PrioritizedTask优先级任务 的优先级值越大优先级越高越先执行 queue.take().run(); }} catch (InterruptedException e) {System.out.println(PrioritizedTaskConsumer interrupted);}System.out.println(finish PrioritizedTaskConsumer);}
}
/*
toString() 线程优先级[9 ] task-线程编号- 5
toString() 线程优先级[8 ] task-线程编号- 0
toString() 线程优先级[8 ] task-线程编号- 6
toString() 线程优先级[7 ] task-线程编号- 9
toString() 线程优先级[5 ] task-线程编号- 1
toString() 线程优先级[3 ] task-线程编号- 2
toString() 线程优先级[2 ] task-线程编号- 8
toString() 线程优先级[1 ] task-线程编号- 4
toString() 线程优先级[1 ] task-线程编号- 3
toString() 线程优先级[0 ] task-线程编号- 7
toString() 线程优先级[10 ] task-线程编号- 10
toString() 线程优先级[10 ] task-线程编号- 11
finish PrioritizedTaskProducer
toString() 线程优先级[10 ] task-线程编号- 12
toString() 线程优先级[1 ] task-线程编号- 14
toString() 线程优先级[0 ] task-线程编号- 13
(summary() 线程编号0: 线程优先级8)
(summary() 线程编号1: 线程优先级5)
(summary() 线程编号2: 线程优先级3)
(summary() 线程编号3: 线程优先级1)
(summary() 线程编号4: 线程优先级1)--------------------我是换行符--------------------
(summary() 线程编号5: 线程优先级9)
(summary() 线程编号6: 线程优先级8)
(summary() 线程编号7: 线程优先级0)
(summary() 线程编号8: 线程优先级2)
(summary() 线程编号9: 线程优先级7)--------------------我是换行符--------------------
(summary() 线程编号10: 线程优先级10)
(summary() 线程编号11: 线程优先级10)
(summary() 线程编号12: 线程优先级10)
(summary() 线程编号13: 线程优先级0)
(summary() 线程编号14: 线程优先级1)--------------------我是换行符--------------------
(summary() 线程编号15: 线程优先级-1)toString() 线程优先级[-1 ] task-线程编号- 15 calling shutdownNow() 关闭线程池
finish PrioritizedTaskConsumer
*/
代码解说 toString() 线程优先级[9 ] task-线程编号- 5 toString() 线程优先级[8 ] task-线程编号- 0 toString() 线程优先级[8 ] task-线程编号- 6 toString() 线程优先级[7 ] task-线程编号- 9 toString() 线程优先级[5 ] task-线程编号- 1 toString() 线程优先级[3 ] task-线程编号- 2 toString() 线程优先级[2 ] task-线程编号- 8 toString() 线程优先级[1 ] task-线程编号- 4 toString() 线程优先级[1 ] task-线程编号- 3 toString() 线程优先级[0 ] task-线程编号- 7 根据输出信息可以看出优先级高的线程先执行其执行顺序与线程创建顺序无关
【21.7.5】使用 ScheduledExecutor的温室控制器 干货——ScheduledExecutor计划调度器可用于系统后台的周期性或定时跑批如每日凌晨跑批采用cron 表达式 1、背景每个期望的温室事件都是一个在预定时间运行的任务。ScheduledThreadPoolExecutor 提供了解决该问题的服务。 2、如何解决通过使用schedule() 方法运行一次任务或者 scheduleAtFixedRate()每隔规则的时间重复执行任务你可以将Runnable对象设置为在将来的某个时刻执行。 【代码-GreenHouseScheduler】
/*** 温室调度器定时调度非常重要 * page 730 */
public class GreenHouseScheduler {private volatile boolean light false; private volatile boolean water false;private String thermostat day; // 调温器-day-白天 ScheduledThreadPoolExecutor scheduler new ScheduledThreadPoolExecutor(10); // 调度线程池/*** 创建并执行一次操作该操作在给定的延迟后变为启用状态。*/public void schedcule(Runnable event, long delay) { // 调度方法 scheduler.schedule(event, delay, TimeUnit.MILLISECONDS); }public void repeat(Runnable event, long initialDelay, long period) { // 重复执行任务 /*创建并执行一个周期性操作该操作将在给定的初始延迟后首先启用然后在给定的周期内启用 即执行将在initialDelayinitialDelay periodinitialDelay 2 *期间等之后开始如果任务的任何执行遇到异常则后续执行将被抑制否则该任务将仅通过执行者的取消或终止而终止 。如果此任务的任何执行花费的时间超过其周期则后续执行可能会延迟开始但不会同时执行。*/scheduler.scheduleAtFixedRate(event, initialDelay, period, TimeUnit.MILLISECONDS); } class LightOn implements Runnable { // 开灯任务 Overridepublic void run() {light true; System.out.println(turning on lights);}}class LightOff implements Runnable {// 关灯灯任务Overridepublic void run() {light false; System.out.println(turning off lights);}}class WaterOn implements Runnable { // 浇水任务 Overridepublic void run() {water true; System.out.println(turning on water);}}class WaterOff implements Runnable { // 停止浇水任务 Overridepublic void run() {water false; System.out.println(turning off water);}}class ThermostatNight implements Runnable { // 把温度调整为夜晚模式 Overridepublic void run() {System.out.println(Thermostat to night setting );setThermostat(night);}}class ThermostatDay implements Runnable { // 把温度调整为夜晚白昼模式 Overridepublic void run() {System.out.println(Thermostat to day setting );setThermostat(day);}}class Bell implements Runnable { // 钟声响起任务 Overridepublic void run() {System.out.println(bing); }}class Terminate implements Runnable { // 终止任务关闭线程池 Overridepublic void run() {System.out.println(terminating);scheduler.shutdownNow(); // 关闭线程池 // 声明 同步列表 data // ListDataPoint data Collections.synchronizedList(new ArrayListDataPoint()); new Thread() {public void run() {for (DataPoint d : data) { // 遍历并打印数据点 System.out.println(d);}}}.start(); }}static class DataPoint { // 数据点 Calendar time; // 日期float temperature; // 温度float humidity; // 湿度 public DataPoint(Calendar time, float temperature, float humidity) {this.time time;this.temperature temperature;this.humidity humidity;}Overridepublic String toString() {return time.getTime() String.format(, temperature: %1$.1f humidity: %2$.2f, temperature, humidity); }}private Calendar lastTime Calendar.getInstance();{lastTime.set(Calendar.MINUTE, 30);lastTime.set(Calendar.SECOND, 30); }private float lastTemp 65.0f; // 最新温度private int tempDirection 1; // 温度方向 正private float lastHumidity 50.0f; // 最新湿度 private int humidityDirection 1; // 湿度方向正 private Random rand new Random(47);/*Collections.synchronizedList - 返回由指定列表支持的同步线程安全列表。 为了保证串行访问至关重要的是对后备列表的所有访问都必须通过返回的列表来完成。*/ListDataPoint data Collections.synchronizedList(new ArrayListDataPoint());// 收集数据任务 class CollectData implements Runnable {Override public void run() {System.out.println(collecting data);synchronized (GreenHouseScheduler.this) {lastTime.set(Calendar.MINUTE, lastTime.get(Calendar.MINUTE) 30); // 设置最新时间 if (rand.nextInt(5) 4) {tempDirection * -1; // 改变温度方向 正数加 负数减 }lastTemp tempDirection * (1.0f rand.nextFloat()); // 最新温度 if (rand.nextInt(5) 4) {humidityDirection * -1; // 改变湿度方向 正数加 负数减}lastHumidity humidityDirection * rand.nextFloat(); // 计算最新湿度data.add(new DataPoint((Calendar)lastTime.clone(), lastTemp, lastHumidity)); // 添加数据信息 }}}// main public static void main(String[] args) {GreenHouseScheduler scheduler new GreenHouseScheduler(); // 温室调度器 scheduler.schedcule(scheduler.new Terminate(), 2000);// 3000毫秒后执行Terminate任务关闭线程池 scheduler.repeat(scheduler.new Bell(), 0, 1000); // 0毫秒01000毫秒010001000毫秒.... 后执行Bell任务 scheduler.repeat(scheduler.new ThermostatNight(), 0, 1000); // 0毫秒01000毫秒010001000毫秒.... 后执行ThermostatNight任务scheduler.repeat(scheduler.new LightOn(), 0, 200); // 0毫秒0200毫秒0200200.... 后执行LightOn任务scheduler.repeat(scheduler.new LightOff(), 0, 200); // 0毫秒02毫秒0200200毫秒.... 后执行LightOff任务scheduler.repeat(scheduler.new WaterOn(), 0, 200); // 0毫秒0200毫秒0200200毫秒.... 后执行WaterOn任务scheduler.repeat(scheduler.new WaterOff(), 0, 200); // 0毫秒0200毫秒0200200毫秒.... 后执行WaterOff任务scheduler.repeat(scheduler.new ThermostatDay(), 0, 200); // 0毫秒0200毫秒0200200毫秒.... 后执行ThermostatDay 任务scheduler.repeat(scheduler.new CollectData(), 500, 500); // 0毫秒0500毫秒0500500毫秒.... 后执行CollectData任务}public String getThermostat() { return thermostat;}public void setThermostat(String thermostat) {this.thermostat thermostat;}
}
/*
bing
Thermostat to night setting
turning on lights
turning off lights
turning on water
turning off water
Thermostat to day setting
turning on lights
turning off lights
turning on water
Thermostat to day setting
turning off water
turning on lights
turning on water
turning off lights
Thermostat to day setting
turning off water
collecting data
turning on lights
turning off lights
turning off water
turning on water
Thermostat to day setting
turning on lights
turning off lights
turning on water
Thermostat to day setting
turning off water
bing
Thermostat to night setting
turning on lights
turning off lights
turning off water
collecting data
turning on water
Thermostat to day setting
turning on lights
Thermostat to day setting
turning off water
turning on water
turning off lights
turning on lights
turning off water
Thermostat to day setting
turning off lights
turning on water
collecting data
turning on lights
turning off lights
turning on water
turning off water
Thermostat to day setting
turning on lights
turning on water
turning off lights
turning off water
Thermostat to day setting
bing
turning on lights
Thermostat to day setting
Thermostat to night setting
terminating
collecting data
turning off water
turning on water
turning off lights
Sun Jul 05 17:00:30 CST 2020, temperature: 66.4 humidity: 50.05
Sun Jul 05 17:30:30 CST 2020, temperature: 68.0 humidity: 50.47
Sun Jul 05 18:00:30 CST 2020, temperature: 69.7 humidity: 51.42
Sun Jul 05 18:30:30 CST 2020, temperature: 70.8 humidity: 50.87
*/
public class ScheduledThreadPoolExecutorextends ThreadPoolExecutorimplements ScheduledExecutorService
代码解说volatile和 synchronized 都得到了应用以防止任务之间的相互干涉。在持有 DataPoint的List中的所有方法都是 synchronized这是因为在List被创建时使用了 Collections工具 synchronizedList();
【21.7.6】Semaphore 1、正常的锁current.locks或 synchronized锁在任何时刻都只允许一个任务访问一项资源而计数信号量允许n个任务同时访问这个资源你还可以将信号量看做是在向外分发使用资源的许可证尽管实际上没有使用任何许可证对象。 2、看个荔枝线程池。 管理着数量有限的对象当要使用对象时可以签出他们而在用户使用完成时可以将它们签回 【代码-Pool】
/*** Pool对象池-通过信号量Semaphore来管理 Semaphore-信号量(他管理着数量有限的对象当要使用对象时可以签出他们* 在用户使用完成时将他们签回) page 733*/
public class PoolT {private int size;private ListT items new ArrayList();private volatile boolean[] checkOut;private Semaphore available; // 计数信号量public Pool(ClassT classObj, int size) {this.size size;checkOut new boolean[size];available new Semaphore(size, true);for (int i 0; i size; i) {try {items.add(classObj.newInstance());} catch (Exception e) {throw new RuntimeException(e);}}}// 如果没有任何信号量许可证可用 available将阻塞调用。public T checkOut() throws InterruptedException {// 从此信号量获取许可直到一个可用或线程中断为止将一直阻塞。available.acquire(); // 获取return getItem();}// 如果被签入的对象有效则向信号量返回一个许可证public void checkIn(T x) { // 嵌入对象if (releaseItem(x)) { // 释放对象成功// 释放许可证将其返回到信号灯。available.release(); //}}private synchronized T getItem() { // 签出对象for (int i 0; i size; i) {if (!checkOut[i]) { // 签出状态为 false则返回该对象checkOut[i] true;return items.get(i);}}return null;}private synchronized boolean releaseItem(T item) { // 释放对象int index items.indexOf(item);if (index -1)return false;if (checkOut[index]) {checkOut[index] false;return true;}return false;}
}
/*** 创建代价高昂的对象类型构造器运行起来很耗时* page 734 */
public class Fat {private volatile double d; private static int counter 0;private final int id counter;public Fat() {for (int i 0; i 10000; i) {d (Math.PI Math.E) / (double) i ;}}public void operation() {System.out.println(this);}Override public String toString() {return fat id: id; }
} /*** 信号量演示非常重要的荔枝 * page 734 * (* 一旦池中的所有对象被签出semaphore 将不允许执行任何签出操作 * blocked的run()方法因此会被阻塞 2秒钟后cancel()方法会被调用 以此来挣脱Future的束缚* )*/
public class SemaphoreDemo {final static int size 5; public static void main(String[] args) throws InterruptedException {final PoolFat pool new Pool(Fat.class, size); // 对象池通过信号量来管理 ExecutorService exec Executors.newCachedThreadPool(); // 线程池 for (int i 0; i size; i) {exec.execute(new CheckOutTaskFat(pool)); // 运行签出任务 }System.out.println(all checkout tasks created);ListFat list new ArrayList();for (int i 0; i size; i) {Fat f pool.checkOut(); // 签出对象 System.out.println(i : main() thread check out);f.operation(); // fat id: id list.add(f);}Future? blocked exec.submit(new Runnable() {Overridepublic void run() {try {pool.checkOut();// 开启单个线程签出对象 } catch (InterruptedException e) {System.out.println(checkout() interrupted. ); }}});TimeUnit.SECONDS.sleep(2); //睡眠2秒 blocked.cancel(true); // 尝试取消执行此任务 for (Fat f : list) {pool.checkIn(f);}for (Fat f : list) { // 冗余的签入将被pool 忽略 pool.checkIn(f);} exec.shutdown(); // 关闭线程池 }
}
// 创建一个任务先签出Fat对象持有一段时间后再签入以此来测试Pool这个类
class CheckOutTaskT implements Runnable { // 签出任务 private static int counter 0;private final int id counter;private PoolT pool;public CheckOutTask(PoolT pool) {this.pool pool; }Overridepublic void run() {try {T item pool.checkOut(); // 签出对象获取信号量许可证System.out.println(this checked out item);System.out.println(this checking in item);pool.checkIn(item); // 签入对象释放许可证归还给信号量 } catch (InterruptedException e) {System.out.println(CheckOutTask interrupted);}}Overridepublic String toString() {return checkout task id ; }
}
/*checkout task 1 checked out fat id: 1
checkout task 4 checked out fat id: 4
checkout task 4 checking in fat id: 4
all checkout tasks created
checkout task 3 checked out fat id: 3
checkout task 3 checking in fat id: 3
checkout task 0 checked out fat id: 0
checkout task 0 checking in fat id: 0
checkout task 2 checked out fat id: 2
checkout task 2 checking in fat id: 2
0 : main() thread check out
fat id: 4
1 : main() thread check out
checkout task 1 checking in fat id: 1
fat id: 0
2 : main() thread check out
fat id: 1
3 : main() thread check out
fat id: 2
4 : main() thread check out
fat id: 3
checkout() interrupted. */
【21.7.7】 Exchanger 交换器 1、定义Exchanger 是在两个任务之间交换对象的栅栏。当这些任务进入栅栏时各自拥有一个对象当它们离开时它们都拥有之前由对象持有的对象 2、应用场景一个任务在创建对象这些对象的生产代价很高昂而另一个任务在消费这些对象。通过这种方式可以有更多的对象在被创建的同时被消费
/*** 任务交换演示 * page 736*/
public class ExchangerDemo {static int size 5; static int delay 5; // secondspublic static void main(String[] args) throws Exception {ExecutorService exec Executors.newCachedThreadPool(); // 线程池 ExchangerListFat xc new Exchanger(); // 交换器 ListFat producerList new CopyOnWriteArrayList(); // 生产者列表 ListFat consumerList new CopyOnWriteArrayList(); // 消费者列表 exec.execute(new ExchangerProducerFat(xc, BasicGenerator.create(Fat.class), producerList));// 运行交换任务生产者 exec.execute(new ExchangerConsumerFat(xc, consumerList)); // 运行交换任务消费者 TimeUnit.MILLISECONDS.sleep(5);exec.shutdownNow(); }
}
// 任务交换生产者
class ExchangerProducerT implements Runnable {private GeneratorT generator;private ExchangerListT exchanger;private ListT holder; ExchangerProducer(ExchangerListT exchanger, GeneratorT gen, ListT holder) {this.exchanger exchanger; this.generator gen; this.holder holder; }Overridepublic void run() {try {while (!Thread.interrupted()) {for (int i 0; i ExchangerDemo.size; i) {holder.add(generator.next()); } System.out.println(producer, before exchange, holder holder);// exchange方法-等待另一个线程到达此交换点除非当前线程被中断然后将给定对象传送给它并在返回时接收其对象。holder exchanger.exchange(holder);System.out.println(producer, after exchange, holder holder);}} catch (InterruptedException e) {System.out.println(ExchangerProducer interrupted);}}
}
// 任务交换消费者
class ExchangerConsumerT implements Runnable {private ExchangerListT exchanger;private ListT holder; private volatile T value ;ExchangerConsumer(ExchangerListT ex, ListT holder) {this.exchanger ex; this.holder holder; }Overridepublic void run() {try {while(!Thread.interrupted()) {System.out.println(consumer, before exchange, holder holder);// exchange方法-等待另一个线程到达此交换点除非当前线程被中断然后将给定对象传送给它并在返回时接收其对象。holder exchanger.exchange(holder);System.out.println(consumer, after exchange, holder holder);for (T x : holder) {value x; holder.remove(x); }}} catch (InterruptedException e) {System.out.println( ExchangerConsumer interrupted);}System.out.println(final value: value);}
}
/*
consumer, before exchange, holder []
producer, before exchange, holder [fat id: 0, fat id: 1, fat id: 2, fat id: 3, fat id: 4]
producer, after exchange, holder []
consumer, after exchange, holder [fat id: 0, fat id: 1, fat id: 2, fat id: 3, fat id: 4]
consumer, before exchange, holder []
producer, before exchange, holder [fat id: 5, fat id: 6, fat id: 7, fat id: 8, fat id: 9]
producer, after exchange, holder []
consumer, after exchange, holder [fat id: 5, fat id: 6, fat id: 7, fat id: 8, fat id: 9]
consumer, before exchange, holder []
producer, before exchange, holder [fat id: 10, fat id: 11, fat id: 12, fat id: 13, fat id: 14]
producer, after exchange, holder []
consumer, after exchange, holder [fat id: 10, fat id: 11, fat id: 12, fat id: 13, fat id: 14]
consumer, before exchange, holder []
producer, before exchange, holder [fat id: 15, fat id: 16, fat id: 17, fat id: 18, fat id: 19]
producer, after exchange, holder []
consumer, after exchange, holder [fat id: 15, fat id: 16, fat id: 17, fat id: 18, fat id: 19]
consumer, before exchange, holder []ExchangerConsumer interrupted
final value: fat id: 19
producer, before exchange, holder [fat id: 20, fat id: 21, fat id: 22, fat id: 23, fat id: 24]
ExchangerProducer interrupted */
代码解说 在main方法中创建了用于两个任务的单一的Exchanger 交换器以及两个用于互换的 CopyOnWriteArrayList。这个特定的list变体允许在列表被遍历时调用remove()方法而不抛出异常 ModificationExcetpion。ExchangeProduer 填充这个list ExchangerConsumer消费这个list然后将这个满列表交换为 ExchangerConsumer传递给它的空列表。因为有了 Exchanger填充一个列表和消费另一个列表可以同时发生了。
干货——引入了CopyOnWriteArrayList允许在列表被遍历时调用remove()方法而不抛出异常 ModificationExcetpion
【21.8】仿真
【21.9】性能调优
1、比较 synchronize 与 Lock的性能
/*** page 748* 启动单个任务比较 synchronized关键字和 Lock和Atomic类的区别 */
public class SimpleMicroBenchmark {static long test(Incrementable incr) {long start System.nanoTime();for (long i0; i 100000000L; i) {incr.increment();}return System.nanoTime() - start; }public static void main(String[] args) {long synchTime test(new SynchronizingTest());long lockTime test(new LockingTest());System.out.printf(synchronized 花费多少纳秒: %1$10d \n, synchTime); // 203974196 System.out.printf(lock 花费多少纳秒: %1$10d \n, lockTime); // 164559713 System.out.printf(lock/synchronized: %1$.3f\n, lockTime/(double)synchTime); // 0.807 }
}
abstract class Incrementable {protected long counter 0;public abstract void increment();
}
// synchronized 关键字性能测试
class SynchronizingTest extends Incrementable {public synchronized void increment() {counter;}
}
// lock锁性能测试
class LockingTest extends Incrementable {private Lock lock new ReentrantLock();public void increment() {lock.lock();try {counter;} finally {lock.unlock();}}
}
/*
synchronized 花费多少纳秒: 2166063742
lock 花费多少纳秒: 1725775548
lock/synchronized: 0.797
*/
以上代码是单线程不具体代表性我们需要构建复杂程序或多个任务来测试
【代码-SynchronizationComparison】
/*** page 749 * 启动多个任务测试 synchronize lock Atomic类性能 经过验证Atomic原子类同步性能最佳 */
public class SynchronizationComparison{static BaseLine baseLine new BaseLine(); // 基线 static SynchronizedTest synch new SynchronizedTest(); // synchronize测试 static LockTest lock new LockTest(); // ReentrantLock 可重入锁测试 static AtomicTest atomic new AtomicTest(); // 原子类测试 static void test() { System.out.println(); System.out.printf(%-12s : %13d\n, Cycles, Accumulator.cycles);// 运行任务测试总时长baseLine.timedTest(); // 基本测试无任何同步方法 synch.timedTest(); // synchronize同步 lock.timedTest(); // lock 同步 atomic.timedTest(); // Atomic 类同步 // 比较两种模拟器性能 Accumulator.report(synch, baseLine); // 1.65 Accumulator.report(lock, baseLine); // 2.31 Accumulator.report(atomic, baseLine); // 0.91 Accumulator.report(synch, lock); // 0.71 Accumulator.report(synch, atomic); // 1.82 Accumulator.report(lock, atomic); // 2.54 }public static void main(String[] args) {int iteration 5; System.out.println(warm up);baseLine.timedTest();for (int i 0; i iteration; i) {test();Accumulator.cycles * 2; }Accumulator.exec.shutdown(); }
}
abstract class Accumulator { // 模拟器 public static long cycles 50000L; // 循环次数 private static final int N 4; public static ExecutorService exec Executors.newFixedThreadPool(N*2); // 线程池
// CyclicBarrier也叫同步屏障在JDK1.5被引入可以让一组线程达到一个屏障时被阻塞
// 直到最后一个线程达到屏障时所以被阻塞的线程才能继续执行。 private static CyclicBarrier barrier new CyclicBarrier(N*21); // 同步屏障 protected volatile int index 0;protected volatile long value 0;protected long duration 0; // 持续时间 protected String id error;protected final static int SIZE 100000 ;protected static int[] preLoaded new int[SIZE]; static {Random rand new Random(47); // 随机数发生器 for (int i0; i SIZE; i) {preLoaded[i] rand.nextInt(); // 预加载}}public abstract void accumulate(); // 模拟方法抽象由子类实现 public abstract long read(); // 读取方法 private class Modifier implements Runnable { // 修改器 Overridepublic void run() { // 模板方法模式由子类提供实现 for(long i0; icycles; i) {accumulate(); // 调用模拟方法 }try {barrier.await(); // 屏障阻塞直到给定数量的线程都等待为止 } catch (Exception e) {throw new RuntimeException(e);}}}private class Reader implements Runnable { // 读取器 private volatile long value; Overridepublic void run() {for(long i0; icycles; i) {value read(); }try {barrier.await(); // 屏障阻塞直到给定数量的线程都等待为止 } catch (Exception e) {throw new RuntimeException(e);}}}public void timedTest() { // 时间测试 long start System.nanoTime(); for (int i0; iN; i) {exec.execute(new Modifier()); // 执行修改器 exec.execute(new Reader()); // 执行读取器 }try {// 程序中必须有一个 CyclicBarrier 因为需要确保所有任务在声明每个测试完成之前都已经完成 barrier.await(); // 屏障阻塞直到给定数量的线程都等待为止 } catch (Exception e) {throw new RuntimeException(e);}duration System.nanoTime() - start; // 总时长 System.out.printf(%-13s:%13d\n, id, duration);}public static void report(Accumulator acc1, Accumulator acc2) { // 报告 System.out.printf(%-22s: %.2f\n, acc1.id / acc2.id, acc1.duration/(double)acc2.duration); }
}
class BaseLine extends Accumulator { // 基本测试无任何同步方法 {id baseline; }Override public void accumulate() {if (index SIZE-1) index 0;value preLoaded[(index)%SIZE];}Overridepublic long read() {return value;}
}
class SynchronizedTest extends Accumulator { // synchronize同步 测试 {id Synchronized; }Overridepublic synchronized void accumulate() {if (index SIZE-1) index 0;value preLoaded[index];}Overridepublic long read() {return value;}
}
class LockTest extends Accumulator { // ReentrantLock可重入锁同步 测试 {id lock;}private Lock lock new ReentrantLock(); Overridepublic void accumulate() {lock.lock();try {if (index SIZE-1) index 0;value preLoaded[index];} finally {lock.unlock();}}Overridepublic long read() {lock.lock();try {return value; } finally {lock.unlock(); }}
}
class AtomicTest extends Accumulator { // Atomic原子类同步 测试 {id atomic;}private AtomicInteger index new AtomicInteger(0);private AtomicLong value new AtomicLong(0);Overridepublic void accumulate() {int i index.getAndIncrement();value.getAndAdd(preLoaded[i%SIZE]);if (i SIZE-1) {index.set(0);}}Overridepublic long read() {return value.get();}
}
/*
warm up
baseline : 12811667Cycles : 50000
baseline : 10401913
Synchronized : 18486698
lock : 26550332
atomic : 8931189
Synchronized/baseline : 1.78
lock/baseline : 2.55
atomic/baseline : 0.86
Synchronized/lock : 0.70
Synchronized/atomic : 2.07
lock/atomic : 2.97Cycles : 100000
baseline : 18458982
Synchronized : 28172394
lock : 36321361
atomic : 14323233
Synchronized/baseline : 1.53
lock/baseline : 1.97
atomic/baseline : 0.78
Synchronized/lock : 0.78
Synchronized/atomic : 1.97
lock/atomic : 2.54Cycles : 200000
baseline : 36408153
Synchronized : 50424697
lock : 71790482
atomic : 28702992
Synchronized/baseline : 1.38
lock/baseline : 1.97
atomic/baseline : 0.79
Synchronized/lock : 0.70
Synchronized/atomic : 1.76
lock/atomic : 2.50Cycles : 400000
baseline : 68541253
Synchronized : 103632938
lock : 144097706
atomic : 53405164
Synchronized/baseline : 1.51
lock/baseline : 2.10
atomic/baseline : 0.78
Synchronized/lock : 0.72
Synchronized/atomic : 1.94
lock/atomic : 2.70Cycles : 800000
baseline : 137235667
Synchronized : 180808536
lock : 283742763
atomic : 108986327
Synchronized/baseline : 1.32
lock/baseline : 2.07
atomic/baseline : 0.79
Synchronized/lock : 0.64
Synchronized/atomic : 1.66
lock/atomic : 2.60
*/代码解说程序中有一个CyclicBarrier 循环屏障因为我们希望确保所有的任务在声明每个测试完成之前都已经完成了
【互斥技术总结】 1、Atomic如果涉及多个Atomic对象你就有可能会被强制要求放弃这种用法因为Atomic对象只有在非常简单的情况下才有用这些情况通常包括你只有一个要被修改的Atomic对象并且这个对象独立于其他所有的对象。更安全的做法只有在性能方面的需求能够明确指示时再替换为 Atomic否则还是推荐使用 synchronized; 干货——Atomic类的使用场景 2、推荐使用 synchronize进行并发控制因为 synchronize关键字所产生的代码与Lock所需的 加锁-try-finally-解锁惯用方法锁产生的代码相比可读性提高了很多所以推荐使用 synchronize。就如我在本书其他地方提到的代码被阅读次数远多于被编写的次数。在编程时与其他人交流相对于与计算机交流而言要重要得多因此代码的可读性至关重要。因此从 synchronized 入手只有在性能调优时才替换为 Lock对象这种做法具有实际意义的。干货——推荐使用 synchronize进行并发控制
【21.9.2】免锁容器 1、CopyOnWriteArrayList写入将导致创建整个底层数组的副本而源数组将保留在原地使得复制的数组在被修改时读取操作可以安全执行 CopyOnWriteArrayList好处是当多个迭代器同时遍历和修改这个列表时不会抛出 ConcurrentModificationExceptionCopyOnWriteArraySet 使用了CopyOnWriteArrayList 来实现其免锁行为 2、ConcurrentHashMap 与ConcurrentLinkedQueue 使用了类似的技术允许并发的读取和写入但是容器中只有部分内容而不是整个容器可以被复制和修改。然后任何修改在完成之前读取者仍旧不能看到他们。ConcurrentHashMap 不会抛出 ConcurrentModificationException异常。 3、乐观锁 只要你主要是从免锁容器中读取那么它就会比 synchronized 快很多因为获取和释放锁的开销省掉了
4、比较并发控制的list容器干货——测试并发编程下的list性能 CopyOnWriteArrayList性能 优于 SynchronizedList 【代码——Tester】
/*** 性能测试器* page 756* param C*/
public abstract class TesterC {static int testReps 10;static int testCycles 10;static int containerSize 10; // 容器大小/** 抽象方法-初始化容器 */abstract C containerInitializer();/** 抽象方法-开启读取和写入任务 */abstract void startReadersAndWriters();C testContainer;String testId;/** 读取线程个数 */int nReaders;/** 写入线程个数 */ int nWriters;volatile long readResult 0;volatile long readTime 0;volatile long writeTime 0;/*CountDownLatch的作用也是如此在构造CountDownLatch的时候需要传入一个整数n* 在这个整数“倒数”到0之前主线程需要等待在门口而这个“倒数”过程则是由各个执行线程驱动的* 每个线程执行完一个任务“倒数”一次。总结来说CountDownLatch的作用就是等待其他的线程都执行完任务* 必要时可以对各个任务的执行结果进行汇总然后主线程才继续往下执行。*/CountDownLatch endLatch; // latch-门栓 static ExecutorService exec Executors.newCachedThreadPool(); // 线程池 Integer[] writeData;/** 构造器 */ Tester(String testId, int nReaders, int nWriters) {this.testId testId , nReaders reader thread, nWriters writer thread; this.nReaders nReaders;this.nWriters nWriters; writeData Generated.array(Integer.class, new RandomGenerator.Integer(), containerSize);for (int i0; i testReps; i) {runTest();readTime 0 ;writeTime 0;}}void runTest() {endLatch new CountDownLatch(nReaders nWriters);testContainer containerInitializer();startReadersAndWriters();try {endLatch.await(); // 门栓等待直到所有线程都执行完成 } catch (InterruptedException ex) {System.out.println(endLatch interrupted);}System.out.printf(%-100s %14d %14d\n, testId, readTime, writeTime);if (readTime !0 writeTime ! 0) {System.out.printf(%-100s %14d\n, readTime writeTime , readTime writeTime);}}abstract class TestTask implements Runnable {/** 开启线程运行test方法 */abstract void test();/** 存放结果在synchronzid 静态块里执行 */abstract void putResults();long duration; public void run() {long startTime System.nanoTime();test();duration System.nanoTime() - startTime;synchronized (Tester.this) {putResults();}endLatch.countDown(); // 门栓减1直到减为0则门栓不等待 }}public static void initMain(String[] args) {testReps new Integer(3);testCycles new Integer(3);containerSize new Integer(3);System.out.printf(%-100s %14s %14s\n, type, readTime, write Time); }
}
【代码——ListComparisons——比较列表】
/*** 测试并发编程下的list性能 CopyOnWriteArrayList性能 优于 SynchronizedList* page 758 */
public class ListComparisons {public static void main(String[] args) {Tester.initMain(null);new SynchronizedArrayListTest(10, 0);new SynchronizedArrayListTest(9, 1);new SynchronizedArrayListTest(5, 5);new CopyOnWriteArrayListTest(10, 0);new CopyOnWriteArrayListTest(9, 1);new CopyOnWriteArrayListTest(5, 5);Tester.exec.shutdown();}
}
/** List测试类 */
abstract class ListTest extends TesterListInteger {ListTest(String testId, int nReaders, int nWriters) {super(testId, nReaders, nWriters);}class Reader extends TestTask { // 读取任务 long result 0;void test() {for (int i 0; i testCycles; i) {for (int j 0; j containerSize; j) {result testContainer.get(j);}}}void putResults() {readResult result;readTime duration; }}class Writer extends TestTask { // 写入任务 Overridevoid test() {for (int i 0; i testCycles; i) {for (int j 0; j containerSize; j) {testContainer.set(i, writeData[j]);}}}Overridevoid putResults() {writeTime duration; }}/** 运行读取任务和写入任务 */void startReadersAndWriters() {for (int i 0; i nReaders; i) {exec.execute(new Reader());}for (int i 0; i nWriters; i) {exec.execute(new Writer());}}
}
/** 同步list-SynchronizedList*/
class SynchronizedArrayListTest extends ListTest {ListInteger containerInitializer() {return Collections.synchronizedList(new ArrayListInteger(new CountingIntegerList(containerSize)));}SynchronizedArrayListTest(int nreaders, int nwriters) {super(synched arraylist, nreaders, nwriters);}
}
/** 同步list-CopyOnWriteArrayList*/
class CopyOnWriteArrayListTest extends ListTest {OverrideListInteger containerInitializer() {/*CopyOnWriteArrayList好处是当多个迭代器同时遍历和修改这个列表时* 不会抛出 ConcurrentModificationException*/ return new CopyOnWriteArrayListInteger(new CountingIntegerList(containerSize));} CopyOnWriteArrayListTest(int nreaders, int nwriters) {super(CopyOnWriteArrayListTest, nreaders, nwriters);}
}
/*
type readTime write Time
t null
t null
t null
synched arraylist , 10 reader thread, 0 writer thread 86062 0
synched arraylist , 10 reader thread, 0 writer thread 140764 0
synched arraylist , 10 reader thread, 0 writer thread 535339 0
t null
t null
t null
synched arraylist , 9 reader thread, 1 writer thread 238497 20422
readTime writeTime 258919
synched arraylist , 9 reader thread, 1 writer thread 188900 4376
readTime writeTime 193276
synched arraylist , 9 reader thread, 1 writer thread 192182 3647
readTime writeTime 195829
t null
t null
t null
synched arraylist , 5 reader thread, 5 writer thread 86791 74393
readTime writeTime 161184
synched arraylist , 5 reader thread, 5 writer thread 605721 540446
readTime writeTime 1146167
synched arraylist , 5 reader thread, 5 writer thread 39385 76216
readTime writeTime 115601
t null
t null
t null
CopyOnWriteArrayListTest , 10 reader thread, 0 writer thread 63456 0
CopyOnWriteArrayListTest , 10 reader thread, 0 writer thread 48866 0
CopyOnWriteArrayListTest , 10 reader thread, 0 writer thread 44126 0
t null
t null
t null
CopyOnWriteArrayListTest , 9 reader thread, 1 writer thread 30997 35738
readTime writeTime 66735
CopyOnWriteArrayListTest , 9 reader thread, 1 writer thread 71475 21516
readTime writeTime 92991
CopyOnWriteArrayListTest , 9 reader thread, 1 writer thread 24434 21151
readTime writeTime 45585
t null
t null
t null
CopyOnWriteArrayListTest , 5 reader thread, 5 writer thread 19692 680843
readTime writeTime 700535
CopyOnWriteArrayListTest , 5 reader thread, 5 writer thread 48503 622496
readTime writeTime 670999
CopyOnWriteArrayListTest , 5 reader thread, 5 writer thread 14587 756695
readTime writeTime 771282*/
代码解说 synchronized ArrayList 无论读者和写入者的数量是多少都具有大致相同的性能——读取者与其他读取者竞争锁的方式与写入者相同。但 CopyOnWriteArrayList 在没有写入者时速度会快很多。通过测试CopyOnWriteArrayList 性能优于 synchronized list对列表写入的影响并没有超过短期同步整个列表的影响。
5、比较并发控制的map容器性能干货——测试并发编程下的Map性能 CurrentHashMap性能优于 synchronizedHashMap
/*** 测试并发编程下的Map性能 CurrentHashMap性能优于synchronizedHashMap * page 758*/
public class MapComparisons {public static void main(String[] args) {Tester.initMain(null); new SynchronizedHashMapTest(10, 0);new SynchronizedHashMapTest(9, 1);new SynchronizedHashMapTest(5, 5);new ConcurrentHashMapTest(10, 0);new ConcurrentHashMapTest(9, 1);new ConcurrentHashMapTest(5, 5);Tester.exec.shutdown();}
}
/** Map测试 */
abstract class MapTest extends TesterMapInteger, Integer {MapTest(String testId, int nReaders, int nWriters) {super(testId, nReaders, nWriters); }/** 读取器 */class Reader extends TestTask {long result 0;void test() {for (int i 0; i testCycles; i) {for (int j 0; j containerSize; j) {result testContainer.get(j);}}}Overridevoid putResults() {readResult result; readTime duration; }}/** 写入器 */ class Writer extends TestTask {long result 0;void test() {for (int i 0; i testCycles; i) {for (int j 0; j containerSize; j) {testContainer.put(j, writeData[j]);}}}Overridevoid putResults() {writeTime duration; }}/** 运行读取与写入任务 */ void startReadersAndWriters() {for (int i 0; i nReaders; i) {exec.execute(new Reader());}for (int i 0; i nWriters; i) {exec.execute(new Writer());}}
}
/** 同步块SynchronizedHashMap*/
class SynchronizedHashMapTest extends MapTest {MapInteger, Integer containerInitializer() {return Collections.synchronizedMap(new HashMapInteger, Integer(MapData.map(new CountingGenerator.Integer(), new CountingGenerator.Integer(), containerSize)));}SynchronizedHashMapTest(int nreaders, int nwriters) {super(SynchronizedHashMapTest, nreaders, nwriters);}
}
/** 同步HashMap */
class ConcurrentHashMapTest extends MapTest {MapInteger, Integer containerInitializer() {return new ConcurrentHashMapInteger, Integer( MapData.map(new CountingGenerator.Integer(), new CountingGenerator.Integer(), containerSize));}ConcurrentHashMapTest(int nreaders, int nwriters) {super(ConcurrentHashMapTest, nreaders, nwriters);}
}
/*
type readTime write Time
t null
t null
t null
SynchronizedHashMapTest , 10 reader thread, 0 writer thread 207497 0
SynchronizedHashMapTest , 10 reader thread, 0 writer thread 487569 0
SynchronizedHashMapTest , 10 reader thread, 0 writer thread 253084 0
t null
t null
t null
SynchronizedHashMapTest , 9 reader thread, 1 writer thread 126177 31361
readTime writeTime 157538
SynchronizedHashMapTest , 9 reader thread, 1 writer thread 131281 25892
readTime writeTime 157173
SynchronizedHashMapTest , 9 reader thread, 1 writer thread 98095 7658
readTime writeTime 105753
t null
t null
t null
SynchronizedHashMapTest , 5 reader thread, 5 writer thread 37559 56889
readTime writeTime 94448
SynchronizedHashMapTest , 5 reader thread, 5 writer thread 49961 74393
readTime writeTime 124354
SynchronizedHashMapTest , 5 reader thread, 5 writer thread 64910 122531
readTime writeTime 187441
t null
t null
t null
ConcurrentHashMapTest , 10 reader thread, 0 writer thread 46679 0
ConcurrentHashMapTest , 10 reader thread, 0 writer thread 48867 0
ConcurrentHashMapTest , 10 reader thread, 0 writer thread 41572 0
t null
t null
t null
ConcurrentHashMapTest , 9 reader thread, 1 writer thread 48134 8023
readTime writeTime 56157
ConcurrentHashMapTest , 9 reader thread, 1 writer thread 55795 8023
readTime writeTime 63818
ConcurrentHashMapTest , 9 reader thread, 1 writer thread 28080 7294
readTime writeTime 35374
t null
t null
t null
ConcurrentHashMapTest , 5 reader thread, 5 writer thread 17506 60170
readTime writeTime 77676
ConcurrentHashMapTest , 5 reader thread, 5 writer thread 17506 41207
readTime writeTime 58713
ConcurrentHashMapTest , 5 reader thread, 5 writer thread 14221 58348
readTime writeTime 72569*/
【21.9.3】乐观加锁 1、原理在执行某项计算时实际上没有使用互斥但在计算完成时准备更新时需要使用 compareAndSet的方法。你把旧值和新值一起提交给这个方法如果旧值与他在Atomic对象中发现的值不一致那么这个操作就会失败这意味着某个其他任务已经于此操作前修改了这个对象。 2、通常情况下我们使用 synchronized 或 lock来防止多个任务同时修改同一个对象但这里我们是乐观的因为我们保持数据为为锁定状态并希望没有任何其他任务插入修改他。使用 Atomic替代 synchronized或 Lock可以获得性能上的好处 3、注意compareAndSet() 方法操作失败会发生什么 建议程序做好补偿机制 【代码——FastSimulation】
/*** page 760 * 乐观加锁测试 */
public class FastSimulation {/** 元素个数 */static final int N_ELEMENTS 100;/** 每个元素的基因个数 */static final int N_GENES 30;/** 进化者 */static final int N_EVOLVERS 50;// 进化者 /** 网格 */ static final AtomicInteger[][] GRID new AtomicInteger[N_ELEMENTS][N_GENES]; static Random rand new Random(47);static class Evolver implements Runnable { // 进化者任务 Overridepublic void run() {while(!Thread.interrupted()) { // 如果线程不中断重复执行 int element rand.nextInt(N_ELEMENTS); // 获取元素 for (int i 0; i N_GENES; i) {/** 前一个元素 */int previous element - 1; if (previous 0) previous N_ELEMENTS - 1;/** 下一个元素 */ int next element 1; if (next N_ELEMENTS) next 0;/** 旧值 */int oldValue GRID[element][i].get();/** 新值 */ int newValue oldValue GRID[previous][i].get() GRID[next][i].get();newValue / 3; // Atomic.compareAndSet 方法比较重要// 比较当前值与传入的 old 值是否相同相同则更新为 新值 if (!GRID[element][i].compareAndSet(oldValue, newValue)) {System.out.printf(oldvalue, changed from %d to %d \n, oldValue, newValue); }}}}}public static void main(String[] args) throws Exception {ExecutorService exec Executors.newCachedThreadPool();for (int i0; iN_ELEMENTS; i) {for (int j 0; j N_GENES; j) {GRID[i][j] new AtomicInteger(rand.nextInt(1000));}}for (int i 0; i N_EVOLVERS; i) {exec.execute(new Evolver());}exec.shutdown(); }
}
/*
oldvalue, changed from 670 to 676
oldvalue, changed from 352 to 351
oldvalue, changed from 455 to 454
oldvalue, changed from 424 to 423
............
*/
【21.9.4】ReadWriteLock 读写锁 1、定义 ReadWriteLock对向数据结构相对不频繁写入但是有多个任务要经常读取这个数据结构的这类情况进行了优化。 ReadWriteLock 使得你可以同时有多个读取这只要他们都不试图写入。如果写锁已经被其他任务持有那么任何读取者都不能访问直到这个写锁被释放为止。 2、ReadWriteLock是否能够提高程序性能是不确定的取决于数据被读取与修改的频率读取和写入操作的时间有多少线程竞争以及是否在多处理器上运行等因素。 最好的方法就是用实验来证明是否性能提升。 【代码——ReaderWriterList】
/*** 读写任务列表-可重入锁测试 page 763*/
public class ReaderWriterListT {private ArrayListT lockedList;/** 可重入读写锁 */private ReentrantReadWriteLock lock new ReentrantReadWriteLock(true);public ReaderWriterList(int size, T initialValue) {/** Collections.nCopies-返回由指定对象这里是initialValue初始值的n(size)个副本组成的不可变列表。* 新分配的数据对象很小它包含对数据对象的单个引用。 该方法与List.addAll方法结合使用以增长列表很有用。* 返回的列表是可序列化的。*/lockedList new ArrayListT(Collections.nCopies(size, initialValue));}/** 写入 */public T set(int index, T element) {Lock wLock lock.writeLock(); // 获取写锁wLock.lock(); // 加锁try {System.out.println(写锁数量 lock.getWriteHoldCount());return lockedList.set(index, element);} finally {wLock.unlock(); // 解锁}}public T get(int index) {Lock rLock lock.readLock(); // 获取读锁rLock.lock();// 加锁try {if (lock.getReadLockCount() 1) {System.out.println(读锁数量 lock.getReadLockCount());}return lockedList.get(index);} finally {rLock.unlock(); // 解锁}}public static void main(String[] args) {new ReaderWriterListTest(5, 1);}
}
/*** 读写任务列表测试*/
class ReaderWriterListTest {ExecutorService exec Executors.newCachedThreadPool();private final static int SIZE 10;private static Random rand new Random(47);private ReaderWriterListInteger list new ReaderWriterListInteger(SIZE, 0);/** 构造器启动读取和写入任务 */public ReaderWriterListTest(int readers, int writers) {for (int i 0; i readers; i) {exec.execute(new Reader()); // 读取任务}for (int i 0; i writers; i) {exec.execute(new Writer()); // 写入任务}}/** 写入任务 */private class Writer implements Runnable {Overridepublic void run() {try {for (int i 0; i SIZE; i) {list.set(i, rand.nextInt()); // 获取写锁TimeUnit.MICROSECONDS.sleep(100);}} catch (InterruptedException e) {e.printStackTrace();}System.out.println(writer finished, shutting down);exec.shutdown();}}/** 读取任务 */private class Reader implements Runnable {Overridepublic void run() {try {for (int i 0; i SIZE; i) {list.get(i); // 获取读锁TimeUnit.MICROSECONDS.sleep(1);}} catch (InterruptedException e) {e.printStackTrace();}System.out.println(reader finished, shutting down);}}
}
/** 读锁数量 3 读锁数量 3 读锁数量 4 读锁数量 3 写锁数量 1 写锁数量 1 写锁数量 1 写锁数量 1 写锁数量* 1 读锁数量 2 读锁数量 2 写锁数量 1 写锁数量 1 读锁数量 3 读锁数量 4 读锁数量 4 读锁数量 4* 写锁数量 1 写锁数量 1 写锁数量 1 reader finished, shutting down reader finished,* shutting down writer finished, shutting down reader finished, shutting down* reader finished, shutting down reader finished, shutting down*/
【21.10】活动对象 1、有一种可替换的方式被称为活动对象或行动者。之所以称这些对象是活动的是因为每个对象都维护着它自己的工作器线程和消息队列并且所有对这种对象的请求都将进入队列排队任何时刻都只能运行其中一个。然而有个活动对象可以串行化消息而不是方法这意味着不需要防备一个任务在其循环的中间被中断这种问题。 2、当你向一个活动对象发送消息时这条消息会转变为一个任务该任务会被插入到这个对象的队列中等待在以后的某个时刻运行。Future在实现这种模式时将派上用场。
【看个荔枝】有两个方法可以将方法调用排进队列
/*** 活动对象演示* page 764 */
public class ActiveObjectDemo {/** 线程池 */ private ExecutorService exec Executors.newSingleThreadExecutor();private Random rand new Random(47);/** 暂停方法睡眠 */private void pause(int factor) {try {TimeUnit.MILLISECONDS.sleep(100 rand.nextInt(factor));} catch (InterruptedException e) {System.out.println(sleep interrupt);} } /** 调用int方法 */public FutureInteger calculateInt(final int x, final int y) {return exec.submit(new CallableInteger() {public Integer call() {System.out.println(starting x x , y y);pause(500);return x y ;}});}public FutureFloat calculateFloat(final float x, final float y) {return exec.submit(new CallableFloat() {public Float call() {System.out.println(starting x x , y y);pause(2000);return x y; }});}public void shutdown() {exec.shutdown(); }public static void main(String[] args) {ActiveObjectDemo d1 new ActiveObjectDemo(); /*在计算机中就是当你想要对一块内存进行修改时我们不在原有内存块中进行写操作* 而是将内存拷贝一份在新的内存中进行写操作* 写完之后呢就将指向原来内存指针指向新的内存原来的内存就可以被回收掉嘛*/ListFuture? results new CopyOnWriteArrayListFuture?();for (float f 0.0f; f 1.0f; f 0.2f) {results.add(d1.calculateFloat(f, f));}for (int i 0; i 5; i) {results.add(d1.calculateInt(i, i));}System.out.println( all asynch calls made );int index 0;while(results.size() 0) { // while 循环再放一层for循环因为 可能f.isDone() 为false for (Future? f: results) {if (f.isDone()) {try {System.out.println(f.get( index ) f.get()); } catch (Exception e) {throw new RuntimeException(e);}results.remove(f);}} d1.shutdown(); }}
}
/*all asynch calls made
starting x 0.0, y 0.0
f.get(1) 0.0
starting x 0.2, y 0.2
f.get(2) 0.4
starting x 0.4, y 0.4
f.get(3) 0.8
starting x 0.6, y 0.6
f.get(4) 1.2
starting x 0.8, y 0.8
f.get(5) 1.6
starting x 0, y 0
starting x 1, y 1
f.get(6) 0
f.get(7) 2
starting x 2, y 2
f.get(8) 4
starting x 3, y 3
f.get(9) 6
starting x 4, y 4
f.get(10) 8
*/
代码解说使用 CopyOnWriteArrayList 可以移除为了防止 ConcurrentModificationException而复制List的这种需求小结 小结1为了能够在不经意间就可以防止线程之间的耦合任何传递给活动对象方法调用的参数都必须是只读的其他活动对象或者是不连续对象。即没有连接任何其他任务的对象。小结2有个活动对象 你可以干一下事情 事情1、每个对象都可以拥有自己的工作器线程 事情2、每个对象都将维护对他自己的域的全部控制权 这比普通类要严格一些普通类只是拥有防护它们的域的选择权 事情3、所有在活动对象之间的通信都将以在这些对象之间的消息形式发生 事情4、活动对象之间的所有消息都要排队 【21.11】总结【21.11.1】java线程进行并发编码的基础知识 1、可以运行多个独立任务 2、必须考虑当这些任务关闭时可能出现的所有问题 3、任务可能会在共享资源上彼此干涉。互斥或锁是用来防止这种冲突的基本工具 4、如果任务设计得不够合理就有可能会死锁【21.11.2】什么时候使用并发什么时候应该避免并发非常关键使用并发的原因如下 1、要处理很多任务它们交织在一起应用并发能够更有效地使用计算机 2、要能够更好地组织代码 3、要便于用户使用
【21.11.3】 线程的好处 1、轻量级的上下文切换轻量级的线程上下万切换只需要100条指令重量级的进程上下文切换需要上千条指令 2、因为一个给定进程内的所有线程共享相同的内存空间轻量级的上下文切换只是改变了程序的执行序列和局部变量而进程切换必须改变所有内存空间干货——线程的上下文切换是轻量级的进程的上下文切换是重量级的
【21.11.4】多线程的缺陷 1、等待共享资源的时候性能降低 2、需要处理线程的额外cpu花费 3、糟糕的程序设计导致不必要的复杂度 4、有可能产生一些病态行为如饿死竞争死锁活锁多个运行各自任务的线程使得整体无法完成 5、不同平台导致的不一致性