生产者生产速度和消费者消费速度不平衡,利用一个阻塞队列实现缓冲区,比如,如果生产者速度较快,当队列已满,就不再生产。同理,队列空的时候,消费者不再消费进入睡眠等到生产者有生产的时候才开始消费。所以
1. 这个模式实现了平衡两者间处理能力
2. 实现生产者和消费者逻辑间解耦
import java.util.Stack;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 1个生产者 3个消费者 生产、消费10次
*
* @作者 pt
*
*/
public class ProducerConsumer {
Stack<Integer> items = new Stack<Integer>();
final static int NO_ITEMS = 10;
public static void main(String args[]) {
ProducerConsumer pc = new ProducerConsumer();
Thread t1 = new Thread(pc.new Producer());
Consumer consumer = pc.new Consumer();
Thread t2 = new Thread(consumer);
Thread t3 = new Thread(consumer);
Thread t4 = new Thread(consumer);
t1.start();
try {
Thread.sleep(100);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
t2.start();
t3.start();
t4.start();
try {
t2.join();
t3.join();
t4.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
class Producer implements Runnable {
public void produce(int i) {
System.out.println("Producing " + i);
items.push(new Integer(i));
}
@Override
public void run() {
int i = 0;
// 生产10次
while (i++ < NO_ITEMS) {
synchronized (items) {
produce(i);
items.notifyAll();
}
try {
// 休眠一段时间
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
}
}
class Consumer implements Runnable {
// consumed计数器允许线程停止
AtomicInteger consumed = new AtomicInteger();
public void consume() {
if (!items.isEmpty()) {
System.out.println(Thread.currentThread().getId()+" Consuming " + items.pop());
consumed.incrementAndGet();
}
}
private boolean theEnd() {
return consumed.get() >= NO_ITEMS;
}
@Override
public void run() {
while (!theEnd()) {
synchronized (items) {
while (items.isEmpty() && (!theEnd())) {
try {
//wait same time as Producer, 主要目的是sleep,减少CPU资源消耗
items.wait(1000);
} catch (InterruptedException e) {
Thread.interrupted();
}
}
consume();
}
}
}
}
}
ref: https://zh.wikipedia.org/wiki/%E7%94%9F%E4%BA%A7%E8%80%85%E6%B6%88%E8%B4%B9%E8%80%85%E9%97%AE%E9%A2%98
http://www.infoq.com/cn/articles/producers-and-consumers-mode
No comments:
Post a Comment