seo优化网站的注意事项,网站常用模块,深圳网站设计专业乐云seo,网站推广方式推荐前言今天我们讲两个内容#xff0c;第一个是JMH#xff0c;第二个是Disruptor。这两个内容是给大家做更进一步的这种多线程和高并发的一些专业上的处理。生产环境之中我们很可能不自己定义消息队列#xff0c;而是使用Disruptor。我们生产环境做测试的时候也不是像我说的那样…前言今天我们讲两个内容第一个是JMH第二个是Disruptor。这两个内容是给大家做更进一步的这种多线程和高并发的一些专业上的处理。生产环境之中我们很可能不自己定义消息队列而是使用Disruptor。我们生产环境做测试的时候也不是像我说的那样写一个start写一个end就测试完了。在这里给大家先介绍专业的JMH测试工具在给大家介绍Disruptor号称最快的消息队列。JMH -java Microbenchmark Harness微基准测试它是测的某一个方法的性能到底是好或者不好换了方法的实现之后他的性能到底好还是不好。这个测试的框架是2013年发出来的由JLT的开发人员开发后来归到了OpenJDK下面。官网http://openjdk.java.net/projects/code-tools/jmh/下面我们来介绍什么是一个JMH他是用来干什么的我们来看到底怎么使用给大家一个简单的介绍肯定是了解不了jmh是个什么东西已经把这个步骤给大家总结一篇文档官网在哪里怎么样去创建一个JMH的测试创建一共大致有七个步骤还有他的一些基本概念什么叫预热什么叫Mesurement等等的还有进一步了解的官方地址。JMH Java准测试工具套件什么是JMH官网http://openjdk.java.net/projects/code-tools/jmh/创建JMH测试1. 创建Maven项目添加依赖我们需要添加两个依赖1.1jmh-core (jmh的核心)1.2jmh-generator-annprocess(注解处理包)?xml version1.0 encodingUTF-8?4.0.0UTF-8UTF-81.81.81.8mashibing.comHelloJMH21.0-SNAPSHOTorg.openjdk.jmhjmh-core1.21org.openjdk.jmhjmh-generator-annprocess1.21test2. idea安装JMH插件 JMH plugin v1.0.3JMH这个东西你要想真正的安安静静的去运行就不会去影响我们正常程序的执行最好的方式就是按照官网的说法是命令行的方式比方说你要测试某一个包里面的类的话你应该把这个类和其他的依赖类打成一个jar包然后单独的把这个jar包放到某一个机器上在这个机器上对这个jar包进行微基准的测试如果对它进行测试的比较好那说明最后的结果还可以如果说边开发边进行这种微基准的测试实际上他非常的不准因为你的开发环境会对结果产生影响。只不过我们自己开发人员来说平时你要想进行一些微基准的测试的话你要是每次打个包来进行正规一个从头到尾的测试 完了之后发现问题不对再去重新改效率太低了。所以在这里教大家的是怎么样在IDE里面来进行微基准的测试。idea安装JMH插件fifile-Settings-Plugins-JMH-plugin。它运行的时候需要这个plugin的支持如果你用命令行是不需要这些东西的。3. 由于用到了注解打开运行程序注解配置因为JMH在运行的时候他用到了注解注解这个东西你自己得写一个程序得解释他所以你要把这个给设置上允许JMH能够对注解进行处理compiler - Annotation Processors - Enable Annotation Processing4. 定义需要测试类PS (ParallelStream)看这里写了一个类并行处理流的一个程序定义了一个list集合然后往这个集合里扔了1000个数。写了一个方法来判断这个数到底是不是一个质数。写了两个方法第一个是用forEach来判断我们这1000个数里到底有谁是质数第二个是使用了并行处理流这个forEach的方法就只有单线程里面执行挨着牌从头拿到尾从0拿到1000但是并行处理的时候会有多个线程采用ForkJoin的方式来把里面的数分成好几份并行的进行处理。一种是串行处理一种是并行处理都可以对他们进行测试但需要注意这个基准测试并不是对比测试的你只是测试一下你这方法写出这样的情况下他的吞吐量到底是多少这是一个非常专业的测试的工具。严格的来讲这部分是测试开发专业的。package com.mashibing.jmh;import java.util.ArrayList;import java.util.List;import java.util.Random;public class PS {static List nums new ArrayList();static {Random r new Random();for (int i 0; i 10000; i) nums.add(1000000 r.nextInt(1000000));}static void foreach() {nums.forEach(v-isPrime(v));}static void parallel() {nums.parallelStream().forEach(PS::isPrime);}static boolean isPrime(int num) {for(int i2; inum/2; i) {if(num % i 0) return false;}return true;}}5. 写单元测试这个测试类一定要在test package下面我对这个方法进行测试testForEach很简单我就调用PS这个类的foreach就行了对它测试最关键的是我加了这个注解Benchmark这个是JMH的注解是要被JMH来解析处理的这也是我们为么要把那个Annotation Processing给设置上的原因非常简单你只要加上注解就可以对这个方法进行微基准测试了点击右键直接run。package com.mashibing.jmh;import org.openjdk.jmh.annotations.Benchmark;import static org.junit.jupiter.api.Assertions.*;public class PSTest {BenchmarkWarmup(iteration1, time3)//在专业测试里面首先要进行预热预热多少次预热多少时间Fork(5)//意思是用多少个线程去执行我们的程序BenchmarkMode(Mode.Throughput)//是对基准测试的一个模式这个模式用的最多的是Throughput吞吐量Measurement(iteration1, time3)//是整个测试要测试多少遍调用这个方法要调用多少次public void testForEach() {PS.foreach();}}6. 运行测试类如果遇到下面的错误ERROR: org.openjdk.jmh.runner.RunnerException: ERROR: Exception while tryingto acquire the JMH lock (C:WINDOWS/jmh.lock): C:WINDOWSjmh.lock (拒绝访问。), exiting. Use -Djmh.ignoreLocktrue to forcefully continue.at org.openjdk.jmh.runner.Runner.run(Runner.java:216)at org.openjdk.jmh.Main.main(Main.java:71)这个错误是因为JMH运行需要访问系统的TMP目录解决办法是打开RunConfifiguration - Environment Variables - include system environment viables7. 阅读测试报告JMH中的基本概念1. Warmup预热由于JVM中对于特定代码会存在优化(本地化)预热对于测试结果很重要2. Mesurement总共执行多少次测试3. Timeout4. Threads线程数由fork指定5. Benchmark mode基准测试的模式6. Benchmark测试哪一段代码next做个是JMH的一个入门严格来讲这个和我们的关系其实并不大这个是测试部门干的事儿但是你了解一下没有特别多的坏处你也知道你的方法最后效率高或者底可以通过一个简单的JMH插件来帮你完成你不要在手动的去写这件事儿了。如果说大家对JMH有兴趣你们在工作中可能会有用的上大家去读一下官方的例子官方大概有好几十个例子程序你可以自己一个一个的去研究。官方样例http://hg.openjdk.java.net/code-tools/jmh/fifile/tip/jmh-samples/src/main/java/org/openjdk/jmh/samples/Disruptor按照英文翻译的话Disruptor应该是分裂、瓦解。这个Disruptor是一个做金融的、做股票的这样一个公司交易所来开发的为自己来开发的这么一个底层的框架开发出来之后受到了很多的认可开源之后2011年获得Duke奖。如果你想把它用作MQ的话单机最快的MQ。性能非常的高主要是它里面用的全都是cas另外把各种各样的性能开发到了极致所以他单机支持很高的一个并发。Disruptor不是平时我们学的这个redis、不是平时我们所学的kafka他可以跟他们一样有类似的用途但他是单机redis、kafka也可以用于集群。redis他有这种序列化的机制就是你可以把它存储到硬盘上或数据库当中是可以的kafka当然也有Disruptor没有Disruptor就是在内存里Disruptor简单理解就是内存里用于存放元素的一个高效率的队列。介绍关于Disruptor的一些资料给大家列在这里。主页http://imax-exchange.github.io/disruptor/源码https://github.com/LMAX-Exchange/disruptorGettingStarted:https://github.com/LMAX-Exchange/disruptor/wiki/Getting-Startedapi:http://imax-exchange.github.io/disruptor/docs/index.htmlmaven:https://mvnrepository.com/artifact/com.imax/disruptorDisruptor叫无锁、高并发、环形Buffffer直接覆盖(不用清除)旧的数据降低GC频率用于生产者消费者模式(如果说按照设计者角度来讲他就是观察者模式)。什么叫观察者模式想象一下我们在前面学各种各样的队列的时候队列就是个容器好多生产者往里头扔东西好多消费者从里头往外拿东西。所谓的生产者消费者就是这个意思为什么我们可以叫他观察者呢因为这些消费者正在观察着里面有没有新东西如果有的话我马上拿过来消费所以他也是一种观察者模式。Disruptor实现的就是这个容器Disruptor核心与特点Disruptor也是一个队列和其他队列不一样的是他是一个环形队列环形的Buffffer。一般情况下我们的容器是一个队列不管你是用链表实现还是用数组实现的它会是一个队列那么这个队列生产者这边使劲往里塞消费者这边使劲往外拿但Disruptor的核心是一个环形的buffffer。对比ConcurrentLinkedQueue链表实现这种环形的buffffer速度就是更快同学们可以去查一下JDK自带的容器你会发现效率比较高的有各种各样的队列如果不想阻塞就可以用Concurrent相关的ConcurrentLinkedQueue是并发的用链表实现的队列它里面大量的使用了cas因此它的效率相对比较高可是对于遍历来讲链表的效率一定会比数组低。JDK中没有ConcurrentArrayQueue因为数组的大小的固定的如果想扩展的话就要把原来的数组拷贝到新数组里每次加都要拷贝这个效率相当底所以他并没有给大家加这个叫ConcurrentArrayQueue但是Disruptor就非常牛X想到了这样一个办法就是把数组的头尾相连。Disruptor是用数组实现的这样的一个队列你可以认为Disruptor就是用数组实现的ConcurrentArrayQueue另外这个Queue是首尾相连的.那Disruptor用数组实现的环形的就比上面两个都牛吗牛在哪为啥呢如果我们用ConcurrentLinkedQueue这里面就是一个一个链表这个链表遍历起来肯定没有数组快这个是一点。还有第二点就是这个链表要维护一个头指针和一个尾指针我往头部加的时候要加锁往尾部拿的时候也要加锁。另外链表本身效率就偏低还要维护两个指针。关于环形的呢环形本身就维护一个位置这个位置称之为sequence序列这个序列代表的是我下一个有效的元素指在什么位置上就相当于他只有一个指针来回转。加在某个位置上怎么计算直接用那个数除以我们整个的容量求余就可以了。RingBuffffer是一个环形队列RingBuffffer的序号指向下一个可用的元素采用数组实现没有首尾指针对比ConcurrentLinkedQueue用数组实现的速度更快假如长度为8当添加到第12个元素的时候在哪个序号上呢用12%8决定当Buffffer被填满的时候到底是覆盖还是等待由Produce决定长度设为2的n次幂利于二进制计算例如12%812(8-1)如果大家对于位运算有疑问的在咱们网站上有一个菜鸟预习里面有一部分是二进制大家去翻看一下。由于它会采用覆盖的方式所以他没有必要记头指针没有必要记尾指针。我只要记一个指针放在这就可以了。在这点上依然要比ConcurrentLinkedQueue要快。那我生产者线程生产的特别多消费者没来得及消费那我在往后覆盖的话怎么办不会那么轻易的让你覆盖的我们是有策略的我生产者生产满了要在生产一个的话就马上覆盖这个位置上的数了。这时候是不能覆盖的指定了一个策略叫等待策略这里面有8种等待策略分情况自己去用。最常见的是BlockingWait满了我就在这等着什么时候你空了消费者来唤醒一下就继续。Disruptor开发步骤开发步骤是比较固定的一个开发步骤。1定义Event-队列中需要处理的元素。在Disruptor他是每一个消息都认为是一个事件在他这个概念里就是一个事件所以在这个环形队列里面存的是一个一个的Event。2定义Event工厂用于填充队列那这个Event怎么产生就需要指定Event的工厂。3定义EventHandler(消费者)处理容器中的元素那这个Event怎么消费呢就需要指定Event的消费者EventHandler。下面我们直接看程序先看来自官网的几个辅助程序LongEvent这个事件里面或者说消息里面装的什么值我只装了一个long值但这里面可以装任何值任何类型的都可以往里装这个long类型的值我们可以指定它set官网上没有toString方法我给大家加了一段主要是为了打印消息让大家看的更清楚。package com.mashibing.disruptor;public class LongEvent{private long value;public void set(long value){this.value value;}Overridepublic String toString(){return LongEvent{ value value };}}然后呢我需要一个EventFactory就是怎么产生这些个事件这个Factory非常简单LongEventFactory去实现EventFactiry的接口去重写它的newInstance方法直接new LongEvent。构建这个环的时候为什么要指定一个产生事件的工厂我直接new这个事件不可以吗但是有的事件里面的构造方法不让你new呢产生事件工厂的话你可以灵活的指定一些 这里面也是牵扯到效率的。底层比较深我给大家解释一下这里牵扯效率问题因为Disruptor初始化的时候会调用Event工厂对ringBuffffer进行内存的提前分配GC频率会降低。package com.mashibing.disruptor;import com.lmax.disruptor.EventFactory;public class LongEventFactory implements EventFactiry{Overridepublic LongEvent newInstance(){return new LongEvent();}}在看第三个叫LongEventHandlerHandler就是我拿到这个事件之后该怎么样进行处理所以这里是消息的消费者怎么处理呢很简单我处理完这个消息之后呢就记一个数总共记下来我一共处理了多少消息了处理消息的时候默认调用的是onEvent方法这个方法里面有三个参数第一个是你要处理的那个消息第二个是你处理的是哪个位置上的消息第三个是整体的消息结束没结束是不是处理完了。你可以判断他如果是true的话消费者就可以退出了如果是false的话说明后面还有继续消费。package com.mashibing.disruptor;import com.lmax.disruptor.EventHandler;public class LongEventHandler implements EventHandler{/****param event*param sequence RingBuffer的序号*param endOfBatch 是否为最后一个元素*throws Exception**/public static long count 0;Overridepublic void onEvent(LongEvent event,long sequence,boolean endOfBatch) throwsException{count;System.out.println([Thread.currentThread().getName()]event序号sequence);}}所以我们定义了这三个类关于这三个类在给大家解释一下我们现在有一个环然后这个环上每一个位置装LongEvent怎么产生这个LongEvent通过这个LongEventFactory的newInstance方法来产生当我拿到这个Event之后通过LongEventHandler进行处理。到现在我们把这三个辅助类都已经定义好了定义好的情况下我们怎么才能比较有机的结合在一起让他在Disruptor进行处理呢看第一个小例子程序首先把EvenFactory给他初始化了newLongEventFactory我们这个环应该是2的N次方1024然后new一个Disruptor出来需要指定这么几个参数factory产生消息的工厂bufffferSize是指定这个环大小到底是多少defaultThreadFactory线程工厂指的是当他要产生消费者的时候当要调用这个消费者的时候他是在一个特定的线程里执行的这个线程就是通过defaultThreadFactory来产生继续往下看当我们拿到这个消息之后怎么进行处理啊我们就用这个LongEventHandler来处理。然后start当start之后一个环起来了每个环上指向的这个LongEvent也得初始化好内存分配好了整个就安安静静的等待着生产者的到来。看生产者的代码long sequence ringBuffffer.next()通过next找到下一个可用的位置最开始这个环是空的下一个可用的位置是0这个位置拿到这个位置之后直接去ringBuffffer里面get(0)这个位置上的event。如果说你要是追求效率的极致你应该是一次性全部初始化好你get的时候就不用再去判断如果你想做一个延迟很不幸的是你每次都要做判断是不是初始化了。get的时候就是拿到一个event这个是我们new出来的默认的但是我们可以改里面的event.set( 值...)填好数据之后ringBuffffer.publish发布生产。package com.mashibing.disruptor;import java.util.concurrent.Executor;import java.util.concurrent.Executors;import com.lmax.disruptor.dsl.Disruptor;import com.lmax.disruptor.RingBuffer;import com.lmax.disruptor.util.DaemonThreadFactory;import java.nio.ByteBuffer;public class Main01{public static void main(String[] args) thrwos Exception{//the factory for the eventLongEvenFactory factory new LongEventFactory();//Specify the of the ring buffer,must be power of 2.int bufferSize 1024;//Construct the DisruptorDisruptor disruptor new Disruptor(factory,bufferSize,Executors.defaultThreadFactory());//Connect the handlerdisruptor.handleEventsWith(new LongEventHandler());//Start the Disruptor,start all threads runningdisruptor.start();//Get the ring buffer form the Disruptor to be used for publishing.RingBuffer ringBuffer disruptor.getRingBuffer();//官方例程long sequence ringBuffer.next();//Grab the next sequencetry{LongEvent eventringBuffer.get(sequence);//Get the entry in theDisruptor//for the sequenceevent.set(8888L);//Fill with data}finally{ringBuffer.publish(sequence);}}}disruptor在后面提供了一些Lambda表达式的写法为了支持这种写法对整个消息的构建过程做了改进读下面02小程序使用translator就是怎么样构建这个消息原来我们都是用消息的factory但是下面这次我们用translator对他进行构建就是把某一些数据翻译成消息。前面产生event工厂还是一样然后bufffferSize后面再扔的是DaemonThreadFactory就是后台线程了new LongEventHandler然后start拿到他的ringBuffffer前面都一样。只有一个地方叫EventTranslator不一样我们在main01里面的代码是要写try catch然后把里面的值给设好相当于把这个值转换成event对象。相对简单的写法它会把某些值转成一个LongEvent通过EventTranslator。new出来后实现了translateTo方法EventTranslator它本身是一个接口所以你要new的时候你又要实现它里面没有实现的方法translateTo的意思是你给我一个Event我会把这个Event给你填好。ringBuffffer.publishEvent(translator1) 你只要把translator1交给ringBuffffer就可以了。这个translator就是为了迎合Lambda表达式的写法(为java8的写法做准备)另外translator有很多种用法EventTranslatorOneArg只有带一个参数的EventTranslator。我带有一个参数这个参数会通过我的translateTo方法转换成一个LongEvent既然有EventTranslatorOneArg就有EventTranslatorTwoArg、EventTranslatorThreeArg还有EventTranslatorVararg多了去了Vararg就是有好多个值我把里面的值全都给你加起来最后把结果set到event里面。package com.mashibing.disruptor;import java.util.concurrent.Executor;import java.util.concurrent.Executors;import com.lmax.disruptor.dsl.Disruptor;import com.lmax.disruptor.RingBuffer;import com.lmax.disruptor.util.DaemonThreadFactory;import java.nio.ByteBuffer;public class Main02{public static void main(String[] args) thrwos Exception{//the factory for the eventLongEvenFactory factory new LongEventFactory();//Specify the of the ring buffer,must be power of 2.int bufferSize 1024;//Construct the DisruptorDisruptor disruptor new Disruptor(factory,bufferSize,DaemonThreadFactory.INSTANCE);//Connect the handlerdisruptor.handleEventsWith(new LongEventHandler());//Start the Disruptor,start all threads runningdisruptor.start();//Get the ring buffer form the Disruptor to be used for publishing.RingBuffer ringBuffer disruptor.getRingBuffer();//EventTranslator translator1 new EventTranslator(){Overridepublic void translateTo(LongEvent event,long sequence){event.set(8888L); }};ringBuffer.publishEvent(translator1);//EventTranslatorOneArg translator2 newEventTranslatorOneArg(){Overridepublic void translateTo(LongEvent event,long sequence,Long l){event.set(l); }};ringBuffer.publishEvent(translator2,7777L);//EventTranslatorTwoArg translator3 newEventTranslatorTwoArg(){Overridepublic void translateTo(LongEvent event,long sequence,Long l1,Long l2){ event.set(l); }};ringBuffer.publishEvent(translator3,10000L,10000L);//EventTranslatorThreeArg translator4 newEventTranslatorThreeArg(){Overridepublic void translateTo(LongEvent event,long sequence,Long l1,Longl2,Long l3){ event.set(l1 l2 l3); }};ringBuffer.publishEvent(translator4,10000L,10000L,10000L);//EventTranslatorVararg translator5 newEventTranslatorThreeArg(){Overridepublic void translateTo(LongEvent event,long sequence,Object...objects){long result 0;for(Object o : objects){long l (Long)o;result l;}}};ringBuffer.publishEvent(translator5,10000L,10000L,10000L,10000L);}}有了上面Translator之后呢下面看Lambda表达式怎么写这个是比较简洁的写法连factory都省了直接指定一个Lambda表达式LongEvent::new。继续handleEventsWith把三个参数传进来后面写好Lambda表达式直接打印然后start 接着RingBufffferpublishEvent原来我们还有写try...catch现在简单了直接ringBuffffer.publishEvent(第一个是lambda表达式表达式后是你指定的几个参数)所以现在的这种写法就不定义各种各样的EventTranslator了。package com.mashibing.disruptor;import java.util.concurrent.Executor;import java.util.concurrent.Executors;import com.lmax.disruptor.dsl.Disruptor;import com.lmax.disruptor.RingBuffer;import com.lmax.disruptor.util.DaemonThreadFactory;import java.nio.ByteBuffer;public class Main03{public static void main(String[] args) thrwos Exception{//the factory for the eventLongEvenFactory factory new LongEventFactory();//Specify the of the ring buffer,must be power of 2.int bufferSize 1024;//Construct the DisruptorDisruptor disruptor new Disruptor(LongEvent::new,bufferSize,DaemonThreadFactory.INSTANCE);//Connect the handlerdisruptor.handleEventsWith((event,sequence,endOfBatch)-System.out.println(Event:event));//Start the Disruptor,start all threads runningdisruptor.start();//Get the ring buffer form the Disruptor to be used for publishing.RingBuffer ringBuffer disruptor.getRingBuffer();ringBuffer.publishEvent((event, sequence)- event.set(10000L));System.in.read();}}下面我们叫一些细节这些个细节也不难讲给大家。第一个细节是我们生产者的时候默认会有好多种生产方式默认的是多线程生产者但是假如你确定你整个程序里头只有一个生产者的话那你还能提高效率就是在你指定Disruptor生产者的线程的方式是SINGLE生产者的类型ProducerType。ProducerType生产者线程模式ProducerType有两种模式ProducerMULTI和Producer.SINGLE默认是MULTI表示在多线程模式下产生sequence如果确认是单线程生产者那么可以指定SINGLE效率会提升如果是多个生产者(多线程)但模式指定为SINGLE会出什么问题假如你的程序里头只有一个生产者还用ProducerMULTI的话我们对序列来进行多线程访问的时候肯定是要加锁的所以MULTI里面默认是有锁定处理的但是假如你只有一个线程这个时候应该把生产者指定为SINGLE他的效率更高因为它里面不加锁。下面这个小程序我这里指定的是Producer.SINGLE但是我生产的时候用的是一堆线程当我制定了Producer.SINGLE之后相当于内部对于序列的访问就没有锁了它会把性能发挥到极致它不会报错它会把你的消息静悄悄的覆盖了因此你要小心一点。我这里这个写法是我有50 个线程然后每个线程生产100个数最后结果正常的话应该是有5000个消费产生。package com.mashibing.disruptor;import java.util.concurrent.Executor;import java.util.concurrent.Executors;import com.lmax.disruptor.dsl.Disruptor;import com.lmax.disruptor.RingBuffer;import com.lmax.disruptor.util.DaemonThreadFactory;import java.nio.ByteBuffer;public class Main04_ProducerType{public static void main(String[] args) thrwos Exception{//the factory for the eventLongEvenFactory factory new LongEventFactory();//Specify the of the ring buffer,must be power of 2.int bufferSize 1024;//Construct the Disruptor//Disruptor disruptor new Disruptor(factory,bufferSize,Executors.defaultThreadFactory());Disruptor disruptor new Disruptor(factory,bufferSize,Executors.defaultThreadFactory(),ProducerType.SINGLE,newBlockingWaitStrategy());//Connect the handlerdisruptor.handleEventsWith(new LongEventHandler());//Start the Disruptor,start all threads runningdisruptor.start();//Get the ring buffer form the Disruptor to be used for publishing.RingBuffer ringBuffer disruptor.getRingBuffer();//final int threadCount 50;CycliBarrier barriernew CycliBarrier(threadCount);ExecutorService service Executors.newCachedThreadPool();for(long i0; i{System.out.printf(Thread %s ready to start!,threadNum);try{barrier.await();}catch(InterruptedException e){e.printStackTrace();}catch(BrokenBarrierException e){e.printStackTrace();}for(int j0; j100;j){ringBuffer.publishEvent((event,sequence)-{event.set(threadNum);System.out.println(生产了threadNum);});}});}service.shutdown();//disruptor.shutdown();TimeUnit.SECONDS.sleep(3);System.out.println(LongEventHandler.count);}}我们再来聊一下等待策略WaitStrategy有好多种方法看下面等待策略(常用)BlockingWaitStrategy:通过线程堵塞的方式等待生产者唤醒被唤醒后再循环检查依赖的sequence是否已经消费。BusySpinWaitStrategy线程一直自旋等待可能比较耗cpuLiteBlockingWaitStrategy线程阻塞等待生产者唤醒与BlockingWaitStrategy相比区别在signalNeeded.getAndSet如果两个线程同时访问一个访问waitfor一个访问signalAll时可以减少lock加锁次数LiteTimeoutBlockingWaitStrategy与LiteBlockingWaitStrategy相比设置了阻塞时间超过时间后抛出异常PhasedBackoffffWaitStrategy根据时间参数和传入的等待策略来决定使用那种等待策略TimeoutBlockingWaitStrategy相对于BlockingWaitStrategy来说设置了等待时间超过后抛出异常(常用)YieldingWaitStrategy尝试100次然后Thread.yield()让出cpu(常用)SleepingWaitStrategysleep我们常用的BlockingWaitStrategy满了就等着SleepingWaitStrategy满了就睡一觉睡醒了看看能不能继续执行了YieldingWaitStrategy让出cpu让你消费者赶紧消费消费完了之后我又回来看看我是不是又能生产了一般YieldingWaitStrategy效率是最高的但也要看实际情况适用不适用。package com.mashibing.disruptor;import java.util.concurrent.Executor;import java.util.concurrent.Executors;import com.lmax.disruptor.dsl.Disruptor;import com.lmax.disruptor.RingBuffer;import com.lmax.disruptor.util.DaemonThreadFactory;import java.nio.ByteBuffer;public class Main05_WaitStrategy{public static void main(String[] args) thrwos Exception{//the factory for the eventLongEvenFactory factory new LongEventFactory();//Specify the of the ring buffer,must be power of 2.int bufferSize 1024;//Construct the DisruptorDisruptor disruptor new Disruptor(factory,bufferSize,Executors.defaultThreadFactory(),ProducerType.MULTI,new SleepingWaitStrategy());//Connect the handlerdisruptor.handleEventsWith(new LongEventHandler());//Start the Disruptor,start all threads runningdisruptor.start();//Get the ring buffer form the Disruptor to be used for publishing.RingBuffer ringBuffer disruptor.getRingBuffer();//final int threadCount 50;CycliBarrier barriernew CycliBarrier(threadCount);ExecutorService service Executors.newCachedThreadPool();for(long i0; i{System.out.printf(Thread %s ready to start!,threadNum);try{barrier.await();}catch(InterruptedException e){e.printStackTrace();}catch(BrokenBarrierException e){e.printStackTrace();}for(int j0; j100;j){ringBuffer.publishEvent((event,sequence)-{event.set(threadNum);System.out.println(生产了threadNum);});}});}service.shutdown();//disruptor.shutdown();TimeUnit.SECONDS.sleep(3);System.out.println(LongEventHandler.count);}}我们来看多个消费者怎么指定默认的情况下只有一个消费者你想要有多个消费者的时候也非常简单看下面代码我定义了两个消费者h1、h2disruptor.handleEventsWith(h1,h2)这里面是一个可变参数所以你要想有多个消费者的时候就往里装多个消费者是位于多个线程里面的。package com.mashibing.disruptor;import java.util.concurrent.Executor;import java.util.concurrent.Executors;import com.lmax.disruptor.dsl.Disruptor;import com.lmax.disruptor.RingBuffer;import com.lmax.disruptor.util.DaemonThreadFactory;import java.nio.ByteBuffer;public class Main06_MultiConsumer{public static void main(String[] args) thrwos Exception{//the factory for the eventLongEvenFactory factory new LongEventFactory();//Specify the of the ring buffer,must be power of 2.int bufferSize 1024;//Construct the DisruptorDisruptor disruptor new Disruptor(factory,bufferSize,Executors.defaultThreadFactory(),ProducerType.MULTI,new SleepingWaitStrategy());//Connect the handlersLongEventHandler h1 new LongEventHandler();LongEventHandler h2 new LongEventHandler();disruptor.handleEventsWith(h1,h2);//Start the Disruptor,start all threads runningdisruptor.start();//Get the ring buffer form the Disruptor to be used for publishing.RingBuffer ringBuffer disruptor.getRingBuffer();//final int threadCount 10;CycliBarrier barriernew CycliBarrier(threadCount);ExecutorService service Executors.newCachedThreadPool();for(long i0; i{System.out.printf(Thread %s ready to start!,threadNum);try{barrier.await();}catch(InterruptedException e){e.printStackTrace();}catch(BrokenBarrierException e){e.printStackTrace();}for(int j0; j10;j){ringBuffer.publishEvent((event,sequence)-{event.set(threadNum);System.out.println(生产了threadNum);});}});}service.shutdown();//disruptor.shutdown();TimeUnit.SECONDS.sleep(3);System.out.println(LongEventHandler.count);}}还有disruptor最后一个问题出了异常怎么处理消费者异常处理默认disruptor.setDefaultExceptionHandler()覆盖disruptor.handleExceptionFor().with()看下面代码这这里方法里写了一个EventHandler是我们的消费者在消费者里打印了event之后马上抛出了异常当我们消费者出现异常之后你不能让整个线程停下来有一个消费者出了异常那其他的消费者就不干活了肯定不行。handleExceptionsFor为消费者指定Exception处理器 (h1).with后面是我们的ExceptionHandler出了异常之后该怎么办进行处理重写三个方法第一个是当产生异常的时候在这很简单直接打印出来了第二个是handleOnStart如果启动的时候出异常第三个handleOnShutdown你该怎么处理。package com.mashibing.disruptor;import java.util.concurrent.Executor;import java.util.concurrent.Executors;import com.lmax.disruptor.dsl.Disruptor;import com.lmax.disruptor.RingBuffer;import com.lmax.disruptor.util.DaemonThreadFactory;import java.nio.ByteBuffer;public class Main07_ExceptionHandler{public static void main(String[] args) thrwos Exception{//the factory for the eventLongEvenFactory factory new LongEventFactory();//Specify the of the ring buffer,must be power of 2.int bufferSize 1024;//Construct the DisruptorDisruptor disruptor new Disruptor(factory,bufferSize,Executors.defaultThreadFactory(),ProducerType.MULTI,new SleepingWaitStrategy());//Connect the handlersEventHandler h1 (event,sequence,end)-{System.out.println(消费者出异常)};disruptor.handleEventsWith(h1);disruptor.handleExceptionsFor(h1).with(newExceptionHandler(){Overridepublic void handleEventException(Throwable throwable,longl,LongEvent longEvent){throwable.printStackTrace();}Overridepublic void handleOnStartException(Throwable throwable){System.out.println(Exception Start to Handle!);}Overridepublic void handleOnShutdownException(Throwable throwable){System.out.println(Exception End to Handle!);}});//Start the Disruptor,start all threads runningdisruptor.start();//Get the ring buffer form the Disruptor to be used for publishing.RingBuffer ringBuffer disruptor.getRingBuffer();//final int threadCount 1;CycliBarrier barriernew CycliBarrier(threadCount);ExecutorService service Executors.newCachedThreadPool();for(long i0; i{System.out.printf(Thread %s ready to start!,threadNum);try{barrier.await();}catch(InterruptedException e){e.printStackTrace();}catch(BrokenBarrierException e){e.printStackTrace();}for(int j0; j10;j){ringBuffer.publishEvent((event,sequence)-{event.set(threadNum);System.out.println(生产了threadNum);});}});}service.shutdown();//disruptor.shutdown();TimeUnit.SECONDS.sleep(3);System.out.println(LongEventHandler.count);}}disruptor是一个环然后这个环有多个生产者可以往里头生产由于它是环形的设计效率会非常的高我们写程序的时候是这样写的首先你自己定义好Event消息的格式然后定义消息工厂消息工厂是用来初始化整个环的时候相应的一些位置上各种各样不同的消息先把它new出来new出来之后先占好空间我们在生产的时候只需要把这个位置上这个默认的这块空间拿出来往里头填值填好值之后消费者就可以往里头消费了消费完了生产者就可以继续往里头生产了如果说你生产者消费的比较快消费者消费的比较慢满了怎么办就是用各种各样的等待策略消费者出了问题之后可以用ExceptionHandler来进行处理。觉得文章内容不错的话可以转发关注一下小编~ 之后持续更新干货好文~~