当前位置: 首页 > news >正文

如何提升网站的排名北京网站开发建设

如何提升网站的排名,北京网站开发建设,成免费的crm,爱山东app二维码文章目录 一、Disruptor介绍1、为什么要有Disruptor2、Disruptor介绍3、Disruptor的高性能设计4、RingBuffer数据结构5、等待策略6、Disruptor在日志框架中的应用7、术语 二、Disruptor实战1、引入依赖2、Disruptor构造器3、入门实例#xff08;1#xff09;Hello World… 文章目录 一、Disruptor介绍1、为什么要有Disruptor2、Disruptor介绍3、Disruptor的高性能设计4、RingBuffer数据结构5、等待策略6、Disruptor在日志框架中的应用7、术语 二、Disruptor实战1、引入依赖2、Disruptor构造器3、入门实例1Hello World2单生产者单消费者模式3单生产者多消费者模式4多生产者多消费者模式 4、场景应用1使用EventProcessor消息处理器2使用WorkerPool消息处理器 5、复杂场景下使用 一、Disruptor介绍 1、为什么要有Disruptor juc包下阻塞队列的缺陷 1 juc下的队列大部分采用加ReentrantLock锁方式保证线程安全。在稳定性要求特别高的系统中为了防止生产者速度过快导致内存溢出只能选择有界队列。 2加锁的方式通常会严重影响性能。线程会因为竞争不到锁而被挂起等待其他线程释放锁而唤醒这个过程存在很大的开销而且存在死锁的隐患。 3 有界队列通常采用数组实现。但是采用数组实现又会引发另外一个问题false sharing(伪共享)。 2、Disruptor介绍 Disruptor是英国外汇交易公司LMAX开发的一个高性能队列研发的初衷是解决内存队列的延迟问题在性能测试中发现竟然与I/O操作处于同样的数量级。基于Disruptor开发的系统单线程能支撑每秒600万订单2010年在QCon演讲后获得了业界关注。2011年企业应用软件专家Martin Fowler专门撰写长文介绍。同年它还获得了Oracle官方的Duke大奖。 目前包括Apache Storm、Camel、Log4j 2在内的很多知名项目都应用了Disruptor以获取高性能。 Githubhttps://github.com/LMAX-Exchange/disruptor 官方学习网站http://ifeve.com/disruptor-getting-started/ Disruptor实现了队列的功能并且是一个有界队列可以用于生产者-消费者模型。 Disruptor是一个高性能的异步处理框架或者可以认为是最快的消息框架轻量的JMS也可以认为是一个观察者模式的实现或者事件监听模式的实现。 3、Disruptor的高性能设计 Disruptor通过以下设计来解决队列速度慢的问题 环形数组结构 为了避免垃圾回收采用数组而非链表。同时数组对处理器的缓存机制更加友好空间局部性原理。元素位置定位 数组长度2^n通过位运算加快定位的速度。下标采取递增的形式。不用担心index溢出的问题。index是long类型即使100万QPS的处理速度也需要30万年才能用完。无锁设计 每个生产者或者消费者线程会通过先申请可以操作的元素在数组中的位置申请到之后直接在该位置写入或者读取数据。整个过程通过原子变量CAS保证操作的线程安全。利用缓存行填充解决了伪共享的问题实现了基于事件驱动的生产者消费者模型观察者模式 消费者时刻关注着队列里有没有消息一旦有新消息产生消费者线程就会立刻把它消费 4、RingBuffer数据结构 使用RingBuffer来作为队列的数据结构RingBuffer就是一个可自定义大小的环形数组。除数组外还有一个序列号(sequence)用以指向下一个可用的元素供生产者与消费者使用。原理图如下所示 Disruptor要求设置数组长度为2的n次幂。在知道索引(index)下标的情况下存与取数组上的元素时间复杂度只有O(1)而这个index我们可以通过序列号与数组的长度取模来计算得出indexsequence % entries.length。也可以用位运算来计算效率更高此时array.length必须是2的幂次方indexsequece(entries.length-1)当所有位置都放满了再放下一个时就会把0号位置覆盖掉 5、等待策略 名称措施适用场景BlockingWaitStrategy加锁CPU资源紧缺吞吐量和延迟并不重要的场景BusySpinWaitStrategy自旋通过不断重试减少切换线程导致的系统调用而降低延迟。推荐在线程绑定到固定的CPU的场景下使用PhasedBackoffWaitStrategy自旋 yield 自定义策略CPU资源紧缺吞吐量和延迟并不重要的场景SleepingWaitStrategy自旋 yield sleep性能和CPU资源之间有很好的折中。延迟不均匀TimeoutBlockingWaitStrategy加锁有超时限制CPU资源紧缺吞吐量和延迟并不重要的场景YieldingWaitStrategy自旋 yield 自旋性能和CPU资源之间有很好的折中。延迟比较均匀 6、Disruptor在日志框架中的应用 Log4j 2相对于Log4j 1最大的优势在于多线程并发场景下性能更优。该特性源自于Log4j 2的异步模式采用了Disruptor来处理。 在Log4j 2的配置文件中可以配置WaitStrategy默认是Timeout策略。 loggers all async采用的是Disruptor而Async Appender采用的是ArrayBlockingQueue队列。 由图可见单线程情况下loggers all async与Async Appender吞吐量相差不大但是在64个线程的时候loggers all async的吞吐量比Async Appender增加了12倍是Sync模式的68倍。 7、术语 RingBuffer: 被看作Disruptor最主要的组件然而从3.0开始RingBuffer仅仅负责存储和更新在Disruptor中流通的数据。对一些特殊的使用场景能够被用户(使用其他数据结构)完全替代。 Sequence: Disruptor使用Sequence来表示一个特殊组件处理的序号。和Disruptor一样每个消费者(EventProcessor)都维持着一个Sequence。大部分的并发代码依赖这些Sequence值的运转因此Sequence支持多种当前为AtomicLong类的特性。 Sequencer: 这是Disruptor真正的核心。实现了这个接口的两种生产者单生产者和多生产者均实现了所有的并发算法为了在生产者和消费者之间进行准确快速的数据传递。 SequenceBarrier: 由Sequencer生成并且包含了已经发布的Sequence的引用这些的Sequence源于Sequencer和一些独立的消费者的Sequence。它包含了决定是否有供消费者来消费的Event的逻辑。 WaitStrategy决定一个消费者将如何等待生产者将Event置入Disruptor。Event从生产者到消费者过程中所处理的数据单元。Disruptor中没有代码表示Event因为它完全是由用户定义的。 EventProcessor主要事件循环处理Disruptor中的Event并且拥有消费者的Sequence。它有一个实现类是BatchEventProcessor包含了event loop有效的实现并且将回调到一个EventHandler接口的实现对象。 EventHandler由用户实现并且代表了Disruptor中的一个消费者的接口。 Producer由用户实现它调用RingBuffer来插入事件(Event)在Disruptor中没有相应的实现代码由用户实现。 WorkProcessor确保每个sequence只被一个processor消费在同一个WorkPool中的处理多个WorkProcessor不会消费同样的sequence。 WorkerPool一个WorkProcessor池其中WorkProcessor将消费Sequence所以任务可以在实现WorkHandler接口的worker之间移交 。 LifecycleAware当BatchEventProcessor启动和停止时于实现这个接口用于接收通知。 二、Disruptor实战 1、引入依赖 !-- disruptor -- dependencygroupIdcom.lmax/groupIdartifactIddisruptor/artifactIdversion3.3.4/version /dependency2、Disruptor构造器 public Disruptor(final EventFactoryT eventFactory,final int ringBufferSize,final ThreadFactory threadFactory,final ProducerType producerType,final WaitStrategy waitStrategy)eventFactory -在环缓冲区中创建事件的工厂。 ringBufferSize -环形缓冲区的大小必须是2的幂。 threadFactory——一个为处理器创建线程的threadFactory。 producerType——用于环形缓冲区的声明策略。 waitStrategy -用于环缓冲区的等待策略。 3、入门实例 1Hello World 在Disruptor中我们想实现hello world 需要如下几步骤 第一建立一个Event类 第二建立一个工厂Event类用于创建Event类实例对象 第三需要有一个监听事件类用于处理数据Event类 第四我们需要进行测试代码编写。实例化Disruptor实例配置一系列参数。然后我们对Disruptor实例绑定监听事件类接受并处理数据。 第五在Disruptor中真正存储数据的核心叫做RingBuffer我们通过Disruptor实例拿到它然后把数据生产出来把数据加入到RingBuffer的实例对象中即可。 //1、真正要生产的对象 public class LongEvent {private long value;public long getValue() {return value;}public void setValue(long value) {this.value value;} }import com.lmax.disruptor.EventFactory;// 2、需要让disruptor为我们创建事件我们同时还声明了一个EventFactory来实例化Event对象。 public class LongEventFactory implements EventFactory {Overridepublic Object newInstance() {return new LongEvent();} }import com.lmax.disruptor.EventHandler;//3、我们还需要一个事件消费者也就是一个事件处理器。这个事件处理器简单地把事件中存储的数据打印到终端 public class LongEventHandler implements EventHandlerLongEvent {Overridepublic void onEvent(LongEvent longEvent, long l, boolean b) throws Exception {//消费逻辑System.out.println(consumer longEvent.getValue());} }import com.lmax.disruptor.RingBuffer;import java.nio.ByteBuffer;/*** 很明显的是当用一个简单队列来发布事件的时候会牵涉更多的细节这是因为事件对象还需要预先创建。* 发布事件最少需要两步获取下一个事件槽并发布事件发布事件的时候要使用try/finnally保证事件一定会被发布。* 如果我们使用RingBuffer.next()获取一个事件槽那么一定要发布对应的事件。* 如果不能发布事件那么就会引起Disruptor状态的混乱。* 尤其是在多个事件生产者的情况下会导致事件消费者失速从而不得不重启应用才能会恢复。* B系统名称/BBR* B模块名称/BBR* B中文类名/BBR* B概要说明/BBR*/ //4、这是一个生产者 public class LongEventProducer {private final RingBufferLongEvent ringBuffer;public LongEventProducer(RingBufferLongEvent ringBuffer) {this.ringBuffer ringBuffer;}/*** onData用来发布事件每调用一次就发布一次事件* 它的参数会用过事件传递给消费者*/public void onData(ByteBuffer bb) {//1.可以把ringBuffer看做一个事件队列那么next就是得到下面一个事件槽long sequence ringBuffer.next();try {//2.用上面的索引取出一个空的事件用于填充获取该序号对应的事件对象LongEvent event ringBuffer.get(sequence);//3.获取要通过事件传递的业务数据event.setValue(bb.getLong(0));} finally {//4.发布事件发布后才能消费//注意最后的 ringBuffer.publish 方法必须包含在 finally 中以确保必须得到调用如果某个请求的 sequence 未被提交将会堵塞后续的发布操作或者其它的 producer。ringBuffer.publish(sequence);}} }import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.YieldingWaitStrategy; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType;import java.nio.ByteBuffer; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;// 5、编码 public class LongEventMain {public static void main(String[] args) throws Exception {//创建缓冲池ExecutorService executor Executors.newCachedThreadPool();//创建工厂LongEventFactory factory new LongEventFactory();//创建bufferSize ,也就是RingBuffer大小必须是2的N次方int ringBufferSize 1024 * 1024; ////创建disruptor//1.第一个参数为工厂类对象用于创建一个个的LongEventLongEvent是实际的消费数据//2.第二个参数为缓冲区//3.第三个参数为线程池//4.第四个参数为ProducerType.SINGLE表示生产者只有一个和ProducerType.MULTY表示有多个生产者//5.第五个参数是一种策略/**//BlockingWaitStrategy 是最低效的策略但其对CPU的消耗最小并且在各种不同部署环境中能提供更加一致的性能表现WaitStrategy BLOCKING_WAIT new BlockingWaitStrategy();//SleepingWaitStrategy 的性能表现跟BlockingWaitStrategy差不多对CPU的消耗也类似但其对生产者线程的影响最小适合用于异步日志类似的场景WaitStrategy SLEEPING_WAIT new SleepingWaitStrategy();//YieldingWaitStrategy 的性能是最好的适合用于低延迟的系统。在要求极高性能且事件处理线数小于CPU逻辑核心数的场景中推荐使用此策略例如CPU开启超线程的特性WaitStrategy YIELDING_WAIT new YieldingWaitStrategy();*/DisruptorLongEvent disruptor new DisruptorLongEvent(factory, ringBufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy());// 连接消费事件方法disruptor.handleEventsWith(new LongEventHandler());// 启动disruptor.start();//Disruptor 的事件发布过程是一个两阶段提交的过程//发布事件//使用该方法获得具体存放数据的容器ringbuffer环形结构RingBufferLongEvent ringBuffer disruptor.getRingBuffer();//把容器传入生产者LongEventProducer producer new LongEventProducer(ringBuffer);//LongEventProducerWithTranslator producer new LongEventProducerWithTranslator(ringBuffer);ByteBuffer byteBuffer ByteBuffer.allocate(8);//传值用的不用太在意for(long l 0; l100; l){byteBuffer.putLong(0, l);producer.onData(byteBuffer);Thread.sleep(1000);}disruptor.shutdown();//关闭 disruptor方法会堵塞直至所有的事件都得到处理executor.shutdown();//关闭 disruptor 使用的线程池如果需要的话必须手动关闭 disruptor 在 shutdown 时不会自动关闭 } }import com.lmax.disruptor.EventTranslatorOneArg; import com.lmax.disruptor.RingBuffer;import java.nio.ByteBuffer;/*** Disruptor 3.0提供了lambda式的API。这样可以把一些复杂的操作放在Ring Buffer* 所以在Disruptor3.0以后的版本最好使用Event Publisher或者Event Translator来发布事件* B系统名称/BBR* B模块名称/BBR* B中文类名/BBR* B概要说明/BBR*/ // 其他生产者也可以这样写 public class LongEventProducerWithTranslator {//一个translator可以看做一个事件初始化器publicEvent方法会调用它//填充Eventprivate static final EventTranslatorOneArgLongEvent, ByteBuffer TRANSLATOR new EventTranslatorOneArgLongEvent, ByteBuffer() {Overridepublic void translateTo(LongEvent event, long sequeue, ByteBuffer buffer) {event.setValue(buffer.getLong(0));}};private final RingBufferLongEvent ringBuffer;public LongEventProducerWithTranslator(RingBufferLongEvent ringBuffer) {this.ringBuffer ringBuffer;}public void onData(ByteBuffer buffer) {ringBuffer.publishEvent(TRANSLATOR, buffer);} }2单生产者单消费者模式 /*** 1、消息载体(事件)*/ public class OrderEvent {private long value;private String name;public String getName() {return name;}public void setName(String name) {this.name name;}public long getValue() {return value;}public void setValue(long value) {this.value value;} } import com.lmax.disruptor.EventFactory;/*** 2、事件工厂*/ public class OrderEventFactory implements EventFactoryOrderEvent {Overridepublic OrderEvent newInstance() {return new OrderEvent();} } import com.lmax.disruptor.RingBuffer; import com.disruptor.event.OrderEvent;/*** 3、消息事件生产者*/ public class OrderEventProducer {//事件队列private final RingBufferOrderEvent ringBuffer;public OrderEventProducer(RingBufferOrderEvent ringBuffer) {this.ringBuffer ringBuffer;}public void onData(long value,String name) {// 获取事件队列 的下一个槽long sequence ringBuffer.next();try {//获取消息载体事件OrderEvent orderEvent ringBuffer.get(sequence);// 写入消息数据orderEvent.setValue(value);orderEvent.setName(name);} catch (Exception e) {// TODO 异常处理e.printStackTrace();} finally {System.out.println(生产者 Thread.currentThread().getName()发送数据value:value,name:name);//发布事件ringBuffer.publish(sequence);}} } import com.disruptor.event.OrderEvent; import com.lmax.disruptor.EventHandler;/*** 4、消费者*/ public class OrderEventHandler implements EventHandlerOrderEvent {Overridepublic void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {// TODO 消费逻辑System.out.println(EventHandler 消费者 Thread.currentThread().getName()获取数据value: event.getValue(),name:event.getName());}} import com.disruptor.consumer.OrderEventHandler; import com.disruptor.event.OrderEvent; import com.disruptor.producer.OrderEventProducer; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.YieldingWaitStrategy; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType;import java.util.concurrent.Executors;/*** 5、编码*/ public class DisruptorDemo {public static void main(String[] args) throws Exception {//创建disruptorDisruptorOrderEvent disruptor new Disruptor(OrderEvent::new,1024 * 1024,Executors.defaultThreadFactory(),ProducerType.SINGLE, //单生产者new YieldingWaitStrategy() //等待策略);//设置消费者用于处理RingBuffer的事件disruptor.handleEventsWith(new OrderEventHandler());//设置多消费者,消息会被重复消费//disruptor.handleEventsWith(new OrderEventHandler(),new OrderEventHandler());//设置多消费者,消费者要实现WorkHandler接口一条消息只会被一个消费者消费//disruptor.handleEventsWithWorkerPool(new OrderEventHandler(), new OrderEventHandler());//启动disruptordisruptor.start();//创建ringbuffer容器RingBufferOrderEvent ringBuffer disruptor.getRingBuffer();//创建生产者OrderEventProducer eventProducer new OrderEventProducer(ringBuffer);// 发送消息for(int i0;i100;i){eventProducer.onData(i,Foxi);}disruptor.shutdown();} } 3单生产者多消费者模式 如果消费者是多个只需要在调用 handleEventsWith 方法时将多个消费者传递进去。 //设置多消费者,消息会被重复消费 disruptor.handleEventsWith(new OrderEventHandler(), new OrderEventHandler());上面传入的两个消费者会重复消费每一条消息如果想实现一条消息在有多个消费者的情况下只会被一个消费者消费那么需要调用 handleEventsWithWorkerPool 方法。 //设置多消费者,消费者要实现WorkHandler接口一条消息只会被一个消费者消费 disruptor.handleEventsWithWorkerPool(new OrderEventHandler(), new OrderEventHandler());注意消费者要实现WorkHandler接口 public class OrderEventHandler implements EventHandlerOrderEvent, WorkHandlerOrderEvent {Overridepublic void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {// TODO 消费逻辑System.out.println(消费者 Thread.currentThread().getName()获取数据value: event.getValue(),name:event.getName());}Overridepublic void onEvent(OrderEvent event) throws Exception {// TODO 消费逻辑System.out.println(消费者 Thread.currentThread().getName()获取数据value: event.getValue(),name:event.getName());} }4多生产者多消费者模式 在实际开发中多个生产者发送消息多个消费者处理消息才是常态。 import com.disruptor.consumer.OrderEventHandler; import com.disruptor.event.OrderEvent; import com.disruptor.event.OrderEventFactory; import com.disruptor.producer.OrderEventProducer; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.YieldingWaitStrategy; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType;import java.util.concurrent.Executors;/*** 5、编码*/ public class DisruptorDemo2 {public static void main(String[] args) throws Exception {//创建disruptorDisruptorOrderEvent disruptor new Disruptor(new OrderEventFactory(),1024 * 1024,Executors.defaultThreadFactory(),ProducerType.MULTI, //多生产者new YieldingWaitStrategy() //等待策略);//设置消费者用于处理RingBuffer的事件//disruptor.handleEventsWith(new OrderEventHandler());//设置多消费者,消息会被重复消费//disruptor.handleEventsWith(new OrderEventHandler(),new OrderEventHandler());//设置多消费者,消费者要实现WorkHandler接口一条消息只会被一个消费者消费disruptor.handleEventsWithWorkerPool(new OrderEventHandler(), new OrderEventHandler());//启动disruptordisruptor.start();//创建ringbuffer容器RingBufferOrderEvent ringBuffer disruptor.getRingBuffer();new Thread(()-{//创建生产者OrderEventProducer eventProducer new OrderEventProducer(ringBuffer);// 发送消息for(int i0;i100;i){eventProducer.onData(i,Foxi);}},producer1).start();new Thread(()-{//创建生产者OrderEventProducer eventProducer new OrderEventProducer(ringBuffer);// 发送消息for(int i0;i100;i){eventProducer.onData(i,monkeyi);}},producer2).start();//disruptor.shutdown();} }4、场景应用 1使用EventProcessor消息处理器 import java.util.concurrent.atomic.AtomicInteger;// 1、消息 public class Trade {private String id;//IDprivate String name;private double price;//金额private AtomicInteger count new AtomicInteger(0);public String getId() {return id;}public void setId(String id) {this.id id;}public String getName() {return name;}public void setName(String name) {this.name name;}public double getPrice() {return price;}public void setPrice(double price) {this.price price;}public AtomicInteger getCount() {return count;}public void setCount(AtomicInteger count) {this.count count;} } import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.WorkHandler;import java.util.UUID;//消费者 public class TradeHandler implements EventHandlerTrade, WorkHandlerTrade { //根据具体需要实现之一即可Override public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception { this.onEvent(event); } Override public void onEvent(Trade event) throws Exception { //这里做具体的消费逻辑 event.setId(UUID.randomUUID().toString());//简单生成下IDSystem.out.println(event.getId()); } } import com.lmax.disruptor.*;import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future;public class Main1 {public static void main(String[] args) throws Exception { int BUFFER_SIZE1024; int THREAD_NUMBERS4; /** createSingleProducer创建一个单生产者的RingBuffer* 第一个参数叫EventFactory从名字上理解就是事件工厂其实它的职责就是产生数据填充RingBuffer的区块。* 第二个参数是RingBuffer的大小它必须是2的指数倍 目的是为了将求模运算转为运算提高效率* 第三个参数是RingBuffer的生产都在没有可用区块的时候(可能是消费者或者说是事件处理器 太慢了)的等待策略*/ final RingBufferTrade ringBuffer RingBuffer.createSingleProducer(new EventFactoryTrade() {Override public Trade newInstance() { return new Trade(); } }, BUFFER_SIZE, new YieldingWaitStrategy());//创建线程池 ExecutorService executors Executors.newFixedThreadPool(THREAD_NUMBERS);//创建SequenceBarrier SequenceBarrier sequenceBarrier ringBuffer.newBarrier();//创建消息处理器 BatchEventProcessorTrade transProcessor new BatchEventProcessorTrade(ringBuffer, sequenceBarrier, new TradeHandler()); //这一步的目的就是把消费者的位置信息引用注入到生产者 如果只有一个消费者的情况可以省略ringBuffer.addGatingSequences(transProcessor.getSequence()); //把消息处理器提交到线程池 executors.submit(transProcessor); //如果存在多个消费者 那重复执行上面3行代码 把TradeHandler换成其它消费者类 Future? future executors.submit(new CallableVoid() {Override public Void call() throws Exception { long seq; for(int i0;i10;i){ seq ringBuffer.next();//占个坑 --ringBuffer一个可用区块 ringBuffer.get(seq).setPrice(Math.random()*9999);//给这个区块放入 数据ringBuffer.publish(seq);//发布这个区块的数据使handler(consumer)可见 } return null; } });future.get();//等待生产者结束 Thread.sleep(1000);//等上1秒等消费都处理完成 transProcessor.halt();//通知事件(或者说消息)处理器 可以结束了并不是马上结束!!! executors.shutdown();//终止线程 } } 2使用WorkerPool消息处理器 import com.lmax.disruptor.*;import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;public class Main2 { public static void main(String[] args) throws InterruptedException { int BUFFER_SIZE1024; int THREAD_NUMBERS4; EventFactoryTrade eventFactory new EventFactoryTrade() { public Trade newInstance() { return new Trade(); } }; RingBufferTrade ringBuffer RingBuffer.createSingleProducer(eventFactory, BUFFER_SIZE); SequenceBarrier sequenceBarrier ringBuffer.newBarrier(); ExecutorService executor Executors.newFixedThreadPool(THREAD_NUMBERS); WorkHandlerTrade handler new TradeHandler(); WorkerPoolTrade workerPool new WorkerPoolTrade(ringBuffer, sequenceBarrier, new IgnoreExceptionHandler(), handler); workerPool.start(executor); //下面这个生产8个数据 这里其实应该换成生产者for(int i0;i8;i){ long seqringBuffer.next(); ringBuffer.get(seq).setPrice(Math.random()*9999); ringBuffer.publish(seq); } Thread.sleep(1000); workerPool.halt(); executor.shutdown(); } } 5、复杂场景下使用 在复杂场景下使用RingBuffer希望P1生产的数据给C1、C2并行执行最后C1、C2执行结束后C3执行 这种场景必须使用disruptor了。 import java.util.concurrent.atomic.AtomicInteger;// 1、数据 public class Trade { private String id;//ID private String name;private double price;//金额 private AtomicInteger count new AtomicInteger(0);public String getId() {return id;}public void setId(String id) {this.id id;}public String getName() {return name;}public void setName(String name) {this.name name;}public double getPrice() {return price;}public void setPrice(double price) {this.price price;}public AtomicInteger getCount() {return count;}public void setCount(AtomicInteger count) {this.count count;} } import com.lmax.disruptor.EventTranslator; import com.lmax.disruptor.dsl.Disruptor;import java.util.Random; import java.util.concurrent.CountDownLatch; // 2、生产者 public class TradePublisher implements Runnable { DisruptorTrade disruptor; private CountDownLatch latch; private static int LOOP10;//模拟百万次交易的发生 public TradePublisher(CountDownLatch latch,DisruptorTrade disruptor) { this.disruptordisruptor; this.latchlatch; } Override public void run() { TradeEventTranslator tradeTransloator new TradeEventTranslator(); for(int i0;iLOOP;i){ disruptor.publishEvent(tradeTransloator); } latch.countDown(); } }class TradeEventTranslator implements EventTranslatorTrade{ private Random randomnew Random(); Override public void translateTo(Trade event, long sequence) { this.generateTrade(event); } private Trade generateTrade(Trade trade){ trade.setPrice(random.nextDouble()*9999); return trade; } } // 3、以下是消费者 import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.WorkHandler;public class Handler1 implements EventHandlerTrade,WorkHandlerTrade { Override public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception { this.onEvent(event); } Override public void onEvent(Trade event) throws Exception { System.out.println(handler1: set name);event.setName(h1);Thread.sleep(1000);} } import com.lmax.disruptor.EventHandler;public class Handler2 implements EventHandlerTrade { Override public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception { System.out.println(handler2: set price);event.setPrice(17.0);Thread.sleep(1000);} } import com.lmax.disruptor.EventHandler;public class Handler3 implements EventHandlerTrade {Override public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception { System.out.println(handler3: name: event.getName() , price: event.getPrice() ; instance: event.toString());} }import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.WorkHandler;public class Handler4 implements EventHandlerTrade,WorkHandlerTrade { Override public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception { this.onEvent(event); } Override public void onEvent(Trade event) throws Exception { System.out.println(handler4: get name : event.getName());event.setName(event.getName() h4);} } import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.WorkHandler;public class Handler5 implements EventHandlerTrade,WorkHandlerTrade { Override public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception { this.onEvent(event); } Override public void onEvent(Trade event) throws Exception { System.out.println(handler5: get price : event.getPrice());event.setPrice(event.getPrice() 3.0);} } import com.lmax.disruptor.BusySpinWaitStrategy; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType;import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;// 编码 public class Main { public static void main(String[] args) throws InterruptedException { long beginTimeSystem.currentTimeMillis(); int bufferSize1024; ExecutorService executorExecutors.newFixedThreadPool(8); DisruptorTrade disruptor new DisruptorTrade(new EventFactoryTrade() { Override public Trade newInstance() { return new Trade(); } }, bufferSize, executor, ProducerType.SINGLE, new BusySpinWaitStrategy()); //菱形操作/**//使用disruptor创建消费者组C1,C2 EventHandlerGroupTrade handlerGroup disruptor.handleEventsWith(new Handler1(), new Handler2());//声明在C1,C2完事之后执行JMS消息发送操作 也就是流程走到C3 handlerGroup.then(new Handler3());*///顺序操作/**disruptor.handleEventsWith(new Handler1()).handleEventsWith(new Handler2()).handleEventsWith(new Handler3());*///六边形操作. /**Handler1 h1 new Handler1();Handler2 h2 new Handler2();Handler3 h3 new Handler3();Handler4 h4 new Handler4();Handler5 h5 new Handler5();disruptor.handleEventsWith(h1, h2);disruptor.after(h1).handleEventsWith(h4);disruptor.after(h2).handleEventsWith(h5);disruptor.after(h4, h5).handleEventsWith(h3);*/disruptor.start();//启动 CountDownLatch latchnew CountDownLatch(1); //生产者准备 executor.submit(new TradePublisher(latch, disruptor));latch.await();//等待生产者完事. disruptor.shutdown(); executor.shutdown(); System.out.println(总耗时:(System.currentTimeMillis()-beginTime)); } }
http://www.huolong8.cn/news/393998/

相关文章:

  • 网站设计规划书例子信息流广告剪辑
  • 品牌网站官网知乎关键词优化软件
  • 小企业网站价格沈阳网站建设哪家做得好啊
  • 男男做h的视频网站wordpress主题评论
  • 国际网站开发客户的技巧c2c电子商务网站的功能
  • 东莞石龙网站建设莞网站制作二手交易平台网站的建设
  • 数字域名做网站域名及密码登录域名管理网站
  • 大学网站 作风建设专题比特币wordpress插件
  • 互联网做网站大庆做流产油城女子网站
  • 外贸网站建设哪家公司比较好锦绣江南网站建设
  • 杭州网站建设公司哪家好黄石企业网站设计
  • 潍坊网站优化公司哪家好平台软件开发
  • 做乐高肖像的网站南昌网站设计公司
  • 网站静态页模板wordpress 浮动留言框
  • 云南网站建设天软科技网站建设柒首先金手指2
  • 专业站店铺推广文案简短
  • wordpress个人网站后台登陆做美工用的网站
  • 贵阳白云区城乡建设局网站在线建网站
  • 山东鲁桥建设有限公司网站中山快速做网站价格
  • 濮阳住房建设厅网站网络营销推广的方式方法有哪些
  • 网站专栏的作用产品设计工具
  • 想做电商网站运营要怎么做怎么建设销售网站
  • 微信建设银行官方网站泉州人才网
  • 山东城市建设厅网站公司网站能自己做二维码
  • 昆山住房和城乡建设局网站首页网站的页脚什么做
  • 建网站啦Wordpress删除主题的
  • 溧阳网站建设中心备案网站分布地点
  • 做网站前期需要准备什么网站logo更换
  • 有做lol直播网站2024最火的十大新闻
  • 品牌建设网站例子做推广的免费的济宁网站有哪些