请打开123720的网站百度,WordPress评论制作,长沙智优营家,珠宝静态网站模板通过本章的学习可以学到#xff1a;掌握java.util.concurrent(JUC)开发框架的核心接口与使用特点,掌握TimeUnit类的作用#xff0c;并且可以使用此类实现日期时间数据转换#xff0c;掌握多线程原子操作类的实现以及与volatile关键字的应用#xff0c;理解ThreadFactory类的…通过本章的学习可以学到掌握java.util.concurrent(JUC)开发框架的·核心接口与使用特点,掌握TimeUnit类的作用并且可以使用此类实现日期时间数据转换掌握多线程原子操作类的实现以及与volatile关键字的应用理解ThreadFactory类的作用与使用掌握线程同步锁的作用理解互斥锁与读写锁的应用掌握线程同步工具类的使用掌握并发集合操作访问可以深刻理解普通集合在并发访问下的异常产生原理掌握阻塞队列与延迟队列的使用并深刻理解延迟队列针对数据缓存的实现原理掌握线程池的概念、分类以及线程池的拒绝策略。多线程是进行Java项目开发与设计中的重要组成部分也是Java语言区别于其他语言的最大特点但是传统的读线程实现机制在进行同步(包括等待与唤醒机制)处理时编写难度较高所以为了进一步简化多线程同步处理机制Java提供JUC并发编程开发包的支持。本章将完整讲解JUC的各个组成部分并且将采用大量案例进行详细讲解。
21.1 JUC简介
多线程可以有效地提升程序的执行性能在最初的Java编程模型中除了需要考虑程序性能外还需要考虑线程死锁、公平性、资源管理以及如何避免线程安全性方面带来的危害等诸多因素因而往往会采用一系列复杂的安全策略加大了程序的实现困难。 为了简化多线程的开发难题从JDK1.5开始提供了一个新的并发编程开发包java.util.concurrent(以下简称JUC)利用此包中提供的并发编程模型可以有效地减少竞争条件(race conditions)和死锁问题的出现。在java.util.concurrent包中支持的核心类如下
21.2 TimeUnit
TimeUnit(时间单元)是一个描述时间单元的枚举类在该枚举类中定义有以下几个时间单元实例:天(DAYS)、时(HOURS)、分(MINUTES)、秒(SECONDS)、毫秒(MILLSECONDS)、微秒(MICROSECONDS)、纳秒(NANOSECONDS)利用此类可以方便的实现各个时间单元数据的转换也可以更加方便的实现线程的休眠时间控制该类提供的主要方法是
范例时间单元转换(将小时变为秒)
package cn.mldn.demo; import java.util.concurrent.TimeUnit; public class JUCDemo { public static void main(String[]args) { long hour1; long secondTimeUnit.SECONDS.convert(hour,TimeUnit.HOURS);//由小时转化为秒单位 } }
本程序通过TimeUnit获取了秒级对象SECONDS随后将给定的小时转化为秒数据表示
范例获取18天后的日期
package cn.mldn.demo; import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.TimeUnit; public class JUCDemo { public static void main(String[]args) { long currentSystem.currentTimeMills();//获取当前的时间 //利用当前的时间戳(毫秒)18天的毫秒数 longaftercurrentTimeUnit.MILLSECONDS.convert(18,TimeUnit.Days); //将long数据转化为Date并且利用SimpleDateFormat进行格式化显示 System.out.println(new SimpleDateFormat(yyyy-MM-dd).format(new Date(after))); } }
本程序利用TimeUnit类提供的时间格式转换为处理操作将指定的天数内容转为与之相匹配的毫秒数而后在与当前的时间搓进行累加后即可得到18天后的日期时间戳数据。 在Thread类中提供的休眠方法Thread.sleep()是通过毫秒来定义休眠时间的所以在进行休眠控制室往往都需要针对休眠时间对毫秒数据进行计算处理。而在TimeUnit类中也提供有sleep()休眠方法此方法最大的特点是可以结合TimeUnit类提供的一系列实例化对象轻松地指定休眠时间的单位。
范例使用TimeUnit休眠线程
package cn.mldn.demo; import java.util.concurrent.TimeUnit; public class JUCDemo { public static void main(String[]args) { new Thread(()- { for(int x0;x10;x) { TimeUnit.MINUTES.sleep(1); System.out.println(Thread.currentThread().getName()x); } }).start(); } }
本程序创建了一个线程对象并且在本线程执行时利用TimeUnit类提供的sleep()方法直接设置休眠时间为一分钟。
21.3 原子操作类
在多线程操作中经常会出现多个线程对一个共享变量的并发修改为了保证此操作的正确性最初的时候可以通过synchronized关键字来操作。而从JDK1.5之后提供了java.util.concurrent.atomic操作包该包中的原子操作类提供了一种用法简单、性能高效、线程安全的更新一个变量的方式。 在java.util.concurrent.atomic包中提供的原子操作类可以分为以下四类 基本类型:AtomicInterger、AtomicLong、AtomicBoolean 数组类型AtomicIntergerArray、AtomicLongArray、AtomicReferenceArray 引用类型:AtomicReference、AtomicStampedReference、AtomicMarkableReference 对象的属性修改类型:AtomicIntergerFieldUpdater、AtomicLongFieldUpdater
提示volatile关键字与原子操作类 原子操作类最大的特点是可以进行线程安全更新即帮助用户使用一种更为简单地共享数据的线程同步操作所以通过源代码可以发现在这些原子类数据保存属性上都使用volatile关键字进行声明这样就可以防止由于数据缓存所造成的数据更新不一致的问题。
21.3.1 基本类型原子操作类
基本类型原子操作类一共有三个AtomicInterger、AtomicLong、AtomicBoolean这三个类的原理和用啊类似为了说明问题将通过AtomicLong进行讲解其常用方法如下提示关于32位操作系统和64位操作系统在long操作上的区别 在32位操作系统上64位的long和double变量由于会被JVM当做两个个分离的32位来进行操作所以不具备原子性而使用AtomicLong能让long的操作保持原执行下面给出了AtomicLong类中关于数据存储的部分源代码 private volatile long value; public AtomicLong(long initialValue) { valueinitialValue; } 可以发现通过AtomicLong构造进行赋值时将数据内容通过value成员属性保存而value成员塑性上使用了volatile关键字进行直接数据操作。
范例使用AtomicLong进行原子性操作
package cn.mldn.demo; import java.util.concurrent.atomic.AtomicLong; public class JUCDemo { public static void main(String []args) { AtomicLong numnew AtomicLong(100L);//实例化原子操作类 num.addAndGet(200);//增加数据并取得 long currnum.getAndIncrement();//先获取后自增 System.out.println(curr);//自增签的内容 System.out.println(num.get()); } }
本程序啊将要操作的数据设置到了AtomicLong类示例中并且对AtomicLong类中提供的原子性操作方法进行保存内容的操作。 范例利用多线程操作数据
package cn.mldn.demo; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; public class JUCDemo { public static void main(String[]args)throws Exception { AtomicLong numnew AtomicLong(100);//实例化原子操作类 for(int x0;x10;x) { new Thread(()-{num.addAndGet(200);}).start(); } TimeUnit.SECONDS.sleep(2); System.out.println(num.get());自增后的内容 } }
本程序启动10个线程同时帮助AtomicLong对象中保存的数据增加操作通过最终的执行结果可以发现AtomicLong已经帮助开发者实现了多线程的同步操作通过最终的执行结果可以发现AtomicLong已经帮助开发者实现了多线程的同步操作得到了正确的计算结果。
范例判断并设置新内容
package cn.mldn.demo; import java.util.concurrent.atomic.AtomicLong; public class JUCDemo { public static void main(String []args)throws Exception { AtomicLong numnew AtomicLong(100L);//实例化原子操作类 System.out.println(num.compareAndSet(100L,300L));//内容相同返回true System.out.println(num.get()); } }
本程序利用AtomicLong类中的compareAndSet()方法对要进行操作的内容进行判断由于此时AtomicLong中保存的是100并且判断的内容也是100内容相同所以可以保存新的数据如果不相同则无法进行保存。
提示关于CAS问题
在JUC中有两大核心操作CAS、AQS其中CAS是java.util.concurrentatomic包的基础而AQS是同步缩的实现基础。 CAS(Compare And Swap)是一条CPU并发原语。它的功能是判断内存某个位置的值是否为预期值如果是则更改为新的值这个过程属于原子性操作。CAS并发原语体现在Java语言中就是sum.misc.Unsafe类中的各个方法。调用Unsafe类中的CAS方法JVM会帮助开发者实现出CAS汇编指令。这是一种完全以雷雨硬件的功能为了说明这个问题下面来观察一下AtomicLong类中的源代码
成员属性private static final jdk.internal.misc.Unsafe Ujdk.internal.misc.Unsafe.getUnsafe();成员属性private static finale long VALUE-U.objectFieldOffset(AtomicLong.class,value);compareAndSet()方法public finale boolean compareAndSet(long expectedValue,long new Value){return U.compareAndSetLong(this,VALUE,expectedValue,newValue);} 通过源代码的分析可以发现compareAndSet()方法时Unsafe()类负责执行CAS并发原语由JVM转化为汇编在代码中使用CAS自旋volatile变量的形式实现非阻塞并发这种方式时CAS的主要使用方式。 CAS是乐观锁是一种冲突重试机制在并发竞争不是很激烈的情况下其操作性能要好于悲观锁机制(synchronized同步处理)
21.3.2 数组原子操作类
数组原子操作类由3个AtomicIntergerArray、AtomicLongArray、AtomicReferenceArray(对象数组)其操作原理和形式类似。下面使用AtomicReferenceArray类进行说明
范例使用AtomicReferenceArray类操作
package cn.mldn.demo; import java.util.concurrent.atomic.AtomicReferenceArray; public class JUCDemo { public static void main(String[]args)throws Exception { String infos[]new String[]{A,B,C}; AtomicReferenceArrayStringarraynew AtomicReferenceArrayString(infos); //自使用compareAndSet()方法进行比较时是通过方式实现的比较操作 System.out.println(array.compareAndSet(0,AA,BB)); System.out.println(array.get(0)); } }
本程序实现了字符串对象数组的原子性保存同时利用compareAndSet()方法实现了数组内容的修改。
提问compareAndSet()修改时为什么传入匿名对象无法修改
对本程序而言如果修改数据时通过匿名的String类对象比较为什么无法成功设置 System.out.println(array.compareAndSet(0,new String(AA),BBB)); 此时代码执行后的结果为false内容也没有被交换最为关键的是保存自定义类对象时法向此类方法同样也无法进行交换为什么该方法不按照对象匹配的模式比较而只按照地址的方式比较呢
回答由底层C语言实现的
在Java层次上的数据比较有两类实现模式hasCode()和equals()、比较器但是在使用CAS方法时这两类操作都无法使用只是简单实现了地址的比较这实际上沿用了C语言的特点实现的。C语言是现代马参考如下 int compare_and_swap(int *reg,int oldval,int newval) { int old_reg_val*reg; if(old_reg_valoldval) *regnewval; return old_reg_val; } 可以发现此类操作是基于地址指针的形式实现的所以只能使用的方式进行地址判断
21.3.3 引用类型原子操作类
引用类型原子操作类一共有三种AtomicReference(引用类型原子型)、AtomicStampedReference(带有引用版本的原子类)、AtomicMarkableReference(标记节点原子类)。其中AtomicReference可以直接实现应用数据类型的原子操作常用方法如下
范例使用AtomicReference操作引用数据
package cn.mldn.demo; import java.util.concurrent.atomic.AtomicReference; class Memeber { private String name; private int age; public Member(String name,int age){...} }
public class JUCDemo { public static void main(String[]args)throws Exception { Member memAnew Member(AAA,12);//实例化 Member memBnew Member(ABB,12);//实例化 AtomicReferenceMemberrefnew AtomicReferenceMember(memA); ref.compareAndSet(memA,memB); System.out.println(ref); } }
本程序通过AtomicReference保存了一个引用对象由于对象存在引用关联这样就可以直接利用CAS正确判断并进行内容的替换。 AtomicStampedReference原子性应用类可以实现基于版本号的引用数据操作在操作时可以基于版本号实现数据操作常用方法范例使用AtomicStampedReference进行应用原子性操作 package cn.mldn.demo; import java.util.concurret.AtomicStampedReference; //Member类不再重复定义 public class JUCDemo { public static void main(String[]args)throws Exception { Member memAnew Member(AAA,12);//实例化menber对象 Member memBnew Member(AAA,12);//实例化menber对象 //由于AtomicStampedReference需要提供版本号所以在初始化时定义版本号为1 AtomicStampedReferenceMemberrefnew AtomicStampedReferenceMember(memA,1); //在进行CAS操作时除了要设置替换内容外也需要设置正确的版本号否则无法进行替换 ref.compareAndSet(memA,memA,1,2); System.out.println(ref.getReference());//输出当前数据内容 System.out.println(ref.getStamp()); } }
本程序在进行应用原子操作时除了设置应用数据外还设置有版本编号这样在使用CAS进行数据修改的时候就需要传入比较内容与版本编号. 除了使用版本号的处理行驶外也可以使用AtomicMarkableReferences类实现boolean标记,常用方法如下
范例使用AtomicMarkableReference进行标记原子性操作
public static void main(String[]args)throws Exception { Member memAnew Member(AAA,12);//实例化Member实例 Member memBnew Member(BBB,13); //由于AtomicMarkableReferenceMemberrefnew AtomicMarkableReferenceMember(memA,true); //在进行CAS操作的时候除了要设置替换内容外也需要设置标记号否则无法替换 ref.compareAndSet(memA,memB,true,false); System.out.println(ref.getReference()); 本程序利用AtomicMarkableReference类进行标记原子性处理操作这样在进行数据修改时就必须传入当前的标记状态(true或false)才可以实现内容更新. 提示关于ABA访问问题 对于JUC提供的AtomicStampedReference和AtomicMarkableReference两个类所需要解决的是多线程访问下的数据操作ABA不同步问题是指两个线程并发操作时由于更新不同步所造成的更新错误可以考虑以下流程对于ABA问题最简单的理解就是现在A和B两位开发工程师同时打开了一个相同的程序文件A在打开之后由于有其他的事情要忙所以暂时没有做任何的代码编写而B却一直在进行代码编写当B把代码写完之后并保存后观赏计算机离开了而A处理完其它事情后发现没有什么可写的于是直接保存退出了这样B的修改就消失不见了。 由于ABA问题的存在那么就有可能造成CAS的数据更新错误因为CAS是基于数据内容的判断来实现数据修改所以此时的操作就会产生错误。为了解决这个问题提出了版本号设计方案这也就是JUC提供AtomicStampedReference和AtomicMarkableReference两个类的原因所在。
21.3.4 对象属性修改原子操作类
为了保证在并发编程访问下的类属性修改的正确性JUC提供了3个塑性原子操作类:AtomicIntergerFieldUpdater、AtomicLongFieldUpdater、AtomicReferenceFieldUpdater这3个类都可以安全的进行属性更新。由于这几个累的实现原理和操作模式相同本节将针对AtomicLongFieldUpdater类的使用来进行讲解:
范例实现属性操作
class Book { //必须使用volatile定义 private volatile long id; private String title; public Book(long id,String title){...} public void setId(long id) { AtomicLongFileUpdaterBookatoLongAtomicLongFieldUpdater.newUpdater(Book.class,id); atoLong.compareAndSet(this.this.id,id); } }
public class JUCDemo { public static void main(String[]args)throws Exception { Book booknew Book(10001,AAA); book.setId(2003); } }
本程序在定义Book类的时候通过AtomicLongFieldUpdater类进行了id属性的内容更新操作但是此类操作更新的属性必须用volatile声明。
21.3.5 并发计算
使用原子操作类可以保证多线程访问下的数据操作安全性而为了进一步加强多线程下的计算操作所以从JDK1.8之后开始提供累加器(DoubleAccumulator、LongAccumulator)和加法器(DoubleAddler、LongAdder)的支持。但是原子性的累加器只是适合于进行基础的数据统计并不适合于其他更加细粒度的操作。
范例使用累加器计算
public class JUCDemo { public static void main(String[]args)throws Exception { DoubleAccumulator danew DoubleAccumulator((x,y)-xy,1.1);//累加器 System.out.println(da.doubleValue());//原始内容 da.accumulate(20);//加法计算 System.out.println(da.get());//获取数据 } }
本程序创建了一个累加器并且设置了一个计算表达式(DoubleBinaryOperator接口实现)随后利用原始数据进行了指定数据内容的增加并获取结果。
范例使用加法器计算
public static void main(String[]args)throws Exception { DoubleAdder danew DoubleAddler(); da.add(10); da.add(20); System.out.println(da.sum()); }
本程序定义了一个加法器同时设置了3个要进行加法计算的数字最后通过sum()方法获取加法结果.
21.4 ThreadFactory
多线程的执行类需要实现Runnable或Callable接口标准所以为了进一步规划线程类的对象产生JUC提供了一个ThreadFactory接口利用此接口可以获取THread类的实例化对象该接口定义如下 public interface ThreadFactory { //传入Runnable接口实例创建Thread类实例 //param r Runnable线程核心操作实现 //return Thread线程类对象 public Thread newThread(Runnable r); }
在开发中ThreadFactory接口的使用结构
范例使用ThreadFactory创建线程
public class JUCDemo { public static void main(String[]args) { Thread threadDefaultThreadFactory.getInstance().newThread(()- { System.out.println(多线程执行Thread.currentThread().getName()); }); thread.start(); } }
class DefaultThreadFactory implements ThreadFactory { //定义线程工厂实现类
private static final ThreadFactory INSTANCEnew DefaultThreadFactory(); private static final String titleAAA;//定义线程标记名称 private static int count0;//线程个数统计 private DefaultThreadFactory(){} public static ThreadFactory getInstance(){return INSTANCE;} Override public Thread newThread(Runnable run){return new Thread(run,TITLEcount);}//获取线程实例 }
本程序定义了一个ThreadFactory实现类这样就可以依据此接口实现Thread类实例的统一管理。
21.5 线程锁
传统的线程锁机制需要依赖synchronized同步与Object类中wait()方法、notify()方法进行控制然而这样的控制并不容易所以JUC中提供有一个新的框架锁再次框架中提供两个核心接口 Lock接口支持各种不同定义(公平机制锁、非公平机制锁、可重入锁) ReadWriteLock接口:针对线程的读或写提供不同的锁处理机制在数据读取时采用共享锁数据修改时使用独占锁这样就可以保证数据访问的性能
在JUC提供的锁几只钟提供有大量不同类型的锁处理类包括ReentranLock、StampedLock、LockSupport、Semaphore等
注意AQS操作支持 在JUC中AQS有3个支持类AbstractOwnableSynchronizer、AbstractQueuedSynchronizer、AbstractQueuedLongSynchronizer这3个类的主要功能是实现锁以及阻塞线程执行的功能。JUC中的许多同步类都依赖于AQS支持其中AbstractQueuedLongSynchronizer类提供同步状态的64为操作支持
21.5.1 ReentrantLock ReentrantLock提供了一种互斥锁(或成为独占锁机制)这样在同一个时间点内只允许有一个线程持有该锁而其他线程将等待与重新获取操作。ReentrantLock最大的特点在于它也属于一个可重用锁这就意味着该锁可以被单个线程重复获取
范例使用互斥锁实现多线程并发售票操作
class Ticket { //售票类该类不是线程子类 private int count3;//售票总数 private ReentrantLock reentrantLocknew ReentrantLock();//互斥锁(独占锁) //售票操作方法该方法不再使用synchronized进行同步处理每次只允许一个线程操作 public void sal() { this.reentrantLock.lock();//锁定 if(this.count0) { //票数有空余 TimeUnit.SECONDS.sleep(1);//模拟网络延迟 System.out.println(Thread.currentThread().getName()卖票票数剩余this.count--); }else { System.err.println(Thread.currentThread().getName()没票了); }
this.reentrantLock.unlock(); } }
public class JUCDemo { public static void main(String[]args) { Ticket ticketnew Ticket();//实例化内对象 for(int x0;x20;x) { new Thread(()-{ticket.sal();},售票员-x).start() } } }
本程序针对卖票程序中的线程控制并没有使用传统的syncronized进行锁定并且Ticket也不是Runnable或Callable线程类。在sal()方法中直接依据ReentranLock(本次为非公平机制)实现线程锁定(lock()方法)这样就保证只允许有一个现场恒进行买票的数据处理操作而当此线程解锁后(unlock()方法)其他线程将根据优先级抢占独占锁并进行卖票操作。
提示关于ReentrantLock公平锁与非公平锁的处理实现 互斥锁中针对所得获取可以使用lock()方法释放锁可以使用unlock()方法。同时需要注意的是锁的获取有两种不同的实现机制公平机制和非公平机制实际上这两种不同机制在进行锁或趋势的操作流程也有所不同lock()方法代码实现如下 public void lock(){sync.acquire(1);} acquire()方法·在AbstractQueuedSynchronizer类中的实现代码如下 public final void acquire(int arg) { if(!tryAcquire(arg)//尝试获取失败进入等待序列 acquireQueued(//获取队列 addWaiter(Node.EXCLUSIVE),arg))//加入CLH队列 selfInterrupt();//等待中被中断则自己中断 }
CLH是一个肺阻塞的FIFO队列。也就是说往里面插入或移除一个节点的时候在并发条件下不会产生阻塞而是通过自旋锁和CAS保证节点插入与移除的原子性。 公平锁和非公平锁的区别是在获取锁的机制上的区别。
21.5.2 ReentrantReadWriteLock 使用独占锁最大的特点在于其只允许一个线程进行操作这样在进行数据更新的时候可以保证操作的完整性但是在进行数据读取时独占锁会造成严重的性能问题。为了解决高并发下的快速访问与安全修改JUC提供了ReentrantReadWriteLock读写锁即在读取的时候上读锁写的时候上写锁这两种锁时呼呲的有JVM进行控制。其继承结果如图
在ReentrantReadWriteLock中读锁属于共享锁而写锁只允许一个线程进行操作所以在使用时就需要通过不同的方式获取锁 例readLock()获取读锁,writeLock()获取写锁。
范例使用读、写锁实现银行账户的并发写入与并发读取
class Account { //银行账户 private String name;//账户名称 private double asset;//账户资产 private ReadWriteLock readWriteLocknew ReentrantReadWriteLock();//读/写锁 public Account(STring name,double asset)//设置账户信息 { this.namename; this.assetasset; } public void saveMoney(double money) { //资产追加 this.readWriteLock.writeLock().lock();//获取写锁(独占锁 this.assetmony; TimeUnit.SECONDS.sleep(2);//模拟延迟 System.out.println(Thread.currentThread().getName()修改金额money当前总资产this.asset); this.readWriteLock.writeLock().unlock(); } public String toString() { this.readWriteLock.readLock().lock();//获取读锁 TimeUnit.SECONDS.sleep(2);//模拟延迟 System.out.println(Thread.currentThread().getName()修改金额money当前总资产this.asset); this.readWriteLock.readLock().unlock(); }
}
public static void main(String[]args) { Accountnew Account(AA,0.0); double[]moneyDatanew double[]{120,300,500,700,5000}; //5个写入线程 for(int x0;x5;x) { new Thread(()-{for(int y0;ymoneyData.length;y) { account.saveMoney(moneyData[y]);//存放金额 }}).start(); } for(int x0;x5;x)//5个读取线程 { new Thread(()-{ while(true) { System.err.println(account.toString());//获取数据 } }).start(); } }
本程序定义了一个Account类描述银行账户,随后启动了5个写线程和5个读线程。在程序执行时可以发现写线程采用了独占锁的处理方式多个写线程依次执行而读线程是会有多个线程并行读取这样保证了数据操作的正确性也实现了数据的快速读取。
21.5.3 StampedLock
读、写锁可以保证并发访问下的数据写入和读取性能但是在读线程非常多的情况下有可能会造成写线程的长时间泽德从而减少线程的调度次数。为此JUC针对读、写锁提出了改进方案提供了无障碍锁(StampedLock),使用这种锁的特点在于若干个读线程彼此之间不会相互影响但是依然可以保证多个写线程的独占操作。
范例使用StampedLock实现银行账户并发操作
class Account { //银行账户 private String name;//账户名称 private double asset;//账户资产 private ReadWriteLock readWriteLocknew ReentrantReadWriteLock();//读/写锁 public Account(STring name,double asset)//设置账户信息 { this.namename; this.assetasset; } public void saveMoney(double money) { //资产追加 long tampthis.stampedLock.readLock();//获取读锁检查状态 boolean flagtrue; long writeStampthis.stampedLock.tryConvertToWriteLock(stamp);//转为写锁 while(flag) { if(writeStamp!0) {//当前为写锁 stampwriteStamp;//修改为写锁的标记 this.assetmoney;//进行资产修改 TimeUnit.SECONDS.sleep(1);//模拟延迟 flagfalse; }else { //没有获取到写锁 this.stampedLock.unlockRead(stamp);//释放读锁 writeStampthis.stampedLock.writeLock();//获取写锁 stampwriteStamp; } }
this.stampedLock.unlock(stamp);//解锁 } public String toString() { long stampthis.stampedLock.tryOptimisticRead();//获取乐观锁 double currentthis.asset;//获取当前的资产 TimeUnit.SECONDS.sleep(1);//模拟延迟 //validate()方法虽然可以检测但是依然有可能出现异常所以本处依据StampedLock类的源代码多追加了一个验证机制 if(!this.stampedLock.validate(stamp)||(stamp(long)(Math.pow(2,7)-1))0) { //验证记录点有效性 long readStampthis.stampedLock.readLock();//获取互斥锁 currentthis.asset;//修改当前内容 stampreadStamp;//修改原始记录点 }
} public static void main(String[]args) { Accountnew Account(AA,0.0); double[]moneyDatanew double[]{120,300,500,700,5000}; //5个写入线程 for(int x0;x5;x) { new Thread(()-{for(int y0;ymoneyData.length;y) { account.saveMoney(moneyData[y]);//存放金额 }}).start(); } for(int x0;x5;x)//5个读取线程 { new Thread(()-{ while(true) { System.err.println(account.toString());//获取数据 } }).start(); } }
本县城产生了30个读线程以及个写线程在Account类中利用StampedLock类获取了读锁和写锁为了放置过多读取所造成的写线程阻塞所以在进行写入前都会判断读锁的状态并利用转换方法实现了读锁和写锁的转换这样就可以解决读写锁的缺陷。 21.5.4 Condition
在JUC中允许用户自己进行锁对象的创建而这种锁对象可以通过接口进行描述Ccondition提供了与Object类中类似的线程控制方法同时有用户自己来决定使用的锁既可以创建更多的锁来进行控制。
范例Condition基本使用
public class JUCDemo { public static String msgnull;//信息保存 public static void main(String[]args)throws Exception { Lock locknew ReentrantLock();//获取Lock接口实例 Condition conditionlock.newCondition();//获得一个写入锁 lock.lock();//获取锁 new Thread(()- { //获取锁 lock.lock(); System.out.println(Thread.currentThread().getName()进行数据处理); msgAAA; condition.signal();//唤醒其他等待线程 lock.unlock(); },数据处理线程).start(); condition.await();//主线程等待 lock.unlock(); } }
本程序通过Lock接口与Condition接口实现了主线程等待子线程的程序结构在本程序中通过Condition提供的await()和signal()两个方法实现了线程的锁定与唤醒操作。但是需要注意的是在使用Condition接口操作时必须使用lock.lock()处理方法否则代码中执行中就会出现异常。 Condition除了支持上面的功能外它更强大的地方在于能够更加精细地控制多线程的休眠和唤醒对于同一个锁开发者可以创建多个Condition在不同的情况下使用不同的Condition。例如现在要实现一个多线程并发读、写的缓冲区操作当向缓冲区写入数据后可以唤醒读线程当数据从缓冲区读取出来后可以唤醒写线程当缓冲区已经写满数据后则可以将“写线程”设置为等待状态当缓冲区为空时可以将“读线程”设置为等待状态当缓冲区为空时可以将“读线程”设置为等待状态。此类操作如果使用Object类中提供的方法(wait()、notify()、notifyAll()操作)是无法明确地进行“读线程”或“写线程”的而使用Condition就可以明确的唤醒指定的线程。
范例使用Condition实现数据缓存操作
class DataBuffer { private static final int MAX_LENGTH5;//保存元素的最大个数 private Lock locknew ReentrantLock();//实例化Lock private final Condition writeConditionlock.newCondition();//设置写Condition private final Condition readConditionlock.newCondition();//设置读Condition private final Object[]datanew Object[MAX_LENGTH];//数据存储空间 private int writeIndex0;//数据写索引 private int readIndex0;数据读索引 private int count0;//保存的元素个数 public void put(Object obj)throws Exception { this.lock.lock();//获取锁 if(this.countMAX_LENGTH){this.writeCondition.await();}//已达到最大缓存个数保存线程等待 this.data[this.writeIndex]obj;//保存数据修改指针 if(this.writeIndexMAX_LENGTH){this.writeIndex0;}//已达保存上线重置保存索引脚标 this.count;//修改元素保存个数 this.readCondition.signal();//唤醒消费线程 this.lock.unlock(); } public Object get() throws Exception { this.lock.lock();//获取锁 if(this.count0){this.readCondition.await();}//没有数据消费线程等待 Object takeDatathis.data[this.readIndex];//取出索引数据 if(this.readIndexMAX_LENGTH){this.readIndex0;}//重置索引 this.count--;//消费后保存个数-1 this.writeCondition.signal();//消费后唤醒保存线程 return takeData; this.loc.unlock(); } }
public class JUCDemo { public static void main(String[]args)throws Exception { DataBuffer buffernew DataBuffer();//实例化数据缓冲区 for(int x0;x10;x)//循环创建线程 { final int tempDatax;//匿名内部类的使用 new Thread(()-{ Thread.sleep(1);//模拟延迟 buffer.put(tempData);//保存数据 },PUT线程-x).start(); new Thread(()- { Thread.sleep(100); },Get线程x).start(); } } }
本程序针对读和写分别创见了两个Condition实例化对象随后依据缓存数据的个数并结合相应的Condition就可以实现准确地读/写两个先后才能的等待与唤醒。
21.5.5 LockSupport
Thread类从JDK1.2版本开始为了方式可能出现的死锁问题所以废除了Thread类中的一些线程控制方法(如suspend()、resume()),但是有部分开发者认为使用这几个飞出的方法实际上操作会更加直观。
范例使用LockSupport阻塞线程
public class JUCDemo { public static String msgnull;//信息保存 public static void main(String[]args)throws Exception { //获取主线程对象 Thread mainThreadThread,currentThread(); new Thread(()-{ System.out.println(Thread.currentThread().getName()); msgAAA; LockSupport.unpark(mainThread);//唤醒主线程 },数据处理-Thread).start();//启动子线程 LockSupport.park(mainThread);//阻塞主线程 } }
本程序直接利用LockSupport提供的park方法与unpark()方法简单地实现了子线程与主线程的同步操作可以发现LockSupport可以直接针对线程实例进行挂起与恢复处理。
21.5.6 Semaphore
在大多数情况下服务器提供的资源不是无限的所以当并发访问线程量较大时就需要针对所有的可用资源进行线程调度这一点类似于生活中的银行业务办理。例如在银行里并不是所有的业务窗口都会开启往往只开几个窗口如果线程办理银行业务的人较多那么这些人将会通过依次叫号的功能获取业务办理资格这样就可以实现有限资源的分配和调度在JUC中的Semaphore类就可以实现此类调度处理。
范例模拟银行办公业务(2个业务窗口、10位待办理人)
public class JUCDemo { public static void main(String[]args)throws Exception { Semaphore semnew Semaphore(2);//2个可用的资源 for(int x0;x10;x) { //循环创建并启动线程 new Thread(()- { sem.acquire();//资源抢占若无资源则等待 if(sem.availablePermits()0)//有空闲资源 { System.out.println(Thread.currentThread().getName()抢占资源成功); }else { System.err.println(Thread.currentThread().getName()资源抢占失败); } System.err.println(Thread.currentThread().getName()开始进行业务办理); TimieUnit.SECONDS.slee(2);//业务办理延迟 System.err.println(Thread.currentThread().getName()业务办理成功); sem.release(); },业务办理人员-x).start(); } } }
本程序通过Semaphore实现了两个资源的线程抢占与释放处理所有的线程都会根据有限的资源进行等待与唤醒处理。
21.5.7 CountDownLatch
CountDownLatch可以保证一组子线程全部执行完毕后再进行主线程的执行操作。例如在服务器主线程启动前可能需要启动若干子线程这是就可以使用CountDownLatch来进行控制。 CountDownLatch是通过一个线程个数的计数器实现的同步处理操作在初始化时可以为CountDownLatch设置一个线程执行总数这样当一个子线程执行完毕后都执行一个减一的操作当所有的子线程都执行完毕后CountDownLatch中保存的计数内容为0则主线程恢复执行
范例使用CountDownLatch进行线程操作(模拟机场接人)
public static void main(String[]args)throws Exception { CountDownLatch latchnew CountDownLatch(2);//要接两位客人 //循环启动线程 for(int x0;x2;x) { new Thread(()-{ System.out.println(Thread.currentThread().getName()上车); latch.countDown();//等待数量-1 },客人-x).start(); } latch.await();//等待 }
本程序利用CountDownLatch定义了要等待的子线程数量这样再统计数量不为0的时候主线程暂时挂起知道所有的子线程执行完毕(latch.countDown()进行-1操作)后主线程恢复运行。
21.5.8 CylicBarrier
CylicBarrier可以保证多个线程达到某一个公共屏障点(Common Barrier Point)的时候彩之星如果没有达到此屏障点那么线程将持续等待由于访客较多可能会猜中分批的模式进入(要求每满10个人才可以入场) CylicBarrier的实现就好比栅栏一样这样可以保证若干个线程的并发执行同时还可以利用此方法更新屏障点的状态进行更加方便的控制
范例使用CylicBarrier设置栅栏
public static void main(String[]args)throws Exception { CyclicBarrier cyclicBarriernew CyclicBarrier(2,()-{...});//等待栅栏 //循环创建线程 for(int x1;x5;x) { final int tempx; if(x3){TimeUnit.SECONDS.sleep(2);} } new Thread(()- { System.out.println(Thread.currentThread().getName()); if(temp3){cyclicBarrier.reset();}else{cyclicBarrier.await();} },执行者-x).start(); }
本程序通过CyclicBarrier设置的屏障点数量为2这样只有达到两个线程的时候才会接触锁定状态继续执行同时会执行CyclicBarrier子线程进行其他业务处理在线程等待期间也可以使用reset()方法重置栅栏技术。 提示CyclicBarrier和CountDownLatch的区别 CountDownLatch采用单次计数的形式完成并且该技术操作只允许执行一次如果在执行中线程出现了错误那么技术将无法重新开始。 CyclicBarrier采用屏障点的设计模式可以进行循环技术处理如果子线程出现了错误则也可以使用reset()方法进行重置CyclicBarrier能处理更为复杂的业务场景。
21.5.9 Exchanger
生产者和消费者模型需要有一个公共区域进行数据的保存于获取在JUC中专门提供了一个交换区域的程序类java.util.concurrent.Exchanger类
范例使用Exchanger实现数据交换
public static void main(String[]args)throws Exception { ExchangerStringexchangernew ExchangerString();//定义交换空间 boolean isEndfalse;//结束标记 new Thread(()- { String datanull; for(int x0;x2;x)//生产数据 { datax; System.out.println(beforexdata); Thread.sleep(1000); exchanger.exchange(data); System.out.println(afterxdata); } },信息生产者).start(); new Thread(()- { String datanull; while(!isEnd) { System.out.println(beforedata); Thread.sleep(2000); dataexchanger.exchange(null); System.out.println(afterdata); } }) }
本程序定义了生产者和消费者线程两个线程利用Echanger作为信息交换空间生产者向xchanger设置数据在消费者没有取走时将等待消费者取走后在继续进行生产。
21.5.10 CompletableFuture
JDK1.5提供的Future可以实现异步计算操作虽然Future的相关方法提供了异步任务的执行能力但是对于线程执行的结果的获取只能能采用阻塞和轮训的方式进行处理。阻塞的方式与多线程异步处理的初衷产生了分歧轮训的方式又造成CPU资源的浪费同时也无法及时的得到结果。为了解决这些设计问题从JDK1.8开始提供了Future的扩展实现类CompleteFuture可以帮助开发者简化异步编程的复杂性同时又可以结合函数时模式利用回调的方式进行异步处理计算操作。
范例使用CompletableFuture模拟炮兵听从命令打炮场景
public static void main(String[]args)throws Exception { //线程回调 CompletableFutureStringfuturenew CompletableFutureString();/线程回调 for(int x0;x2;x) { new Thread(()- { System.out.println(Thread.currentThread().getName()炮兵就绪); System.out.println(Thread.currentThread().getName()解除阻塞future.get()); },炮兵-x).start(); } new Thread(()- { TimeUnit.SECONDS.sleep(2); future.complete(开炮);//命令发出 }) } 21.6 并发集合
集合是数据结构的系统实现传统的Java集合大多属于“非线程安全的”虽然Java追加了Collections工具类以实现集合的同步处理操作但是其并发效率并不高。所以为了更好地支持高并发任务处理在JUC中提供了支持高并发的处理类同时为了保证集合操作的一致性这些高并发的集合类依然实现了集合标准接口如List、Set、Map、Queue。
范例传统集合进行多线程并发访问
public static void main(String[]args)throws Exception { //List集合 ListStringallnew ArrayListString(); for(int num0;num10;x) { new Thread(()- { for(int x0;x10;x) { all.add(Thread.currentThread().getName()); System.out.println(all); } },集合操作线程num).start(); } }
本程序通过循环产生了10个线程并且这10个线程对同一个集合进行数据保存于获取操作而此程序一旦执行就会产生集合并发修改异常即传统的集合都是围绕着单线程涉及展开的只有JUC提供的集合类才支持多线程并发操作。
21.6.1 并发单值集合类
CopyOnWriteArrayList是基于数组实现的并发集合访问类在使用此类进行数据的添加/修改/删除操作时都会创建一个新的数组并将更新后的数据复制到新建的数组中。由于每次更新操作都回建立新的数组所以在进行数据修改时CopyOnWriteArrayList类的性能并不高但是在数据遍历查找时性能会较高。在使用Iterator进行迭代输出时不支持数据删除(remove()方法)操作 提示CopyOnWriteArrayList部分源代码分析 CopyOnWriteArrayList采用复制与写入数组的形式组成所以在源代码中定义有以下成员属性。 private transient volatile Object[]array; 在定义array数组时使用volatile定义了盖度向数组这样就可以保证直接对原始数据进行操作同时为了保证安全的读、写操作还提供了一个互斥锁的成员属性。 final transient Object locknew Object(); 在JDK新版本中并没有直接使用ReentrantLook而是定义了一个Object对象在操作时利用synchronized进行锁定处理。这样在对数据进行“添加、修改、删除”操作时会先获取“互斥锁”但数据修改完毕后先将数据更新到volatile数组中然后在释放“互斥锁”以此实现数据保护的目的。
范例使用CopyOnWriteArrayList实现多线程并发访问
public static void main(String[]args)throws Exception { ListStringallnew CopyOnWriteArrayListString();//List集合 //循环定义线程 for(int num0;num10;num) { new Thread(()- { for(int x0;x10;x) { all.add(Thread.currentThread().getName()); } },集合线程操作num).start(); } }
本程序利用CopyOnWriteArrayList类实现了多线程的并发访问操作所以此时的代码中不会再抛出并发异常。与之相似的集合还有CopyOnWriteArraySet它提供了一种无序的线程安全集合结构可以理解为线程安全的HashSet实现但是与HashSet的区别在于HashSet是基于散列方式存放的而其是基于数组实现的。
范例使用CopyOnWriteArraySet操作 public static void main(String[]args)throws Exception { SetStringallnew CopyOnWriteArraySetString();//set集合 for(int num0;num10;num) { //循环定义线程 new Thread(()- { for(int x0;x1-;x) { all.add(Thread.currentThread().getName()); } },集合操作线程-num).start(); } }
本程序利用CopyOnWriteArraySet类实现了同样的多线程操作需要注意的是CopyOnWriteArraySet类的数据保存时依赖于CopyOnWriteArrayList类实现的所以不支持remove方法。
21.6.2 ConcurrentHashMap
ConconcurrentHashMap是线程安全的哈希表实现类在实现结构中他将哈希表分成许多片段每一个片段中除了保存有哈希数据外还提供了一个可重用的互斥锁以片段的形式实现多线程的操作即在同一个片段内除了保存有哈希数据外还提供了一个可重用的互斥锁以片段的形式实现多线程的操作即在同一个片段内多个线程访问是互斥的而不同片段的访问采用的是异步处理方式。
范例多线程访问ConcurrentHashMap
public static void main(String[]args)throws Exception { MapString,Intergermapnew ConcurrentHashMapString,Interger();//Map集合 for(int num0;num10;num) { new Thread(()- { map.put(AAA,x); },集合操作线程-num).start(); } }
本程序创建了ConcurrentHshMap集合这样该集合在进行并发访问控制时会根据所属的分段进行同步处理而未分段的部分可以直接进行并行读取。
21.6.3 跳表集合
跳表是一种与平衡二叉树性能类似的数据结构其主要是在有序链表上使用。在JUC提供的集合中有两个支持跳表操作的集合类型ConcurrrentSkipListMap、ConcurrentSkipListSet
范例使用ConcurrentSkipListMap集合
public static void main(String[]args)throws Exception { CountDownLatch latchnew CountDownLatch(10);//同步处理 MapString,Intergermapnew ConcurrentSkipListMapString,Interger(); //多线程访问跳表 for(int num0;num100;num) { //多线程访问跳表 new Thread(()- { map.put(Thread.currentThread().getName()x,x); latch.countDown(); },集合操作线程-num).start(); } latch.await(); }
本程序利用100个线程实现了Map集合数据的并发修改
范例使用oncurrentSkipListSet集合
public static void main(String[]args)throws Exception { CountDownLatch latchnew CountDownLatch(10);//同步处理 MapString,Intergermapnew ConcurrentSkipListMapString,Interger(); //多线程访问跳表 for(int num0;num100;num) { //多线程访问跳表 new Thread(()- { set.add(Thread.currentThread().getName()x,x); latch.countDown(); },集合操作线程-num).start(); } latch.await(); }
21.7 阻塞队列
队列是一种FIFO模式处理的集合结构可以利用队列进行批量数据的保存例如在传统的生产者和消费者模型中如果此时生产者的效率较高而消费者效率较低就可以通过队列保存生产的内容。
21.7.1 BlockingQueue
BlockingQueue类属于单端阻塞队列所有的数据将按照FIFO算法进行保存与获取BlockingQueue类提供有以下几个子类ArrayBlockingQueue(数组结构)、LinkedBlockingQueue(链表单段·阻塞队列)、SynchronousQueue(同步队列) ArrayBlockingQueue是一个利用数组控制形式实现的队列操作需要在其实例化时直接提供数组的长度也可以设置阻塞线程的公平非公平抢占原则。
范例使用rrayBlockingQueue实现生产者和消费者模型
public static void main(String[]args)throws Exception { BlockingQueueStringqueuenew ArrayBlockingQueueString(5);//5个队列容量 //10个生产者 for(int x0;x10;x) { new Thread(()- { TimeUnit.SECONDS.sleep(2); String msgThread.currentThread().getName()y; queue.put(msg);//队列保存数据 System.out.println(生产数据msg); },生产者-x).start(); } for(int x0;x2;x)//2个消费者 { new Thread(()- { while(true) { TimeUnit.SEDONDS.sleep(1);//延迟操作 System.err.println(queue.take()); } }) } }
本程序定义了10个生产者线程和2个消费者线程通过程序的执行结果可以发现消费者线程在队列未保存数据时会进行等待而如果生产者生产的数据超过了队列设置的长度也会进行等待消费者取消数据后才允许继续生产。 21.7.2 BlockingDeque
BlockingDeque为双端阻塞队列可以实现FIFO和FILO操作BlocingDeque只有LinkedBlockingDequeue一个实现子类。
范例使用双端阻塞队列实现生产者和消费者模型 public static void main(String[]args)throws Exception { //双端阻塞队列 BlockingDequeStringqueuenew LinkedBlockingDequeString(5); for(int x0;x10;x) { if(x%20) { new Thread(()- { for(int y0;y100;y) { TimeUnit.SECONDS.sleep(2);//操作延迟 String msgThread.currentThread().getName()y; queue.putFirst(msg); System.out.println([first]生产数据msg); } } ,MLDN生产者-x).start(); }else { new Thread(()- { TimeUnit.SECONDS.sleep(2); String msgThread.currentThread().getName()y; queue.putLast(msg); System.out.println(last生产数据msg); },lastMLDN生产者-x).start(); } }
for(int x0;x2;x) { new Thread(()- { int count0; while(true) { TimeUnit.SECONDS.sleep(2);//延迟操作 if(count%20) { System.err.println(FIRST取出queue.takeFirst()); }else { System.err.println(last取出queue.takeLast()); } } },消费者线程-x).start(); } }
本程序产生了20个生产者线程和2个消费者线程20个生产者线程分两批各自队列首尾数据保存2个消费者线程也依次进行首尾队列数据的取出。
21.7.3 延迟队列
在JUC中提供自动弹出数据的延迟队列DelayQueue,该类属于BlockingQueue接口子类而对于延迟操作的计算则需要通过Delayed接口进行计算。
范例使用延迟队列(模拟讨论会依次离开的场景)
public static void main(String[]args)throws Exception { BlockingQueueStudentqueuenew DelayQueueStudent();//定义延迟队列 queue.put(new Student(小李,2,TimeUnit.SECONDS));保存延迟队列信息 queue.put(new Student(小王,2,TimeUnit.SECONDS)); while(!queue.isEmpy()) { Student stuqueue.take(); System.out.println(stu); TimeUnit.SECONS.sleep(1); } }
class Student implements Delayed { //定义延迟计算 private String name;//姓名 private long expire;//离开时间 private long delay;//停留时间 public Student(String name,long delay,TimeUnit unit) { this.namename; this.delayTimeUnit.MILLISECONDS.convert(delay,unit);//转换时间为毫秒 this.expireSystem.currentTimeMills()this.delay();//失效时间计算 } public String toString() { return this.name同学已经达到了预计的停留时间TimeUnit.SECONDS.convert(this.delay,TimeUnit.MILLISECONDS)秒已经离开了 } Override//队列弹出计算 public int compareTo(Delayed obj) { return (int )(this.delay-this.getDelay(TimeUnit.MILLSECONDS)); } Override public long getDelay(TimeUnit unit) { //延迟时间计算 return unit.convert(this.expire-System.currentTimeMills(),TimeUnit.MILLISECONDS); } }
本程序实现了延迟队列的操作逻辑在队列中多保存的每一个元素内容每当时间一到(compareTo()进行比较getDelay()获取延迟时间)都会自动进行队列数据的弹出操作。 使用延迟队列的主要原因是他可以实现队列内容的定时清理操作那么基于这样的自动清理机制就可以实现数据缓存的操作控制这样的操作可以极大地提升项目的并发性能。
提示关于数据缓存的作用 在实际开发中如果是基于数据库的查询操作那么在多线程并发量较高的情况下就有可能产生严重的性能问题数据库的执行性能较低例如一个人们欣慰可能会有成千上万的访问量这个时候采用直接数据的读取模式非常不理智。为了解决这个问题可以采用缓存的模式将一些重要的数据直接放到缓存里面
当不同的线程查询相同数据实现判断缓存中是否有指定内容如果存在则直接进行读取如果不存在则再进行数据库加载。对于缓存中的内容还需要考虑无效数据的清理问题而有了延迟队列这种自动弹出的极值存在。这一操作的实现就会变得非常容易。 本次实现一个新闻数据的缓存操作考虑到可能会保存多个数据所以将通过Map集合实现存储。同时考虑到缓存数据的修改安全性问题将使用ConcurrentHashMap子类另外对于数据的弹出操作将通过守护线程进行处理。
范例实现缓存操作
public static void main(String[]args)throws Exception { CacheLong,Newscachenew CacheLong,News();定义缓存类对象 cache.put(1L,new News(1L,AAA));//向缓存保存数据 cache.put(1L,new News(2L,AAA));//向缓存保存数据 cache.put(1L,new News(3L,AAA));//向缓存保存数据 System.out.println(cache.get(1L)); System.out.println(cache.get(2L));//通过缓存获取数据 TimeUnit.SECONDS.sleep(5);//延迟获取 System.out.println(cache.get(1L)); }
class CacheK,V { //定义一个缓存数据处理类 private static final TimeUnit TIMETimeUnit.SECONDS;//时间工具类 private static final long DELAY_SECONDS2;//缓存时间 private MapK,VcacheObjectsnew ConcurrentHashMapK,V();//设置缓存集合 private BlockingQueueDelayedItemPairqueuenew DelayQueueDelayedItemPair(); //启动守护线程 public Cache { Thread threadnew Thread(()- { while(true) { DelayedItemPairitemCache.this.queue.take();//数据消费 if(item!null) { //存在数据 Pair pairitem.getItem();//获取内容 Cache.this.cacheObjects.remove(pair.key,pair.value);//删除数据 } } }); thread.setDaemon(true);//设置后台线程 thread.start();//线程启动 } public void put(K key,V value)throws Exception { V oldValuethis.cacheObjects.put(key,value);//数据保存 if(oldValue!null) { this.queue.remove(oldValue);//删除已有的数据 } this.queue.put(new DelayedItemPair(new Pair(key,value),DELAY_SECONDS,TIME));//重新保存 } public V get(K key){return this.cacheObjects.get(key);}//Map查询 private class Pair { private K key; private V value;//数据value public Pair(K key,V value) { this.keykey; this.valuevalue; } } private class DelayedItemTimplements Delayed { //延迟数据保存项 private T item;//数据项 private long delay;//保存时间 private long expire;//失效时间 public DelayedItem(T item,long delay,TimeUnit unit) { this.itemitem; this.delayTimeUnit.MILLISECONDS.convert(delay,unit); this.expireSystem.currentTimeMills()this.delay; } Override public int compareTo(Delayed obj){return (int)(this.delay-this.getDelay(TimeUnit.MILLISECONDS));} Override public long getDelay(TimeUnit unit) { return unit.convert(this.expire-System.getDelay(TimeUnit.MILLISECONDS)); } Override public long getDelay(TimeUnit unit) { return unit.convert(this.expire-System.currentTimeMillis(),TimeUnit.MILLSECONDS); } public T getItem(){return this.item;} } }
class News { //新闻数据 private long nid; private String title; public News(Long nid,String title){this.nidnid;this.titletitle;} public String toString(){return this.nidthis.title;} }
本程序实现了一个数据缓存的处理操作在程序中考虑到缓存数据的到时自动清除问题所以使用了延迟队列保存所有的数据信息同时还有一份数据信息保存在Map集合中。为了保证延迟队列中的数据弹出后可以进行Map集合相应数据的删除所以定义了一个守护线程接收延迟队列弹出的内容由于本程序设定的缓存时间为两秒这样当两秒一国数据就会自动删除。
21.8 线程池
多线程技术的出现大大提升了程序的处理性能但是过多的线程一定会带来线程资源调度的损耗例如线程的创建与回收这样就会导致程序的响应速度变慢。为了实现合理的线程操作就需要提高线程的可管理型并且降低资源损耗所以在JUC中提供了线程池的概念。 Executors类能够创建的线程类一共有4中其主要作用如下 缓存线程池(CachedThreadPool):线程池中的每个子线程都可以重用白存了所有的用户线程并且随着处理量的增加可以持续进行用户线程的创建。 固定大小线程池(FixedThreadPool):保存所有的内核线程这些内核线程可以被不断重用并且不保存任何的用户线程。 单线程池(SingleThreadPool):只维护一个内核线程所有执行者依据顺醋排队获取线程资源。 定时调度池(ScheduledThreadPool):按照计划周期性地完成线程中的任务包含内核线程与用户线程可以提供许多的用户线程。
提示关于用户线程与内核线程的区别 多线程的实现过程本身依赖于操作系统但同时也依赖于所使用的平台。例如JDK最初发展的时候只有单核CPU所以当时一类与软件平台的实现。用户线程和内核线程的区别如下 用户线程可以在不支持多线程的系统中存在内核线程需要操作系统与硬件的支持。 在只有用户线程的系统中CPU调度依然以进程为单位处于运行进程中的多个线程是通过程序实现轮换执行而有内核支持的多线程系统中CPU调度以线程为单位并由操作系统调度。 用户线程通过进程划分一个进程系统只为其分配一个处理器所以用户线程无法调用系统的多核处理器而内核线程可以调度一个程序再多核处理器上执行提高处理性能。 系统只能将导致其所属进程被中断而内核线程执行系统指令调用时只导致该线程被中断。
21.8.1 创建线程池
public static void main(String[]args)throws Exception { //缓存线程池 ExecutorService serviceExecutors.newCachedThreadPool();//缓存线程池 //创建1000个线程 for(int x0;x1000;x) { //创建1000线程 for(int x0;x1000;x) { service.submit(()- { System.out.println(Thread.currentThread().getId()-Thread.currentThread().getName()); }); } service.shutdown();//关闭线程池 } } 本程序创建了一个缓存线程池由于没有设置其长度限制所以只要线程池中的线程不够用则会自动创建新的线程而线程的数量最多不超过Interger.MAX_VALUE个。
提示关于execute()与submit()方法 通过executors获取的线程池实际上都是Executor接口的实例而在Executor接口中提供有一个execute()方法可以进行多线程保存。在本程序中使用submit()方法submit()方法追加了执行任务是否为空的判断如果为空会排除异常)最终调用execute方法执行。 而在线程池中实际上有3个核心的概念需要注意 task:表示真正要执行的线程任务但是所有的线程任务在追加后并不会立刻执行。 worker:所有线程池中的任务都需要通过worker来执行所有的worker数量受到线程池容量的限制内核线程 reject:拒绝策略如果线程池中的线程已经满了就可以选择离开或等待。 所有都需要等待分配线程后才会被真正执行而当线程池容量已经达到上线后也会对新加入的线程采用核心的拒绝策略
范例创建固定长度线程池 ExecutorService serviceExecutors.newSingleThreadExecutor();//单线程池 范例设置单线程池 ecutorService serviceExecutors.newSingleThreadExecutor(); 范例设置线程调度池 public static void main(String[]args)throws Exception { //创建定时调度器并且设置允许的内核线程数量为1 ScheduledExecutorService executorServiceExecutors.newScheduledThreadPool(1); for(int x0;x10;x) { int indexx; //设置调度任务操作单位为秒3秒后开始执行每2秒执行一次 executorService.scheduleAtFixedRate(new Runnable() { Override public void run(){System.out.println(Thread.curretThread().getName()xindex);} },3,2,TimeUnit.SECONDS); } }
本程序定义了一个大小的线程调度池这样所有追加的线程每隔2秒就会执行一次调度按顺序执行。如果在线程池中传入了一个Callable接口实例那么也可以利用Future接口获取线程的返回结果在ExecutorService接口中提供invokeAny与invokeAll两个方法可以实现一组Callable实例的执行。
范例执行一组Callable实例
public static void main(String[]args)throws Exception { SetCallableStringallThreadsnew HashSetCallableString();//保存多个向成都向 //创建线程 for(int x0;x5;x) { final int tempx; allThreads.add(()- { return Thread.currentThread().getId()-Thread.currentThread().getName()数量temp; }); } ExecutorService serviceExecutors.newFixedThreadPool(3);//创建定长线程池 ListFutureStringresultsservice.invokeAll(allThreads);//执行线程对象 for(FutureStringfuture:results){System.out.println(future.get());} }
本程序通过Set集合保存了多个执行线程由于只开辟了一个定长为3的线程池这些集合中的线程将依次进行线程资源抢占并执行程序通过invokeAll()方法同时执行接收了集合中线程的返回结果。
21.8.2 CompletgionService
CompletionService是一个异步处理模式其主要的功能是可以异步获取线程池的返回结果。CompletionService将Executor(线程池)和BlocingQueue(阻塞队列)结合在一起同事主要使用allable定义线程任务。整个操作中就是生产者不断地将Callable线程任务保存进阻塞队列而后线程池作为消费者不断的把线程池中的任务取出并且返回结果。
范例使用CompletionService接口获取异步执行任务接口。 class ThreadItem implements CallableString { //线程体 Override public String call()throws Exception { long timeMillsSystem.currentTimeMillis();//当前时间戳 System.out.println(startThread.curretThread().getName()); Thread.sleep(1000); System.out.println(endThread.currentThread().getName()); return Thread.currentThread().getName()timeMillis(); } } public class JUCDemo { public static void main(String[]args)throws Exception { //创建线程池 ExecutorService serviceExecutors.newCachedThreadPool(); //创建一个异步处理任务并且该异步处理任务需要接收一个线程池实例 CompletionServiceStringcompletionnew ExecutorCompletionServiceString(service); //信息生产者 for(int i0;i10;i) { completion.submit(new ThreadItem());//提交线程 } for(int i0;i10;i){System.out.pritnln(获取数据completion.take().get());} service.shutdown(); } } 本程序通过CompletionService基于已有的线程池构建了一个异步任务由于其内部会自动弹出一个阻塞队列这样所有Callable任务执行完成后可以通过take()方法获取线程任务执行结果。
21.8.3 ThreadPoolExecutor
通过Executors类可以实现线程池的创建而通过Executors类创建的所有线程池都是基于ThreadPoolExecutor类实现的创建。在一些特殊环境下开发者也可以直接利用Threaxecutor类结合阻塞队列与拒绝策略创建属于自己的线程池。 提示Executors类创建线程池源代码分析 如果要想清楚Executors类与ThreadPoolExexutor类之间的关系最好的方法就是查看Executors类中线程池创建操作的源代码。
缓存线程池public static ExecutorService newCachedThreadPool(){return new ThreadPoolExecutor(0,Integer.MAX_VALUE,60L,TimeUnit.SECONDS,new SynchronousQueueRunnable());}
在线程池创建时会调用ThreadPoolExecutor类的构造方法该构造方法定义如下 public ThreadPoolExecutor ( int corePoolSize,//内核线程数量 int maxinumPoolSize,//线程池最大程度 long keepAliveTime,//每个线程的存活时间 TimeUnit unit,//存活时间单位 BlockingQueueRunnableworkQueue,//阻塞队列、work ThreadFactory threadFactory,//线程工厂 RejectedExecutionHandler handler//线程拒绝策略 )
由于线程池中的资源是有限的所以对于超过线程池容量的部分任务线程将拒绝执行为此定义了以下4种拒绝策略 ThreadPoolExecutor.AbortPolicy:当任务添加到线程池中被拒绝的时候会抛出异常。 ThreadPoolExecutor.CallerRunsPolicy:当任务被拒绝的时候会在线程池当前正在执行线程的worker里处理此线程即加塞现在已有一个线程在worker里正在执行于是将这个线程踢走还我执行 ThreadPoolExecutor.DiscardOldestPolycy:当被拒绝的时候线程池会放弃队列中等待最长的时间的任务并且将被拒绝的任务添加到队列中。 ThreadPoolExecutor.DiscardPolicy:当任务添加被拒绝的时候将直接丢弃该线程 范例使用ThreadPoolExecutor创建线程池
publc static void main(String[]args)throws Exception { BlockingQUEUERunnablequeuenew ArrayBlockingQueueRunnable(2);//阻塞队列 //通过ThreadPoolExecutor创建线程池该线程池有2个内核线程每个线程的存活时间为6秒 ThreadPoolExecutor executornew ThreadPoolExecutor(2,2,6L,TimeUnit.SECONDS,queue,Executor.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy()); for(int x0;x5;x) { executor.submit(()- { System.out.println(beforeThread.currentThread().getName()); TimeUnit.SECONDS.sleep(2); System.out.println(afterThread.currentThread().getName()); }); } }
本程序采用手动的方式设置了一个线程池同时设置了拒绝策略为AbortPolicy这样当线程池中的执行线程已经被占满时将抛出异常。
21.9 ForkJoinPool
在JDK1.7之后为了充分利用多核CPU的性能优势可以将一个复杂的业务进行拆分交由多台CPU进行计算这样就可以提高程序的执行性能。ForkJoinPool类可以看错一个特殊的Executor执行器这个框架包括以下两个操作 分解操作将一个大型业务拆分成若干个小人物在框架中执行。 合并操作主任务将等待多个子任务完毕后进行结果合并 在ForkJoinPool中需要通过ForkJoinTask定义执行任务
范例创建有返回值的分支任务 public static void main(String[]args)throws Exception { //创建分支任务 SumTasknew SumTask(0,100);//创建分支任务 FkJoinPool poolnew ForkJoinPool();//分支任务池 FutureIntergerfuturepool.submit(task);//提交分支任务 System.out.println(future.get()); } SuppressWarnings(serial) class SumTask extends RecursizeTaskInterger { //有返回结果 private int start; private int end; public SumTask(int start,int end) {this.startstart;this.endend;} Override protexted Interger compute() { int sum0; if(this.end-this.start100) { for(int xthis.start;xthis.end;x){sumx;} }else { int middle(startend)/2;//计算中间值 SumTask leftTasknew SumTask(this.start,middle); SumTask rightTasknew SumTask(middle1,this.end); leftTask.fork(); rightTask.fork(); sumleftTask.join()rightTask.join(); } return sum; } }
本程序通过分支合并任务的处理模式开启了两个分支这两个分支分别要进行各自的数学累加操作由于RecursiveTask采用返回值的方式返回了计算结果所以不同分支可以通过join()方法获取计算结果并完成最终的计算。