如何做网站毕业设计,机电建设工程施工网站,安徽省建筑人员信息网,网站布局技术在Java中#xff0c;经典的生产者#xff0d;消费者模式相对简单#xff0c;因为我们有java.util.concurrent.BlockingQueue 。 为了避免繁忙的等待和容易出错的手动锁定#xff0c;我们只需利用put()和take() 。 如果队列已满或为空#xff0c;它们都将阻塞。 我们需要的… 在Java中经典的生产者消费者模式相对简单因为我们有java.util.concurrent.BlockingQueue 。 为了避免繁忙的等待和容易出错的手动锁定我们只需利用put()和take() 。 如果队列已满或为空它们都将阻塞。 我们需要的是一堆线程共享对同一队列的引用一些正在生产而其他正在消耗。 当然队列必须具有有限的容量否则如果生产者的表现优于消费者我们很快就会用光内存。 格雷格·扬Greg Young在波兰Devoxx期间对这条规则的强调不够 永远不要创建无限队列 使用 这是最简单的例子。 首先我们需要一个将对象放在共享队列中的生产者 import lombok.Value;
import lombok.extern.slf4j.Slf4j;Slf4j
Value
class Producer implements Runnable {private final BlockingQueueUser queue;Overridepublic void run() {try {while (!Thread.currentThread().isInterrupted()) {final User user new User(User System.currentTimeMillis());log.info(Producing {}, user);queue.put(user);TimeUnit.SECONDS.sleep(1);}} catch (Exception e) {log.error(Interrupted, e);}}
} 生产者只需每秒将User类的实例无论它是什么发布到给定队列。 显然在现实生活中将User在队列中是系统中某些操作例如用户登录的结果。 同样消费者从队列中获取新项目并进行处理 Slf4j
Value
class Consumer implements Runnable {private final BlockingQueueUser queue;Overridepublic void run() {try {while (!Thread.currentThread().isInterrupted()) {final User user queue.take();log.info(Consuming: {}, user);}} catch (Exception e) {log.error(Interrupted, e);}}
} 再次在现实生活中处理将意味着存储在数据库中或对用户运行某些欺诈检测。 我们使用队列将处理线程与消耗线程解耦例如减少延迟。 为了运行一个简单的测试让我们启动几个生产者和消费者线程 BlockingQueueUser queue new ArrayBlockingQueue(1_000);
final ListRunnable runnables Arrays.asList(new Producer(queue),new Producer(queue),new Consumer(queue),new Consumer(queue),new Consumer(queue)
);final ListThread threads runnables.stream().map(runnable - new Thread(runnable, threadName(runnable))).peek(Thread::start).collect(toList());TimeUnit.SECONDS.sleep(5);
threads.forEach(Thread::interrupt);//...private static String threadName(Runnable runnable) {return runnable.getClass().getSimpleName() - System.identityHashCode(runnable);
} 我们有2个生产者和3个消费者似乎一切正常。 在现实生活中您可能会有一些隐式生产者线程例如HTTP请求处理线程。 在使用者方面您很可能会使用线程池。 这种模式效果很好但是特别是在消费方面是很底层的。 介绍 本文的目的是介绍一种抽象其行为类似于生产者方的队列但表现为来自消费者方的RxJava的Observable 。 换句话说我们可以将添加到队列中的对象视为可以在客户端映射过滤撰写等的流。 有趣的是这不再是排在后面的队列。 ObservableQueueT仅将所有新对象直接转发给订阅的使用者并且在没有人监听“可观察到的” 热 的情况下不缓冲事件。 ObservableQueueT本身并不是队列它只是一个API与另一个API之间的桥梁。 它类似于java.util.concurrent.SynchronousQueue 但是如果没有人对使用感兴趣则将对象简单地丢弃。 这是第一个实验性实现。 这只是一个玩具代码不要认为它已准备就绪。 另外我们稍后将对其进行简化 public class ObservableQueueT implements BlockingQueueT, Closeable {private final SetSubscriber? super T subscribers Collections.newSetFromMap(new ConcurrentHashMap());private final ObservableT observable Observable.create(subscriber - {subscriber.add(new Subscription() {Overridepublic void unsubscribe() {subscribers.remove(subscriber);}Overridepublic boolean isUnsubscribed() {return false;}});subscribers.add(subscriber);});public ObservableT observe() {return observable;}Overridepublic boolean add(T t) {return offer(t);}Overridepublic boolean offer(T t) {subscribers.forEach(subscriber - subscriber.onNext(t));return true;}Overridepublic T remove() {return noSuchElement();}Overridepublic T poll() {return null;}Overridepublic T element() {return noSuchElement();}private T noSuchElement() {throw new NoSuchElementException();}Overridepublic T peek() {return null;}Overridepublic void put(T t) throws InterruptedException {offer(t);}Overridepublic boolean offer(T t, long timeout, TimeUnit unit) throws InterruptedException {return offer(t);}Overridepublic T take() throws InterruptedException {throw new UnsupportedOperationException(Use observe() instead);}Overridepublic T poll(long timeout, TimeUnit unit) throws InterruptedException {return null;}Overridepublic int remainingCapacity() {return 0;}Overridepublic boolean remove(Object o) {return false;}Overridepublic boolean containsAll(Collection? c) {return false;}Overridepublic boolean addAll(Collection? extends T c) {c.forEach(this::offer);return true;}Overridepublic boolean removeAll(Collection? c) {return false;}Overridepublic boolean retainAll(Collection? c) {return false;}Overridepublic void clear() {}Overridepublic int size() {return 0;}Overridepublic boolean isEmpty() {return true;}Overridepublic boolean contains(Object o) {return false;}Overridepublic IteratorT iterator() {return Collections.emptyIterator();}Overridepublic Object[] toArray() {return new Object[0];}Overridepublic T T[] toArray(T[] a) {return a;}Overridepublic int drainTo(Collection? super T c) {return 0;}Overridepublic int drainTo(Collection? super T c, int maxElements) {return 0;}Overridepublic void close() throws IOException {subscribers.forEach(rx.Observer::onCompleted);}
} 关于它有两个有趣的事实 我们必须跟踪所有订户即愿意接收新商品的消费者。 如果其中一个订阅者不再感兴趣我们必须删除该订阅者否则会发生内存泄漏请继续阅读 此队列的行为就好像它始终为空。 它永远不会保存任何项目–当您将某些内容放入此队列时它会自动传递给订阅者并被遗忘 从技术上讲此队列是无界的这意味着您可以根据需要放置任意数量的项目。 但是由于将项目传递给所有订户如果有并立即丢弃因此此队列实际上始终为空请参见上文 生产者可能仍会生成太多事件而消费者可能无法跟上这一步– RxJava现在具有背压支持本文未介绍。 假设我正确实现了队列协定生产者可以像使用其他BlockingQueueT一样使用ObservableQueueT 。 但是消费者看起来更轻巧更聪明 final ObservableQueueUser users new ObservableQueue();
final ObservableUser observable users.observe();users.offer(new User(A));
observable.subscribe(user - log.info(User logged in: {}, user));
users.offer(new User(B));
users.offer(new User(C)); 上面的代码仅打印B和C 。 由于ObservableQueue会在没有人监听的情况下丢弃项目因此设计会丢失A 。 显然 Producer类现在使用users队列。 一切正常您可以随时调用users.observe()并应用数十个Observable运算符之一。 但是有一个警告默认情况下RxJava不执行任何线程处理因此消耗与产生线程在同一线程中发生 我们失去了生产者-消费者模式的最重要特征即线程去耦。 幸运的是RxJava中的所有内容都是声明性的线程调度也是如此 users.observe().observeOn(Schedulers.computation()).forEach(user -log.info(User logged in: {}, user)); 现在让我们看一下RxJava的真正功能。 假设您要计算每秒登录的用户数其中每个登录都作为事件放入队列中 users.observe().map(User::getName).filter(name - !name.isEmpty()).window(1, TimeUnit.SECONDS).flatMap(Observable::count).doOnCompleted(() - log.info(System shuts down)).forEach(c - log.info(Logins in last second: {}, c)); 性能也是可以接受的这样的队列每秒可以在我的一个订户的笔记本电脑上接受约300万个对象。 将此类视为使用队列到现代反应世界的旧系统的适配器。 可是等等 使用ObservableQueueT很容易但是使用subscribers同步集的实现似乎太底层了。 幸运的是有SubjectT, T 。 Subject是Observable “另一面” –您可以将事件推送到Subject但是它仍然实现Observable 因此您可以轻松地创建任意Observable 。 使用Subject实现之一 ObservableQueue外观如何 public class ObservableQueueT implements BlockingQueueT, Closeable {private final SubjectT, T subject PublishSubject.create();public ObservableT observe() {return subject;}Overridepublic boolean add(T t) {return offer(t);}Overridepublic boolean offer(T t) {subject.onNext(t);return true;}Overridepublic void close() throws IOException {subject.onCompleted();}Overridepublic T remove() {return noSuchElement();}Overridepublic T poll() {return null;}Overridepublic T element() {return noSuchElement();}private T noSuchElement() {throw new NoSuchElementException();}Overridepublic T peek() {return null;}Overridepublic void put(T t) throws InterruptedException {offer(t);}Overridepublic boolean offer(T t, long timeout, TimeUnit unit) throws InterruptedException {return offer(t);}Overridepublic T take() throws InterruptedException {throw new UnsupportedOperationException(Use observe() instead);}Overridepublic T poll(long timeout, TimeUnit unit) throws InterruptedException {return null;}Overridepublic int remainingCapacity() {return 0;}Overridepublic boolean remove(Object o) {return false;}Overridepublic boolean containsAll(Collection? c) {return false;}Overridepublic boolean addAll(Collection? extends T c) {c.forEach(this::offer);return true;}Overridepublic boolean removeAll(Collection? c) {return false;}Overridepublic boolean retainAll(Collection? c) {return false;}Overridepublic void clear() {}Overridepublic int size() {return 0;}Overridepublic boolean isEmpty() {return true;}Overridepublic boolean contains(Object o) {return false;}Overridepublic IteratorT iterator() {return Collections.emptyIterator();}Overridepublic Object[] toArray() {return new Object[0];}Overridepublic T T[] toArray(T[] a) {return a;}Overridepublic int drainTo(Collection? super T c) {return 0;}Overridepublic int drainTo(Collection? super T c, int maxElements) {return 0;}} 上面的实现更加简洁我们完全不必担心线程同步。 翻译自: https://www.javacodegeeks.com/2015/07/consuming-java-util-concurrent-blockingqueue-as-rx-observable.html