我想建个自己的网站,免费网站后台模板,中山网站建设找丁生,百度推广一天烧多少钱JUC 中 Semaphore 的使用与原理分析#xff0c;Semaphore 也是 Java 中的一个同步器#xff0c;与 CountDownLatch 和 CycleBarrier 不同在于它内部的计数器是递增的#xff0c;那么#xff0c;Semaphore 的内部实现是怎样的呢#xff1f; Semaphore 信号量也是Java 中一个…JUC 中 Semaphore 的使用与原理分析Semaphore 也是 Java 中的一个同步器与 CountDownLatch 和 CycleBarrier 不同在于它内部的计数器是递增的那么Semaphore 的内部实现是怎样的呢 Semaphore 信号量也是Java 中一个同步容器与CountDownLatch 和 CyclicBarrier 不同之处在于它内部的计数器是递增的。为了能够一览Semaphore的内部结构我们首先要看一下Semaphore的类图类图如下所示 如上类图可以知道Semaphoren内部还是使用AQS来实现的Sync只是对AQS的一个修饰并且Sync有两个实现类分别代表获取信号量的时候是否采取公平策略。创建Semaphore的时候会有一个变量标示是否使用公平策略源码如下 public Semaphore(int permits) {sync new NonfairSync(permits);}public Semaphore(int permits, boolean fair) {sync fair ? new FairSync(permits) : new NonfairSync(permits);}Sync(int permits) {setState(permits);} 如上面代码所示Semaphore默认使用的是非公平策略如果你需要公平策略则可以使用带两个参数的构造函数来构造Semaphore对象另外和CountDownLatch一样构造函数里面传递的初始化信号量个数 permits 被赋值给了AQS 的state状态变量也就是说这里AQS的state值表示当前持有的信号量个数。 接下来我们主要看看Semaphore实现的主要方法的源码如下 1.void acquire() 当前线程调用该方法的时候目的是希望获取一个信号量资源如果当前信号量计数个数大于 0 并且当前线程获取到了一个信号量则该方法直接返回当前信号量的计数会减少 1 。否则会被放入AQS的阻塞队列当前线程被挂起直到其他线程调用了release方法释放了信号量并且当前线程通过竞争获取到了改信号量。当前线程被其他线程调用了 interrupte方法中断后当前线程会抛出 InterruptedException异常返回。源码如下 public void acquire() throws InterruptedException {//传递参数为1说明要获取1个信号量资源sync.acquireSharedInterruptibly(1);}public final void acquireSharedInterruptibly(int arg)throws InterruptedException {//1如果线程被中断则抛出中断异常if (Thread.interrupted())throw new InterruptedException();//2否者调用sync子类方法尝试获取,这里根据构造函数确定使用公平策略if (tryAcquireShared(arg) 0)//如果获取失败则放入阻塞队列,然后再次尝试如果失败则调用park方法挂起当前线程doAcquireSharedInterruptibly(arg);} 如上代码可知acquire内部调用了sync的acquireSharedInterruptibly 方法后者是对中断响应的(如果当前线程被中断则抛出中断异常)尝试获取信号量资源的AQS的方法tryAcquireShared 是由 sync 的子类实现所以这里就要分公平性了这里先讨论非公平策略 NonfairSync 类的 tryAcquireShared 方法源码如下 protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires);}final int nonfairTryAcquireShared(int acquires) {for (;;) {//获取当前信号量值int available getState();//计算当前剩余值int remaining available - acquires;//如果当前剩余小于0或者CAS设置成功则返回if (remaining 0 ||compareAndSetState(available, remaining))return remaining;}
} 如上代码先计算当前信号量值(available)减去需要获取的值(acquires) 得到剩余的信号量个数(remaining)如果剩余值小于 0 说明当前信号量个数满足不了需求则直接返回负数然后当前线程会被放入AQS的阻塞队列当前线程被挂起。如果剩余值大于 0 则使用CAS操作设置当前信号量值为剩余值然后返回剩余值。另外可以知道NonFairSync是非公平性获取的是说先调用aquire方法获取信号量的线程不一定比后来者先获取锁。 接下来我们要看看公平性的FairSync 类是如何保证公平性的源码如下 protected int tryAcquireShared(int acquires) {for (;;) {if (hasQueuedPredecessors())return -1;int available getState();int remaining available - acquires;if (remaining 0 || compareAndSetState(available, remaining))return remaining;}} 可以知道公平性还是靠 hasQueuedPredecessors 这个方法来做的以前的随笔已经讲过公平性是看当前线程节点是否有前驱节点也在等待获取该资源如果是则自己放弃获取的权力然后当前线程会被放入AQS阻塞队列否则就去获取。hasQueuedPredecessors源码如下 public final boolean hasQueuedPredecessors() {Node t tail; Node h head;Node s;return h ! t ((s h.next) null || s.thread ! Thread.currentThread());
} 如上面代码所示如果当前线程节点有前驱节点则返回true否则如果当前AQS队列为空 或者 当前线程节点是AQS的第一个节点则返回 false 其中如果 h t 则说明当前队列为空则直接返回 false如果 h !t 并且 s null 说明有一个元素将要作为AQS的第一个节点入队列(回顾下 enq 函数第一个元素入队列是两步操作首先创建一个哨兵头节点然后第一个元素插入到哨兵节点后面)那么返回 true如果 h !t 并且 s ! null 并且 s.thread ! Thread.currentThread() 则说明队列里面的第一个元素不是当前线程则返回 true。 2.void acquire(int permits) 该方法与 acquire() 不同在与后者只需要获取一个信号量值而前者则获取指定 permits 个源码如下 public void acquire(int permits) throws InterruptedException {if (permits 0) throw new IllegalArgumentException();sync.acquireSharedInterruptibly(permits);
} 3.void acquireUninterruptibly() 该方法与 acquire() 类似不同之处在于该方法对中断不响应也就是当当前线程调用了 acquireUninterruptibly 获取资源过程中包含被阻塞后其它线程调用了当前线程的 interrupt方法设置了当前线程的中断标志当前线程并不会抛出 InterruptedException 异常而返回。源码如下 public void acquireUninterruptibly() {sync.acquireShared(1);
} 4.void acquireUninterruptibly(int permits) 该方法与 acquire(int permits) 不同在于该方法对中断不响应。源码如如下 public void acquireUninterruptibly(int permits) {if (permits 0) throw new IllegalArgumentException();sync.acquireShared(permits);} 5.void release() 该方法作用是把当前 semaphore对象的信号量值增加 1 如果当前有线程因为调用 acquire 方法被阻塞放入了 AQS的阻塞队列则会根据公平策略选择一个线程进行激活激活的线程会尝试获取刚增加的信号量源码如下 public void release() {//(1)arg1sync.releaseShared(1);}public final boolean releaseShared(int arg) {//(2)尝试释放资源if (tryReleaseShared(arg)) {//(3)资源释放成功则调用park唤醒AQS队列里面最先挂起的线程doReleaseShared();return true;}return false;}protected final boolean tryReleaseShared(int releases) {for (;;) {//(4)获取当前信号量值int current getState();//(5)当前信号量值增加releases这里为增加1int next current releases;if (next current) // 移除处理throw new Error(Maximum permit count exceeded);//(6)使用cas保证更新信号量值的原子性if (compareAndSetState(current, next))return true;}} 如上面代码可以看到 release方法中对 sync.releaseShared1可以知道release方法每次只会对信号量值增加 1 tryReleaseShared方法是无限循环使用CAS保证了 release 方法对信号量递增 1 的原子性操作。当tryReleaseShared 方法增加信号量成功后会执行代码3调用AQS的方法来激活因为调用acquire方法而被阻塞的线程。 6.void release(int permits) 该方法与不带参数的不同之处在于前者每次调用会在信号量值原来基础上增加 permits而后者每次增加 1。源码如下 public void release(int permits) {if (permits 0) throw new IllegalArgumentException();sync.releaseShared(permits);
} 另外注意到这里调用的是 sync.releaseShared 是共享方法这说明该信号量是线程共享的信号量没有和固定线程绑定多个线程可以同时使用CAS去更新信号量的值而不会阻塞。 到目前已经知道了其原理接下来用一个例子来加深对Semaphore的理解例子如下 package com.hjc;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;/*** Created by cong on 2018/7/8.*/
public class SemaphoreTest {// 创建一个Semaphore实例private static volatile Semaphore semaphore new Semaphore(0);public static void main(String[] args) throws InterruptedException {ExecutorService executorService Executors.newFixedThreadPool(2);// 加入线程A到线程池executorService.submit(new Runnable() {public void run() {try {System.out.println(Thread.currentThread() over);semaphore.release();} catch (Exception e) {e.printStackTrace();}}});// 加入线程B到线程池executorService.submit(new Runnable() {public void run() {try {System.out.println(Thread.currentThread() over);semaphore.release();} catch (Exception e) {e.printStackTrace();}}});// 等待子线程执行完毕返回semaphore.acquire(2);System.out.println(all child thread over!);//关闭线程池executorService.shutdown();}
} 运行结果如下 类似于 CountDownLatch上面我们的例子也是在主线程中开启两个子线程进行执行等所有子线程执行完毕后主线程在继续向下运行。 如上代码首先首先创建了一个信号量实例构造函数的入参为 0说明当前信号量计数器为 0然后 main 函数添加两个线程任务到线程池每个线程内部调用了信号量的 release 方法相当于计数值递增一最后在 main 线程里面调用信号量的 acquire 方法参数传递为 2 说明调用 acquire 方法的线程会一直阻塞直到信号量的计数变为 2 时才会返回。 看到这里也就明白了如果构造 Semaphore 时候传递的参数为 N在 M 个线程中调用了该信号量的 release 方法那么在调用 acquire 对 M 个线程进行同步时候传递的参数应该是 MN; 对CountDownLatchCyclicBarrierSemaphored这三者之间的比较总结 1.CountDownLatch 通过计数器提供了更灵活的控制只要检测到计数器为 0而不管当前线程是否结束调用 await 的线程就可以往下执行相比使用 jion 必须等待线程执行完毕后主线程才会继续向下运行更灵活。 2.CyclicBarrier 也可以达到 CountDownLatch 的效果但是后者当计数器变为 0 后就不能在被复用而前者则使用 reset 方法可以重置后复用前者对同一个算法但是输入参数不同的类似场景下比较适用。 3.而 semaphore 采用了信号量递增的策略一开始并不需要关心需要同步的线程个数等调用 aquire 时候在指定需要同步个数并且提供了获取信号量的公平性策略。转载于:https://www.cnblogs.com/huangjuncong/p/9280646.html