BlockQueue
四种形式
- 抛出异常
- 返回一个特殊值(可能是null或者是false,取决于具体的操作)
- 阻塞当前执行直到其可以继续
当线程被挂起后,等待最大的时间,如果一旦超时,即使该操作依旧无法继续执行,线程也不会再继续等待下去。
注意:
BlockingQueue中是不允许添加null的,该接受在声明的时候就要求所有的实现类在接收到一个null的时候,都应该抛出NullPointerException。(poll在没有数据的时候会返回null)。
123456789public E poll() {final ReentrantLock lock = this.lock;lock.lock();try {return (count == 0) ? null : dequeue();//当count==0时,返回null} finally {lock.unlock();}}BlockingQueue是线程安全的,因此它的所有和队列相关的方法都具有原子性。但是对于那么从Collection接口中继承而来的批量操作方法,比如addAll(Collection e)等方法,BlockingQueue的实现通常没有保证其具有原子性,因此我们在使用的BlockingQueue,应该尽可能地不去使用这些方法。
BlockingQueue主要应用于生产者与消费者的模型中,其元素的添加和获取都是极具规律性的。但是对于remove(Object o)这样的方法,虽然BlockingQueue可以保证元素正确的删除,但是这样的操作会非常响应性能,因此我们在没有特殊的情况下,也应该避免使用这类方法。
ArrayBlockingQueue
基于数组的并发阻塞队列
- 构造方法12345678public ArrayBlockingQueue(int capacity, boolean fair) {if (capacity <= 0)throw new IllegalArgumentException();this.items = new Object[capacity];lock = new ReentrantLock(fair);notEmpty = lock.newCondition();notFull = lock.newCondition();}
由ReentrantLock实现,公正锁实现,为了保证List的先进先出。
除了在内部array未满的时候需要满足先进先出,还需要保证await的线程在signal的时候保证是顺序的
(todo condition 源码解析,大致理解是condition内部也是维护一个队列,在signal的时候唤醒的是firstWaiter,这样就能保证线程是顺序唤醒的)。
插入
移除
DelayQueue
DelayQueue主要用于放置实现了Delay接口的对象,其中的对象只能在其时刻到期时才能从队列中取走。这种队列是有序的,即队头的延迟到期时间最短。如果没有任何延迟到期,那么久不会有任何头元素。
重点
- 无界的BlockingQueue
- poll方法立即返回,如果没有头结点则返回null。take方法会阻塞直到有头结点才返回。
代码分析
- 成员变量1234private final transient ReentrantLock lock = new ReentrantLock();private final PriorityQueue<E> q = new PriorityQueue<E>();private Thread leader = null;private final Condition available = lock.newCondition();
内部由优先队列实现,available来控制他的阻塞和通知。
lock,还有用于优化阻塞通知的线程元素leader。
- offer(E e)1234567891011121314public boolean offer(E e) {final ReentrantLock lock = this.lock;lock.lock();try {q.offer(e);if (q.peek() == e) {leader = null;available.signal();}return true;} finally {lock.unlock();}}
在头元素为空的情况下,插入头元素将会唤醒线程。
- take()123456789101112131415161718192021222324252627282930313233public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) {E first = q.peek();if (first == null)available.await();//队列中没有元素的时候,进入等待else {long delay = first.getDelay(NANOSECONDS);if (delay <= 0)return q.poll();first = null; // don't retain ref while waitingif (leader != null)available.await();else {Thread thisThread = Thread.currentThread();leader = thisThread; //用于判断是否有线程在获取try {available.awaitNanos(delay);} finally {if (leader == thisThread)leader = null;}}}}} finally {if (leader == null && q.peek() != null)available.signal();lock.unlock();}}
|
|
想想假设现在延迟队列里面有三个对象。
- 线程A进来获取first,然后进入 else 的else ,设置了leader为当前线程A
- 线程B进来获取first,进入else的阻塞操作,然后无限期等待
- 如果线程A阻塞完毕,获取对象成功,出队,这个对象理应被GC回收,但是他还被线程B持有着,GC链可达,所以不能回收这个first.
- 假设还有线程C 、D、E.. 持有对象1引用,那么无限期的不能回收该对象1引用了,那么就会造成内存泄露.
//todo 实现一个带超时的缓存,实现一下delayedQueue
LinkedBlockingQueue
适合实现一个消费者生产者队列
代码分析
|
|
读写分离,take 和 put 不互斥。
ArrayBlockingQueue 为什么只用了一把锁。
猜想:ArrayBlockingQueue 是一个环形数组,如果用两把锁,对capacity的判断可能是失效,会导致put的时候插入了存在的数据。
而LinkedBlockingQueue 是基于链表的,添加和删除的时候是对于head和last的操作,互不相关,所以可以用两把锁来实现。
因为使用了两把锁,所以count使用了AtomicInteger来实现,保证其并发。
offer(E e)
12345678910111213141516171819202122232425262728public boolean offer(E e) {if (e == null) throw new NullPointerException();final AtomicInteger count = this.count;if (count.get() == capacity)return false; //int c = -1;Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;putLock.lock();try {if (count.get() < capacity) {enqueue(node);c = count.getAndIncrement();if (c + 1 < capacity)notFull.signal();}} finally {putLock.unlock();}if (c == 0)signalNotEmpty();return c >= 0;}private void enqueue(Node<E> node) {// assert putLock.isHeldByCurrentThread();// assert last.next == null;last = last.next = node;//last = head = new Node<E>(null); 初始化的时候head=last,所以在第一个节点插入的时候,head会被赋值}poll
123456789101112131415161718192021222324252627282930313233public E poll() {final AtomicInteger count = this.count;if (count.get() == 0)return null;E x = null;int c = -1;final ReentrantLock takeLock = this.takeLock;takeLock.lock();try {if (count.get() > 0) {x = dequeue();c = count.getAndDecrement();if (c > 1)notEmpty.signal();}} finally {takeLock.unlock();}if (c == capacity)signalNotFull();return x;}private E dequeue() {// assert takeLock.isHeldByCurrentThread();// assert head.item == null;Node<E> h = head;Node<E> first = h.next;h.next = h; // help GChead = first;E x = first.item;first.item = null;return x;}
LinkedTransferQueue
//todo
PriorityBlockingQueue
//todo
SynchronousQueue
//todo
ConcurrentQueue
//todo
ConcurrentLinkedQueue
//todo