Java并发编程知识点

Ari的小跟班 / 2023-08-24 / 原文

并发编程实践

参考

  • 《深入理解 Java 虚拟机》
  • 《实战 Java 高并发程序设计》
  • Guide to the Volatile Keyword in Java - Baeldung:Guide to the Volatile Keyword in Java | Baeldung
  • 理解 Java 中的 ThreadLocal - 技术小黑屋:理解Java中的ThreadLocal - 技术小黑屋 (droidyue.com)
  • ThreadLocal (Java Platform SE 8 ) - Oracle Help Center:ThreadLocal (Java Platform SE 8 ) (oracle.com)
下面的编程实践中,启动线程的方式都是放到线程池中,我用枚举把线程池作为一个单例对象(正好可以复习一下枚举类型的单例写法)。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * 通过枚举类型来实现线程池的单例
 */
public enum enumWay {
    INSTANCE;

    private int corePoolSize = 5;
    private int maximumPoolSize = 10;
    private long keepAliveTime = 10;
    private TimeUnit unit = TimeUnit.SECONDS;
    private BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(10);
    private ThreadPoolExecutor threadPoolExecutor;
    enumWay(){
        threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue);
    }
    public ThreadPoolExecutor getThreadPoolExecutor(){
        return this.threadPoolExecutor;
    }
}

生产者消费者模型

​ 有好几种写法,这里我就介绍传统的sychronized+wait+notify以及ReentrantLock+Condition的方式,第一个是用JVM底层的机制实现的,第二个是API层面的。

使用sychronized+wait+notify

对于执行条件的判断一定要用while循环来判断,不能是if,因为获取锁之后是从wait()函数往下运行的,因为有多个消费者,试想:现在有两个消费者A和B,队列为空,消费者B拿到锁进来发现队列为空,于是wait,释放锁并等待生产者抢到锁之后生产了一个产品到队列中,随后,消费者A拿到锁之后消费了一个饼释放锁了,释放锁完了,消费者B马上就拿到了锁,这个时候如果是if,则会往下执行消费操作,但是明明是空的,它不应该消费,会导致错误,这个时候如果用while,就会再次判断了,发现为空,则再次wait

import mianshi.Design_parttern_practice.singleton.enumWay;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;

public class producer_and_consumer {
    private static final int size = 10;

    private static final Object lockOfQueue = new Object();//队列操作锁

    private static final AtomicInteger currentSize = new AtomicInteger(0);

    static class producer implements Runnable{
        @Override
        public void run() {
            for(int i = 0;i < 100;i++){
                synchronized (lockOfQueue){//synchronized是一种隐形的抢锁
                    try {
                        while(currentSize.get() >= size){//如果满了
                            lockOfQueue.wait();
                        }
                        System.out.println("生产者" + Thread.currentThread().getName() + "生产完毕了" +
                                ",现在有" + currentSize.addAndGet(1));
                        lockOfQueue.notifyAll();
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                }
            }
        }
    }

    static class consumer implements Runnable{
        @Override
        public void run() {
            for(int i = 0;i < 100;i++){
                synchronized (lockOfQueue){
                    try {
                        while (currentSize.get() <= 0){

                            lockOfQueue.wait();
                        }
                        System.out.println("消费者:" + Thread.currentThread().getName()+"正在消费," +
                                "消费完毕后还有多少个" + currentSize.addAndGet(-1));
                        lockOfQueue.notifyAll();
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    public static void main(String[] args) {
        ThreadPoolExecutor threadPoolExecutor = enumWay.INSTANCE.getThreadPoolExecutor();
        threadPoolExecutor.execute(new producer());
        threadPoolExecutor.execute(new consumer());
        threadPoolExecutor.execute(new producer());
        threadPoolExecutor.execute(new consumer());
        threadPoolExecutor.shutdown();//用于在生产者、消费者结束后,结束线程池从而退出psvm
    }
}

使用ReetrantLock+Condition

​ 使用Condition可以实现更精确的控制并发顺序和条件。

import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class producer_and_consumer2 {
    private static Integer count = 0;
    private static final Integer FULL = 10;
    //创建一个锁对象
    private static Lock lock = new ReentrantLock();
    //创建两个条件变量,一个为缓冲区非满,一个为缓冲区非空
    private static final Condition producerCondition = lock.newCondition();
    private static final Condition consumerCondition = lock.newCondition();

    static class threadProducer implements Runnable{

        @Override
        public void run() {
            for(int i = 0;i < 100;i++){
                lock.lock();
                try {
                    while(count >= FULL){
                        producerCondition.await();
                    }
                    count++;
                    //生产
                    System.out.println(Thread.currentThread().getName() + "生产者生产商品完成,现在有"
                            + count + "个产品");
                    consumerCondition.signalAll();
                }catch (Exception e){
                    e.printStackTrace();
                }
                lock.unlock();
            }
        }
    }

    static class threadConsumer implements Runnable{
        @Override
        public void run() {
            for(int i = 0;i < 100;i++){
                lock.lock();
                try {
                    while(count <= 0){
                        consumerCondition.await();
                    }
                    //生产
                    count--;
                    System.out.println(Thread.currentThread().getName() + "消费者消费商品完成,现在有"
                            + count + "个产品");
                    producerCondition.signalAll();
                }catch (Exception e){
                    e.printStackTrace();
                }
                lock.unlock();
            }
        }
    }

    public static void main(String[] args) {
        ThreadPoolExecutor threadPoolExecutor = enumWay.INSTANCE.getThreadPoolExecutor();
        threadPoolExecutor.execute(new threadConsumer());
        threadPoolExecutor.execute(new threadConsumer());
        threadPoolExecutor.execute(new threadProducer());
        threadPoolExecutor.execute(new threadProducer());
        threadPoolExecutor.shutdown();
    }
}

轮流打印ABC

sychronized写法

import mianshi.Design_parttern_practice.singleton.enumWay;
import java.util.concurrent.ThreadPoolExecutor;
public class alternately_print_ABC2 {
    private final static Object object = new Object();
    private static int count = 0;
    static class threadPrintA implements Runnable{
        @Override
        public void run() {
            for(int i = 0;i < 100;i++){
                synchronized (object){
                    try {
                        while(count % 3 != 0){
                            object.wait();
                        }
                        System.out.println(count + "A");
                        count++;
                        object.notifyAll();
                    }catch (Exception e){
                        e.printStackTrace();
                    }

                }
            }
        }
    }

    static class threadPrintB implements Runnable{

        @Override
        public void run() {
            for(int i = 0;i < 100;i++){
                synchronized (object){
                    try {
                        while(count % 3 != 1){
                            object.wait();
                        }
                        System.out.println(count + "B");
                        count++;
                        object.notifyAll();
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                }
            }
        }
    }

    static class threadPrintC implements Runnable{

        @Override
        public void run() {
            for(int i = 0;i < 100;i++){
                synchronized (object){
                    try {
                        while(count % 3 != 2){
                            object.wait();
                        }
                        System.out.println(count + "C");
                        count++;
                        object.notifyAll();
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                }
            }
        }
    }

    public static void main(String[] args) {
        ThreadPoolExecutor threadPoolExecutor = enumWay.INSTANCE.getThreadPoolExecutor();
        threadPoolExecutor.execute(new threadPrintA());
        threadPoolExecutor.execute(new threadPrintB());
        threadPoolExecutor.execute(new threadPrintC());
        threadPoolExecutor.shutdown();
    }
}

ReetrantLock+Condition

import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class alternately_print_ABC {
    private static final Lock lock = new ReentrantLock();
    private static final Condition conditionA = lock.newCondition();
    private static final Condition conditionB = lock.newCondition();
    private static final Condition conditionC = lock.newCondition();

    private static int count = 0;
    static class threadPrintingA implements Runnable{
        @Override
        public void run() {
            for(int i = 0;i < 100;i++){
                //先拿到锁
                lock.lock();
                try {
                    while(count % 3 != 0){
                        conditionA.await();//没到自己打印
                    }
                    System.out.println(count + "A");
                    count++;
                    conditionB.signal();//唤醒B
                }catch (Exception e){
                    e.printStackTrace();
                }
                lock.unlock();
            }
        }
    }

    static class threadPrintingB implements Runnable{
        @Override
        public void run() {
            for(int i = 0;i < 100;i++){
                //先拿到锁
                lock.lock();

                try {
                    while(count % 3 != 1){
                        conditionB.await();//等待下一次唤醒B来打印B
                    }
                    System.out.println(count + "B");
                    count++;
                    conditionC.signal();//唤醒C

                }catch (Exception e){
                    e.printStackTrace();
                }
                lock.unlock();

            }
        }
    }

    static class threadPrintingC implements Runnable{
        @Override
        public void run() {
            for(int i = 0;i < 100;i++){
                lock.lock();
                try {
                    while(count % 3 != 2){
                        conditionC.await();//等待下一次唤醒C来打印C
                    }
                    System.out.println(count + "C");
                    count++;
                    conditionA.signal();//唤醒C
                }catch (Exception e){
                    e.printStackTrace();
                }
                lock.unlock();
            }
        }
    }

    public static void main(String[] args) {
        ThreadPoolExecutor threadPoolExecutor = enumWay.INSTANCE.getThreadPoolExecutor();
        threadPoolExecutor.execute(new threadPrintingA());
        threadPoolExecutor.execute(new threadPrintingB());
        threadPoolExecutor.execute(new threadPrintingC());

        threadPoolExecutor.shutdown();
    }
}

Java 并发常见面试题总结

什么是线程和进程?

何为进程?

​ 进程是程序的一次执行过程,是系统运行程序的基本单位,因此进程是动态的。系统运行一个程序即是一个进程从创建,运行到消亡的过程。

​ 在 Java 中,当我们启动 main 函数时其实就是启动了一个 JVM 的进程,而 main 函数所在的线程就是这个进程中的一个线程,也称主线程。如下图所示,在 Windows 中通过查看任务管理器的方式,我们就可以清楚看到 Windows 当前运行的进程(.exe 文件的运行)。

何为线程?

​ 线程与进程相似,但线程是一个比进程更小执行单位。一个进程在其执行的过程中可以产生多个线程。与进程不同的是同类的多个线程共享进程的方法区资源,但每个线程有自己的程序计数器虚拟机栈本地方法栈,所以系统在产生一个线程,或是在各个线程之间作切换工作时,负担要比进程小得多,也正因为如此,线程也被称为轻量级进程。

​ Java 程序天生就是多线程程序,我们可以通过 JMX 来看看一个普通的 Java 程序有哪些线程,代码如下。

public class MultiThread {
	public static void main(String[] args) {
		// 获取 Java 线程管理 MXBean
	ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
		// 不需要获取同步的 monitor 和 synchronizer 信息,仅获取线程和线程堆栈信息
		ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(false, false);
		// 遍历线程信息,仅打印线程 ID 和线程名称信息
		for (ThreadInfo threadInfo : threadInfos) {
			System.out.println("[" + threadInfo.getThreadId() + "] " + threadInfo.getThreadName());
		}
	}
}

上述程序输出如下(输出内容可能不同,不用太纠结下面每个线程的作用,只用知道 main 线程执行 main 方法即可):

[5] Attach Listener //添加事件
[4] Signal Dispatcher // 分发处理给 JVM 信号的线程
[3] Finalizer //调用对象 finalize 方法的线程
[2] Reference Handler //清除 reference 线程
[1] main //main 线程,程序入口

​ 从上面的输出内容可以看出:一个 Java 程序的运行是 main 线程和多个其他线程同时运行

请简要描述线程与进程的关系,区别及优缺点?

从 JVM 角度说进程和线程之间的关系。

图解进程和线程的关系

​ 下图是 Java 内存区域,通过下图我们从 JVM 的角度来说一下线程和进程之间的关系。

​ 从上图可以看出:一个进程中可以有多个线程,多个线程共享进程的方法区 (JDK1.8 之后的元空间)*资源,但是每个线程有自己的*程序计数器虚拟机栈本地方法栈

总结: 线程是进程划分成的更小的运行单位。线程和进程最大的不同在于基本上各进程是独立的,而各线程则不一定,因为同一进程中的线程极有可能会相互影响。线程执行开销小,但不利于资源的管理和保护;而进程正相反。

下面是该知识点的扩展内容!

下面来思考这样一个问题:为什么程序计数器虚拟机栈本地方法栈是线程私有的呢?为什么堆和方法区是线程共享的呢?

程序计数器为什么是私有的?

程序计数器主要有下面两个作用:

  1. 字节码解释器通过改变程序计数器来依次读取指令,从而实现代码的流程控制,如:顺序执行、选择、循环、异常处理。
  2. 在多线程的情况下,程序计数器用于记录当前线程执行的位置,从而当线程被切换回来的时候能够知道该线程上次运行到哪儿了。

​ 需要注意的是,如果执行的是 native 方法,那么程序计数器记录的是 undefined 地址,只有执行的是 Java 代码时程序计数器记录的才是下一条指令的地址。

​ 所以,程序计数器私有主要是为了线程切换后能恢复到正确的执行位置

虚拟机栈和本地方法栈为什么是私有的?

  • 虚拟机栈: 每个 Java 方法在执行的同时会创建一个栈帧用于存储局部变量表操作数栈常量池引用等信息。从方法调用直至执行完成的过程,就对应着一个栈帧在 Java 虚拟机栈中入栈和出栈的过程。
  • 本地方法栈: 和虚拟机栈所发挥的作用非常相似,区别是: 虚拟机栈为虚拟机执行 Java 方法 (也就是字节码)服务,而本地方法栈则为虚拟机使用到的 Native 方法服务。 在 HotSpot 虚拟机中和 Java 虚拟机栈合二为一。

所以,为了保证线程中的局部变量不被别的线程访问到,虚拟机栈和本地方法栈是线程私有的

一句话简单了解堆和方法区

​ 堆和方法区是所有线程共享的资源,其中堆是进程中最大的一块内存,主要用于存放新创建的对象 (几乎所有对象都在这里分配内存),方法区主要用于存放已被加载的类信息常量静态变量即时编译器编译后的代码等数据。

并发与并行的区别

  • 并发:两个及两个以上的作业在同一 时间段 内执行。
  • 并行:两个及两个以上的作业在同一 时刻 执行。

最关键的点是:是否是 同时 执行

同步和异步的区别

  • 同步 : 发出一个调用之后,在没有得到结果之前, 该调用就不可以返回,一直等待。
  • 异步 :调用在发出之后,不用等待返回结果,该调用直接返回。

为什么要使用多线程呢?

先从总体上来说:

  • 从计算机底层来说: 线程可以比作是轻量级的进程,是程序执行的最小单位,线程间的切换和调度的成本远远小于进程。另外,多核 CPU 时代意味着多个线程可以同时运行,这减少了线程上下文切换的开销。
  • 从当代互联网发展趋势来说: 现在的系统动不动就要求百万级甚至千万级的并发量,而多线程并发编程正是开发高并发系统的基础,利用好多线程机制可以大大提高系统整体的并发能力以及性能。

再深入到计算机底层来探讨:

  • 单核时代: 在单核时代多线程主要是为了提高单进程利用 CPU 和 IO 系统的效率。 假设只运行了一个 Java 进程的情况,当我们请求 IO 的时候,如果 Java 进程中只有一个线程,此线程被 IO 阻塞则整个进程被阻塞。CPU 和 IO 设备只有一个在运行,那么可以简单地说系统整体效率只有 50%。当使用多线程的时候,一个线程被 IO 阻塞,其他线程还可以继续使用 CPU。从而提高了 Java 进程利用系统资源的整体效率。
  • 多核时代: 多核时代多线程主要是为了提高进程利用多核 CPU 的能力。举个例子:假如我们要计算一个复杂的任务,我们只用一个线程的话,不论系统有几个 CPU 核心,都只会有一个 CPU 核心被利用到。而创建多个线程,这些线程可以被映射到底层多个 CPU 上执行,在任务中的多个线程没有资源竞争的情况下,任务执行的效率会有显著性的提高,约等于(单核时执行时间/CPU 核心数)。

使用多线程可能带来什么问题?

​ 并发编程的目的就是为了能提高程序的执行效率提高程序运行速度,但是并发编程并不总是能提高程序运行速度的,而且并发编程可能会遇到很多问题,比如:内存泄漏死锁线程不安全等等。

如何创建线程?(重要!!)

一、继承Thread类创建

​ 通过继承Thread类并且重写其run(),run方法中即线程执行任务。创建后的子类通过调用 start() 方法即可执行线程方法。通过继承Thread实现的线程类,多个线程间无法共享线程类的实例变量。(需要创建不同Thread对象,自然不共享)。创建线程对象实例,调用start方法开启线程(而非run()方法)

​ 比如有个flag共享变量,但是因为是用继承Thread类的方式来实现继承的,所以每个线程都不能共享得到flag,尽管加了volatile关键字。

public class extendThread extends Thread{
    public volatile int flag = 0;
    public int getFlag() {
        return flag;
    }
    //重写run方法
    @Override
    public void run() {
        System.out.println("现在的flag是"+getFlag());
        flag++;
        System.out.println("修改后的flag是"+getFlag());
    }

    public static void main(String[] args) throws InterruptedException {
        extendThread myThread = new extendThread();
        for (int i = 0;i<5;i++){
            new extendThread().start();
        }
    }
}

二、实现Runnable接口

​ 该方法需要先定义一个类实现Runnable接口,并重写该接口的 run() 方法,此run方法是线程执行体。接着创建 Runnable实现类的对象,作为创建Thread对象的参数target此Thread对象才是真正的线程对象。通过实现Runnable接口的线程类,是互相共享资源的。

public class runnableThread implements Runnable{
    public int flag = 0;
    public int getFlag() {return flag;}
    @SneakyThrows
    @Override
    public void run() {
        System.out.println("现在的flag是"+getFlag());
        flag++;
        System.out.println("修改后的flag是"+getFlag());
    }

    public static void main(String[] args) throws InterruptedException {
        runnableThread myThread1 = new runnableThread();
        for (int i = 0;i<5;i++){
            new Thread(myThread1).start();//每次传入的都是同一个对象,可以共享变量
        }
        System.out.println("最后的结果");
        System.out.println(myThread1.getFlag());
    }
}

运行结果:

​ 可以发现,共享了结果,但是不保证操作的原子性,最后得到的结果是4(每次运行的结果可能会有所不同),并不是5.如果我们用AtomicInteger类来修饰的话,就可以保证程序的可复现性,以及确保每次程序的运行得到的结果都是一致的

public volatile AtomicInteger flag = new AtomicInteger(0);
public AtomicInteger getFlag() {
    return flag;
}

flag.getAndIncrement();//自增

三、 使用Callable和Future创建线程

​ 从继承Thread类和实现Runnable接口可以看出,上述两种方法都不能有返回值,且不能声明抛出异常。而Callable接口则实现了此两点,Callable接口如同Runable接口的升级版,其提供的call()方法将作为线程的执行体,同时允许有返回值

​ 但是Callable对象不能直接作为Thread对象的target,因为Callable接口是 Java 5 新增的接口,不是Runnable接口的子接口。

​ 对于这个问题的解决方案,就引入 Future接口,此接口可以接受call() 的返回值,Future接口是Runnable接口的子接口,可以作为Thread对象的target 。并且, Future 接口提供了一个实现类:FutureTask 。
​ FutureTask实现了Future接口、Runnable接口 ,可以作为 Thread对象的target。

public class CallableTest {
    public static void main(String[] args) {
        CallableTest callableTest = new CallableTest() ;
        //因为Callable接口是函数式接口,可以使用Lambda表达式
        FutureTask task = new FutureTask(()->{
            int i = 0 ;
            for(;i<10;i++){
                System.out.println(Thread.currentThread().getName() + "的循环变量i的值 :" + i);
            }
            return i;
        });
        new Thread(task,"有返回值的线程").start();
        try{
            System.out.println("子线程返回值 : " + task.get());
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

四、创建线程池

  1. 创建线程池(推荐使用ThreadPoolExecutor)
  2. 调用execute方法并传入参数(Runnable或Thread接口对象)来启动线程
  3. 关闭服务

具体代码看下面的线程池part的示例代码。

如何中断线程的执行?

参考 JAVA并发编程(八)之interrupt方法_自考的程序员的博客-CSDN博客_java的interrupt方法

​ 使用stop()方法是不合适的,该方法已经废弃,因为该方法不安全(直接暴力停止可能会产生不一致的问题),且JDK提倡的中断方式是中断的用法应该是只修改一些变量以指示目标线程应该停止运行。且目标线程应该定期检查此变量,如果变量指示要停止运行,则从其run方法有序返回,如果需要等待很长时间,就抛出中断异常来中断等待。

​ 所以就引出了正确的中断方式interrupt()方法:

​ interrrupt 含义:字面意思中断此线程,但实际上只是将调用线程的中断标志设置为true,仅此而已。被设置中断标志的线程将继续正常运行,不受影响。

public static void main(String[] args) {
    Thread myThread = new Thread(()->{
        for(int i = 0;i < 5;i++){
            System.out.println(Thread.currentThread().getName()+"::::"+i);
        }
    },"我的线程哈哈");
    myThread.start();

    System.out.println(myThread.getName()+"的中断状态是"+myThread.isInterrupted());
    myThread.interrupt();
    System.out.println(myThread.getName()+"的中断状态是"+myThread.isInterrupted());
    System.out.println("main方法执行结束");

}

​ 运行结果如下:

​ 分析结果可知线程thread对象的中断状态已经为true,但依旧不会影响线程的执行;isInterrupted()方法:返回当前线程的中断状态。那么我们就可以通过访问isInterrupted()方法来得到,改进一下的代码就是:

public static void main(String[] args) {
    Thread myThread = new Thread(()->{
        for(int i = 0;i < 5;i++){
            if(Thread.currentThread().isInterrupted()){
                break;//如果是中断状态,那么就直接break返回,不要继续执行了
            }
            System.out.println(Thread.currentThread().getName()+"::::"+i);
        }
    },"我的线程哈哈");
    myThread.start();
    System.out.println(myThread.getName()+"的中断状态是"+myThread.isInterrupted());
    myThread.interrupt();
    System.out.println(myThread.getName()+"的中断状态是"+myThread.isInterrupted());
    System.out.println("main方法执行结束");
}

​ 运行结果如下:

为什么线程的sleep()或者wait()都要抛出受检查的异常?

​ 如下图所示:

​ 这是因为,如果线程处于阻塞状态(不管是sleep()方法引起的还是wait()方法引起的),那么调用了interrupt()方法之后就会抛出InterruptedException方法,强制进程醒过来,因为中断线程应当是一个同步的操作。

public static void main(String[] args) throws InterruptedException {
    Thread myThread = new Thread(()->{
        for(int i = 0;i < 5;i++){
            try {
                Thread.sleep(1500);
            } catch (InterruptedException e) {
                System.out.println(Thread.currentThread().getName()+"捕获到了异常"+e.toString());
                e.printStackTrace();
                break;//捕获到了中断异常就可以直接跳出循环了
            }
            System.out.println(Thread.currentThread().getName()+"::::"+i);
        }
    },"我的线程哈哈");
    myThread.start();
    System.out.println(myThread.getName()+"的中断状态是"+myThread.isInterrupted());
    Thread.sleep(1500);//让线程可以执行完一轮
    myThread.interrupt();
    System.out.println(myThread.getName()+"的中断状态是"+myThread.isInterrupted());
    System.out.println("main方法执行结束");
}

说说线程的生命周期和状态?

Java 线程在运行的生命周期中的指定时刻只可能处于下面 6 种不同状态的其中一个状态:

  • NEW: 初始状态,线程被创建出来但没有被调用 start()
  • RUNNABLE: 运行状态,线程被调用了 start()等待运行的状态。
  • BLOCKED :阻塞状态,需要等待锁释放。
  • WAITING:等待状态,表示该线程需要等待其他线程做出一些特定动作(通知或中断)。
  • TIME_WAITING:超时等待状态,可以在指定的时间后自行返回而不是像 WAITING 那样一直等待。
  • TERMINATED:终止状态,表示该线程已经运行完毕。

线程在生命周期中并不是固定处于某一个状态而是随着代码的执行在不同状态之间切换。

Java 线程状态变迁图(图源:挑错 |《Java 并发编程的艺术》中关于线程状态的三处错误):

​ 由上图可以看出:线程创建之后它将处于 NEW(新建) 状态,调用 start() 方法后开始运行,线程这时候处于 READY(可运行) 状态。可运行状态的线程获得了 CPU 时间片(timeslice)后就处于 RUNNING(运行) 状态。

​ 在操作系统层面,线程有 READY 和 RUNNING 状态;而在 JVM 层面,只能看到 RUNNABLE 状态(图源:HowToDoInJava:Java Thread Life Cycle and Thread States),所以 Java 系统一般将这两个状态统称为 RUNNABLE(运行中) 状态 。

为什么 JVM 没有区分这两种状态呢? (摘自:Java 线程运行怎么有第六种状态? - Dawell 的回答 ) 现在的时分(time-sharing)多任务(multi-task)操作系统架构通常都是用所谓的“时间分片(time quantum or time slice)”方式进行抢占式(preemptive)轮转调度(round-robin 式)。这个时间分片通常是很小的,一个线程一次最多只能在 CPU 上运行比如 10-20ms 的时间(此时处于 running 状态),也即大概只有 0.01 秒这一量级,时间片用后就要被切换下来放入调度队列的末尾等待再次调度。(也即回到 ready 状态)。线程切换的如此之快,区分这两种状态就没什么意义了。

  • 当线程执行 wait()方法之后,线程进入 WAITING(等待) 状态。进入等待状态的线程需要依靠其他线程的通知才能够返回到运行状态。并非只是检测锁。
  • TIMED_WAITING(超时等待) 状态相当于在等待状态的基础上增加了超时限制,比如通过 sleep(long millis)方法或 wait(long millis)方法可以将线程置于 TIMED_WAITING 状态。当超时时间结束后,线程将会返回到 RUNNABLE 状态。
  • 当线程进入 synchronized 方法/块或者调用 wait 后(被 notify)重新进入 synchronized 方法/块,但是锁被其它线程占有,这个时候线程就会进入 BLOCKED(阻塞) 状态。
  • 线程在执行完了 run()方法之后将会进入到 TERMINATED(终止) 状态。

相关阅读:线程的几种状态你真的了解么?

什么是上下文切换?

​ 线程在执行过程中会有自己的运行条件状态(也称上下文),比如上文所说到过的程序计数器栈信息等。当出现如下情况的时候,线程会从占用 CPU 状态中退出。

  • 主动让出 CPU,比如调用了 sleep(), wait() 等。
  • 时间片用完,因为操作系统要防止一个线程或者进程长时间占用 CPU 导致其他线程或者进程饿死。
  • 调用了阻塞类型的系统中断,比如请求 IO,线程被阻塞。
  • 被终止或结束运行

​ 这其中前三种都会发生线程切换,线程切换意味着需要保存当前线程的上下文,留待线程下次占用 CPU 的时候恢复现场。并加载下一个将要占用 CPU 的线程上下文。这就是所谓的 上下文切换。上下文切换是现代操作系统的基本功能,因其每次需要保存信息恢复信息,这将会占用 CPU,内存等系统资源进行处理,也就意味着效率会有一定损耗,如果频繁切换就会造成整体效率低下。

什么是线程死锁?如何避免死锁?

认识线程死锁

​ 线程死锁描述的是这样一种情况:多个线程同时被阻塞,它们中的一个或者全部都在等待某个资源被释放。由于线程被无限期地阻塞,因此程序不可能正常终止。如下图所示,线程 A 持有资源 2,线程 B 持有资源 1,他们同时都想申请对方的资源,所以这两个线程就会互相等待而进入死锁状态。

下面通过一个例子来说明线程死锁,代码模拟了上图的死锁的情况 (代码来源于《并发编程之美》):

public class DeadLockDemo {
    private static Object resource1 = new Object();//资源 1
    private static Object resource2 = new Object();//资源 2

    public static void main(String[] args) {
        new Thread(() -> {
            synchronized (resource1) {
                System.out.println(Thread.currentThread() + "get resource1");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread() + "waiting get resource2");
                synchronized (resource2) {
                    System.out.println(Thread.currentThread() + "get resource2");
                }
            }
        }, "线程 1").start();

        new Thread(() -> {
            synchronized (resource2) {
                System.out.println(Thread.currentThread() + "get resource2");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread() + "waiting get resource1");
                synchronized (resource1) {
                    System.out.println(Thread.currentThread() + "get resource1");
                }
            }
        }, "线程 2").start();
    }
}

Output

Thread[线程 1,5,main]get resource1
Thread[线程 2,5,main]get resource2
Thread[线程 1,5,main]waiting get resource2
Thread[线程 2,5,main]waiting get resource1

​ 线程 A 通过 synchronized (resource1) 获得 resource1监视器锁,然后通过Thread.sleep(1000);让线程 A 休眠 1s 为的是让线程 B 得到执行然后获取到 resource2 的监视器锁。线程 A 和线程 B 休眠结束了都开始企图请求获取对方的资源,然后这两个线程就会陷入互相等待的状态,这也就产生了死锁。

上面的例子符合产生死锁的四个必要条件:

  1. 互斥条件:该资源任意一个时刻只由一个线程占用
  2. 请求与保持条件:一个线程因请求资源而阻塞时,对已获得的资源保持不放
  3. 不剥夺条件:线程已获得的资源在未使用完之前不能被其他线程强行剥夺只有自己使用完毕后才释放资源。
  4. 循环等待条件:若干线程之间形成一种头尾相接的循环等待资源关系。

如何预防和避免线程死锁?

如何预防死锁? 破坏死锁的产生的必要条件即可:

  1. 破坏请求与保持条件 :一次性申请所有的资源。(只有能拿到所有资源的时候才拿,只要资源中有一个拿不到,就不拿,哲学家就餐问题)
  2. 破坏不剥夺条件 :占用部分资源的线程进一步申请其他资源时,如果申请不到,可以主动释放它占有的资源。
  3. 破坏循环等待条件 :靠按序申请资源来预防。按某一顺序申请资源,释放资源则反序释放。破坏循环等待条件。

如何避免死锁?

避免死锁就是在资源分配时,借助于算法(比如银行家算法)对资源分配进行计算评估,使其进入安全状态。

安全状态 指的是系统能够按照某种线程推进顺序(P1、P2、P3.....Pn)来为每个线程分配所需资源,直到满足每个线程对资源的最大需求,使每个线程都可顺利完成。称 <P1、P2、P3.....Pn> 序列为安全序列。

​ 我们对线程 2 的代码修改成下面这样就不会产生死锁了。先申请资源1再申请资源2,破幻循环等待条件。

new Thread(() -> {
            synchronized (resource1) {
                System.out.println(Thread.currentThread() + "get resource1");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread() + "waiting get resource2");
                synchronized (resource2) {
                    System.out.println(Thread.currentThread() + "get resource2");
                }
            }
        }, "线程 2").start();

​ 线程 1 首先获得到 resource1 的监视器锁,这时候线程 2 就获取不到了。然后线程 1 再去获取 resource2 的监视器锁,可以获取到。然后线程 1 释放了对 resource1、resource2 的监视器锁的占用,线程 2 获取到就可以执行了。这样就破坏了破坏循环等待条件,因此避免了死锁。

sleep() 方法和 wait() 方法对比

共同点 :两者都可以暂停线程的执行。

区别

  • sleep() 方法没有释放锁,而 wait() 方法释放了锁
  • wait() 通常被用于线程间交互/通信sleep()通常被用于暂停执行
  • wait() 方法被调用后,线程不会自动苏醒,需要别的线程调用同一个对象上的 notify()或者 notifyAll() 方法。sleep()方法执行完成后,线程会自动苏醒,或者也可以使用 wait(long timeout) 超时后线程会自动苏醒。
  • sleep()Thread 类的静态本地方法wait() 则是 Object 类的本地方法。为什么这样设计呢?

为什么 wait() 方法不定义在 Thread 中?

wait() 是让获得对象锁的线程实现等待,会自动释放当前线程占有的对象锁。每个对象(Object)都拥有对象锁,既然要释放当前线程占有的对象锁并让其进入 WAITING 状态,自然是要操作对应的对象Object)而非当前的线程(Thread)。

​ 类似的问题:为什么 sleep() 方法定义在 Thread 中?因为 sleep() 是让当前线程暂停执行,不涉及到对象类,也不需要获得对象锁。

可以直接调用 Thread 类的 run 方法吗?

​ 这是另一个非常经典的 Java 多线程面试问题,而且在面试中会经常被问到。很简单,但是很多人都会答不上来!

​ new 一个 Thread,线程进入了新建状态。调用 start()方法,会启动一个线程并使线程进入了就绪状态,当分配到时间片后就可以开始运行了start()执行线程的相应准备工作然后自动执行 run() 方法的内容,这是真正的多线程工作。 但是,直接执行 run() 方法,会把 run() 方法当成一个 main 线程下的普通方法去执行,并不会在某个线程中执行它,所以这并不是多线程工作。

总结: 调用 start() 方法方可启动线程并使线程进入就绪状态,直接执行 run() 方法的话不会以多线程的方式执行

volatile 关键字

如何保证变量的可见性?

​ 在 Java 中,volatile 关键字可以保证变量的可见性,如果我们将变量声明为 volatile ,这就指示 JVM,这个变量是共享且不稳定的,每次使用它都到主存中进行读取。

​ 到主内存中去取:

volatile 关键字其实并非是 Java 语言特有的,在 C 语言里也有,它最原始的意义就是禁用 CPU 缓存。如果我们将一个变量使用 volatile 修饰,这就指示 编译器,这个变量是共享且不稳定的,每次使用它都到主存中进行读取。volatile 关键字能保证数据的可见性,但不能保证数据的原子性synchronized 关键字两者都能保证。

如何禁止指令重排序?

在 Java 中,volatile 关键字除了可以保证变量的可见性,还有一个重要的作用就是防止 JVM 的指令重排序。 如果我们将变量声明为 volatile ,在对这个变量进行读写操作的时候,会通过插入特定的 内存屏障 的方式来禁止指令重排序。

​ 在 Java 中,Unsafe 类提供了三个开箱即用的内存屏障相关的方法,屏蔽了操作系统底层的差异:

public native void loadFence();
public native void storeFence();
public native void fullFence();

​ 理论上来说,你通过这个三个方法也可以实现和volatile禁止重排序一样的效果,只是会麻烦一些。

​ 下面我以一个常见的面试题为例讲解一下 volatile 关键字禁止指令重排序的效果。

​ 面试中面试官经常会说:“单例模式了解吗?来给我手写一下!给我解释一下双重检验锁方式实现单例模式的原理呗!”

双重校验锁实现对象单例(线程安全)

public class Singleton {
    private volatile static Singleton uniqueInstance;
    private Singleton() {
    }
    public static Singleton getUniqueInstance() {
       //先判断对象是否已经实例过,没有实例化过才进入加锁代码
        if (uniqueInstance == null) {
            //类对象加锁
            synchronized (Singleton.class) {
                if (uniqueInstance == null) {
                    uniqueInstance = new Singleton();
                }
            }
        }
        return uniqueInstance;
    }
}

uniqueInstance 采用 volatile 关键字修饰也是很有必要的, uniqueInstance = new Singleton(); 这段代码其实是分为三步执行:

  1. uniqueInstance 分配内存空间
  2. 初始化 uniqueInstance
  3. uniqueInstance 指向分配的内存地址

​ 但是由于 JVM 具有指令重排的特性,执行顺序有可能变成 1->3->2。指令重排在单线程环境下不会出现问题,但是在多线程环境下会导致一个线程获得还没有初始化的实例。例如,线程 T1 执行了 1 和 3,此时 T2 调用 getUniqueInstance() 后发现 uniqueInstance 不为空,因此返回 uniqueInstance,但此时 uniqueInstance 还未被初始化。

volatile 可以保证原子性么?

volatile 关键字能保证变量的可见性,但不能保证对变量的操作是原子性的。

我们通过下面的代码即可证明:

public class VolatoleAtomicityDemo {
    public volatile static int inc = 0;
    public void increase() {
        inc++;
    }
    public static void main(String[] args) throws InterruptedException {
        ExecutorService threadPool = Executors.newFixedThreadPool(5);
        VolatoleAtomicityDemo volatoleAtomicityDemo = new VolatoleAtomicityDemo();
        for (int i = 0; i < 5; i++) {
            threadPool.execute(() -> {
                for (int j = 0; j < 500; j++) {
                    volatoleAtomicityDemo.increase();
                }
            });
        }
        // 等待1.5秒,保证上面程序执行完成
        Thread.sleep(1500);
        System.out.println(inc);
        threadPool.shutdown();
    }
}

​ 正常情况下,运行上面的代码理应输出 2500。但你真正运行了上面的代码之后,你会发现每次输出结果都小于 2500。为什么会出现这种情况呢?不是说好了,volatile 可以保证变量的可见性嘛!

​ 也就是说,如果 volatile 能保证 inc++ 操作的原子性的话。每个线程中对 inc 变量自增完之后,其他线程可以立即看到修改后的值。5 个线程分别进行了 500 次操作,那么最终 inc 的值应该是 5*500=2500。

​ 很多人会误认为自增操作 inc++ 是原子性的,实际上,inc++ 其实是一个复合操作,包括三步:

  1. 读取 inc 的值。
  2. 对 inc 加 1。
  3. 将 inc 的值写回内存。

volatile无法保证这三个操作是具有原子性的,有可能导致下面这种情况出现:

  1. 线程 1 对 inc 进行读取操作之后,还未对其进行修改。线程 2 又读取了 inc的值并对其进行修改(+1),再将inc 的值写回内存。
  2. 线程 2 操作完毕后,线程 1 对 inc的值进行修改(+1),再将inc 的值写回内存。

​ 这也就导致两个线程分别对 inc 进行了一次自增操作后,inc 实际上只增加了 1。其实,如果想要保证上面的代码运行正确也非常简单,利用 synchronizedLock或者AtomicInteger都可以。

使用 synchronized 改进:

public synchronized void increase() {
    inc++;
}

使用 AtomicInteger 改进:

public AtomicInteger inc = new AtomicInteger();

public void increase() {
    inc.getAndIncrement();
}

使用 ReentrantLock 改进:

Lock lock = new ReentrantLock();
public void increase() {
    lock.lock();
    try {
        inc++;
    } finally {
        lock.unlock();
    }
}

synchronized 关键字

说一说自己对于 synchronized 关键字的了解

synchronized 翻译成中文是同步的的意思,主要解决的是多个线程之间访问资源的同步性,可以保证被它修饰的方法或者代码块在任意时刻只能有一个线程执行

​ 在 Java 早期版本中,synchronized 属于 重量级锁,效率低下。 因为监视器锁(monitor)是依赖于底层的操作系统的 Mutex Lock 来实现的,Java 的线程是映射到操作系统的原生线程之上的。如果要挂起或者唤醒一个线程,都需要操作系统帮忙完成,而操作系统实现线程之间的切换时需要从用户态转换到内核态,这个状态之间的转换需要相对比较长的时间,时间成本相对较高。

​ 不过,在 Java 6 之后,Java 官方对从 JVM 层面对 synchronized 较大优化,所以现在的 synchronized 锁效率也优化得很不错了。JDK1.6 对锁的实现引入了大量的优化,如自旋锁适应性自旋锁锁消除锁粗化偏向锁轻量级锁等技术来减少锁操作的开销。所以,你会发现目前的话,不论是各种开源框架还是 JDK 源码都大量使用了 synchronized 关键字。

如何使用 synchronized 关键字?

synchronized 关键字最主要的三种使用方式:

  1. 修饰实例方法
  2. 修饰静态方法
  3. 修饰代码块

1、修饰实例方法 (锁当前对象实例)

​ 给当前对象实例加锁,进入同步代码前要获得 当前对象实例的锁

synchronized void method() {
    //业务代码
}

2、修饰静态方法 (锁当前类)

​ 给当前类加锁,会作用于类的所有对象实例 ,进入同步代码前要获得 当前 class 的锁。这是因为静态成员不属于任何一个实例对象,归整个类所有,不依赖于类的特定实例,被类的所有实例共享。

synchronized static void method() {
    //业务代码
}

​ 静态 synchronized 方法和非静态 synchronized 方法之间的调用互斥么?

​ 不互斥!如果一个线程 A 调用一个实例对象的非静态 synchronized 方法,而线程 B 需要调用这个实例对象所属类的静态 synchronized 方法,是允许的,不会发生互斥现象,因为访问静态 synchronized 方法占用的锁是当前类的锁,而访问非静态 synchronized 方法占用的锁是当前实例对象锁

3、修饰代码块 (锁指定对象/类)

对括号里指定的对象/类加锁:

  • synchronized(object) 表示进入同步代码块前要获得 给定对象的锁
  • synchronized(类.class) 表示进入同步代码块前要获得 给定 Class 的锁
synchronized(this) {
    //业务代码
}

总结:

  • synchronized 关键字加到 static 静态方法和 synchronized(class) 代码块上都是是给 Class 类上锁;
  • synchronized 关键字加到实例方法上是给对象实例上锁;
  • 尽量不要使用 synchronized(String a) 因为 JVM 中,字符串常量池具有缓存功能

构造方法可以使用 synchronized 关键字修饰么?

先说结论:构造方法不能使用 synchronized 关键字修饰。构造方法本身就属于线程安全的,不存在同步的构造方法一说。

讲一下 synchronized 关键字的底层原理

synchronized 关键字底层原理属于 JVM 层面。

synchronized 同步语句块的情况

public class SynchronizedDemo {
    public void method() {
        synchronized (this) {
            System.out.println("synchronized 代码块");
        }
    }
}

​ 通过 JDK 自带的 javap 命令查看 SynchronizedDemo 类的相关字节码信息:首先切换到类的对应目录执行 javac SynchronizedDemo.java 命令生成编译后的 .class 文件,然后执行javap -c -s -v -l SynchronizedDemo.class

​ 从上面我们可以看出:synchronized 同步语句块的实现使用的是 monitorentermonitorexit 指令,其中 monitorenter 指令指向同步代码块的开始位置,monitorexit 指令则指明同步代码块的结束位置。当执行 monitorenter 指令时,线程试图获取锁也就是获取 对象监视器 monitor 的持有权。

​ 在 Java 虚拟机(HotSpot)中,Monitor 是基于 C++实现的,由ObjectMonitor实现的。每个对象中都内置了一个 ObjectMonitor对象。

wait/notify等方法也依赖于monitor对象(ObjectMonitor),这就是为什么只有在同步的块或者方法中才能调用wait/notify等方法(因为需要调用ObjectMonitor内部的一些方法),否则会抛出java.lang.IllegalMonitorStateException的异常的原因。

​ 在执行monitorenter时,会尝试获取对象的锁,如果锁的计数器为 0 则表示锁可以被获取,获取后将锁计数器设为 1 也就是加 1

​ 对象锁的的拥有者线程才可以执行 monitorexit 指令来释放锁。在执行 monitorexit 指令后,将锁计数器设为 0,表明锁被释放,其他线程可以尝试获取锁。

​ 如果获取对象锁失败,那当前线程就要阻塞等待,直到锁被另外一个线程释放为止。


​ Q1:这里的.class文件就直接使用了monitorentermonitorexit,这不就是重量级锁的实现逻辑吗?那么锁升级体现在哪里呢?

答:

​ 首先,需要弄清楚的是,monitorenter monitorexit 字节码指令是 Java 对象监视器(monitor)操作的指令。在 Java 编译器层面,同步方法和 synchronized 块都是通过 monitorentermonitorexit 指令来实现同步的。

​ 关于偏向锁、轻量级锁和重量级锁的过渡,这其实是在 Java 虚拟机(JVM)运行时发生的,而不是在编译阶段。在编译后的 .class 文件中,只会有 monitorenter monitorexit 指令,具体的锁升级和降级逻辑是由 JVM 在运行时根据实际情况来执行的

​ 具体来说,偏向锁、轻量级锁和重量级锁的实现是基于 Java 对象的 Mark Word 和 JVM 运行时的锁策略。当 JVM 执行 monitorenter 和 monitorexit 指令时,会根据当前对象的 Mark Word 状态来决定使用偏向锁、轻量级锁或者重量级锁。

​ 例如,当一个线程试图获取一个处于匿名偏向状态的对象的锁时,JVM 会使用偏向锁逻辑。如果有竞争发生,JVM 可能会尝试升级为轻量级锁或重量级锁。这些锁状态的转换是在 JVM 运行时根据具体情况进行的,而与编译后的 .class 文件中的 monitorentermonitorexit 指令无关。

​ 简而言之,偏向锁、轻量级锁和重量级锁的过渡逻辑是在 JVM 运行时执行的,而不是在编译阶段。.class 文件中的 monitorenter monitorexit 指令只负责同步,锁的具体实现由 JVM 在运行时根据对象状态来决定

synchronized 修饰方法的的情况

public class SynchronizedDemo2 {
    public synchronized void method() {
        System.out.println("synchronized 方法");
    }
}

synchronized 修饰的方法并没有 monitorenter 指令和 monitorexit 指令,取得代之的确实是 ACC_SYNCHRONIZED 标识,该标识指明了该方法是一个同步方法。JVM 通过该 ACC_SYNCHRONIZED 访问标志来辨别一个方法是否声明为同步方法,从而执行相应的同步调用。如果是实例方法,JVM 会尝试获取实例对象的锁。如果是静态方法,JVM 会尝试获取当前 class 的锁。

​ 其实这就意味着,一个类的同一个对象的不同方法(非静态方法,且已被Sychronzied修饰),是不可以并行执行的。

总结

synchronized 同步语句块的实现使用的是 monitorentermonitorexit 指令,其中 monitorenter 指令指向同步代码块的开始位置monitorexit 指令则指明同步代码块的结束位置synchronized 修饰的方法并没有 monitorenter 指令和 monitorexit 指令,取得代之的确实是 ACC_SYNCHRONIZED 标识,该标识指明了该方法是一个同步方法

不过两者的本质都是对对象监视器 monitor 的获取。

相关推荐:Java 锁与线程的那些事 - 有赞技术团队。🧗🏻 进阶一下:学有余力的小伙伴可以抽时间详细研究一下对象监视器 monitor

JDK1.6 之后的 synchronized 详解

​ JDK1.6 对锁的实现引入了大量的优化,如偏向锁、轻量级锁、自旋锁、适应性自旋锁、锁消除、锁粗化等技术来减少锁操作的开销。

​ 锁主要存在四种状态,依次是:无锁状态偏向锁状态轻量级锁状态重量级锁状态,他们会随着竞争的激烈而逐渐升级。注意锁可以升级不可降级,这种策略是为了提高获得锁和释放锁的效率。

一、对象存储结构

​ 在 HotSpot 虚拟机中,对象在内存中的布局分为三块区域对象头实例数据对齐填充

对象头中包含两部分:MarkWord类型指针。如果是数组对象的话,对象头还有一部分是存储数组的长度

​ 多线程下 synchronized 的加锁就是对同一个对象的对象头中的 MarkWord 中的变量进行CAS操作

对象头的组成:

  1. Mark Word :用于存储对象自身的运行时数据,如 HashCode、GC分代年龄、锁状态标志、线程持有的锁、偏向线程ID等等。占用内存大小与虚拟机位长一致(32位JVM -> MarkWord是32位,64位JVM -> MarkWord是64位)。
  2. 类型指针:虚拟机通过这个指针确定该对象是哪个类的实例。
  3. 对象头长度(仅当对象是数组对象时存在)

​ 如果是数组对象的话,虚拟机用3个字宽(32/64bit + 32/64bit + 32/64bit)存储对象头,如果是普通对象的话,虚拟机用2字宽存储对象头(32/64bit + 32/64bit)。

​ 在运行期间,Mark Word中存储的数据会随着锁标志位的变化而变化,在32位虚拟机中,不同状态下的组成如下:

​ 在64位JVM下,其Mark Word组成如下图所示:

​ 其中线程ID表示持有偏向锁线程的ID,Epoch表示偏向锁的时间戳,偏向锁和轻量级锁是在jdk1.6引入的。当对象刚创建时,其mark word中的Thread ID字段为空:这意味着没有线程获得了该对象的偏向锁当对象刚被创建时,系统会自动使其进入匿名偏向状态。如果一个对象一直没有被线程访问并获取锁,只是访问,那么对象就一直处于匿名偏向的状态

二、偏向锁

​ 引⼊偏向锁的⽬的:在没有多线程竞争的情况下,尽量减少不必要的轻量级锁的执⾏。轻量级锁的获取及释放依赖 多次CAS原⼦指令,⽽偏向锁只依赖⼀次CAS原⼦指令。但在多线程竞争时,需要进⾏偏向锁撤销步骤,因此其撤 销的开销必须⼩于节省下来的CAS开销,否则偏向锁并不能带来收益。JDK 1.6中默认开启偏向锁,可以通过-XX:- UseBiasedLocking来禁⽤偏向锁。

1.进入偏向锁:

​ 先说结论,简单来说在JVM开启偏向锁功能(JVM参数-XX:+UseBiasedLocking)时,只有对象处于匿名偏向状态时(即没有其他线程已经获取了这个偏向锁)或者无锁状态时,线程获取这个对象(偏向锁状态)才能直接成功,否则会进行“偏向锁撤销”(在撤销过程中,也可以获得偏向锁,当且仅当源线程已死亡或者已经释放了该偏向锁)或者“轻量级锁逻辑”。

​ 如果当前锁已偏向其他线程||epoch值过期||class偏向模式关闭||获取偏向锁的过程中存在并发冲突,都会进⼊到 InterpreterRuntime::monitorenter ⽅法, 在该⽅法中会进⾏偏向锁撤销 和升级。流程如下图所示:


2.既然对象已创建就是匿名偏向的状态,那无锁状态去哪里了呢?

匿名偏向的对象才能进⼊偏向锁模式。JVM启动时会延时初始化偏向锁,默认是4000ms。初始化后会将所有加载 的Klass的prototype header修改为匿名偏向样式。当创建⼀个对象时,会通过Klass的prototype_header来初始化该对象的对象头。

​ 简单的说,偏向锁初始化结束后,后续所有对象的对象(在创建时)头都为匿名偏向样式,在此之前创建的对象则为无锁状态。匿名偏向的对象才能进⼊偏向锁模式。而对于无锁状态的锁对象,如果有竞争,会直接进⼊到 轻量级锁。这也是为什么JVM启动前4秒对象在发生锁竞争时会直接进⼊到轻量级锁的原因。当然在其他情况下也会让对象转为无锁状态,比如轻量级锁释放时,CAS失败后,膨胀至重量级锁后执行exit命令,会变成无锁状态。

那么为什么要延迟初始化呢?

​ JVM启动时必不可免会有⼤量sync的操作,⽽偏向锁并不是都有利的。如果开启了偏向锁,会发⽣⼤量锁撤销和锁升级操作,⼤⼤降低JVM启动效率。

​ 因此,我们可以明确地说,只有锁对象处于匿名偏向状态,线程才能拿到到我们通常意义上的偏向锁。而处于无锁状态的锁对象,只能进入到轻量级锁状态


3.偏向锁的撤销和释放:

​ 像刚刚说的,假设在JVM初始化偏向锁之后,对象一被创建,该对象就处于匿名偏向状态,当线程获取了一个处于匿名偏向状态的对象锁时,该对象会从匿名偏向变为偏向锁状态。当这个线程执行完毕后,对象不会自动从偏向锁状态转回匿名偏向状态。偏向锁会在该线程再次访问这个锁时提供性能优势。只要偏向锁被设置,就会保持与特定线程的偏向,直到撤销偏向锁操作发生。

​ 而撤销偏向锁是当另一个线程试图锁定一个已经被偏向锁保护的对象时发生的。在这种情况下,JVM会尝试撤销原来线程的偏向锁,并将锁状态升级。撤销偏向锁的过程如下:

  1. 暂停拥有偏向锁的线程(原线程)。
  2. 检查原线程是否仍然存活,如果已经死亡,将对象头恢复为无锁状态
  3. 如果原线程仍然存活,检查它是否仍然持有锁。如果是,则将锁状态升级为轻量级锁;如果原线程已经释放锁,那么将对象头恢复为无锁状态匿名偏向状态
  4. 恢复原线程的执行。

​ 偏向锁的撤销 (revoke)是⼀个很特殊的操作, 为了执⾏撤销操作, 需要等待全局安全点(SafePoint) , 此时所有的⼯作线 程都停⽌了字节码的执⾏。

​ 偏向锁的释放实际上并不是真正的释放锁,因为偏向锁并没有进行传统意义上的锁获取和释放操作。当线程执行完同步块或同步方法后,它不需要显式地释放锁,它不会修改对象头中的Mark Word。而是在释放锁的过程中,线程会从自己的Java虚拟机栈中移除对应的Lock Record偏向锁依然会继续偏向原线程,直到发生锁竞争。这种设计有助于提高性能,因为在该线程再次访问相同的锁时,不需要重新获取锁。

​ Q1:在撤销偏向锁的过程中,第三步,如何判断原线程是否还持有该对象的偏向锁呢?(假设线程1先持有了对象A的偏向锁,现在线程2来获取对象A的偏向锁了)

​ 答案是通过Java虚拟机栈(每个线程的栈都是独立、互不干扰的)的锁记录(Lock Record)对象的Monitor(监视器)来实现的。当一个线程获得偏向锁时,它会在上创建一个锁记录(Lock Record),并将对象的Monitor指针保存到锁记录中。同时,Monitor也会保存指向该线程的Java虚拟机栈的指针

​ 因此,当线程2试图获取对象A的偏向锁时,它可以通过以下步骤来检查线程1是否还持有偏向锁:

  1. 检查对象A的Monitor中的指针是否指向线程1的Java虚拟机栈
  2. 如果指向线程1的Java虚拟机栈,则需要遍历线程1的Java虚拟机栈,查找是否存在一个锁记录(Lock Record),该锁记录中的Monitor指针指向对象A。

​ 如果找到了一个匹配的锁记录(Lock Record),说明线程1仍然持有对象A的偏向锁;如果没有找到匹配的锁记录,说明线程1已经释放了偏向锁。


4.批量重偏向和撤销

​ 在偏向锁的加锁解锁过程中,当只有⼀个线程反复进⼊同步块时,偏向锁带来的性能开销基本可以忽 略,但是当有其他线程尝试获得锁时(发生竞争),就需要等到 safe point时将偏向锁撤销为⽆锁状态或升级为轻量级/重量级锁

​ 因此,JVM中增加了⼀种批量重偏向/撤销的机制以减少锁撤销的开销,⽽mark word中的epoch也是在这⾥被⼤量应⽤,这⾥不展开说明。但⽆论怎么优化,偏向锁的撤销仍有⼀定不可避免的成本。如果业务场景存在⼤量 多线程竞争,那偏向锁的存在不仅不能提⾼性能,⽽且会导致性能下降(偏向锁并不都有利,jdk15默认不开启)


5.无锁状态在什么情况下会产生?

​ 假设已经开启了JVM的偏向锁。那么会在下面3种情况产生无锁对象:

  1. JVM启动阶段延迟初始化偏向锁:JVM启动时会有一个延迟初始化偏向锁的时间,默认为4000ms。在此期间创建的对象会以无锁状态创建。当延迟初始化时间结束后,后续创建的对象将直接处于匿名偏向状态。
  2. 偏向锁撤销:当有第二个线程尝试获取已经被偏向的对象锁时,偏向锁需要被撤销。这个过程中,锁状态可能会从偏向锁状态变成无锁状态(但这种情况很少发生,当原线程已经死亡时会发生,通常情况下会直接转为轻量级锁或者重量级锁)。
  3. 重量级锁释放后恢复为无锁状态:在重量级锁被释放时,锁的状态可能会恢复为无锁状态。这种情况下,该对象锁将可以被其他线程获取。再次竞争锁时,锁会尝试转为偏向锁状态。

三、轻量级锁

​ 引⼊轻量级锁的⽬的:在多线程交替执⾏同步块的情况下,尽量避免重量级锁使⽤的操作系统互斥量带来的开销, 但是如果多个线程在同⼀时刻进⼊临界区,会导致轻量级锁膨胀升级重量级锁,所以轻量级锁的出现并非是要替代重量级锁。

​ 轻量级锁的逻辑流程,结合了偏向锁的逻辑:

1.获取轻量级锁

​ 获取轻量级锁时,首先会检查对象的标记字(mark word)。

​ a. 如果对象处于无锁状态(正常情况下是这样,即对象的Mark Word 的状态为“01”,表示当前无线程对该对象持有锁),线程会在其栈帧中创建一个名为锁记录(lock record)的空间,将对象的原始标记字(Mark Word)拷贝到这个锁记录中。这个空间用于存储对象标记字的拷贝。

​ b. 然后,线程使用CAS(compare-and-swap)操作尝试将对象的标记字更新为指向锁记录的指针。如果CAS操作成功,表示线程获取轻量级锁成功;如果失败,表示存在其他线程竞争该锁,会通过自旋来尝试重新获取锁,自旋的次数有限制(取决于JVM得到具体实现和运行时的参数),当自旋次数达到限制后,会将轻量级锁膨胀为重量级锁

2.释放轻量级锁

​ 在获取锁,并且执行完毕后,会进行锁的释放,会使用CAS来把Mark Word替换成原来的Mark Word(原来的Mark Word记录在Lock Record中),如果CAS失败,即解锁时有竞争,会调用 inflate 方法进行重量级锁膨胀,升级到到重量级锁后再执行exit方法,恢复为无锁状态

四、重量级锁

​ 重量级锁通过对象内部的监视器(monitor)实现,其依赖于底层操作系统的Mutex Lock实现,需要额外的⽤户态到内核态切换的开销。

1.获取重量级锁的逻辑

​ 重量级锁的逻辑图为:

​ 重量级锁通常在轻量级锁竞争失败时才会被尝试。在获取轻量级锁失败,并且CAS失败达到一定次数时,会调用inflate函数进行锁膨胀逻辑:

inflate其中是⼀个for循环,主要是为了处理多线程同时调⽤inflate的情况。然后会根据锁对象的状态进行不同的处理:

  1. 已经是重量级状态,说明膨胀已经完成,返回并继续执行ObjectMonitor::enter方法。
  2. 如果是轻量级锁则需要进行膨胀操作。
  3. 如果是膨胀中状态,则进行忙等待。
  4. 如果是⽆锁状态则需要进行膨胀操作。

2.轻量级锁膨胀的流程:

  1. 调⽤omAlloc获取⼀个可用的ObjectMonitor对象。在omAlloc方法中会先从线程私有的 monitor集合omFreeList中分配对象,如果omFreeList中已经没有monitor对象,则从JVM全局的gFreeList中分配一批monitoromFreeList中.
  2. 通过CAS尝试将Mark Word设置为markOopDesc:INFLATING,标识当前锁正在膨胀中。如果CAS失败,说明同⼀时刻其它线程已经将Mark Word设置markOopDesc:INFLATING,当前线程进行自旋等待膨胀完成,膨胀完成后会进行获取重量级锁的操作即ObjectMonitor::enter
  3. 如果CAS成功,设置monitor的各个字段:设置 monitor 的header字段为displaced mark word,owner字段为Lock Record,obj字段为锁对象等。
  4. 设置锁对象头的mark word为重量级锁状态,指向第⼀步分配的monitor对象

​ 需要注意的是:当锁膨胀inflate执行完并返回对应的ObjectMonitor时,并不表示该线程竞争到了锁,真正的锁竞争发生在ObjectMonitor::enter 方法中。

3.锁竞争发生的ObjectMonitor::enter 方法:

  1. 当前是无锁、锁重入,简单操作后返回。
  2. 当前线程是之前持有轻量级锁的线程,则为首次进入,设置recursions为1,owner为当前线程,该线程 成功获得锁并返回。
  3. 先自旋尝试获得锁,尽可能减少同步操作带来的开销。
  4. 调用EnterI方法。

ObjectMonitor竞争失败的线程,通过⾃旋执行ObjectMonitor::EnterI方法等待锁的释放,它的大致原理为:

​ 当⼀个线程尝试获得重量级锁且没有竞争到时,该线程会被封装成⼀个ObjectWaiter对象插⼊到cxq的队列的队首,然后调用park函数挂起当前线程,进⼊BLOCKED状态。当线程释放锁时,会根据唤醒策略,从cxq或 EntryList中挑选⼀个线程unpark 唤醒。

​ 如果线程获得锁后调用Object#wait方法,则会将线程加⼊到WaitSet 中,进⼊WAITINGTIMED_WAITING状态。当被Object#notify唤醒后,会将线程从WaitSet移动到cxq或 EntryList中去,进入BLOCKED状态。

​ 需要注意的是,当调用⼀个锁对象的waitnotify方法时,若当前锁的状态是偏向锁或轻量级锁则会先膨胀成重量级锁。因为这些方法需要与内置的条件队列一起使用,而内置的条件队列与重量级锁的实现紧密相关。

4.重量级锁的释放:

​ 当某个持有锁的线程执行完同步代码块时,会进行锁的释放。在HotSpot中,通过退出monitor的方式实现锁的释放,并通知被阻塞的线程,具体实现位于ObjectMonitor::exit方法中。


​ 总结关于锁升级的几个注意点:

  • 再次强调,无锁状态只能升级为轻量级锁,匿名偏向状态才能进入到偏向锁
  • 偏向锁并不都有利,其适⽤于单个线程重入的场景,原因为:偏向锁的撤销需要进⼊ safepoint ,开销较大。需要进⼊ safepoint 是由于:偏向锁的撤销需要对锁对象的lock record进行操作,而lock record到其他线程的栈帧中遍历寻找。在非safepoint,栈帧是动态的,会引⼊更多的问题。目前看来,偏向锁存在的价值是为历史遗留的Collection类如Vector和HashTable等做优化,迟早药丸。Java 15中默认不开启。
  • 执行Object类的hashcode方法,偏向锁撤销并且锁会膨胀为轻量级锁或者重量锁。执行Object类的 wait/notify/notifyall ⽅法,偏向锁撤销并直接膨胀成重量级锁。
    • 关于HashCode方法,在无锁状态下,Mark Word中可以存储对象的identity hash code值。当对象的hashCode()方法(非用户自定义)第一次被调用时,JVM会生成对应的identity hash code值(以jdk8为例,JVM默认的计算identity hash code的方式得到的是一个随机数),并将该值存储到Mark Word中。后续如果该对象的hashCode()方法再次被调用则不会再通过JVM进行计算得到,而是直接从Mark Word中获取。只有这样才能保证多次获取到的identity hash code的值是相同的。
    • 我们还知道,对于轻量级锁,获取锁的线程栈帧中有锁记录(Lock Record)空间,用于存储Mark Word的拷贝,官方称之为Displaced Mark Word,该拷贝中可以包含identity hash code,所以轻量级锁可以和identity hash code共存;对于重量级锁,ObjectMonitor类里有字段可以记录非加锁状态下的mark word,其中也可以存储identity hash code的值,所以重量级锁也可以和identity hash code共存。
    • 对于偏向锁,在线程获取偏向锁时,会用Thread ID和epoch值覆盖identity hash code所在的位置。如果一个对象的hashCode()方法已经被调用过一次之后,这个对象还能被设置偏向锁么?答案是不能。因为如果可以的化,那Mark Word中的identity hash code必然会被偏向线程Id给覆盖,这就会造成同一个对象前后两次调用hashCode()方法得到的结果不一致。
    • 总结,HotSpot VM的锁实现机制是:
      • 当一个对象已经计算过identity hash code,它就无法进入偏向锁状态;
      • 当一个对象当前正处于偏向锁状态,并且需要计算其identity hash code的话,则它的偏向锁会被撤销,并且锁会膨胀为轻量级锁或者重量锁;
      • 轻量级锁的实现中,会通过线程栈帧的锁记录存储Displaced Mark Word;重量锁的实现中,ObjectMonitor类里有字段可以记录非加锁状态下的mark word,其中可以存储identity hash code的值。
  • 重量级锁,会将线程放进等待队列,等待操作系统调度。而偏向锁和轻量级锁,未交由操作系统调度,依然处于用户态,只是采用CAS无锁竞争的⽅式获取锁。CAS通过Unsafe类中compareAndSwap方法,jni调用C++方法, 通过汇编指令锁住cpu中的北桥信号。
  • 轻量级锁适⽤于两个线程的交替执行场景:线程A进⼊轻量级锁,退出同步代码块并释放锁,会将锁对象恢复为无锁状态;线程B再进⼊锁,发现为无锁状态,会cas尝试获取该锁对象的轻量级锁。如果有竞争,则直接膨胀为 重量级锁,没有自旋操作
  • 许多文章声称⼀个对象关联到⼀个monitor,这个说法不够准确。
    • 如果对象已经是重量级锁了,对象头的确指向 了⼀个 monitor 。
    • 但对于正在膨胀的锁,会先从线程私有的 monitor 集合omFreeList中分配对象。如果 omFreeList 中已经没有 monitor 对象,再从JVM全局的 gFreeList 中分配⼀批 monitor 到 omFreeList 中。

synchronized 和 volatile 的区别?

synchronized 关键字和 volatile 关键字是两个互补的存在,而不是对立的存在!

  • volatile 关键字是线程同步轻量级实现,所以 volatile性能肯定比synchronized关键字要好 。但是 volatile 关键字只能用于变量synchronized 关键字可以修饰方法以及代码块 。
  • volatile 关键字能保证数据的可见性,但不能保证数据的原子性synchronized 关键字两者都能保证。
  • volatile关键字主要用于解决变量在多个线程之间的可见性,而 synchronized 关键字解决的是多个线程之间访问资源的同步性

synchronized 和 ReentrantLock 的区别

两者都是可重入锁

“可重入锁” 指的是自己可以再次获取自己的内部锁。比如一个线程获得了某个对象的锁,此时这个对象锁还没有释放,当其再次想要获取这个对象的锁的时候还是可以获取的,如果是不可重入锁的话,就会造成死锁。同一个线程每次获取锁,锁的计数器都自增 1,所以要等到锁的计数器下降为 0 时才能释放锁。

synchronized 依赖于 JVM 而 ReentrantLock 依赖于 API

synchronized 是依赖于 JVM 实现的,前面我们也讲到了 虚拟机团队在 JDK1.6 为 synchronized 关键字进行了很多优化,但是这些优化都是在虚拟机层面实现的,并没有直接暴露给我们ReentrantLock 是 JDK 层面实现的(也就是 API 层面,需要 lock() 和 unlock() 方法配合 try/finally 语句块来完成),所以我们可以通过查看它的源代码,来看它是如何实现的。

ReentrantLock 比 synchronized 增加了一些高级功能

相比synchronizedReentrantLock增加了一些高级功能。主要来说主要有三点:

  • 等待可中断 : ReentrantLock提供了一种能够中断等待锁的线程的机制,通过 lock.lockInterruptibly() 来实现这个机制。也就是说正在等待的线程可以选择放弃等待,改为处理其他事情。
  • 可实现公平锁 : ReentrantLock可以指定是公平锁还是非公平锁。而synchronized只能是非公平锁。所谓的公平锁就是先等待的线程先获得锁ReentrantLock默认情况是非公平的,可以通过 ReentrantLock类的ReentrantLock(boolean fair)构造方法来制定是否是公平的。
  • 可实现选择性通知(锁可以绑定多个条件): synchronized关键字与wait()notify()/notifyAll()方法相结合可以实现等待/通知机制ReentrantLock类当然也可以实现,但是需要借助于Condition接口与newCondition()方法。

Condition是 JDK1.5 之后才有的,它具有很好的灵活性,比如可以实现多路通知功能也就是在一个Lock对象中可以创建多个Condition实例(即对象监视器),线程对象可以注册在指定的Condition中,从而可以有选择性的进行线程通知,在调度线程上更加灵活。 在使用notify()/notifyAll()方法进行通知时,被通知的线程是由 JVM 选择的,用ReentrantLock类结合Condition实例可以实现“选择性通知” ,这个功能非常重要,而且是 Condition 接口默认提供的。而synchronized关键字就相当于整个 Lock 对象中只有一个Condition实例,所有的线程都注册在它一个身上。如果执行notifyAll()方法的话就会通知所有处于等待状态的线程这样会造成很大的效率问题,而Condition实例的signalAll()方法 只会唤醒注册在该Condition实例中的所有等待线程。

如果你想使用上述功能,那么选择 ReentrantLock 是一个不错的选择。性能已不是选择标准。

ThreadLocal

ThreadLocal 有什么用?

​ 通常情况下,我们创建的变量是可以被任何一个线程访问并修改的。如果想实现每一个线程都有自己的专属本地变量该如何解决呢?JDK 中自带的ThreadLocal类正是为了解决这样的问题。 ThreadLocal类主要解决的就是让每个线程绑定自己的值,可以将ThreadLocal类形象的比喻成存放数据的盒子,盒子中可以存储每个线程的私有数据。

​ 如果你创建了一个ThreadLocal变量,那么访问这个变量的每个线程都会有这个变量的本地副本,这也是ThreadLocal变量名的由来。他们可以使用 get()set() 方法来获取默认值或将其值更改为当前线程所存的副本的值,从而避免了线程安全问题。再举个简单的例子:两个人去宝屋收集宝物,这两个共用一个袋子的话肯定会产生争执,但是给他们两个人每个人分配一个袋子的话就不会出现这样的问题。如果把这两个人比作线程的话,那么 ThreadLocal 就是用来避免这两个线程竞争的。

如何使用 ThreadLocal?

相信看了上面的解释,大家已经搞懂 ThreadLocal 类是个什么东西了。下面简单演示一下如何在项目中实际使用 ThreadLocal

import java.text.SimpleDateFormat;
import java.util.Random;

public class ThreadLocalExample implements Runnable{
     // SimpleDateFormat 不是线程安全的,所以每个线程都要有自己独立的副本
    private static final ThreadLocal<SimpleDateFormat> formatter = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyyMMdd HHmm"));

    public static void main(String[] args) throws InterruptedException {
        ThreadLocalExample obj = new ThreadLocalExample();
        for(int i=0 ; i<10; i++){
            Thread t = new Thread(obj, ""+i);
            Thread.sleep(new Random().nextInt(1000));
            t.start();
        }
    }
    @Override
    public void run() {
        System.out.println("Thread Name= "+Thread.currentThread().getName()+" default Formatter = "+formatter.get().toPattern());
        try {
            Thread.sleep(new Random().nextInt(1000));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //formatter pattern is changed here by thread, but it won't reflect to other threads
        formatter.set(new SimpleDateFormat());
        System.out.println("Thread Name= "+Thread.currentThread().getName()+" formatter = "+formatter.get().toPattern());
    }
}

​ 输出结果 :

Thread Name= 0 default Formatter = yyyyMMdd HHmm
Thread Name= 0 formatter = yy-M-d ah:mm
Thread Name= 1 default Formatter = yyyyMMdd HHmm
Thread Name= 2 default Formatter = yyyyMMdd HHmm
Thread Name= 1 formatter = yy-M-d ah:mm
Thread Name= 3 default Formatter = yyyyMMdd HHmm
Thread Name= 2 formatter = yy-M-d ah:mm
Thread Name= 4 default Formatter = yyyyMMdd HHmm
Thread Name= 3 formatter = yy-M-d ah:mm
Thread Name= 4 formatter = yy-M-d ah:mm
Thread Name= 5 default Formatter = yyyyMMdd HHmm
Thread Name= 5 formatter = yy-M-d ah:mm
Thread Name= 6 default Formatter = yyyyMMdd HHmm
Thread Name= 6 formatter = yy-M-d ah:mm
Thread Name= 7 default Formatter = yyyyMMdd HHmm
Thread Name= 7 formatter = yy-M-d ah:mm
Thread Name= 8 default Formatter = yyyyMMdd HHmm
Thread Name= 9 default Formatter = yyyyMMdd HHmm
Thread Name= 8 formatter = yy-M-d ah:mm
Thread Name= 9 formatter = yy-M-d ah:mm

​ 从输出中可以看出,虽然 Thread-0 已经改变了 formatter 的值,但 Thread-1 默认格式化值与初始化值相同,其他线程也一样。

​ 上面有一段代码用到了创建 ThreadLocal 变量的那段代码用到了 Java8 的知识,它等于下面这段代码,如果你写了下面这段代码的话,IDEA 会提示你转换为 Java8 的格式(IDEA 真的不错!)。因为 ThreadLocal 类在 Java 8 中扩展,使用一个新的方法withInitial(),将 Supplier 功能接口作为参数。

private static final ThreadLocal<SimpleDateFormat> formatter = new ThreadLocal<SimpleDateFormat>(){
    @Override
    protected SimpleDateFormat initialValue(){
        return new SimpleDateFormat("yyyyMMdd HHmm");
    }
};

ThreadLocal 原理了解吗?

Thread类源代码入手。

public class Thread implements Runnable {
    //......
    //与此线程有关的ThreadLocal值。由ThreadLocal类维护
    ThreadLocal.ThreadLocalMap threadLocals = null;

    //与此线程有关的InheritableThreadLocal值。由InheritableThreadLocal类维护
    ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
    //......
}

​ 从上面Thread类 源代码可以看出Thread 类中有一个 threadLocals 和 一个 inheritableThreadLocals 变量,它们都是 ThreadLocalMap 类型的变量,我们可以把 ThreadLocalMap 理解为ThreadLocal 类实现的定制化的 HashMap。默认情况下这两个变量都是 null,只有当前线程调用 ThreadLocal 类的 setget方法时才创建它们,实际上调用这两个方法的时候,我们调用的是ThreadLocalMap类对应的 get()set()方法。

ThreadLocal类的set()方法

public void set(T value) {
    //获取当前请求的线程    
    Thread t = Thread.currentThread();
    //取出 Thread 类内部的 threadLocals 变量(哈希表结构)
    ThreadLocalMap map = getMap(t);
    if (map != null)
        // 将需要存储的值放入到这个哈希表中
        map.set(this, value);
    else
        createMap(t, value);
}
ThreadLocalMap getMap(Thread t) {
    return t.threadLocals;
}

​ 通过上面这些内容,我们足以通过猜测得出结论:最终的变量是放在了当前线程的 ThreadLocalMap 中,并不是存在 ThreadLocal 上,ThreadLocal 可以理解为只是ThreadLocalMap的封装,传递了变量值。 ThrealLocal 类中可以通过Thread.currentThread()获取到当前线程对象后,直接通过getMap(Thread t)可以访问到该线程ThreadLocalMap对象。

每个Thread中都具备一个ThreadLocalMap,而ThreadLocalMap可以存储以ThreadLocal为 key ,Object 对象为 value 的键值对。

ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
    //......
}

​ 比如我们在同一个线程中声明了两个 ThreadLocal 对象的话, Thread内部都是使用仅有的那个ThreadLocalMap 存放数据的,ThreadLocalMap的 key 就是 ThreadLocal对象,value 就是 ThreadLocal 对象调用set方法设置的值。

ThreadLocal 数据结构如下图所示:

ThreadLocalMapThreadLocal的静态内部类。

ThreadLocal 内存泄露问题是怎么导致的?

ThreadLocalMap 中使用的 key 为 ThreadLocal弱引用,而 value 是强引用。所以,如果 ThreadLocal 没有被外部强引用的情况下,在垃圾回收的时候,key 会被清理掉,而 value 不会被清理掉。(什么是弱引用?什么是强引用?Java中的强引用、软引用、弱引用、虚引用_少年.的博客-CSDN博客_java的强引用和弱引用以及 java的弱引用_理解Java中的弱引用(Weak Reference)_汪希旧时光的博客-CSDN博客)

​ 这样一来,ThreadLocalMap 中就会出现 key 为 null 的 Entry。假如我们不做任何措施的话,value 永远无法被 GC 回收,这个时候就可能会产生内存泄露。ThreadLocalMap 实现中已经考虑了这种情况,在调用 set()get()remove() 方法的时候,会清理掉 key 为 null 的记录。使用完 ThreadLocal方法后 最好手动调用remove()方法。

static class Entry extends WeakReference<ThreadLocal<?>> {
    /** The value associated with this ThreadLocal. */
    Object value;

    Entry(ThreadLocal<?> k, Object v) {
        super(k);
        value = v;
    }
}

弱引用介绍:

​ 如果一个对象只具有弱引用,那就类似于可有可无的生活用品。弱引用与软引用的区别在于:只具有弱引用的对象拥有更短暂的生命周期。在垃圾回收器线程扫描它所管辖的内存区域的过程中,一旦发现了只具有弱引用的对象,不管当前内存空间足够与否,都会回收它的内存。不过,由于垃圾回收器是一个优先级很低的线程, 因此不一定会很快发现那些只具有弱引用的对象

​ 弱引用可以和一个引用队列(ReferenceQueue)联合使用,如果弱引用所引用的对象被垃圾回收,Java 虚拟机就会把这个弱引用加入到与之关联的引用队列中。

线程池

为什么要用线程池?

池化技术想必大家已经屡见不鲜了,线程池、数据库连接池、Http 连接池等等都是对这个思想的应用。池化技术的思想主要是为了减少每次获取资源的消耗,提高对资源的利用率。

线程池提供了一种限制和管理资源(包括执行一个任务)的方式。 每个线程池还维护一些基本统计信息,例如已完成任务的数量。

​ 这里借用《Java 并发编程的艺术》提到的来说一下使用线程池的好处

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建销毁造成的消耗。
  • 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

实现 Runnable 接口和 Callable 接口的区别

Runnable自 Java 1.0 以来一直存在,但Callable仅在 Java 1.5 中引入,目的就是为了来处理Runnable不支持的用例。Runnable 接口 不会返回结果或抛出检查异常,但是 Callable 接口 可以。所以,如果任务不需要返回结果抛出异常推荐使用 Runnable 接口 ,这样代码看起来会更加简洁。

​ 工具类 Executors 可以实现将 Runnable 对象转换成 Callable 对象。(Executors.callable(Runnable task)Executors.callable(Runnable task, Object result))。

Runnable.java

@FunctionalInterface
public interface Runnable {
   /**
    * 被线程执行,没有返回值也无法抛出异常
    */
    public abstract void run();
}

Callable.java

@FunctionalInterface
public interface Callable<V> {
    /**
     * 计算结果,或在无法这样做时抛出异常。
     * @return 计算得出的结果
     * @throws 如果无法计算结果,则抛出异常
     */
    V call() throws Exception;
}

执行 execute()方法和 submit()方法的区别是什么呢?

  1. execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功与否;
  2. submit()方法用于提交需要返回值的任务。线程池会返回一个 Future 类型的对象,通过这个 Future 对象可以判断任务是否执行成功,并且可以通过 Futureget()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用 get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。

我们以 AbstractExecutorService 接口 中的一个 submit 方法为例子来看看源代码:

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

上面方法调用的 newTaskFor 方法返回了一个 FutureTask 对象。

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
}

我们再来看看execute()方法:

public void execute(Runnable command) {
  ...
}

如何创建线程池(重要!!)

​ 《阿里巴巴 Java 开发手册》中强制线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险

Executors 返回线程池对象的弊端如下:

  • FixedThreadPool 和 SingleThreadExecutor : 允许请求的队列长度为 Integer.MAX_VALUE ,可能堆积大量的请求,从而导致 OOM(Out Of Memory)。
  • CachedThreadPool 和 ScheduledThreadPool : 允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。

方式一:通过构造方法实现

方式二:通过 Executor 框架的工具类 Executors 来实现(不推荐)

我们可以创建三种类型的 ThreadPoolExecutor:

  • FixedThreadPool : 该方法返回一个固定线程数量的线程池。该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。若没有,则新的任务会被暂存在一个任务队列中,待有线程空闲时,便处理在任务队列中的任务。
  • SingleThreadExecutor: 方法返回一个只有一个线程的线程池。若多余一个任务被提交到该线程池,任务会被保存在一个任务队列中,待线程空闲,按先入先出的顺序执行队列中的任务。
  • CachedThreadPool: 该方法返回一个可根据实际情况调整线程数量的线程池。线程池的线程数量不确定,但若有空闲线程可以复用,则会优先使用可复用的线程。若所有线程均在工作,又有新的任务提交,则会创建新的线程处理任务。所有线程在当前任务执行完毕后,将返回线程池进行复用。

对应 Executors 工具类中的方法如图所示:

ThreadPoolExecutor 类分析

ThreadPoolExecutor 类中提供的四个构造方法。我们来看最长的那个,其余三个都是在这个构造方法的基础上产生(其他几个构造方法说白点都是给定某些默认参数的构造方法比如默认制定拒绝策略是什么),这里就不贴代码讲了,比较简单。

/**
 * 用给定的初始参数创建一个新的ThreadPoolExecutor。
 */
public ThreadPoolExecutor(int corePoolSize,
                      int maximumPoolSize,
                      long keepAliveTime,
                      TimeUnit unit,
                      BlockingQueue<Runnable> workQueue,
                      ThreadFactory threadFactory,
                      RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
            throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

下面这些对创建 非常重要,在后面使用线程池的过程中你一定会用到!所以,务必拿着小本本记清楚。

ThreadPoolExecutor构造函数重要参数分析

ThreadPoolExecutor 3 个最重要的参数:

  • corePoolSize : 核心线程数定义了最小可以同时运行的线程数量
  • maximumPoolSize : 当队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数
  • workQueue: 当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。

ThreadPoolExecutor其他常见参数:

  1. keepAliveTime:当线程池中的线程数量大于 corePoolSize 的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了 keepAliveTime才会被回收销毁;
  2. unit : keepAliveTime 参数的时间单位。
  3. threadFactory :executor 创建新线程的时候会用到。
  4. handler :饱和策略。关于饱和策略下面单独介绍一下。

ThreadPoolExecutor 饱和策略

如果当前同时运行的线程数量达到最大线程数量并且队列也已经被放满了任务时,ThreadPoolTaskExecutor 定义一些策略:

  • ThreadPoolExecutor.AbortPolicy 抛出 RejectedExecutionException拒绝新任务的处理。
  • ThreadPoolExecutor.CallerRunsPolicy 调用执行自己的线程运行任务,也就是直接在调用execute方法的线程中运行(run)被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。因此这种策略会降低对于新任务提交速度,影响程序的整体性能。如果您的应用程序可以承受此延迟并且你要求任何一个任务请求都要被执行的话,你可以选择这个策略。
  • ThreadPoolExecutor.DiscardPolicy 不处理新任务,直接丢弃掉。不会给你任何通知包括抛出异常。
  • ThreadPoolExecutor.DiscardOldestPolicy 此策略将丢弃最早的未处理的任务请求。

​ 举个例子: Spring 通过 ThreadPoolTaskExecutor 或者我们直接通过 ThreadPoolExecutor 的构造函数创建线程池的时候,当我们不指定 RejectedExecutionHandler 饱和策略的话来配置线程池的时候默认使用的是 ThreadPoolExecutor.AbortPolicy。在默认情况下,ThreadPoolExecutor 将抛出 RejectedExecutionException 来拒绝新来的任务 ,这代表你将丢失对这个任务的处理。 对于可伸缩的应用程序,建议使用 ThreadPoolExecutor.CallerRunsPolicy。当最大池被填满时,此策略为我们提供可伸缩队列。(这个直接查看 ThreadPoolExecutor 的构造函数源码就可以看出,比较简单的原因,这里就不贴代码了)

一个简单的线程池 Demo

为了让大家更清楚上面的面试题中的一些概念,我写了一个简单的线程池 Demo。

首先创建一个 Runnable 接口的实现类(当然也可以是 Callable 接口,我们上面也说了两者的区别。)

MyRunnable.java

import java.util.Date;
/**
 * 这是一个简单的Runnable类,需要大约5秒钟来执行其任务。
 * @author shuang.kou
 */
public class MyRunnable implements Runnable {
    private String command;
    public MyRunnable(String s) {this.command = s;}
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + " Start. Time = " + new Date());
        processCommand();
        System.out.println(Thread.currentThread().getName() + " End. Time = " + new Date());
    }

    private void processCommand() {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public String toString() {
        return this.command;
    }
}

编写测试程序,我们这里以阿里巴巴推荐的使用 ThreadPoolExecutor 构造函数自定义参数的方式来创建线程池。

ThreadPoolExecutorDemo.java

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolExecutorDemo {

    private static final int CORE_POOL_SIZE = 5;
    private static final int MAX_POOL_SIZE = 10;
    private static final int QUEUE_CAPACITY = 100;
    private static final Long KEEP_ALIVE_TIME = 1L;
    public static void main(String[] args) {

        //使用阿里巴巴推荐的创建线程池的方式,通过ThreadPoolExecutor构造函数自定义参数创建
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                CORE_POOL_SIZE,
                MAX_POOL_SIZE,
                KEEP_ALIVE_TIME,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(QUEUE_CAPACITY),
                new ThreadPoolExecutor.CallerRunsPolicy());

        for (int i = 0; i < 10; i++) {
            //创建 MyRunnable 对象(MyRunnable 类实现了Runnable 接口)
            Runnable worker = new MyRunnable("" + i);
            //执行Runnable
            executor.execute(worker);
        }
        //终止线程池
        executor.shutdown();
        while (!executor.isTerminated()) {
        }
        System.out.println("Finished all threads");
    }
}

可以看到我们上面的代码指定了:

  1. corePoolSize: 核心线程数为 5。
  2. maximumPoolSize :最大线程数 10
  3. keepAliveTime : 等待时间为 1L。
  4. unit: 等待时间的单位为 TimeUnit.SECONDS。
  5. workQueue:任务队列为 ArrayBlockingQueue,并且容量为 100;
  6. handler:饱和策略为 CallerRunsPolicy

Output:

pool-1-thread-3 Start. Time = Sun Apr 12 11:14:37 CST 2020
pool-1-thread-5 Start. Time = Sun Apr 12 11:14:37 CST 2020
pool-1-thread-2 Start. Time = Sun Apr 12 11:14:37 CST 2020
pool-1-thread-1 Start. Time = Sun Apr 12 11:14:37 CST 2020
pool-1-thread-4 Start. Time = Sun Apr 12 11:14:37 CST 2020
pool-1-thread-3 End. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-4 End. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-1 End. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-5 End. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-1 Start. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-2 End. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-5 Start. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-4 Start. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-3 Start. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-2 Start. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-1 End. Time = Sun Apr 12 11:14:47 CST 2020
pool-1-thread-4 End. Time = Sun Apr 12 11:14:47 CST 2020
pool-1-thread-5 End. Time = Sun Apr 12 11:14:47 CST 2020
pool-1-thread-3 End. Time = Sun Apr 12 11:14:47 CST 2020
pool-1-thread-2 End. Time = Sun Apr 12 11:14:47 CST 2020

线程池原理分析

​ 承接 上一节,我们通过代码输出结果可以看出:线程池首先会先执行 5 个任务,然后这些任务有任务被执行完的话,就会去拿新的任务执行。 大家可以先通过上面讲解的内容,分析一下到底是咋回事?(自己独立思考一会)

现在,我们就分析上面的输出内容来简单分析一下线程池原理。

为了搞懂线程池的原理,我们需要首先分析一下 execute方法。 在上一节中的 Demo 中我们使用 executor.execute(worker)来提交一个任务到线程池中去,这个方法非常重要,下面我们来看看它的源码:

// 存放线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

private static int workerCountOf(int c) {
    return c & CAPACITY;
}

private final BlockingQueue<Runnable> workQueue;

public void execute(Runnable command) {
    // 如果任务为null,则抛出异常。
    if (command == null)
        throw new NullPointerException();
    // ctl 中保存的线程池当前的一些状态信息
    int c = ctl.get();

    //  下面会涉及到 3 步 操作
    // 1.首先判断当前线程池中执行的任务数量是否小于 corePoolSize
    // 如果小于的话,通过addWorker(command, true)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 2.如果当前执行的任务数量大于等于 corePoolSize 的时候就会走到这里
    // 通过 isRunning 方法判断线程池状态,线程池处于 RUNNING 状态并且队列可以加入任务,该任务才会被加入进去
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 再次获取线程池状态,如果线程池状态不是 RUNNING 状态就需要从任务队列中移除任务,并尝试判断线程是否全部执行完毕。同时执行拒绝策略。
        if (!isRunning(recheck) && remove(command))
            reject(command);
            // 如果当前线程池为空就新创建一个线程并执行。
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //3. 通过addWorker(command, false)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
    //如果addWorker(command, false)执行失败,则通过reject()执行相应的拒绝策略的内容。
    else if (!addWorker(command, false))
        reject(command);
}

通过下图可以更好的对上面这 3 步做一个展示,下图是我为了省事直接从网上找到,原地址不明。

​ 现在,让我们在回到上一节我们写的 Demo, 现在是不是很容易就可以搞懂它的原理了呢?没搞懂的话,也没关系,可以看看我的分析:

​ 我们在代码中模拟了 10 个任务,我们配置的核心线程数为 5 、等待队列容量为 100 ,所以每次只可能存在 5 个任务同时执行,剩下的 5 个任务会被放到等待队列中去。当前的5个任务中如果有任务被执行完了,线程池就会去拿新的任务执行。

addWorker 这个方法主要用来创建新的工作线程,如果返回 true 说明创建和启动工作线程成功,否则的话返回的就是 false。

	// 全局锁,并发操作必备
    private final ReentrantLock mainLock = new ReentrantLock();
    // 跟踪线程池的最大大小,只有在持有全局锁mainLock的前提下才能访问此集合
    private int largestPoolSize;
    // 工作线程集合,存放线程池中所有的(活跃的)工作线程,只有在持有全局锁mainLock的前提下才能访问此集合
    private final HashSet<Worker> workers = new HashSet<>();
    //获取线程池状态
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    //判断线程池的状态是否为 Running
    private static boolean isRunning(int c) {return c < SHUTDOWN;}

    /**
     * 添加新的工作线程到线程池
     * @param firstTask 要执行
     * @param core参数为true的话表示使用线程池的基本大小,为false使用线程池最大大小
     * @return 添加成功就返回true否则返回false
     */
   private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            //这两句用来获取线程池的状态
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
               //获取线程池中工作的线程的数量
                int wc = workerCountOf(c);
                // core参数为true的话表明队列也满了,线程池大小变为 maximumPoolSize
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
               //原子操作将workcount的数量加1
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                // 如果线程的状态改变了就再次执行上述操作
                c = ctl.get();
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        // 标记工作线程是否启动成功
        boolean workerStarted = false;
        // 标记工作线程是否创建成功
        boolean workerAdded = false;
        Worker w = null;
        try {

            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
              // 加锁
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                   //获取线程池状态
                    int rs = runStateOf(ctl.get());
                   //rs < SHUTDOWN 如果线程池状态依然为RUNNING,并且线程的状态是存活的话,就会将工作线程添加到工作线程集合中
                  //(rs=SHUTDOWN && firstTask == null)如果线程池状态小于STOP,也就是RUNNING或者SHUTDOWN状态下,同时传入的任务实例firstTask为null,则需要添加到工作线程集合和启动新的Worker
                   // firstTask == null证明只新建线程而不执行任务
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                       //更新当前工作线程的最大容量
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                      // 工作线程是否启动成功
                        workerAdded = true;
                    }
                } finally {
                    // 释放锁
                    mainLock.unlock();
                }
                //// 如果成功添加工作线程,则调用Worker内部的线程实例t的Thread#start()方法启动真实的线程实例
                if (workerAdded) {
                    t.start();
                  /// 标记线程启动成功
                    workerStarted = true;
                }
            }
        } finally {
           // 线程启动失败,需要从工作线程中移除对应的Worker
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

​ 更多关于线程池源码分析的内容推荐这篇文章:硬核干货:4W字从源码上分析JUC线程池ThreadPoolExecutor的实现原理

Atomic 原子类

介绍一下 Atomic 原子类

Atomic 翻译成中文是原子的意思。在化学上,我们知道原子是构成一般物质的最小单位,在化学反应中是不可分割的。在我们这里 Atomic 是指一个操作是不可中断的。即使是在多个线程一起执行的时候,一个操作一旦开始,就不会被其他线程干扰。

​ 所以,所谓原子类说简单点就是具有原子/原子操作特征的类。并发包 java.util.concurrent 的原子类都存放在java.util.concurrent.atomic下,如下图所示。

JUC 包中的原子类是哪 4 类?

基本类型

使用原子的方式更新基本类型

  • AtomicInteger:整型原子类
  • AtomicLong:长整型原子类
  • AtomicBoolean:布尔型原子类

数组类型

使用原子的方式更新数组里的某个元素

  • AtomicIntegerArray:整型数组原子类
  • AtomicLongArray:长整型数组原子类
  • AtomicReferenceArray:引用类型数组原子类

上面三个类提供的方法几乎相同,所以我们这里以 AtomicIntegerArray 为例子来介绍。

AtomicIntegerArray 类常用方法

public final int get(int i) //获取 index=i 位置元素的值
public final int getAndSet(int i, int newValue)//返回 index=i 位置的当前的值,并将其设置为新值:newValue
public final int getAndIncrement(int i)//获取 index=i 位置元素的值,并让该位置的元素自增
public final int getAndDecrement(int i) //获取 index=i 位置元素的值,并让该位置的元素自减
public final int getAndAdd(int i, int delta) //获取 index=i 位置元素的值,并加上预期的值
boolean compareAndSet(int i, int expect, int update) //如果输入的数值等于预期值,则以原子方式将 index=i 位置的元素值设置为输入值(update)
public final void lazySet(int i, int newValue)//最终 将index=i 位置的元素设置为newValue,使用 lazySet 设置之后可能导致其他线程在之后的一小段时间内还是可以读到旧的值。
AtomicIntegerArray 常见方法使用
import java.util.concurrent.atomic.AtomicIntegerArray;
public class AtomicIntegerArrayTest {
	public static void main(String[] args) {
		// TODO Auto-generated method stub
		int temvalue = 0;
		int[] nums = { 1, 2, 3, 4, 5, 6 };
		AtomicIntegerArray i = new AtomicIntegerArray(nums);
		for (int j = 0; j < nums.length; j++) {
			System.out.println(i.get(j));
		}
		temvalue = i.getAndSet(0, 2);
		System.out.println("temvalue:" + temvalue + ";  i:" + i);
		temvalue = i.getAndIncrement(0);
		System.out.println("temvalue:" + temvalue + ";  i:" + i);
		temvalue = i.getAndAdd(0, 5);
		System.out.println("temvalue:" + temvalue + ";  i:" + i);
	}

}

引用类型

  • AtomicReference:引用类型原子类
  • AtomicStampedReference:原子更新带有版本号的引用类型。该类将整数值引用关联起来,可用于解决原子的更新数据和数据的版本号,可以解决使用 CAS 进行原子更新时可能出现的 ABA (在下面有介绍)问题。
  • AtomicMarkableReference :原子更新带有标记位的引用类型。该类将 boolean 标记与引用关联起来,
引用类型原子类介绍

​ 基本类型原子类只能更新一个变量,如果需要原子更新多个变量,需要使用引用类型原子类。

​ 上面三个类提供的方法几乎相同,所以我们这里以 AtomicReference 为例子来介绍.

AtomicReference 类使用示例

import java.util.concurrent.atomic.AtomicReference;

public class AtomicReferenceTest {
	public static void main(String[] args) {
		AtomicReference<Person> ar = new AtomicReference<Person>();
		Person person = new Person("SnailClimb", 22);
		ar.set(person);
		Person updatePerson = new Person("Daisy", 20);
		ar.compareAndSet(person, updatePerson);

		System.out.println(ar.get().getName());
		System.out.println(ar.get().getAge());
	}
}
class Person {
	private String name;
	private int age;
	public Person(String name, int age) {
		super();
		this.name = name;
		this.age = age;
	}
	public String getName() {return name;}
	public void setName(String name) {this.name = name;}
	public int getAge() {return age;}
	public void setAge(int age) {this.age = age;}
}

​ 上述代码首先创建了一个 Person 对象,然后把 Person 对象设置进 AtomicReference 对象中,然后调用 compareAndSet 方法,该方法就是通过 CAS 操作设置 ar。如果 ar 的值为 person 的话,则将其设置为 updatePerson。实现原理与 AtomicInteger 类中的 compareAndSet 方法相同。运行上面的代码后的输出结果如下:

Daisy
20
AtomicStampedReference 类使用示例
import java.util.concurrent.atomic.AtomicStampedReference;
public class AtomicStampedReferenceDemo {
    public static void main(String[] args) {
        // 实例化、取当前值和 stamp 值
        final Integer initialRef = 0, initialStamp = 0;
        final AtomicStampedReference<Integer> asr = new AtomicStampedReference<>(initialRef, initialStamp);
        System.out.println("currentValue=" + asr.getReference() + ", currentStamp=" + asr.getStamp());

        // compare and set
        final Integer newReference = 666, newStamp = 999;
        final boolean casResult = asr.compareAndSet(initialRef, newReference, initialStamp, newStamp);
        System.out.println("currentValue=" + asr.getReference()
                + ", currentStamp=" + asr.getStamp()
                + ", casResult=" + casResult);

        // 获取当前的值和当前的 stamp 值
        int[] arr = new int[1];
        final Integer currentValue = asr.get(arr);
        final int currentStamp = arr[0];
        System.out.println("currentValue=" + currentValue + ", currentStamp=" + currentStamp);

        // 单独设置 stamp 值
        final boolean attemptStampResult = asr.attemptStamp(newReference, 88);
        System.out.println("currentValue=" + asr.getReference()
                + ", currentStamp=" + asr.getStamp()
                + ", attemptStampResult=" + attemptStampResult);

        // 重新设置当前值和 stamp 值
        asr.set(initialRef, initialStamp);
        System.out.println("currentValue=" + asr.getReference() + ", currentStamp=" + asr.getStamp());

        // [不推荐使用,除非搞清楚注释的意思了] weak compare and set
        // 困惑!weakCompareAndSet 这个方法最终还是调用 compareAndSet 方法。[版本: jdk-8u191]
        // 但是注释上写着 "May fail spuriously and does not provide ordering guarantees,
        // so is only rarely an appropriate alternative to compareAndSet."
        // todo 感觉有可能是 jvm 通过方法名在 native 方法里面做了转发
        final boolean wCasResult = asr.weakCompareAndSet(initialRef, newReference, initialStamp, newStamp);
        System.out.println("currentValue=" + asr.getReference()
                + ", currentStamp=" + asr.getStamp()
                + ", wCasResult=" + wCasResult);
    }
}

输出结果如下:

currentValue=0, currentStamp=0
currentValue=666, currentStamp=999, casResult=true
currentValue=666, currentStamp=999
currentValue=666, currentStamp=88, attemptStampResult=true
currentValue=0, currentStamp=0
currentValue=666, currentStamp=999, wCasResult=true
AtomicMarkableReference 类使用示例
import java.util.concurrent.atomic.AtomicMarkableReference;

public class AtomicMarkableReferenceDemo {
    public static void main(String[] args) {
        // 实例化、取当前值和 mark 值
        final Boolean initialRef = null, initialMark = false;
        final AtomicMarkableReference<Boolean> amr = new AtomicMarkableReference<>(initialRef, initialMark);
        System.out.println("currentValue=" + amr.getReference() + ", currentMark=" + amr.isMarked());

        // compare and set
        final Boolean newReference1 = true, newMark1 = true;
        final boolean casResult = amr.compareAndSet(initialRef, newReference1, initialMark, newMark1);
        System.out.println("currentValue=" + amr.getReference()
                + ", currentMark=" + amr.isMarked()
                + ", casResult=" + casResult);

        // 获取当前的值和当前的 mark 值
        boolean[] arr = new boolean[1];
        final Boolean currentValue = amr.get(arr);
        final boolean currentMark = arr[0];
        System.out.println("currentValue=" + currentValue + ", currentMark=" + currentMark);

        // 单独设置 mark 值
        final boolean attemptMarkResult = amr.attemptMark(newReference1, false);
        System.out.println("currentValue=" + amr.getReference()
                + ", currentMark=" + amr.isMarked()
                + ", attemptMarkResult=" + attemptMarkResult);

        // 重新设置当前值和 mark 值
        amr.set(initialRef, initialMark);
        System.out.println("currentValue=" + amr.getReference() + ", currentMark=" + amr.isMarked());

        // [不推荐使用,除非搞清楚注释的意思了] weak compare and set
        // 困惑!weakCompareAndSet 这个方法最终还是调用 compareAndSet 方法。[版本: jdk-8u191]
        // 但是注释上写着 "May fail spuriously and does not provide ordering guarantees,
        // so is only rarely an appropriate alternative to compareAndSet."
        // todo 感觉有可能是 jvm 通过方法名在 native 方法里面做了转发
        final boolean wCasResult = amr.weakCompareAndSet(initialRef, newReference1, initialMark, newMark1);
        System.out.println("currentValue=" + amr.getReference()
                + ", currentMark=" + amr.isMarked()
                + ", wCasResult=" + wCasResult);
    }
}

输出结果如下:

currentValue=null, currentMark=false
currentValue=true, currentMark=true, casResult=true
currentValue=true, currentMark=true
currentValue=true, currentMark=false, attemptMarkResult=true
currentValue=null, currentMark=false
currentValue=true, currentMark=true, wCasResult=true

🐛 修正(参见:issue#626) : AtomicMarkableReference 不能解决 ABA 问题。

/**
AtomicMarkableReference是将一个boolean值作是否有更改的标记,本质就是它的版本号只有两个,true和false,
修改的时候在这两个版本号之间来回切换,这样做并不能解决ABA的问题,只是会降低ABA问题发生的几率而已
@author : mazh
@Date : 2020/1/17 14:41
*/
public class SolveABAByAtomicMarkableReference {

       private static AtomicMarkableReference atomicMarkableReference = new AtomicMarkableReference(100, false);
        public static void main(String[] args) {
            Thread refT1 = new Thread(() -> {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                atomicMarkableReference.compareAndSet(100, 101, atomicMarkableReference.isMarked(), !atomicMarkableReference.isMarked());
                atomicMarkableReference.compareAndSet(101, 100, atomicMarkableReference.isMarked(), !atomicMarkableReference.isMarked());
            });
            Thread refT2 = new Thread(() -> {
                boolean marked = atomicMarkableReference.isMarked();
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                boolean c3 = atomicMarkableReference.compareAndSet(100, 101, marked, !marked);
                System.out.println(c3); // 返回true,实际应该返回false
            });
            refT1.start();
            refT2.start();
        }
    }

CAS ABA 问题

  • 描述: 第一个线程取到了变量 x 的值 A,然后巴拉巴拉干别的事,总之就是只拿到了变量 x 的值 A。这段时间内第二个线程也取到了变量 x 的值 A,然后把变量 x 的值改为 B,然后巴拉巴拉干别的事,最后又把变量 x 的值变为 A (相当于还原了)。在这之后第一个线程终于进行了变量 x 的操作,但是此时变量 x 的值还是 A,所以 compareAndSet 操作是成功。

  • 例子描述(可能不太合适,但好理解): 年初,现金为零,然后通过正常劳动赚了三百万,之后正常消费了(比如买房子)三百万。年末,虽然现金零收入(可能变成其他形式了),但是赚了钱是事实,还是得交税的!

  • 代码例子(以AtomicInteger为例)

import java.util.concurrent.atomic.AtomicInteger;
public class AtomicIntegerDefectDemo {
    public static void main(String[] args) {
        defectOfABA();
    }
    static void defectOfABA() {
        final AtomicInteger atomicInteger = new AtomicInteger(1);
        Thread coreThread = new Thread(
                () -> {
                    final int currentValue = atomicInteger.get();
                    System.out.println(Thread.currentThread().getName() + " ------ currentValue=" + currentValue);
                    // 这段目的:模拟处理其他业务花费的时间
                    try {
                        Thread.sleep(300);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    boolean casResult = atomicInteger.compareAndSet(1, 2);
                    System.out.println(Thread.currentThread().getName()
                            + " ------ currentValue=" + currentValue
                            + ", finalValue=" + atomicInteger.get()
                            + ", compareAndSet Result=" + casResult);
                }
        );
        coreThread.start();

        // 这段目的:为了让 coreThread 线程先跑起来
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        Thread amateurThread = new Thread(
                () -> {
                    int currentValue = atomicInteger.get();
                    boolean casResult = atomicInteger.compareAndSet(1, 2);
                    System.out.println(Thread.currentThread().getName()
                            + " ------ currentValue=" + currentValue
                            + ", finalValue=" + atomicInteger.get()
                            + ", compareAndSet Result=" + casResult);

                    currentValue = atomicInteger.get();
                    casResult = atomicInteger.compareAndSet(2, 1);
                    System.out.println(Thread.currentThread().getName()
                            + " ------ currentValue=" + currentValue
                            + ", finalValue=" + atomicInteger.get()
                            + ", compareAndSet Result=" + casResult);
                }
        );
        amateurThread.start();
    }
}

输出内容如下:

Thread-0 ------ currentValue=1
Thread-1 ------ currentValue=1, finalValue=2, compareAndSet Result=true
Thread-1 ------ currentValue=2, finalValue=1, compareAndSet Result=true
Thread-0 ------ currentValue=1, finalValue=2, compareAndSet Result=true

对象的属性修改类型

  • AtomicIntegerFieldUpdater:原子更新整型字段的更新器
  • AtomicLongFieldUpdater:原子更新长整型字段的更新器
  • AtomicReferenceFieldUpdater:原子更新引用类型字段的更新器

​ 要想原子地更新对象的属性需要两步。第一步,因为对象的属性修改类型原子类都是抽象类,所以每次使用都必须使用静态方法 newUpdater()创建一个更新器,并且需要设置想要更新的类和属性。第二步,更新的对象属性必须使用 public volatile 修饰符。

​ 上面三个类提供的方法几乎相同,所以我们这里以 AtomicIntegerFieldUpdater为例子来介绍。

AtomicIntegerFieldUpdater 类使用示例
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

public class AtomicIntegerFieldUpdaterTest {
	public static void main(String[] args) {
		AtomicIntegerFieldUpdater<User> a = AtomicIntegerFieldUpdater.newUpdater(User.class, "age");

		User user = new User("Java", 22);
		System.out.println(a.getAndIncrement(user));// 22
		System.out.println(a.get(user));// 23
	}
}
class User {
	private String name;
	public volatile int age;
	public User(String name, int age) {
		super();
		this.name = name;
		this.age = age;
	}
	public String getName() {return name;}
	public void setName(String name) {this.name = name;}
	public int getAge() {return age;}
	public void setAge(int age) {this.age = age;}
}

基本数据类型原子类的优势

​ 通过一个简单例子带大家看一下基本数据类型原子类的优势

① 多线程环境不使用原子类保证线程安全(基本数据类型)

class Test {
        private volatile int count = 0;
        //若要线程安全执行执行count++,需要加锁
        public synchronized void increment() {count++;}
        public int getCount() {return count;}
}

② 多线程环境使用原子类保证线程安全(基本数据类型)

class Test2 {
        private AtomicInteger count = new AtomicInteger();
        public void increment() {
                  count.incrementAndGet();
        }
      //使用AtomicInteger之后,不需要加锁,也可以实现线程安全。
       public int getCount() {
                return count.get();
        }
}

讲讲 AtomicInteger 的使用

AtomicInteger 类常用方法

public final int get() //获取当前的值
public final int getAndSet(int newValue)//获取当前的值,并设置新的值
public final int getAndIncrement()//获取当前的值,并自增
public final int getAndDecrement() //获取当前的值,并自减
public final int getAndAdd(int delta) //获取当前的值,并加上预期的值
boolean compareAndSet(int expect, int update) //如果输入的数值等于预期值,则以原子方式将该值设置为输入值(update)
public final void lazySet(int newValue)//最终设置为newValue,使用 lazySet 设置之后可能导致其他线程在之后的一小段时间内还是可以读到旧的值。

AtomicInteger 类的使用示例

使用 AtomicInteger 之后,不用对 increment() 方法加锁也可以保证线程安全。

class AtomicIntegerTest {
    private AtomicInteger count = new AtomicInteger();
    //使用AtomicInteger之后,不需要对该方法加锁,也可以实现线程安全。
    public void increment() {
        count.incrementAndGet();
    }

    public int getCount() {
        return count.get();
    }
}

能不能给我简单介绍一下 AtomicInteger 类的原理

  • 浅谈AtomicInteger实现原理

  • Java实现CAS的原理

AtomicInteger 类的部分源码:

// setup to use Unsafe.compareAndSwapInt for updates(更新操作时提供“比较并替换”的作用)
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;

static {
    try {
        valueOffset = unsafe.objectFieldOffset
            (AtomicInteger.class.getDeclaredField("value"));
    } catch (Exception ex) { throw new Error(ex); }
}

private volatile int value;

​ AtomicInteger 类主要利用 CAS (compare and swap) + volatile 和 native 方法来保证原子操作,从而避免 synchronized 的高开销,执行效率大为提升。

​ CAS 的原理是拿期望的值和原本的一个值作比较,如果相同则更新成新的值。UnSafe 类的 objectFieldOffset() 方法是一个本地方法,这个方法是用来拿到“原来的值”的内存地址。另外 value 是一个 volatile 变量,在内存中可见,因此 JVM 可以保证任何时刻任何线程总能拿到该变量的最新值

AQS

AQS 介绍

​ AQS 的全称为(AbstractQueuedSynchronizer),这个类在java.util.concurrent.locks包下面。

​ AQS 是一个用来构建同步器的框架,使用 AQS 能简单且高效地构造出大量应用广泛的同步器,比如我们提到的 ReentrantLockSemaphore,其他的诸如 ReentrantReadWriteLockSynchronousQueueFutureTask 等等皆是基于 AQS 的。当然,我们自己也能利用 AQS 非常轻松容易地构造出符合我们自己需求的同步器。

AQS 原理分析

AQS 原理这部分参考了部分博客,在 6.2 节末尾放了链接。

在面试中被问到并发知识的时候,大多都会被问到“请你说一下自己对于 AQS 原理的理解”。下面给大家一个示例供大家参加,面试不是背题,大家一定要加入自己的思想,即使加入不了自己的思想也要保证自己能够通俗的讲出来而不是背出来。

下面大部分内容其实在 AQS 类注释上已经给出了,不过是英语看着比较吃力一点,感兴趣的话可以看看源码。

AQS 原理概览

AQS 核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制 AQS 是用 CLH 队列锁实现的,即将暂时获取不到锁的线程加入到队列中。

CLH(Craig,Landin and Hagersten)队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS 是将每条请求共享资源的线程封装成一个 CLH 锁队列的一个结点(Node)来实现锁的分配。

看个 AQS(AbstractQueuedSynchronizer)原理图:

​ AQS 使用一个 int 成员变量来表示同步状态,通过内置的 FIFO 队列来完成获取资源线程的排队工作。AQS 使用 CAS 对该同步状态进行原子操作实现对其值的修改。

private volatile int state;//共享变量,使用volatile修饰保证线程可见性

​ 状态信息通过 protected 类型的 getState,setState,compareAndSetState 进行操作

//返回同步状态的当前值
protected final int getState() {
    return state;
}
//设置同步状态的值
protected final void setState(int newState) {
    state = newState;
}
//原子地(CAS操作)将同步状态值设置为给定值update如果当前同步状态的值等于expect(期望值)
protected final boolean compareAndSetState(int expect, int update) {
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

AQS 对资源的共享方式

AQS 定义两种资源共享方式

1.Exclusive(独占):只有一个线程能执行,如 ReentrantLock。又可分为公平锁和非公平锁:

(1)公平锁:按照线程在队列中的排队顺序,先到者先拿到锁

(2)非公平锁:当线程要获取锁时,无视队列顺序直接去抢锁,谁抢到就是谁的

说明:下面这部分关于 ReentrantLock 源代码内容节选自:一行一行源码分析清楚 AbstractQueuedSynchronizer (二)_Javadoop ,这是一篇很不错文章,推荐阅读。

下面来看 ReentrantLock 中相关的源代码:

ReentrantLock 默认采用非公平锁,因为考虑获得更好的性能,通过 boolean 来决定是否用公平锁(传入 true 用公平锁)。

/** Synchronizer providing all implementation mechanics */
private final Sync sync;
public ReentrantLock() {
    // 默认非公平锁
    sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

ReentrantLock 中公平锁的 lock 方法

static final class FairSync extends Sync {
    final void lock() {
        acquire(1);
    }
    // AbstractQueuedSynchronizer.acquire(int arg)
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            // 1. 和非公平锁相比,这里多了一个判断:是否有线程在等待
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

非公平锁的 lock 方法:

static final class NonfairSync extends Sync {
    final void lock() {
        // 2. 和公平锁相比,这里会直接先进行一次CAS,成功就返回了
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }
    // AbstractQueuedSynchronizer.acquire(int arg)
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}
/**
 * Performs non-fair tryLock.  tryAcquire is implemented in
 * subclasses, but both need nonfair try for trylock method.
 */
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        // 这里没有对阻塞队列进行判断
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

总结:公平锁和非公平锁只有两处不同:

  1. 非公平锁在调用 lock 后,首先就会调用 CAS 进行一次抢锁,如果这个时候恰巧锁没有被占用,那么直接就获取到锁返回了。
  2. 非公平锁在 CAS 失败后,和公平锁一样都会进入到 tryAcquire 方法,在 tryAcquire 方法中,如果发现锁这个时候被释放了(state == 0),非公平锁会直接 CAS 抢锁,但是公平锁会判断等待队列是否有线程处于等待状态,如果有则不去抢锁,乖乖排到后面

​ 公平锁和非公平锁就这两点区别,如果这两次 CAS 都不成功,那么后面非公平锁和公平锁是一样的,都要进入到阻塞队列等待唤醒。相对来说,非公平锁会有更好的性能,因为它的吞吐量比较大。当然,非公平锁让获取锁的时间变得更加不确定,可能会导致在阻塞队列中的线程长期处于饥饿状态。

2.Share(共享):多个线程可同时执行,如 CountDownLatchSemaphoreCyclicBarrierReadWriteLock 我们都会在后面讲到。

ReentrantReadWriteLock 可以看成是组合式,因为 ReentrantReadWriteLock 也就是读写锁允许多个线程同时对某一资源进行。不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源 state 的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS 已经在顶层实现好了。

AQS 底层使用了模板方法模式

​ 同步器的设计是基于模板方法模式的,如果需要自定义同步器一般的方式是这样(模板方法模式很经典的一个应用):

  1. 使用者继承 AbstractQueuedSynchronizer 并重写指定的方法。(这些重写方法很简单,无非是对于共享资源 state 的获取和释放)
  2. 将 AQS 组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。

这和我们以往通过实现接口的方式有很大区别,这是模板方法模式很经典的一个运用。

AQS 使用了模板方法模式,自定义同步器时需要重写下面几个 AQS 提供的钩子方法:

protected boolean tryAcquire(int)//独占方式。尝试获取资源,成功则返回true,失败则返回false。
protected boolean tryRelease(int)//独占方式。尝试释放资源,成功则返回true,失败则返回false。
protected int tryAcquireShared(int)//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
protected boolean tryReleaseShared(int)//共享方式。尝试释放资源,成功则返回true,失败则返回false。
protected boolean isHeldExclusively()//该线程是否正在独占资源。只有用到condition才需要去实现它。

什么是钩子方法呢? 钩子方法是一种被声明在抽象类中的方法,它可以是空方法(由子类实现),也可以是默认实现的方法。模板设计模式通过钩子方法控制固定步骤的实现

​ 除了上面提到的钩子方法之外,AQS 类中的其他方法都是 final ,所以无法被其他类重写。

​ 以 ReentrantLock 为例,state 初始化为 0,表示未锁定状态。A 线程 lock() 时,会调用 tryAcquire() 独占该锁并将 state+1 。此后,其他线程再 tryAcquire() 时就会失败,直到 A 线程 unlock()state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A 线程自己是可以重复获取此锁的(state 会累加),这就是可重入的概念。但要注意,获取多少次就要释放多少次,这样才能保证 state 是能回到零态的。

​ 再以 CountDownLatch 以例,任务分为 N 个子线程去执行,state 也初始化为 N(注意 N 要与线程个数一致)。这 N 个子线程是并行执行的,每个子线程执行完后 countDown() 一次,state 会 CAS(Compare and Swap) 减 1。等到所有子线程都执行完后(即 state=0 ),会 unpark() 主调用线程,然后主调用线程就会从 await() 函数返回,继续后余动作。

​ 一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryReleasetryAcquireShared-tryReleaseShared中的一种即可。但 AQS 也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock

推荐两篇 AQS 原理和相关源码分析的文章:

Semaphore(信号量)

synchronizedReentrantLock 都是一次只允许一个线程访问某个资源,Semaphore(信号量)可以指定多个线程同时访问某个资源。

​ 示例代码如下:

public class SemaphoreExample1 {
  // 请求的数量
  private static final int threadCount = 550;

  public static void main(String[] args) throws InterruptedException {
    // 创建一个具有固定线程数量的线程池对象(如果这里线程池的线程数量给太少的话你会发现执行的很慢)
    ExecutorService threadPool = Executors.newFixedThreadPool(300);
    // 一次只能允许执行的线程数量。
    final Semaphore semaphore = new Semaphore(20);

    for (int i = 0; i < threadCount; i++) {
      final int threadnum = i;
      threadPool.execute(() -> {// Lambda 表达式的运用
        try {
          semaphore.acquire();// 获取一个许可,所以可运行线程数量为20/1=20个
          test(threadnum);
          semaphore.release();// 释放一个许可
        } catch (InterruptedException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
        }

      });
    }
    threadPool.shutdown();
    System.out.println("finish");
  }

  public static void test(int threadnum) throws InterruptedException {
    Thread.sleep(1000);// 模拟请求的耗时操作
    System.out.println("threadnum:" + threadnum);
    Thread.sleep(1000);// 模拟请求的耗时操作
  }
}

​ 执行 acquire() 方法阻塞,直到有一个许可证可以获得然后拿走一个许可证;每个 release 方法增加一个许可证,这可能会释放一个阻塞的 acquire() 方法。然而,其实并没有实际的许可证这个对象,Semaphore 只是维持了一个可获得许可证的数量。 Semaphore 经常用于限制获取某种资源的线程数量。

​ 当然一次也可以一次拿取和释放多个许可,不过一般没有必要这样做:

semaphore.acquire(5);// 获取5个许可,所以可运行线程数量为20/5=4
test(threadnum);
semaphore.release(5);// 释放5个许可

​ 除了 acquire() 方法之外,另一个比较常用的与之对应的方法是 tryAcquire() 方法,该方法如果获取不到许可就立即返回 false。

Semaphore 有两种模式,公平模式和非公平模式。

  • 公平模式: 调用 acquire() 方法的顺序就是获取许可证的顺序,遵循 FIFO;
  • 非公平模式: 抢占式的。

Semaphore 对应的两个构造方法如下:

public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

这两个构造方法,都必须提供许可的数量,第二个构造方法可以指定是公平模式还是非公平模式,默认非公平模式

issue645 补充内容:SemaphoreCountDownLatch 一样,也是共享锁的一种实现。它默认构造 AQS 的 state 为 permits。当执行任务的线程数量超出 permits,那么多余的线程将会被放入阻塞队列 Park,并自旋判断 state 是否大于 0。只有当 state 大于 0 的时候,阻塞的线程才能继续执行,此时先前执行任务的线程继续执行 release() 方法,release() 方法使得 state 的变量会加 1,那么自旋的线程便会判断成功。 如此,每次只有最多不超过 permits 数量的线程能自旋成功,便限制了执行任务线程的数量。

CountDownLatch (倒计时器)

CountDownLatch 允许 count 个线程阻塞在一个地方,直至所有线程的任务都执行完毕。

CountDownLatch 是共享锁的一种实现,它默认构造 AQS 的 state 值为 count。当线程使用 countDown() 方法时,其实使用了tryReleaseShared方法以 CAS 的操作来减少 state,直至 state 为 0 。当调用 await() 方法的时候,如果 state 不为 0,那就证明任务还没有执行完毕,await() 方法就会一直阻塞,也就是说 await() 方法之后的语句不会被执行。然后,CountDownLatch 会自旋 CAS 判断 state == 0,如果 state == 0 的话,就会释放所有等待的线程,await() 方法之后的语句得到执行。

CountDownLatch 的两种典型用法

1、某一线程在开始运行前等待 n 个线程执行完毕。

​ 将 CountDownLatch 的计数器初始化为 n (new CountDownLatch(n)),每当一个任务线程执行完毕,就将计数器减 1 。(countdownlatch.countDown()),当计数器的值变为 0 时,在 CountDownLatch 上 await() 的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。

2、实现多个线程开始执行任务的最大并行性。

​ 注意是并行性,不是并发,强调的是多个线程在某一时刻同时开始执行。类似于赛跑,将多个线程放到起点,等待发令枪响,然后同时开跑。做法是初始化一个共享的 CountDownLatch 对象,将其计数器初始化为 1 (new CountDownLatch(1)),多个线程在开始执行任务前首先 coundownlatch.await(),当主线程调用 countDown() 时,计数器变为 0,多个线程同时被唤醒。

CountDownLatch 的使用示例

public class CountDownLatchExample1 {
  // 请求的数量
  private static final int threadCount = 550;

  public static void main(String[] args) throws InterruptedException {
    // 创建一个具有固定线程数量的线程池对象(如果这里线程池的线程数量给太少的话你会发现执行的很慢)
    ExecutorService threadPool = Executors.newFixedThreadPool(300);
    final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
    for (int i = 0; i < threadCount; i++) {
      final int threadnum = i;
      threadPool.execute(() -> {// Lambda 表达式的运用
        try {
          test(threadnum);
        } catch (InterruptedException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
        } finally {
          countDownLatch.countDown();// 表示一个请求已经被完成
        }

      });
    }
    countDownLatch.await();
    threadPool.shutdown();
    System.out.println("finish");
  }

  public static void test(int threadnum) throws InterruptedException {
    Thread.sleep(1000);// 模拟请求的耗时操作
    System.out.println("threadnum:" + threadnum);
    Thread.sleep(1000);// 模拟请求的耗时操作
  }
}

​ 上面的代码中,我们定义了请求的数量为 550,当这 550 个请求被处理完成之后,才会执行System.out.println("finish");

​ 与 CountDownLatch 的第一次交互是主线程等待其他线程。主线程必须在启动其他线程后立即调用 CountDownLatch.await() 方法。这样主线程的操作就会在这个方法上阻塞,直到其他线程完成各自的任务。

​ 其他 N 个线程必须引用闭锁对象,因为他们需要通知 CountDownLatch 对象,他们已经完成了各自的任务。这种通知机制是通过 CountDownLatch.countDown()方法来完成的;每调用一次这个方法,在构造函数中初始化的 count 值就减 1。所以当 N 个线程都调用了这个方法,count 的值等于 0,然后主线程就能通过 await()方法,恢复执行自己的任务。

​ 再插一嘴:CountDownLatchawait() 方法使用不当很容易产生死锁,比如我们上面代码中的 for 循环改为:

for (int i = 0; i < threadCount-1; i++) {
.......
}

​ 这样就导致 count 的值没办法等于 0(最后会减的只剩1),然后就会导致一直等待。

CountDownLatch 的不足

CountDownLatch 是一次性的,计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当 CountDownLatch 使用完毕后,它不能再次被使用。

CountDownLatch 相常见面试题

  • CountDownLatch 怎么用?应用场景是什么?
  • CountDownLatchCyclicBarrier 的不同之处?
  • CountDownLatch 类中主要的方法?

CyclicBarrier(循环栅栏)

CyclicBarrierCountDownLatch 非常类似,它也可以实现线程间的技术等待,但是它的功能比 CountDownLatch 更加复杂强大。主要应用场景和 CountDownLatch 类似。

CountDownLatch 的实现是基于 AQS 的,而 CycliBarrier 是基于 ReentrantLock(ReentrantLock 也属于 AQS 同步器)和 Condition 的。

CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是:让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。CyclicBarrier 默认的构造方法是 CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用 await() 方法告诉 CyclicBarrier 我已经到达了屏障,然后当前线程被阻塞

​ 再来看一下它的构造函数:

public CyclicBarrier(int parties) {
    this(parties, null);
}

public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}

​ 其中,parties 就代表了有拦截的线程的数量,当拦截的线程数量达到这个值的时候打开栅栏,让所有线程通过。

CyclicBarrier 的应用场景

CyclicBarrier 可以用于多线程计算数据,最后合并计算结果的应用场景。比如我们用一个 Excel 保存了用户所有银行流水,每个 Sheet 保存一个帐户近一年的每笔银行流水,现在需要统计用户的日均银行流水,先用多线程处理每个 sheet 里的银行流水,都执行完之后,得到每个 sheet 的日均银行流水,最后,再用 barrierAction 用这些线程的计算结果,计算出整个 Excel 的日均银行流水。

CyclicBarrier 的使用示例
public class CyclicBarrierExample2 {
  // 请求的数量
  private static final int threadCount = 550;
  // 需要同步的线程数量
  private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5);

  public static void main(String[] args) throws InterruptedException {
    // 创建线程池
    ExecutorService threadPool = Executors.newFixedThreadPool(10);

    for (int i = 0; i < threadCount; i++) {
      final int threadNum = i;
      Thread.sleep(1000);
      threadPool.execute(() -> {
        try {
          test(threadNum);
        } catch (InterruptedException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
        } catch (BrokenBarrierException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
        }
      });
    }
    threadPool.shutdown();
  }

  public static void test(int threadnum) throws InterruptedException, BrokenBarrierException {
    System.out.println("threadnum:" + threadnum + "is ready");
    try {
      /**等待60秒,保证子线程完全执行结束*/
      cyclicBarrier.await(60, TimeUnit.SECONDS);
    } catch (Exception e) {
      System.out.println("-----CyclicBarrierException------");
    }
    System.out.println("threadnum:" + threadnum + "is finish");
  }

}

运行结果,如下:

threadnum:0is ready
threadnum:1is ready
threadnum:2is ready
threadnum:3is ready
threadnum:4is ready
threadnum:4is finish
threadnum:0is finish
threadnum:1is finish
threadnum:2is finish
threadnum:3is finish
threadnum:5is ready
threadnum:6is ready
threadnum:7is ready
threadnum:8is ready
threadnum:9is ready
threadnum:9is finish
threadnum:5is finish
threadnum:8is finish
threadnum:7is finish
threadnum:6is finish
......

​ 可以看到当线程数量也就是请求数量达到我们定义的 5 个的时候, await() 方法之后的方法才被执行。另外,CyclicBarrier 还提供一个更高级的构造函数 CyclicBarrier(int parties, Runnable barrierAction),用于在线程到达屏障时,优先执行 barrierAction,方便处理更复杂的业务场景。示例代码如下:

public class CyclicBarrierExample3 {
  // 请求的数量
  private static final int threadCount = 550;
  // 需要同步的线程数量
  private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> {
    System.out.println("------当线程数达到之后,优先执行------");
  });

  public static void main(String[] args) throws InterruptedException {
    // 创建线程池
    ExecutorService threadPool = Executors.newFixedThreadPool(10);

    for (int i = 0; i < threadCount; i++) {
      final int threadNum = i;
      Thread.sleep(1000);
      threadPool.execute(() -> {
        try {
          test(threadNum);
        } catch (InterruptedException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
        } catch (BrokenBarrierException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
        }
      });
    }
    threadPool.shutdown();
  }

  public static void test(int threadnum) throws InterruptedException, BrokenBarrierException {
    System.out.println("threadnum:" + threadnum + "is ready");
    cyclicBarrier.await();
    System.out.println("threadnum:" + threadnum + "is finish");
  }

}

​ 运行结果,如下:

threadnum:0is ready
threadnum:1is ready
threadnum:2is ready
threadnum:3is ready
threadnum:4is ready
------当线程数达到之后,优先执行------
threadnum:4is finish
threadnum:0is finish
threadnum:2is finish
threadnum:1is finish
threadnum:3is finish
threadnum:5is ready
threadnum:6is ready
threadnum:7is ready
threadnum:8is ready
threadnum:9is ready
------当线程数达到之后,优先执行------
threadnum:9is finish
threadnum:5is finish
threadnum:6is finish
threadnum:8is finish
threadnum:7is finish
......

CyclicBarrier 源码分析

​ 当调用 CyclicBarrier 对象调用 await() 方法时,实际上调用的是 dowait(false, 0L)方法。 await() 方法就像树立起一个栅栏的行为一样,将线程挡住了,当拦住的线程数量达到 parties 的值时,栅栏才会打开,线程才得以通过执行。

public int await() throws InterruptedException, BrokenBarrierException {
  try {
    	return dowait(false, 0L);
  } catch (TimeoutException toe) {
   	 throw new Error(toe); // cannot happen
  }
}

dowait(false, 0L)

// 当线程数量或者请求数量达到 count 时 await 之后的方法才会被执行。上面的示例中 count 的值就为 5。
private int count;
/**
* Main barrier code, covering the various policies.
*/
private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
TimeoutException {
    final ReentrantLock lock = this.lock;
    // 锁住
    lock.lock();
    try {
        final Generation g = generation;

        if (g.broken)
            throw new BrokenBarrierException();

        // 如果线程中断了,抛出异常
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }
        // cout减1
        int index = --count;
        // 当 count 数量减为 0 之后说明最后一个线程已经到达栅栏了,也就是达到了可以执行await 方法之后的条件
        if (index == 0) {  // tripped
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
                ranAction = true;
                // 将 count 重置为 parties 属性的初始化值
                // 唤醒之前等待的线程
                // 下一波执行开始
                nextGeneration();
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }

        // loop until tripped, broken, interrupted, or timed out
        for (;;) {
            try {
                if (!timed)
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    // We're about to finish waiting even if we had not
                    // been interrupted, so this interrupt is deemed to
                    // "belong" to subsequent execution.
                    Thread.currentThread().interrupt();
                }
            }

            if (g.broken)
                throw new BrokenBarrierException();

            if (g != generation)
                return index;

            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}

​ 总结:CyclicBarrier 内部通过一个 count 变量作为计数器,count 的初始值为 parties 属性的初始化值,每当一个线程到了栅栏这里了,那么就将计数器减一。如果 count 值为 0 了,表示这是这一代最后一个线程到达栅栏,就尝试执行我们构造方法中输入的任务。

CyclicBarrier 和 CountDownLatch 的区别

​ 下面这个是国外一个大佬的回答:

CountDownLatch 是计数器,只能使用一次,而 CyclicBarrier 的计数器提供 reset 功能,可以多次使用。但是我不那么认为它们之间的区别仅仅就是这么简单的一点。我们来从 jdk 作者设计的目的来看,javadoc 是这么描述它们的:

CountDownLatch: A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.(CountDownLatch: 一个或者多个线程,等待其他多个线程完成某件事情之后才能执行;) CyclicBarrier : A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.(CyclicBarrier : 多个线程互相等待,直到到达同一个同步点,再继续一起执行。)

​ 对于 CountDownLatch 来说,重点是“一个线程(多个线程)等待”,而其他的 N 个线程在完成“某件事情”之后,可以终止,也可以等待。而对于 CyclicBarrier,重点是多个线程,在任意一个线程没有完成,所有的线程都必须等待。

CountDownLatch 是计数器,线程完成一个记录一个,只不过计数不是递增而是递减,而 CyclicBarrier 更像是一个阀门,需要所有线程都到达,阀门才能打开,然后继续执行。

AQS 组件总结

  • Semaphore(信号量)-允许多个线程同时访问: synchronizedReentrantLock 都是一次只允许一个线程访问某个资源,Semaphore(信号量)可以指定多个线程同时访问某个资源(和操作系统里的信号量有相似之处)。
  • CountDownLatch (倒计时器): CountDownLatch 是一个同步工具类,用来协调多个线程之间的同步。这个工具通常用来控制线程等待,它可以让某一个线程等待直到倒计时结束,再开始执行。
  • CyclicBarrier(循环栅栏): CyclicBarrierCountDownLatch 非常类似,它也可以实现线程间的技术等待,但是它的功能比 CountDownLatch 更加复杂和强大。主要应用场景和 CountDownLatch 类似。CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。CyclicBarrier 默认的构造方法是 CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用 await() 方法告诉 CyclicBarrier 我已经到达了屏障,然后当前线程被阻塞。

用过 CountDownLatch 么?什么场景下用的?

CountDownLatch 的作用就是 允许 count 个线程阻塞在一个地方,直至所有线程的任务都执行完毕。之前在项目中,有一个使用多线程读取多个文件处理的场景,我用到了 CountDownLatch 。具体场景是下面这样的:

​ 我们要读取处理 6 个文件,这 6 个任务都是没有执行顺序依赖的任务,但是我们需要返回给用户的时候将这几个文件的处理的结果进行统计整理。

​ 为此我们定义了一个线程池和 count 为 6 的CountDownLatch对象 。使用线程池处理读取任务,每一个线程处理完之后就将 count-1,调用CountDownLatch对象的 await()方法,直到所有文件读取完之后,才会接着执行后面的逻辑

伪代码是下面这样的:

public class CountDownLatchExample1 {
    // 处理文件的数量
    private static final int threadCount = 6;
    public static void main(String[] args) throws InterruptedException {
        // 创建一个具有固定线程数量的线程池对象(推荐使用构造方法创建)
        ExecutorService threadPool = Executors.newFixedThreadPool(10);
        final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
        for (int i = 0; i < threadCount; i++) {
            final int threadnum = i;
            threadPool.execute(() -> {
                try {
                    //处理文件的业务操作
                    //......
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    //表示一个文件已经被完成
                    countDownLatch.countDown();
                }

            });
        }
        countDownLatch.await();
        threadPool.shutdown();
        System.out.println("finish");
    }
}

有没有可以改进的地方呢?

​ 可以使用 CompletableFuture 类来改进!Java8 的 CompletableFuture 提供了很多对多线程友好的方法,使用它可以很方便地为我们编写多线程程序,什么异步、串行、并行或者等待所有线程执行完任务什么的都非常方便。

CompletableFuture<Void> task1 =
    CompletableFuture.supplyAsync(()->{
        //自定义业务操作
    });
......
CompletableFuture<Void> task6 =
    CompletableFuture.supplyAsync(()->{
    //自定义业务操作
    });
......
CompletableFuture<Void> headerFuture=CompletableFuture.allOf(task1,.....,task6);

try {
    headerFuture.join();
} catch (Exception ex) {
    //......
}
System.out.println("all done. ");

上面的代码还可以继续优化,当任务过多的时候,把每一个 task 都列出来不太现实,可以考虑通过循环来添加任务。

//文件夹位置
List<String> filePaths = Arrays.asList(...)
// 异步处理所有文件
List<CompletableFuture<String>> fileFutures = filePaths.stream()
    .map(filePath -> doSomeThing(filePath))
    .collect(Collectors.toList());
// 将他们合并起来
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
    fileFutures.toArray(new CompletableFuture[fileFutures.size()])
);