一、阻塞队列
阻塞队列,顾名思义,首先它是一个队列,而一个阻塞队列在数据结构中所起的作用大致如下图所示:
当阻塞队列是空时,从队列中获取元素的操作将会被阻塞;
当阻塞队列是满时,向队列中添加元素的操作将会被阻塞;
试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其他的线程往空的队列插入新的元素。同样,试图往已满的阻塞队列中添加新元素的线程也同样会被阻塞,直到其他线程从队列中移除一个或者多个元素或者完全清空队列后时队列重新变得空闲起来。
二、为什么用阻塞队列?有什么好处?
在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤醒。
好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQueue都帮我们做好了。在concurrent包发布以嵌,在多线程环境下,每个程序员都必须自己去控制这些实现细节,尤其还要兼顾效率和线程安全,而这会给程序带来不小的复杂度。
三、BlockingQueue的核心方法
也就是说 ,实现同样的功能,选用不同的方法,那么内部处理的方式不同。
四、架构梳理+种类分析
五、用在哪里?
1.生产者消费者模式
2.线程池
3.消息中间件
(1)传统版生产者消费者模式
class ShareData{ // 资源类 private int number = 0; private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); public void addNumber(){ try { lock.lock(); // 1.判断,避免虚假唤醒 while (number != 0){ // 不能生产 condition.await(); } // 2.执行操作 number++; System.out.println(Thread.currentThread().getName() + "\t" + number); condition.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public void decrementNumber(){ try { lock.lock(); // 1.判断,避免虚假唤醒 while (number == 0){ // 不能消费 condition.await(); } // 2.执行操作 number--; System.out.println(Thread.currentThread().getName() + "\t" + number); condition.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }}public class ProducerConsumerTraditional { public static void main(String[] args) { ShareData shareData = new ShareData(); new Thread(()->{ for (int i = 1; i <= 5; i++) { shareData.addNumber(); } }, "AAA").start(); new Thread(()->{ for (int i = 1; i <= 5; i++) { shareData.decrementNumber(); } }, "BBB").start(); }}
输出结果:
AAA 1BBB 0AAA 1BBB 0AAA 1BBB 0AAA 1BBB 0AAA 1BBB 0Process finished with exit code 0
(2)阻塞队列版本生产者消费者模式
class BlockingQueueDemo{ private AtomicInteger integer = new AtomicInteger(); private BlockingQueueblockingQueue = null; private volatile boolean FLAG = true; public BlockingQueueDemo(BlockingQueue blockingQueue) { this.blockingQueue = blockingQueue; } public void produce(){ String data = null; boolean retValue; while (FLAG){ data = String.valueOf(integer.incrementAndGet()) ; retValue = blockingQueue.offer(data); if (retValue){ System.out.println(Thread.currentThread().getName() + "\t插入队列" + data + "成功"); }else { System.out.println(Thread.currentThread().getName() + "\t插入队列" + data + "失败"); } } } public void consume(){ String data = null; boolean retValue; while (FLAG){ data = blockingQueue.poll(); if (data == null){ FLAG = false; return; }else { System.out.println(Thread.currentThread().getName() + "\t消费队列" + data + "成功"); } } } public void stop(){ this.FLAG = false; }}public class ProducerConsumerBlockingQueue { public static void main(String[] args) throws InterruptedException { BlockingQueue blockingQueue = new ArrayBlockingQueue (5); BlockingQueueDemo blockingQueueDemo = new BlockingQueueDemo(blockingQueue); new Thread(()->{ blockingQueueDemo.produce(); }, "Producer").start(); new Thread(()->{ blockingQueueDemo.consume(); }, "Consumer").start(); TimeUnit.SECONDS.sleep(5); blockingQueueDemo.stop(); }}