2021年6月26日星期六

并发王者课-铂金05:致胜良器-无处不在的“阻塞队列”究竟是何面目

欢迎来到《并发王者课》,本文是该系列文章中的第18篇。在线程的同步中,阻塞队列是一个绕不过去的话题,它是同步器底层的关键。所以,我们在本文中将为你介绍阻塞队列的基本原理,以了解它的工作机制和它在Java中的实现。本文稍微有点长,建议先了解大纲再细看章节。

欢迎来到《[并发王者课](https://juejin.cn/post/6967277362455150628)》,本文是该系列文章中的**第18篇**。

在线程的同步中,**阻塞队列**是一个绕不过去的话题,它是同步器底层的关键。所以,我们在本文中将为你介绍阻塞队列的基本原理,以了解它的工作机制和它在Java中的实现。本文稍微有点长,建议先了解大纲再细看章节。

## 一、阻塞队列介绍

在生活中,相信你一定见过下图的人山人海,也见过其中的秩序井然。**混乱,是失控的开始**。想想看,在没有秩序的情况下,拥挤的人流蜂拥而上十分危险,轻则挤出一身臭汗,重则造成踩踏事故。**而秩序,则让情况免于混乱,排好队大家都舒服**。

![](https://writting.oss-cn-beijing.aliyuncs.com/2021/06/24/16245404441166.jpg)

**面对人流,我们通过排队解决混乱。而面对多线程,我们也通过队列让线程间免于混乱,这就是阻塞队列为何而存在。**


所谓阻塞队列,你可以理解它是这样的一种队列:
* **当线程试着往队列里放数据时,如果它已经满了,那么线程将进入等待**;
* **而当线程试着从队列里取数据时,如果它已经空了,那么线程将进入等待**。

下面这张图展示了多线程是如何通过阻塞队列进行协作的:
![](https://writting.oss-cn-beijing.aliyuncs.com/2021/06/24/16245386781811.jpg)

从图中可以看到,对于阻塞队列数据的读写并不局限于单个线程,往往存在多个线程的竞争。

## 二、实现简单的阻塞队列

接下来我们先抛开JUC中复杂的阻塞队列,来设计一个简单的阻塞队列,以了解它的核心思想。

在下面的阻塞队列中,我们设计一个队列`queue`,并通过`limit`字段限定它的容量。`enqueue()`方法用于向队列中放入数据,如果队列已满则等待;而`dequeue()`方法则用于从数据中取出数据,如果队列为空则等待。

```java
public class BlockingQueue {
private final List<Object> queue = new LinkedList<>();
private final int limit;

public BlockingQueue(int limit) {
this.limit = limit;
}

public synchronized void enqueue(Object item) throws InterruptedException {
while (this.queue.size() == this.limit) {
print("队列已满,等待中...");
wait();
}
this.queue.add(item);
if (this.queue.size() == 1) {
notifyAll();
}
print(item, "已经放入!");
}


public synchronized Object dequeue() throws InterruptedException {
while (this.queue.size() == 0) {
print("队列空的,等待中...");
wait();
}
if (this.queue.size() == this.limit) {
notifyAll();
}
Object item = this.queue.get(0);
print(item, "已经拿到!");
return this.queue.remove(0);
}

public static void print(Object... args) {
StringBuilder message = new StringBuilder(getThreadName() + ":");
for (Object arg : args) {
message.append(arg);
}
System.out.println(message);
}

public static String getThreadName() {
return Thread.currentThread().getName();
}
}
```

定义`lanLingWang`线程向队列中放入数据,`niumo`线程从队列中取出数据。

```java
public static void main(String[] args) {
BlockingQueue blockingQueue = new BlockingQueue(1);
Thread lanLingWang = new Thread(() -> {
try {
String[] items = { "A", "B", "C", "D", "E" };
for (String item: items) {
Thread.sleep(500);
blockingQueue.enqueue(item);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
lanLingWang.setName("兰陵王");
Thread niumo = new Thread(() -> {
try {
while (true) {
blockingQueue.dequeue();
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
lanLingWang.setName("兰陵王");
niumo.setName("牛魔王");

lanLingWang.start();
niumo.start();
}
```

运行结果如下:

```shell
牛魔王:队列空的,等待中...
兰陵王:A已经放入!
牛魔王:A已经拿到!
兰陵王:B已经放入!
牛魔王:B已经拿到!
兰陵王:C已经放入!
兰陵王:队列已满,等待中...
牛魔王:C已经拿到!
兰陵王:D已经放入!
兰陵王:队列已满,等待中...
牛魔王:D已经拿到!
兰陵王:E已经放入!
牛魔王:E已经拿到!
牛魔王:队列空的,等待中...

```

从结果中可以看到,设计的阻塞队列已经可以有效工作,你可以仔细地品一品输出的结果。当然,这个阻塞是极其简单的,在下面一节中,我们将介绍Java中的阻塞队列设计。

## 三、Java中的BlockingQueue

Java中的阻塞队列有两个核心接口:**BlockingQueue**和**BlockingDeque**,相关的接口实现设继承关系如下图所示。相比于上一节中我们自定义的阻塞队列,Java中的实现要复杂很多。不过,你不必为此担心,**理解阻塞队列最重要的是理解它的思想和实现的思路,况且Java中的实现其实很有意思,读起来也比较轻松**。

从图中可以看出,BlockingQueue接口继承了**Queue**接口和**Collection**接口,并有LinkedBlockingQueue和ArrayBlockingQueue两种实现。这里有个有意思的地方,**继承**Queue**接口很容易理解,可以为什么要继承**Collection**接口?先卖个关子,你可以思考一会,稍后会给出答案**。

![](https://writting.oss-cn-beijing.aliyuncs.com/2021/06/24/16245372433056.jpg)

### 1. 核心方法

BlockingQueue中义了关于阻塞队列所需要的一系列方法,它们彼此之间看起来很像,从表面上看不出明显的差别。对于这些方法,你不必死记硬背,下图的表格中将这些方法分为了**A、B、C、D**这四种类型,分类之后再去理解它们会容易很多:


|类型|A 抛出异常|B 返回特定值| C 阻塞|D 超时限定|
|---|---|---|---|---|
| Insert |` add(e)` | `offer(e)` | `put(e)` | `offer(e, time, unit)` |
| Remove | `remove()` | `poll()` | `take(`) | `poll(time, unit)` |
| Examine | `Element()` | `peek()` | -- | -- |

其中部分关键方法的解释如下:

* `add(E e)`:在不违反容量限制的前提下,向队列中插入数据。**如果成功,返回true,否则抛出异常**;
* `offer(E e)`:在不违反容量限制的前提下,向队列中插入数据。**如果成功,返回`true`,否则返回`false`**;
* `offer(E e, long timeout, TimeUnit unit)`:如果队列中没有足够的空间,将等待一段时间;
* `put(E e)`:在不违反容量限制的前提下,向队列中插入数据。**如果没有足够的空间,将进入等待**;
* `poll(long timeout, TimeUnit unit)`:从队列的头部获取数据,并移除数据。如果没有数据的话,将会等待指定的时间;
* `take()`:从队列的头部获取数据并移除。如果没有可用数据,将进入等待

将这些方法填入前面的那张图,它应该长这样:
![](https://writting.oss-cn-beijing.aliyuncs.com/2021/06/24/16245387167728.jpg)

### 2. LinkedBlockingQueue

LinkedBlockingQueue实现了BlockingQueue接口,遵从先进先出(FIFO)的原则,提供了可选的有界阻塞队列( Optionally Bounded )的能力,并且是线程安全的。

* **核心数据结构**
* `int capacity`: 设定队列容量;
* `Node<E> head`: 队列的头部元素;
* `Node<E> last`: 队列的尾部元素;
* `AtomicInteger count`: 队列中元素的总数统计。

LinkedBlockingQueue的数据结构并不复杂,不过需要注意的是,数据结构中并不包含List,仅有`head`和`last`两个Node,设计上比较巧妙。

* **核心构造**
* `LinkedBlockingQueue()`: 空构造;
* `LinkedBlockingQueue(int capacity)`: 指定容量构造。
* **线程安全性**
* `ReentrantLock takeLock`: 获取元素时的锁;
* `ReentrantLock putLock`: 写入元素时的锁。

**注意,LinkedBlockingQueue有两把锁,读取和写入的锁是分离的**!这和下面的ArrayBlockingQueue并不相同。

下面截取了LinkedBlockingQueue中读写的部分代码,值得你仔细品一品。**品的时候,要重点关注两把锁的使用和读写时数据结构是如何变化的**。

* **队列插入示例代码分析**

```java
public boolean add(E e) {
addLast(e);
return true;
}

public void addLast(E e) {
if (!offerLast(e))
throw new IllegalStateException("Deque full");
}

public boolean offerFirst(E e) {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
return linkFirst(node);
} finally {
lock.unlock();
}
}
```
* **队列读取示例代码分析**

```java
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
return pollFirst(timeout, unit);
}
public E pollFirst(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
E x;
while ( (x = unlinkFirst()) == null) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return x;
} finally {
lock.unlock();
}
}
```

最后说下LinkedBlockingQueue为什么要继承Collection接口。我们知道,Collection接口有`remove()`这样的移除方法,而这些方法在队列中也是有使用场景的。比如,你把一个数据错误地放入了队列,或者你需要移除已经失效的数据,那么Collection的一些方法就派上了用场。

### 3. ArrayBlockingQueue

ArrayBlockingQueue是BlockingQueue接口的另外一种实现,**它与LinkedBlockingQueue在设计目标上的的关键不同,在于它是有界的**。

* **核心数据结构**

* `Object[] items`: 队列元素集合;
* `int takeIndex`: 下次获取数据时的索引位置;
* `int putIndex`: 下次写入数据时的索引位置;
* `int count`: 队列总量计数。

从数据结构中可以看出,ArrayBlockingQueue使用的是数组,而数组是有界的。

* **核心构造**
* `ArrayBlockingQueue(int capacity)`: 限定容量的构造;
* `ArrayBlockingQueue(int capacity, boolean fair)`: 限定容量和公平性,默认是不公平的;
* `ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c)`:带有初始化队列元素的构造。


* **线程安全性**

* `ReentrantLock lock`:队列读取和写入的锁。

在读写锁方面,前面已经说过,LinkedBlockingQueue和ArrayBlockingQueue是不同的,ArrayBlockingQueue只有一把锁,读写用的都是它。

* **队列写入示例代码分析**

```java
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}

private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
```

下面截取了ArrayBlockingQueue中读写的部分代码,值得你仔细品一品。**品的时候,要重点关注读写锁的使用和读写时数据结构是如何变化的**。

* **队列读取示例代码分析**

```java
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
lock.unlock();
}
}

private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == ite......

原文转载:http://www.shaoqun.com/a/831875.html

跨境电商:https://www.ikjzd.com/

赛兔:https://www.ikjzd.com/w/2375

五洲会:https://www.ikjzd.com/w/1068

吉祥邮:https://www.ikjzd.com/w/1565


欢迎来到《并发王者课》,本文是该系列文章中的第18篇。在线程的同步中,阻塞队列是一个绕不过去的话题,它是同步器底层的关键。所以,我们在本文中将为你介绍阻塞队列的基本原理,以了解它的工作机制和它在Java中的实现。本文稍微有点长,建议先了解大纲再细看章节。欢迎来到《[并发王者课](https://juejin.cn/post/6967277362455150628)》,本文是该系列文章中的**第18
barclays:https://www.ikjzd.com/w/2775
蜜芽宝贝:https://www.ikjzd.com/w/1320
全球速卖通:https://www.ikjzd.com/w/81
2019年亚马逊运营思路:多SKU,自发货,零库存:https://www.ikjzd.com/articles/14588
亚马逊已经开始下架未经授权的苹果产品!:https://www.ikjzd.com/articles/14590
亚马逊未经授权卖家来势汹汹,品牌和制造商如何克敌制胜?:https://www.ikjzd.com/articles/14591
用亚马逊卖家的底子,快速玩转爆赚Shopee!:https://www.ikjzd.com/articles/14592
口述:与准嫂子销魂一夜她怀上我的孩子:http://lady.shaoqun.com/a/38671.html
2021暑假杭州哪里好玩不热 杭州暑期好玩凉快的地方 :http://www.30bags.com/a/458161.html
提高转化的亚马逊主图设置技巧!必收藏!:https://www.ikjzd.com/articles/146092
超级实用!怎么认识一个有钱有上进心的高素质男人?:http://lady.shaoqun.com/a/389458.html
正常的30岁情侣一周做爱几次合适?:http://lady.shaoqun.com/a/389459.html

没有评论:

发表评论

跨境电商资讯:外贸宣传平台有哪些(出口的

现在很多做外贸的人都非常关注 外贸企业怎么推广 ,而现在推广的途径和平台有很多,企业如果都做,成本和时间精力是一个问题,而且并不是所有的推广渠道都是有用的。今天云程网络就来为大家盘点几个有效的外贸推广渠道。 一、海外社交媒体营销 Facebook,领英等海外社交媒体营销在近几年得...