JUC多线程系列-ArrayBlockingQueue源码分析

JUC多线程系列-ArrayBlockingQueue源码分析


背景


最近在项目中应用到后台异步任务并发应用,干脆系统的撸一遍JUC,然后应用到项目中实战.

使用阻塞队列实现生产者消费者模式


优点:阻塞队列实现生产者消费者模式超级简单,它提供开箱即用支持阻塞的方法put()和take(),开发者不需要写困惑的wait-nofity代码去实现通信。BlockingQueue 一个接口,Java5提供了不同的现实,如ArrayBlockingQueue和LinkedBlockingQueue,两者都是先进先出(FIFO)顺序。而ArrayLinkedQueue是自然有界的,LinkedBlockingQueue可选的边界。下面这是一个完整的生产者消费者代码例子,对比传统的wait、nofity代码,它更易于理解。


目录



  1. ArrayBlockingQueue介绍

  2. ArrayBlockingQueue原理和数据结构

  3. ArrayBlockingQueue函数列表

  4. ArrayBlockingQueue源码分析(JDK1.8版本)

  5. ArrayBlockingQueue示例



正文


1.ArrayBlockingQueue介绍


ArrayBlockingQueue是数组实现的线程安全的有界的阻塞队列。
线程安全是指,ArrayBlockingQueue内部通过“互斥锁”保护竞争资源,实现了多线程对竞争资源的互斥访问。而有界,则是指ArrayBlockingQueue对应的数组是有界限的。 阻塞队列,是指多线程访问竞争资源时,当竞争资源已被某线程获取时,其它要获取该资源的线程需要阻塞等待;而且,ArrayBlockingQueue是按 FIFO(先进先出)原则对元素进行排序,元素都是从尾部插入到队列,从头部开始返回。



注意:ArrayBlockingQueue不同于ConcurrentLinkedQueue,ArrayBlockingQueue是数组实现的,并且是有界限的;而ConcurrentLinkedQueue是链表实现的,是无界限的。


2.ArrayBlockingQueue原理和数据结构



  1. ArrayBlockingQueue继承于AbstractQueue,并且它实现了BlockingQueue接口。

  2. ArrayBlockingQueue内部是通过Object[]数组保存数据的,也就是说ArrayBlockingQueue本质上是通过数组实现的。ArrayBlockingQueue的大小,即数组的容量是创建ArrayBlockingQueue时指定的。

  3. ArrayBlockingQueue与ReentrantLock是组合关系,ArrayBlockingQueue中包含一个ReentrantLock对象(lock)。ReentrantLock是可重入的互斥锁,ArrayBlockingQueue就是根据该互斥锁实现“多线程对竞争资源的互斥访问”。而且,ReentrantLock分为公平锁和非公平锁,关于具体使用公平锁还是非公平锁,在创建ArrayBlockingQueue时可以指定;而且,ArrayBlockingQueue默认会使用非公平锁。

  4. ArrayBlockingQueue与Condition是组合关系,ArrayBlockingQueue中包含两个Condition对象(notEmpty和notFull)。



3.ArrayBlockingQueue函数列表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

private ArrayBlockingQueue<Integer> queue=new ArrayBlockingQueue<Integer>(size);
// 将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量),在成功时返回 true,如果此队列已满,则抛出 IllegalStateException。
private boolean add(Integer num){
return queue.add(num);
}
// 将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量),在成功时返回 true,如果此队列已满,则返回 false。
private boolean offer(Integer num){
return queue.offer(num);
}
/ 创建一个带有给定的(固定)容量和默认访问策略的 ArrayBlockingQueue。
ArrayBlockingQueue(int capacity)
// 创建一个具有给定的(固定)容量和指定访问策略的 ArrayBlockingQueue。
ArrayBlockingQueue(int capacity, boolean fair)
// 创建一个具有给定的(固定)容量和指定访问策略的 ArrayBlockingQueue,它最初包含给定 collection 的元素,并以 collection 迭代器的遍历顺序添加元素。
ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c)

//TODO 将指定的元素插入此队列的尾部,如果该队列已满,则等待可用的空间。
private void put(Integer num) throws InterruptedException {
queue.put(num);
}
// 获取但不移除此队列的头;如果此队列为空,则返回 null。
private Integer peek(){
return queue.peek();
}
// 获取并移除此队列的头,如果此队列为空,则返回 null。
private Integer poll(){
return queue.poll();
}
//TODO 获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)。
private Integer take() throws InterruptedException{
return queue.take();
}
// 从此队列中移除指定元素的单个实例(如果存在)
private boolean remove(Integer num){
return queue.remove(num);
}
// 返回此队列中元素的数量。
private int getSize(){
return queue.size();
}

4.源码分析

创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];//存放数组
lock = new ReentrantLock(fair); //互斥锁
notEmpty = lock.newCondition();//竞争条件
notFull = lock.newCondition();//竞争条件
}

添加
offer() //队列满了返回false
add() //队列满了抛出异常
put() //队列慢了,阻塞线程
取出
poll()//获取并移除此队列的头,如果此队列为空,则返回 null。
remove()//获取并移除此队列的头,如果为空,返回false
take() //获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)。
遍历
iterator() //符合Collection 接口

总结


这些API都不需要记忆,因为编程的时候,你可以查看API的实现原理.重点是要花时间把他看懂,知道为什么实现的.

5.思考题ArrayBlockingQueue示例
一个线程打印1-52,一个线程打印A-Z
要求控制台输出A12B34C56….Z5152