兴隆大院网站哪个公司做的,网站建设公司运营模式,柬埔寨网站建设运营维护,微信代运营收费标准简介#xff1a; JUC 工具包是 JAVA 并发编程的利器。本文讲述在没有 JUC 工具包帮助下#xff0c;借助原生的 JAVA 同步原语, 如何实现一个公平有界的阻塞队列。希望你也能在文后体会到并发编程的复杂之处#xff0c;以及 JUC 工具包的强。 作者 | 李新然 来源 | 阿里技术公…简介 JUC 工具包是 JAVA 并发编程的利器。本文讲述在没有 JUC 工具包帮助下借助原生的 JAVA 同步原语, 如何实现一个公平有界的阻塞队列。希望你也能在文后体会到并发编程的复杂之处以及 JUC 工具包的强。 作者 | 李新然 来源 | 阿里技术公众号
一 背景
JUC 工具包是 JAVA 并发编程的利器。
本文讲述在没有 JUC 工具包帮助下借助原生的 JAVA 同步原语, 如何实现一个公平有界的阻塞队列。
希望你也能在文后体会到并发编程的复杂之处以及 JUC 工具包的强大。
二 方法
本文使用到的基本工具
同步监听器 synchronized 方法基本和代码块级别Object 基础类的 wait, notify, notifyAll
基于以上基础工具实现公平有界的阻塞队列此处
将公平的定义限定为 FIFO 也就是先阻塞等待的请求先解除等待并不保证解除等待后执行 Action 的先后顺序确保队列的大小始终不超过设定的容量但阻塞等待的请求数不做限制
三 实现
1 基础版本
首先考虑在非并发场景下借助 ADT 实现一个基础版本
interface Queue {boolean offer(Object obj);Object poll();}
class FairnessBoundedBlockingQueue implements Queue {// 当前大小protected int size;// 容量protected final int capacity;// 头指针empty: head.next tail nullprotected Node head;// 尾指针protected Node tail;public FairnessBoundedBlockingQueue(int capacity) {this.capacity capacity;this.head new Node(null);this.tail head;this.size 0;}// 如果队列已满通过返回值标识public boolean offer(Object obj) {if (size capacity) {Node node new Node(obj);tail.next node;tail node;size;return true;}return false;}// 如果队列为空head.next null返回空元素public Object poll() {if (head.next ! null) {Object result head.next.value;head.next.value null;head head.next; // 丢弃头结点--size;return result;}return null;}class Node {Object value;Node next;Node(Object obj) {this.value obj;next null;}}
}
以上
定义支持队列的两个基础接口 poll 和 offer队列的实现采用经典实现考虑在队列空的情况下 poll 返回为空非阻塞队列在满的情况下 offer 返回 false 入队不成功无异常
需要注意的一点在出队时本文通过迁移头结点的方式实现避免修改尾结点。 在下文实现并发版本时会看到此处的用意。
2 并发版本
如果在并发场景下上述的实现面临一些问题同时未实现给定的一些需求。
通过添加 synchronized 保证并发条件下的线程安全问题。
注意此处做同步的原因是为了保证类的不变式。
并发问题
在并发场景下基础版本的实现面临的问题包括原子性可见性和指令重排的问题。
参考 JMM 的相关描述。
并发问题最简单的解决方法是通过 synchronized 加锁一次性解决问题。
// 省略接口定义
class BoundedBlockingQueue implements Queue {// 当前大小protected int size;// 容量protected final int capacity;// 头指针empty: head.next tail nullprotected Node head;// 尾指针protected Node tail;public BoundedBlockingQueue(int capacity) {this.capacity capacity;this.head new Node(null);this.tail head;this.size 0;}// 如果队列已满通过返回值标识public synchronized boolean offer(Object obj) {if (size capacity) {Node node new Node(obj);tail.next node;tail node;size;return true;}return false;}// 如果队列为空head.next null返回空元素public synchronized Object poll() {if (head.next ! null) {Object result head.next.value;head.next.value null;head head.next; // 丢弃头结点--size;return result;}return null;}// 省略 Node 的定义
}
以上简单粗暴的加 synchronized 可以解决问题但会引入新的问题系统活性问题此问题下文会解决。
同时简单加 synchronized 同步是无法实现阻塞等待即
如果队列为空那么出队的动作还是会立即返回返回为空如果队列已满那么入队动作还是会立即返回返回操作不成功
实现阻塞等待需要借助 JAVA 中的 PV 原语wait, notify, notifyAll 。
参考JDK 中对 wait, notify, notifyAll 的相关描述。
卫式方法
阻塞等待可以通过简单的卫式方法来实现此问题本质上可以抽象为
任何一个方法都需要在满足一定条件下才可以执行执行方法前需要首先校验不变式然后执行变更在执行完成后校验是否满足后验不变式
WHEN(condition) Object action(Object arg) {checkPreCondition();doAction(arg);checkPostCondition();
}
此种抽象 Ada 在语言层面上实现。在 JAVA 中借助 wait, notify, notifyAll 可以翻译为
// 当前线程
synchronized Object action(Object arg) {while(!condition) {wait();}// 前置条件不变式checkPreCondition();doAction();// 后置条件不变式checkPostCondition();
}// 其他线程
synchronized Object notifyAction(Object arg) {notifyAll();
}
需要注意
通常会采用 notifyAll 发送通知而非 notify 因为如果当前线程收到 notify 通知后被中断那么系统将一直等待下去。如果使用了 notifyAll 那么卫式语句必须放在 while 循环中因为线程唤醒后执行条件已经不满足虽然当前线程持有互斥锁。卫式条件的所有变量有任何变更都需要发送 notifyAll 不然面临系统活性问题
据此不难实现简单的阻塞版本的有界队列如下
interface Queue {boolean offer(Object obj) throws InterruptedException;Object poll() throws InterruptedException;}
class FairnessBoundedBlockingQueue implements Queue {// 当前大小protected int size;// 容量protected final int capacity;// 头指针empty: head.next tail nullprotected Node head;// 尾指针protected Node tail;public FairnessBoundedBlockingQueue(int capacity) {this.capacity capacity;this.head new Node(null);this.tail head;this.size 0;}// 如果队列已满通过返回值标识public synchronized boolean offer(Object obj) throws InterruptedException {while (size capacity) {wait();}Node node new Node(obj);tail.next node;tail node;size;notifyAll(); // 可以出队return true;}// 如果队列为空阻塞等待public synchronized Object poll() throws InterruptedException {while (head.next null) {wait();}Object result head.next.value;head.next.value null;head head.next; // 丢弃头结点--size;notifyAll(); // 可以入队return result;}// 省略 Node 的定义
}
以上实现了阻塞等待但也引入了更大的性能问题
入队和出队动作阻塞等待同一把锁恶性竞争当队列变更时所有阻塞线程被唤醒大量的线程上下文切换竞争同步锁最终可能只有一个线程能执行
需要注意的点
阻塞等待 wait 会抛出中断异常。关于异常的问题下文会处理接口需要支持抛出中断异常队里变更需要 notifyAll 避免线程中断或异常丢失消息
3 锁拆分优化
以上第一个问题可以通过锁拆分来解决即定义两把锁读锁和写锁读写分离。
// 省略接口定义
class FairnessBoundedBlockingQueue implements Queue {// 容量protected final int capacity;// 头指针empty: head.next tail nullprotected Node head;// 尾指针protected Node tail;// guard: canPollCount, headprotected final Object pollLock new Object();protected int canPollCount;// guard: canOfferCount, tailprotected final Object offerLock new Object();protected int canOfferCount;public FairnessBoundedBlockingQueue(int capacity) {this.capacity capacity;this.canPollCount 0;this.canOfferCount capacity;this.head new Node(null);this.tail head;}// 如果队列已满通过返回值标识public boolean offer(Object obj) throws InterruptedException {synchronized(offerLock) {while(canOfferCount 0) {offerLock.wait();}Node node new Node(obj);tail.next node;tail node;canOfferCount--;}synchronized(pollLock) {canPollCount;pollLock.notifyAll();}return true;}// 如果队列为空阻塞等待public Object poll() throws InterruptedException {Object result null;synchronized(pollLock) {while(canPollCount 0) {pollLock.wait();}result head.next.value;head.next.value null;head head.next;canPollCount--;}synchronized(offerLock) {canOfferCount;offerLock.notifyAll();}return result;}// 省略 Node 定义
}
以上
定义了两把锁 pollLock 和 offerLock 拆分出队和入队竞争入队锁同步的变量为callOfferCount 和 tail出队锁同步的变量为canPollCount 和 head出队的动作首先拿到 pollLock 卫式等待后完成出队动作然后拿到 offerLock 发送通知解除入队的等待线程。入队的动作首先拿到 offerLock 卫式等待后完成入队的动作然后拿到 pollLock 发送通知解除出队的等待线程。
以上实现
确保通过入队锁和出队锁分别保证入队和出队的原子性出队动作通过特别的实现确保出队只会变更 head 避免获取 offerLock通过 offerLock.notifyAll 和 pollLock.notifyAll 解决读写竞争的问题
但上述实现还有未解决的问题
当有多个入队线程等待时一次出队的动作会触发所有入队线程竞争大量的线程上下文切换最终只有一个线程能执行。
即还有 读与读 和 写与写 之间的竞争问题。
4 状态追踪解除竞争
此处可以通过状态追踪解除读与读之间和写与写之间的竞争问题
class FairnessBoundedBlockingQueue implements Queue {// 容量protected final int capacity;// 头指针empty: head.next tail nullprotected Node head;// 尾指针protected Node tail;// guard: canPollCount, headprotected final Object pollLock new Object();protected int canPollCount;protected int waitPollCount;// guard: canOfferCount, tailprotected final Object offerLock new Object();protected int canOfferCount;protected int waitOfferCount;public FairnessBoundedBlockingQueue(int capacity) {this.capacity capacity;this.canPollCount 0;this.canOfferCount capacity;this.waitPollCount 0;this.waitOfferCount 0;this.head new Node(null);this.tail head;}// 如果队列已满通过返回值标识public boolean offer(Object obj) throws InterruptedException {synchronized(offerLock) {while(canOfferCount 0) {waitOfferCount;offerLock.wait();waitOfferCount--;}Node node new Node(obj);tail.next node;tail node;canOfferCount--;}synchronized(pollLock) {canPollCount;if (waitPollCount 0) {pollLock.notify();}}return true;}// 如果队列为空阻塞等待public Object poll() throws InterruptedException {Object result;synchronized(pollLock) {while(canPollCount 0) {waitPollCount;pollLock.wait();waitPollCount--;}result head.next.value;head.next.value null;head head.next;canPollCount--;}synchronized(offerLock) {canOfferCount;if (waitOfferCount 0) {offerLock.notify();}}return result;}// 省略 Node 的定义
}
以上
通过 waitOfferCount 和 waitPollCount 的状态追踪解决 读写内部的竞争问题当队列变更时根据追踪的状态决定是否派发消息触发线程阻塞状态解除
但上述的实现在某些场景下会运行失败面临活性问题考虑
情况一
初始状态队列为空 线程 A 执行出队动作被阻塞在 pollLock , 此时 waitPollCount1此时线程 A 在执行 wait 时被中断抛出异常 waitPollCount1 并未被重置阻塞队列为空但 waitPollCount1 类状态异常
情况二
初始状态队列为空 线程 A B 执行出队动作被阻塞在 pollLock , 此时 waitPollCount2线程 C 执行入队动作可以立即执行执行完成后触发 pollLock 解除一个线程等待 notify触发的线程在 JVM 实现中是随机的假设线程 A 被解除阻塞假设线程 A 在阻塞过程中已被中断阻塞解除后 JVM 检查 interrupted 状态抛出 InterruptedException 异常此时队列中有一个元素但线程 A 仍阻塞在 pollLock 中且一直阻塞下去
以上为解除阻塞消息丢失的例子问题的根源在与异常处理。
5 解决异常问题
解决线程中断退出的问题线程校验中断状态的场景
JVM 通常只会在有限的几个场景检测线程的中断状态 wait, Thread.join, Thread.sleepJVM 在检测到线程中断状态 Thread.interrupted() 后会清除中断标志抛出 InterruptedException通常为了保证线程对中断及时响应 run 方法中需要自主检测中断标志中断线程特别是对中断比较敏感需要保持类的不变式的场景
class FairnessBoundedBlockingQueue implements Queue {// 容量protected final int capacity;// 头指针empty: head.next tail nullprotected Node head;// 尾指针protected Node tail;// guard: canPollCount, head, waitPollCountprotected final Object pollLock new Object();protected int canPollCount;protected int waitPollCount;// guard: canOfferCount, tail, waitOfferCountprotected final Object offerLock new Object();protected int canOfferCount;protected int waitOfferCount;public FairnessBoundedBlockingQueue(int capacity) {this.capacity capacity;this.canPollCount 0;this.canOfferCount capacity;this.waitPollCount 0;this.waitOfferCount 0;this.head new Node(null);this.tail head;}// 如果队列已满通过返回值标识public boolean offer(Object obj) throws InterruptedException {if (Thread.interrupted()) {throw new InterruptedException(); // 线程已中断直接退出即可防止中断线程竞争锁}synchronized(offerLock) {while(canOfferCount 0) {waitOfferCount;try {offerLock.wait();} catch (InterruptedException e) {// 触发其他线程offerLock.notify();throw e;} finally {waitOfferCount--;}}Node node new Node(obj);tail.next node;tail node;canOfferCount--;}synchronized(pollLock) {canPollCount;if (waitPollCount 0) {pollLock.notify();}}return true;}// 如果队列为空阻塞等待public Object poll() throws InterruptedException {if (Thread.interrupted()) {throw new InterruptedException();}Object result null;synchronized(pollLock) {while(canPollCount 0) {waitPollCount;try {pollLock.wait();} catch (InterruptedException e) {pollLock.notify();throw e;} finally {waitPollCount--;}}result head.next.value;head.next.value 0;// ignore head;head head.next;canPollCount--;}synchronized(offerLock) {canOfferCount;if (waitOfferCount 0) {offerLock.notify();}}return result;}// 省略 Node 的定义
}
以上
当等待线程中断退出时捕获中断异常通过 pollLock.notify 和 offerLock.notify 转发消息通过在 finally 中恢复状态追踪变量
通过状态变量追踪可以解决读与读之间和写与写之间的锁竞争问题。
以下考虑如果解决读与读之间和写与写之间的公平性问题。
6 解决公平性
公平性的问题的解决需要将状态变量的追踪转换为请求监视器追踪。
每个请求对应一个监视器通过内部维护一个 FIFO 队列实现公平性在队列状态变更时释放队列中的监视器
以上逻辑可以统一抽象为
boolean needToWait;
synchronized(this) {needToWait calculateNeedToWait();if (needToWait) {enqueue(monitor); // 请求对应的monitor}
}
if (needToWait) {monitor.doWait();
}
需要注意
monitor.doWait() 需要在 this 的卫式语句之外因为如果在内部 monitor.doWait 并不会释放 this锁calculateNeedToWait() 需要在 this 的守卫之内完成避免同步问题需要考虑中断异常的问题
基于以上的逻辑抽象实现公平队列
// 省略接口定义
class FairnessBoundedBlockingQueue implements Queue {// 容量protected final int capacity;// 头指针empty: head.next tail nullprotected Node head;// 尾指针protected Node tail;// guard: canPollCount, head, pollQueueprotected final Object pollLock new Object();protected int canPollCount;// guard: canOfferCount, tail, offerQueueprotected final Object offerLock new Object();protected int canOfferCount;protected final WaitQueue pollQueue new WaitQueue();protected final WaitQueue offerQueue new WaitQueue();public FairnessBoundedBlockingQueue(int capacity) {this.capacity capacity;this.canOfferCount capacity;this.canPollCount 0;this.head new Node(null);this.tail head;}// 如果队列已满通过返回值标识public boolean offer(Object obj) throws InterruptedException {if (Thread.interrupted()) {throw new InterruptedException(); // 线程已中断直接退出即可防止中断线程竞争锁}WaitNode wait null;synchronized(offerLock) {// 在有阻塞请求或者队列为空时阻塞等待if (canOfferCount 0 || !offerQueue.isEmpty()) {wait new WaitNode();offerQueue.enq(wait);} else {// continue.}}try {if (wait ! null) {wait.doWait();}if (Thread.interrupted()) {throw new InterruptedException();}} catch (InterruptedException e) {offerQueue.doNotify();throw e;}// 确保此时线程状态正常以下不会校验中断synchronized(offerLock) {Node node new Node(obj);tail.next node;tail node;canOfferCount--;}synchronized(pollLock) {canPollCount;pollQueue.doNotify();}return true;}// 如果队列为空阻塞等待public Object poll() throws InterruptedException {if (Thread.interrupted()) {throw new InterruptedException();}Object result null;WaitNode wait null;synchronized(pollLock) {// 在有阻塞请求或者队列为空时阻塞等待if (canPollCount 0 || !pollQueue.isEmpty()) {wait new WaitNode();pollQueue.enq(wait);} else {// ignore}}try {if (wait ! null) {wait.doWait();}if (Thread.interrupted()) {throw new InterruptedException();}} catch (InterruptedException e) {// 传递消息pollQueue.doNotify();throw e;}// 以下不会检测线程中断状态synchronized(pollLock) {result head.next.value;head.next.value 0;// ignore head;head head.next;canPollCount--;}synchronized(offerLock) {canOfferCount;offerQueue.doNotify();}return result;}class WaitQueue {WaitNode head;WaitNode tail;WaitQueue() {head new WaitNode();tail head;}synchronized void doNotify() {for(;;) {WaitNode node deq();if (node null) {break;} else if (node.doNotify()) {// 此处确保NOTIFY成功break;} else {// ignore, and retry.}}}synchronized boolean isEmpty() {return head.next null;}synchronized void enq(WaitNode node) {tail.next node;tail tail.next;}synchronized WaitNode deq() {if (head.next null) {return null;}WaitNode res head.next;head head.next;if (head.next null) {tail head; // 为空迁移tail节点}return res;}}class WaitNode {boolean released;WaitNode next;WaitNode() {released false;next null;}synchronized void doWait() throws InterruptedException {try {while (!released) {wait();} } catch (InterruptedException e) {if (!released) {released true;throw e;} else {// 如果是NOTIFY之后收到中断的信号不能抛出异常需要做RELAY处理Thread.currentThread().interrupt();}}}synchronized boolean doNotify() {if (!released) {released true;notify();// 明确释放了一个线程返回truereturn true;} else {// 没有释放新的线程返回falsereturn false;}}}// 省略 Node 的定义
}
以上
核心是替换状态追踪变量为同步节点 WaitNodeWaitNode 通过简单的同步队列组织实现 FIFO 协议每个线程等待各自的 WaitNode 监视器WaitNode 内部维持 released 状态标识线程阻塞状态是否被释放主要是为了处理中断的问题WaitQueue 本身是全同步的由于已解决了读写竞争已经读写内部竞争的问题 WaitQueue 同步并不会造成问题WaitQueue 是无界队列是一个潜在的问题但由于其只做同步的追踪而且追踪的通常是线程通常并不是问题最终的公平有界队列实现无论是入队还是出队首先卫式语句判定是否需要入队等待如果入队等待通过公平性协议等待;
当信号释放时借助读写锁同步更新队列最后同样借助读写锁触发队列更新消息
7 等待时间的问题
并发场景下等待通常会设置为限时等待 TIMED_WAITING 避免死锁或损失系统活性
实现同步队列的限时等待并没想象的那么困难
class TimeoutException extends InterruptedException {}class WaitNode {boolean released;WaitNode next;WaitNode() {released false;next null;}synchronized void doWait(long milliSeconds) throws InterruptedException {try {long startTime System.currentTimeMillis();long toWait milliSeconds;for (;;) {wait(toWait);if (released) {return;}long now System.currentTimeMillis();toWait toWait - (now - startTime);if (toWait 0) {throw new TimeoutException();}}} catch (InterruptedException e) {if (!released) {released true;throw e;} else {// 如果已经释放信号量此处不抛出异常但恢复中断状态Thread.currentThread().interrupt();}}}synchronized boolean doNotify() {if (!released) {released true;notify();return true;} else {return false;}}
由于所有的等待都阻塞在 WaitNode 监视器以上
首先定义超时异常此处只是为了方便异常处理继承 InterruptedException
此处依赖于 wait(long timeout) 的超时等待实现这通常不是问题
最后将 WaitNode 超时等待的逻辑带入到 FairnessBoundedBlockingQueue 实现中即可。
四 总结
本文通过一步步迭代最终借助 JAVA 同步原语实现初版的公平有界队列。迭代实现过程中可以看到以下几点
观念的转变将调用一个类的方法思维转换为在满足一定条件下方法才可以调用在调用前需要满足不变式调用后满足不变式由于并发的问题很难测试通常要采用卫式表达证明并发的正确性在迭代实现中会看到很多模式比如读写分离时其实可以抽象为读锁和写锁就得到了一个抽象的 Lock 的定义比如读写状态追踪可以采用 Exchanger 抽象表达另外本文的实现远非完善还需要考虑支持 Iterator 遍历、状态查询及数据迁移等操作
最后相信大家再看 JUC 的工具包实现定有不一样的体会。
原文链接 本文为阿里云原创内容未经允许不得转载。