Java中怎么使用BlockingQueue实现并发,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。
成都创新互联公司从2013年创立,先为山东等服务建站,山东等地企业,进行企业商务咨询服务。为山东企业网站制作PC+手机+微官网三网同步一站式服务解决您的所有建站问题。
阻塞队列(BlockingQueue)是一个支持两种附加操作的队列。支持附加阻塞的插入和移除操作。
支持阻塞的插入:当队列满时,插入操作会被阻塞,直到队列不满。
支持阻塞的移除:当队列空时,移除操作会被阻塞,直到队列不空。
阻塞队列不可用时,操作处理方式
方法\处理方式 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
---|---|---|---|---|
插入方法 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
移除方法 | remove() | poll() | take() | poll(time, unit) |
检查方法 | element() | peek() | 无 | 无 |
抛出异常:队列满时,若继续插入元素会抛出IllegalStateException
;当队列为空时,若获取元素则会抛出NoSuchElementException
异常。
返回特殊值:向队列插入元素时,会返回是否插入成功true/false;获取元素时,成功则返回元素,失败则返回null。
一直阻塞:当阻塞队列满时,若继续使用put新增元素时会被阻塞,直到队列不为空或者响应中断退出;当阻塞队列为空时,继续使用take获取元素时会被阻塞,直到队列不为空。
超时退出:当阻塞队列满时,使用offer(e, time, unit)新增元素会被阻塞至超时退出;当队列为空时,使用poll(time, unit)获取元素时会被阻塞至超时退出。
注意:
阻塞队列中不允许插入null
,会抛出NPE异常。
可以访问阻塞队列中的任意元素,调用remove(Object o)
可以将队列之中的特定对象移除,但会遍历全部元素,并不高效。
由数组构成的有界阻塞队列,内部由数组final Object[] items
实现。默认情况下不保证线程公平的访问队列,所谓公平访问队列指阻塞的线程,可以按照阻塞的先后顺序访问队列。
public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); // 使用公平锁/非公平锁 notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
队列大小初始化后不可修改。参数fair
控制内部ReentrantLock
是否采用公平锁。
链表实现的有界阻塞队列。内部结构是单链表。默认大小为Integer.MAX_VALUE
,可以指定大小。
public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); // 指定队列大小 this.capacity = capacity; last = head = new Node(null); } // 单链表节点Node static class Node { E item; Node next; Node(E x) { item = x; } }
支持优先级的无界阻塞队列。默认情况下采取自然顺序升序排列。也可以自定义compareTo()
方法来指定元素的排列顺序,或者初始化队列时,指定构造参数Comparator
来对元素进行排序。同优先级顺序无法保证。
public PriorityBlockingQueue(int initialCapacity, Comparator super E> comparator) { if (initialCapacity < 1) throw new IllegalArgumentException(); this.lock = new ReentrantLock(); // 非公平锁 this.notEmpty = lock.newCondition(); this.comparator = comparator; this.queue = new Object[initialCapacity]; } // offer方法部分代码 Comparator super E> cmp = comparator; if (cmp == null) siftUpComparable(n, e, array); else siftUpUsingComparator(n, e, array, cmp);
由offer代码可以看出,Comparator
的优先级是大于Comparable.compareTo
方法的。
注意:PriorityBlockingQueue
不会阻塞数据生产者(队列无界),只会在没有数据时阻塞消费者。生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则将有可能耗尽堆空间。
支持延时获取元素的无界队列。队列使用PriorityQueue
实现。队列中的元素必须实现java.util.concurrent.Delayed
接口,在创建元素时指定多久才能才能从队列中取到元素。
DelayQueue非常有用,可以将DelayQueu应用在以下应用场景。
缓存系统的设计:用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能获取到元素时,表示缓存有限期到了。
定时任务调度:使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行。比如TimerQueue
就是使用DelayQueue实现的。
不存储元素的阻塞队列。每个put
操作都必须等待一个take
操作,反之亦然。
// fair为true,等待线程将以FIFO的顺序进行访问 public SynchronousQueue(boolean fair) { transferer = fair ? new TransferQueue() : new TransferStack (); }
将生产者线程处理的数据直接传递给消费者线程。队列本身不存储任何元素,非常适合传递性场景。SynchronousQueue
的吞吐量高于ArrayBlockingQueue
和LinkedBlockingQueue
。
利用Lock
锁的多条件(Condition)阻塞控制。下面简单分析下ArrayBlockingQueue
部分代码。
/** The queued items */ // 数据元素数组 final Object[] items; /** items index for next take, poll, peek or remove */ // 下一个待获取元素索引 int takeIndex; /** items index for next put, offer, or add */ // 下一个待插入元素索引 int putIndex; /** Number of elements in the queue */ // 队列中元素个数 int count; /* * Concurrency control uses the classic two-condition algorithm * found in any textbook. */ /** Main lock guarding all access */ // 所有访问的主锁 final ReentrantLock lock; /** Condition for waiting takes */ // 消费者监视器 private final Condition notEmpty; /** Condition for waiting puts */ // 生产者监视器 private final Condition notFull; // public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
// 在队列尾部插入元素,若队列已满则等待队列非满。 public void put(E e) throws InterruptedException { // 校验插入元素,为空则抛出NPE checkNotNull(e); final ReentrantLock lock = this.lock; // 1. 尝试获取锁(响应中断) lock.lockInterruptibly(); try { // 2. 当队列满时 while (count == items.length) // 2.1 若队列满,则阻塞当前线程。等待`notFull.signal()`唤醒。 notFull.await(); // 3. 非满则执行入队操作 enqueue(e); } finally { lock.unlock(); } } // 在`putIndex`处放置当前元素,只有获取lock锁后才会调用 private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; // 在`putIndex`处放置元素 items[putIndex] = x; // putIndex等于数组长度时,重置为0索引。 if (++putIndex == items.length) putIndex = 0; // 数量加1 count++; // 4. 唤醒一个等待线程(等待取元素的线程) notEmpty.signal(); }
put总体流程:
获取lock锁,拿到锁后继续执行,否则自旋竞争锁。
判断阻塞队列是否满。满了了则调用await
阻塞当前线程。同时释放lock锁。
如果没满,则调用enqueue
方法将元素put进阻塞队列。此时还有一种可能是:第2步中被阻塞的线程被唤醒且又拿到了lock锁。
唤醒一个标记为notEmpty(消费者)
的线程。
// 从头部获取元素,若队列为空则等待队列非空。 public E take() throws InterruptedException { final ReentrantLock lock = this.lock; // 1. 获取锁 lock.lockInterruptibly(); try { // 2. 当队列为空时 while (count == 0) // 2.1 当队列为空时,阻塞当前线程。等待`notEmpty.signal()`唤醒。 notEmpty.await(); // 3. 非空则进行入队操作 return dequeue(); } finally { lock.unlock(); } } // 从`takeIndex`位置获取当前元素,只有获取到lock锁后才会调用 private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") // 从`takeIndex`位置获取元素,然后清除该位置元素 E x = (E) items[takeIndex]; items[takeIndex] = null; // if (++takeIndex == items.length) takeIndex = 0; // 队列元素减1 count--; if (itrs != null) itrs.elementDequeued(); // 4. 唤醒一个标记为notFull(生产者)的线程 notFull.signal(); return x; }
take的整体流程:
获取lock锁,拿到锁则执行下一步流程;未拿到则自旋竞争锁。
当前队列是否为空,若为空则调用notEmpty.await
阻塞当前线程,同时释放锁,等待被唤醒。
若非空,则调用dequeue
进行出队操作。此时还有一种可能:第2步中的阻塞的线程被唤醒并且又拿到了lock锁。
唤醒一个被标记为notFull(生产者)的线程。
put
和take
操作都需要先获得锁,没有获得锁的线程无法进行操作。
拿到锁后,并不一定能顺利执行put
/take
操作,还需要判断队列是否可用(是否满/空),不可用则会被阻塞,并释放锁。
在2中被阻塞的线程会被唤醒,但唤醒之后依然需要拿到锁之后才能继续向下执行。否则,自旋拿锁,拿到锁后再while判断队列是否可用。
看完上述内容,你们掌握Java中怎么使用BlockingQueue实现并发的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注创新互联行业资讯频道,感谢各位的阅读!