开发区网站制作公司,腾讯云网站备案吗,汇鑫网站建设,2016用什么网站程序做流量在多线程编程中我们会遇到很多需要使用线程同步机制去解决的并发问题#xff0c;而这些同步机制就是多线程编程中影响正确性和运行效率的重中之重。这不禁让我感到好奇#xff0c;这些同步机制是如何实现的呢#xff1f;好奇心是进步的源泉#xff0c;就让我们一起来揭开同…在多线程编程中我们会遇到很多需要使用线程同步机制去解决的并发问题而这些同步机制就是多线程编程中影响正确性和运行效率的重中之重。这不禁让我感到好奇这些同步机制是如何实现的呢好奇心是进步的源泉就让我们一起来揭开同步机制源码的神秘面纱吧。在本文中我们会从JDK中大多数同步机制的共同基础AbstractQueuedSynchronizer类开始说起然后通过源码了解我们最常用的两个同步类可重入锁ReentrantLock和闭锁CountDownLatch的具体实现。通过这篇文章我们将可以了解到ReentrantLock和CountDownLatch两个常用同步类的源代码实现并且掌握阅读其他基于AQS实现的同步工具类源码的能力甚至可以利用AQS写出自己的同步工具类。阅读这篇文章需要了解基本的线程同步机制有兴趣的读者可以参考一下这篇文章兜里有辣条多线程中那些看不见的陷阱zhuanlan.zhihu.com同步机制的核心——AQS同步机制源码初探ReentrantLock是我们常用的一种可重入互斥锁是synchronized关键字的一个很好的替代品。互斥指的就是同一时间只能有一个线程获取到这个锁而可重入是指如果一个线程再次获取一个它已经持有的互斥锁那么仍然会成功。这个类的源码在JDK的java.util.concurrent包下我们可以在IDE中点击类名跳转到具体的类定义比如下面就是在我的电脑上跳转之后看到的ReentrantLock类的源代码。在这里我们可以看到在ReentrantLock类中还包含了一个继承自AbstractQueuedSynchronizer类的内部类而且有一个该内部类Sync类型的字段sync。实际上ReentrantLock类就是通过这个内部类对象来实现线程同步的。如果打开CountDownLatch的源代码我们会发现这个类里也同样有一个继承自AbstractQueuedSynchronizer类的子类Sync并且也有一个Sync类型的字段sync。在java.util.concurrent包下的大多数同步工具类的底层都是通过在内部定义一个AbstractQueuedSynchronizer类的子类来实现的包括我们在本文中没提到的许多其他常用类也是如此比如读写锁ReentrantReadWriteLock、信号量Semaphore等。AQS是什么那么这个AbstractQueuedSynchronizer类也就是我们所说的AQS到底是何方神圣呢这个类首先像我们上面提到的是大多数多线程同步工具类的基础。它内部包含了一个对同步器的等待队列其中包含了所有在等待获取同步器的线程在这个等待队列中的线程将会在同步器释放时被唤醒。比如一个线程在获取互斥锁失败时就会被放入到等待队列中等待被唤醒这也就是AQS中的Q——“Queued”的由来。而类名中的第一个单词Abstract是因为AQS是一个抽象类它的使用方法就是实现继承它的子类然后使用这个子类类型的对象。在这个子类中我们会通过重写下列的五个方法中的一部分或者全部来指定这个同步器的行为策略boolean tryAcquire(int arg)独占式获取同步器独占式指同一时间只能有一个线程获取到同步器boolean tryRelease(int arg)独占式释放同步器boolean isHeldExclusively()同步器是否被当前线程独占式地持有int tryAcquireShared(int arg)共享式获取同步器共享式指的是同一时间可能有多个线程同时获取到同步器但是可能会有数量的限制boolean tryReleaseShared(int arg)共享式释放同步器。这五个方法之所以能指定同步器的行为则是因为AQS中的其他方法就是通过对这五个方法的调用来实现的。比如在下面的acquire方法中就调用了tryAcquire来获取同步器并且在被调用的acquireQueued方法内部也是通过tryAcquire方法来循环尝试获取同步器的。public final void acquire(int arg) {// 1. 调用tryAcquire方法尝试获取锁 // 2. 如果获取失败(tryAcquire返回false)则调用addWaiter方法将当前线程保存到等待队列中 // 3. 之后调用acquireQueued方法来循环执行“获取同步器 - 获取失败休眠 - 被唤醒重新获取”过程 // 直到成功获取到同步器返回false或者被中断返回true if (!tryAcquire(arg) acquireQueued(addWaiter(Node.EXCLUSIVE), arg))// 如果acquireQueued方法返回true说明线程被中断了 // 所以调用selfInterrupt方法中断当前线程 selfInterrupt();}下面我们就来看看在ReentrantLock和CountDownLatch两个类中定义的AQS子类到底是如何重写这五个方法的。CountDownLatch的实现CountDownLatch是一种典型的闭锁比如我需要使用四个线程完成四种不同的计算然后把四个线程的计算结果相加后返回这种情况下主线程就需要等待四个完成不同任务的工作线程完成之后才能继续执行。那么我们就可以创建一个初始的count值为4的CountDownLatch然后在每个工作线程完成任务时都对这个CountDownLatch执行一个countDown操作这样CountDownLatch中的count值就会减1。当count值减到0时主线程就会从阻塞中恢复然后将四个任务的结果相加后返回。下面是CountDownLath的几个常用方法void await()等待操作如果count值目前已经是0了那么就直接返回否则就进入阻塞状态等待count值变为0void countDown()减少计数操作会让count减1。调用多次countDown()方法让count值变为0之后被await()方法阻塞的线程就可以继续执行了。了解了CountDownLatch的基本用法之后我们就来看看这个闭锁到底是怎么实现的首先我们来看一下CountDownLatch中AQS的子类内部类Sync的定义。CountDownLatch的内部Sync类下面的代码是CountDownLatch中AQS的子类Sync的定义Sync是CountDownLatch类中的一个内部类。在这个类中重写了AQS的tryAcquireShared和tryReleaseShared两个方法这两个都是共享模式需要重写的方法因为CountDownLatch在count值为0时可以被任意多个线程同时获取成功所以应该实现共享模式的方法。在CountDownLatch的Sync中使用了AQS的state值用来存放count值在初始化时会把state值初始化为n。然后在调用tryReleaseShared时会将count值减1但是因为这个方法可能会被多个线程同时调用所以要用CAS操作保证更新操作的原子性就像我们用AtomicInteger一样。在CAS失败时我们需要通过重试来保证把state减1如果CAS成功时即使有许多线程同时执行这个操作最后的结果也一定是正确的。在这里tryReleaseShared方法的返回值表示这个释放操作是否可以让等待中的线程成功获取同步器所以只有在count为0时才能返回true。tryAcquireShared方法就比较简单了直接返回state是否等于0即可因为只有在CountDownLatch中的count值为0时所有希望获取同步器的线程才能获取成功并继续执行。如果count不为0那么线程就需要进入阻塞状态等到count值变为0才能继续执行。private static final class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID 4982264981922014374L;// 构造器初始化count值 // 在这个子类中把count值保存到了AQS的state中 Sync(int count) {setState(count);}// 获取当前的count值 int getCount() {return getState();}// 获取操作在state为0时会成功否则失败 // tryAcquireShared失败时线程会进入阻塞状态等待获取成功 protected int tryAcquireShared(int acquires) {return (getState() 0) ? 1 : -1;}// 对闭锁执行释放操作减小计数值 protected boolean tryReleaseShared(int releases) {// 减小coun值在count值归零时唤醒等待的线程 for (;;) {int c getState();// 如果计数已经归零则直接释放失败 if (c 0)return false;// 将计数值减1 int nextc c-1;// 为了线程安全以CAS循环尝试更新 if (compareAndSetState(c, nextc))return nextc 0;}}}CounDownLatch对Sync类对象的使用看了CountDownLatch中的Sync内部类定义之后我们再来看看CountDownLatch是如何使用这个内部类的。在CountDownLatch的构造器中初始化CountDownLatch对象时会同时在其内部初始化保存一个Sync类型的对象到sync字段用于之后的同步操作。并且传入Sync类构造器的count一定会大于等于0。public CountDownLatch(int count) {if (count 0) throw new IllegalArgumentException(count 0);this.sync new Sync(count);}有了Sync类型的对象之后我们在await()方法里就可以直接调用sync的acquireSharedInterruptibly方法来获取同步器并陷入阻塞等待count值变为0了。在AQS的acquireSharedInterruptibly方法中会在调用我们重写的tryAcquireShared方法获取失败时进入阻塞状态直到CountDownLatch的count值变为0时才能成功获取到同步器。public void await() throws InterruptedException {// 调用sync对象的获取方法来进入锁等待 sync.acquireSharedInterruptibly(1);}而在CountDownLatch的另一个减少count值的重要方法countDown()中我们同样是通过调用sync上的方法来实现具体的同步功能。在这里AQS的releaseShared(1)方法中同样会调用我们在Sync类中重写的tryReleaseShared方法来执行释放操作并在tryReleaseShared方法返回true时去唤醒等待队列中的阻塞等待线程让它们在count值为0时能够继续执行。public void countDown() {sync.releaseShared(1);}从上文中可以看出CoundDownLatch中的各种功能都是通过内部类Sync来实现的而这个Sync类就是一个继承自AQS的子类。通过在内部类Sync中重写了AQS的tryAcquireShared和tryReleaseShared两个方法我们就指定了AQS的行为策略使其能够符合我们对CountDownLatch功能的期望。这就是AQS的使用方法下面我们来看一个大家可能会更熟悉的例子来进一步了解AQS在独占模式下的用法。ReentrantLock的实现可重入锁ReentrantLock可以说是我们的老朋友了从最早的synchronized关键字开始我们就开始使用类似的功能了。可重入锁的特点主要有两点同一时间只能有一个线程持有如果我想保护一段代码同一时间只能被一个线程所访问比如对一个队列的插入操作。那么如果有一个线程已经获取了锁之后在修改队列了那么其他也想要修改队列的线程就会陷入阻塞等待之前的这个线程执行完成。同一线程可以对一个锁重复获取成功多次而如果一个线程对同一个队列执行了两个插入操作那么第二次获取锁时仍然会成功而不会被第一次成功获取到的锁所阻塞。ReentrantLock类的常用操作主要有三种获取锁一个线程一旦获取锁成功后就会阻塞其他线程获取同一个锁的操作所以一旦获取失败那么当前线程就会被阻塞最简单的获取锁方法就是调用public void lock()方法2. 释放锁获取锁之后就要在使用完之后释放它否则别的线程都将会因无法获取锁而被阻塞所以我们一般会在finally中进行锁的释放操作可以通过调用ReentrantLock对象的unlock方法来释放锁3. 获取条件变量条件变量是和互斥锁搭配使用的一种非常有用的数据结构有兴趣的读者可以通过《从0到1实现自己的阻塞队列(上)》这篇文章来了解条件变量具体的使用方法我们可以通过Condition newCondition()方法来获取条件变量对象然后调用条件变量对象上的await()、signal()、signalAll()方法来进行使用ReentrantLock的内部Sync类在ReentrantLock类中存在两种AQS的子类一个实现了非公平锁一个实现了公平锁。所谓的“公平”指的就是获取互斥锁成功返回的时间会和获取锁操作发起的时间顺序一致例如有线程A已经持有了互斥锁当线程B、C、D按字母顺序获取锁并进入等待线程A释放锁后一定是线程B被唤醒线程B释放锁后一定是C先被唤醒。也就是说锁被释放后对等待线程的唤醒顺序和获取锁操作的顺序一致。而且如果在这个过程中有其他线程发起了获取锁操作因为等待队列中已经有线程在等待了那么这个线程一定要排到等待队列最后去而不能直接抢占刚刚被释放还未被刚刚被唤醒的线程锁持有的锁。下面我们同样先看一下ReentrantLock类中定义的AQS子类Sync的具体源代码。下面是上一段说到的非公平Sync类和公平Sync类两个类的共同父类Sync的带注释源代码里面包含了大部分核心功能的实现。虽然下面包含了该类完整的源代码但是我们现在只需要关心三个核心操作也是我们在独占模式下需要重写的三个AQS方法tryAcquire、tryRelease和isHeldExclusively。建议在看完文章之后再回来回顾该类中其他的方法实现直接跳过其他的方法当然也是完全没有问题的。abstract static class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID -5179523762034025860L;/*** 实现Lock接口的lock方法子类化的主要原因是为了非公平版本的快速实现*/abstract void lock();/*** 执行非公平的tryLock。tryAcquire方法在子类中被实现但是两者都需要非公平版本的trylock方法实现。*/final boolean nonfairTryAcquire(int acquires) {final Thread current Thread.currentThread();int c getState();// 如果锁还未被持有 if (c 0) {// 通过CAS尝试获取锁 if (compareAndSetState(0, acquires)) {// 如果锁获取成功则将锁持有者改为当前线程并返回true setExclusiveOwnerThread(current);return true;}}// 锁已经被持有则判断锁的持有者是否是当前线程 else if (current getExclusiveOwnerThread()) {// 可重入锁如果锁的持有者是当前线程那就在state上加上新的获取数 int nextc c acquires;// 判断新的state值有没有溢出 if (nextc 0) // overflow throw new Error(Maximum lock count exceeded);// 将新的state更新为新的值因为可以进入这段代码的只有一个线程 // 所以不需要线程安全措施 setState(nextc);return true;}return false;}// 重写了AQS的独占式释放锁方法 protected final boolean tryRelease(int releases) {// 计算剩余的锁持有量 // 因为只有当前线程持有该锁的情况下才能执行这个方法所以不需要做多线程保护 int c getState() - releases;// 如果当前线程未持有锁则直接抛出错误 if (Thread.currentThread() ! getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free false;// 如果锁持有数已经减少到0则释放该锁并清空锁持有者 if (c 0) {free true;setExclusiveOwnerThread(null);}// 更新state值只有state值被设置为0才是真正地释放了锁 // 所以setState和setExclusiveOwnerThread之间不需要额外的同步措施 setState(c);return free;}// 当前线程是否持有该锁 protected final boolean isHeldExclusively() {return getExclusiveOwnerThread() Thread.currentThread();}// 创建对应的条件变量 final ConditionObject newCondition() {return new ConditionObject();}// 从外层传递进来的方法// 获取当前的锁持有者 final Thread getOwner() {return getState() 0 ? null : getExclusiveOwnerThread();}// 获取锁的持有计数 // 如果当前线程持有了该锁则返回state值否则返回0 final int getHoldCount() {return isHeldExclusively() ? getState() : 0;}// 判断锁是否已经被持有 final boolean isLocked() {return getState() ! 0;}}实际的tryAcquire方法将在公平Sync类与非公平Sync类两个子类中实现但是这两个子类都需要调用父类Sync中的非公平版本的tryAcquire——nonfairTryAcquire方法。在这个方法中我们主要做两件事当前锁还未被人持有。在ReentrantLock中使用AQS的state来保存锁的状态state等于0时代表锁没有被任何线程持有如果state大于0那么就代表持有者对该锁的重复获取次数如果当前锁还未被线程持有那么就会通过compareAndSetState来原子性地修改state值修改成功则需要设置当前线程为锁的持有线程并返回true代表获取成功否则就返回2. 锁已被当前线程持有在锁已被当前线程持有的情况下就需要将state值加1代表持有者线程对锁的重复获取次数。而对于独占式释放同步器的tryRelease方法则在父类Sync中直接实现了两个公平/非公平子类调用的都是同一段代码。首先只有锁的持有者才能释放锁所以如果当前线程不是所有者线程在释放操作中就会抛出异常。如果释放操作会将持有计数清零那么当前线程就不再是该锁的持有者了锁会被完全释放而锁的所有者会被设置为null。最后Sync会将减掉入参中的释放数之后的新持有计数更新到AQS的state中并返回锁是否已经被完全释放了。isHeldExclusively方法比较简单它只是检查锁的持有者是否是当前线程。非公平Sync类的实现Sync的两个公平/非公平子类的实现比较简单下面是非公平版本子类的源代码。在非公平版本的实现中调用lock方法首先会尝试通过CAS修改AQS的state值来直接抢占锁如果抢占成功就直接将持有者设置为当前线程如果抢占失败就调用acquire方法走正常流程来获取锁。而在acquire方法中就会调用子类中的tryAcquire方法并进一步调用到上文提到的父类中的nonfairTryAcquire方法来完成锁获取操作。static final class NonfairSync extends Sync {private static final long serialVersionUID 7316153563782823691L;/*** 执行锁操作。尝试直接抢占如果失败的话就回到正常的获取流程进行*/final void lock() {// 尝试直接抢占 if (compareAndSetState(0, 1))// 抢占成功设置锁所有者 setExclusiveOwnerThread(Thread.currentThread());else// 抢占失败走正常获取流程 acquire(1);}// 实现AQS方法使用nonfairTryAcquire实现 protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}}公平Sync类的实现而在公平版本的Sync子类FairSync中为了保证成功获取到锁的顺序一定要和发起获取锁操作的顺序一致所以自然不能在lock方法中进行CAS方式的抢占只能老老实实调用acquire方法走正式流程。而acquire方法最终就会调用子类中定义的tryAcquire来真正获取锁。在tryAcquire方法中代码主要处理了两种情况当前锁还没有被线程锁持有只有在确保等待队列为空的情况下才能尝试用CAS方式直接抢占锁而在等待队列不为空的情况下最后返回了false之后acquire方法中的代码会将当前线程放入到等待队列中阻塞等待锁的释放。这就保证了在获取锁时已经有线程等待的情况下任何线程都要进入等待队列去等待获取锁而不能直接对锁进行获取。2. 当前线程已经持有了该锁如果当前线程已经是该锁的持有者了那么就会在state值上加上本次的获取数量来更新锁的重复获取次数并返回true代表获取锁成功。static final class FairSync extends Sync {private static final long serialVersionUID -3000897897090466540L;// 直接使用acquire进行获取锁操作 final void lock() {acquire(1);}/*** 公平版本的tryAcquire方法。不要授予访问权限除非是递归调用或者没有等待线程或者这是第一个调用*/protected final boolean tryAcquire(int acquires) {final Thread current Thread.currentThread();int c getState();// 如果锁没有被持有 if (c 0) {// 为了实现公平特性所以只有在等待队列为空的情况下才能直接抢占 // 否则只能进入队列等待 if (!hasQueuedPredecessors() compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}// 如果锁已被持有且当前线程就是持有线程 else if (current getExclusiveOwnerThread()) {// 计算新的state值 int nextc c acquires;// 如果锁计数溢出则抛出异常 if (nextc 0)throw new Error(Maximum lock count exceeded);// 设置state状态值 setState(nextc);return true;}return false;}}ReentrantLock对Sync类对象的使用最后我们来看看ReentrantLock类中的lock()、unlock()、newCondition方法对Sync类对象的使用方式。首先是在构造器中根据入参指定的公平/非公平模式创建不同的内部Sync类对象如果是公平模式就是用FairSync类如果是非公平模式就是用NonfairSync类。public ReentrantLock(boolean fair) {sync fair ? new FairSync() : new NonfairSync();}然后在互斥锁的锁定方法lock()中ReentrantLock直接使用Sync类中的lock方法来实现了锁的获取功能。public void lock() {// 调用sync对象的lock方法实现 sync.lock();}在unlock()方法中也是一样的情况ReentrantLock直接依赖Sync类对象来实现这个功能。public void unlock() {// 调用了sync对象的release方法实现 sync.release(1);}最后一个创建条件变量的方法则直接依赖于AQS中定义的方法我们在ReentranctLock的Sync类中并不需要做任务额外的工作AQS就能为我们做好所有的事情。public Condition newCondition() {// 调用了sync对象继承自AQS的newCondition方法实现 return sync.newCondition();}通过ReentrantLock的例子我们能够更明显地感受到这些基于AQS实现同步功能的类中并不需要做太多额外的工作大多数操作都是通过直接调用Sync类对象上的方法来实现的。只要定义好了继承自AQS的子类Sync并通过Sync类重写几个AQS的关键方法来指定AQS的行为策略就可以实现风格迥异的各种同步工具类了。总结在这篇文章中我们从AQS的基本概念说起简单介绍了AQS的具体用法然后通过CountDownLatch和ReentrantLock两个常用的多线程同步工具类的源码来具体了解了AQS的使用方式。我们不仅可以完全弄明白这两个线程同步类的实现原理与细节而且最重要的是找到了AQS这个幕后大BOSS。通过AQS我们不仅可以更容易地阅读并理解其他同步工具类的使用与实现而且甚至可以动手开发出我们自己的自定义同步工具类。到了这里这一系列多线程编程相关的技术文章就接近尾声了。后续我还会发布一篇囊括这个系列所有内容的总结性文章里面会对多线程编程相关的知识脉络做一次全面的梳理然后将每个知识点链接到具体阐释这个主题的文章中去。让读者可以在宏观和微观两个层面理解多线程编程的原理与技巧帮助大家建立完整的Java多线程理论与实践知识体系。有兴趣的读者可以关注一下后续的文章感谢大家的支持。