📋 Java 并发编程知识整理
并发编程其实是一个很宏大的话题,并不是通过一两篇文章就能叙述详尽的。本文尽量涉及到并发编程的所有基础知识,但对于很多知识点,如果需要进行深入了解,仅仅通过本文仍是不够的。另外,有许多知识其实通过阅读源码的方式更加容易理解。推荐读者在阅读本文时,能够自己动手探究一下源码,这样能够深化认识,加深印象,而不至于仅仅停留在表面。
温馨提示:本文内容较多,请合理安排时间。
并发与并行
什么是并发
在操作系统中,并发是指一个时间段中有几个程序都处于已启动运行到运行完毕之间,且这几个程序都是在同一个处理机上运行。
什么是并行
并行(Parallel),在多核系统中,当一个CPU在执行一个进程时,另一个CPU可以执行另一个进程,两个进程互不抢占CPU资源,可以同时进行,这种方式我们称之为并行。只有在多CPU的情况中,才会发生并行。否则,看似同时发生的事情,其实都是并发执行的。
并发与并行的区别
并发是指在一段时间内宏观上多个程序同时运行。并行指的是同一个时刻,多个任务确实真的在同时运行。
线程
线程的实现
实现线程主要有三种方式:
- 继承 Thread 类,并重写其 run 方法。
- 实现 Runnable 接口,并实现其 run 方法。
- 实现 Callable 接口,并实现其 call 方法。
线程的状态
-
新建状态:
使用 new 关键字和 Thread 类或其子类建立一个线程对象后,该线程对象就处于新建状态。它保持这个状态直到程序 start() 这个线程。 -
就绪状态:
当线程对象调用了 start() 方法之后,该线程就进入就绪状态。就绪状态的线程处于就绪队列中,要等待JVM里线程调度器的调度。 -
运行状态:
如果就绪状态的线程获取 CPU 资源,就可以执行 run(),此时线程便处于运行状态。处于运行状态的线程最为复杂,它可以变为阻塞状态、就绪状态和死亡状态。 -
阻塞状态:
如果一个线程执行了sleep(睡眠)、suspend(挂起)等方法,失去所占用资源之后,该线程就从运行状态进入阻塞状态。在睡眠时间已到或获得设备资源后可以重新进入就绪状态。可以分为三种:
- 等待阻塞:运行状态中的线程执行 wait() 方法,使线程进入到等待阻塞状态。
- 同步阻塞:线程在获取 synchronized 同步锁失败(因为同步锁被其他线程占用)。
- 其他阻塞:通过调用线程的 sleep() 或 join() 发出了 I/O 请求时,线程就会进入到阻塞状态。当sleep() 状态超时,join() 等待线程终止或超时,或者 I/O 处理完毕,线程重新转入就绪状态。
-
死亡状态:
一个运行状态的线程完成任务或者其他终止条件发生时,该线程就切换到终止状态。
优先级
Java线程可以有优先级的设定,高优先级的线程比低优先级的线程有更高的几率得到执行。
- 当线程的优先级没有指定时,所有线程都携带普通优先级。
- 优先级可以用从1-10的范围指定。10表示最高优先级,1表示最低优先级,5是普通优先级。
- 优先级最高的线程在执行时被给予优先,但是不能保证线程在启动时就进入运行状态。
- 与在线程池中等待运行机会的线程相比,当前正在运行的线程可能总是拥有更高的优先级。
- 由调度程序决定哪一个线程被执行。
- setPriority() 用来设定线程的优先级。
- 在线程开始方法被调用之前,线程的优先级应该被设定。
- 可以使用常量,如MIN_PRIORITY,MAX_PRIORITY,NORM_PRIORITY来设定优先级。
线程优先级具有继承性,A线程启动B线程,B线程的优先级和A线程的优先级是一样的。
高优先级的线程总是大部分先执行完,并不是高优先级的完全先执行完。线程的优先级和执行顺序无关,优先级高的线程执行快。
线程调度
Java 提供一个线程调度器来监视和控制 Runnable 状态的线程。线程的调度策略采用抢占式,优先级高的线程比优先级低的线程优先执行。在优先级相同的情况下,按照先到先服务的原则。每个Java程序都有一个默认的主线程,就是通过 JVM 启动的第一个线程。对于应用程序,主线程执行的是 main() 方法。对于 Applet 主线程是指浏览器加载并执行小应用程序的那一个线程。
子线程是由应用程序创建的线程。
还有一种线程称为守护线程(Daemon),这是一种用于监视其他线程工作的服务线程,优先级为最低。
创建线程的多种方式
在 jdk1.5 之前,创建线程就只有两种方式,即继承 java.lang.Thread 类和实现 java.lang.Runnable 接口;而在 JDK1.5 以后,增加了两个创建线程的方式,即实现 java.util.concurrent.Callable 接口和线程池。
创建线程方式:
-
继承 Thread 类
public class ThreadTest { public static void main(String[] args) { // 设置线程名字 Thread.currentThread().setName("main thread"); MyThread myThread = new MyThread(); myThread.setName("子线程:"); // 开启线程 myThread.start(); for (int i = 0; i < 5; i++) { System.out.println(Thread.currentThread().getName() + i); } } } class MyThread extends Thread { @Override public void run() { for (int i = 0; i < 10; i++) { System.out.println(Thread.currentThread().getName() + i); } } }
-
实现 Runnable 接口
public class RunnableTest { public static void main(String[] args) { // 设置线程名字 Thread.currentThread().setName("main thread:"); Thread thread = new Thread(new MyRunnable());// 提示:不要显式创建线程,请使用线程池。 thread.setName("子线程:"); // 开启线程 thread.start(); for (int i = 0; i < 5; i++) { System.out.println(Thread.currentThread().getName() + i); } } } class MyRunnable implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { System.out.println(Thread.currentThread().getName() + i); } } }
-
实现 Callable 接口
public class CallableTest { public static void main(String[] args) { // 执行Callable方式,需要FutureTask实现实现,用于接收运算结果 FutureTask<Integer> futureTask = new FutureTask<>(new MyCallable()); new Thread(futureTask).start();// 提示:不要显式创建线程,请使用线程池。 // 接收线程运算后的结果 try { Integer sum = futureTask.get(); System.out.println(sum); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } } class MyCallable implements Callable<Integer> { @Override public Integer call() { int sum = 0; for (int i = 0; i < 100; i++) { sum += i; } return sum; } }
-
自定义线程池【推荐】
线程池提供了一个线程队列,队列中保存着所有等待状态的线程。避免了创建与销毁额外开销,提高了响应速度。
public class ThreadPoolExecutorTest { public static void main(String[] args) { // 核心线程数 int corePoolSize = 3; // 最大线程数 int maximumPoolSize = 6; // 超过 corePoolSize 线程数量的线程最大空闲时间 long keepAliveTime = 2; // 以秒为时间单位 TimeUnit unit = TimeUnit.SECONDS; // 创建工作队列,用于存放提交的等待执行任务 BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<Runnable>(2); // 创建线程池 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new ThreadPoolExecutor.AbortPolicy()); // ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(10); ThreadPool threadPool = new ThreadPool(); for (int i = 0; i < 5; i++) { // 为线程池分配任务 threadPoolExecutor.submit(threadPool); } // 关闭线程池 threadPoolExecutor.shutdown(); } } class ThreadPool implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { System.out.println(Thread.currentThread().getName() + ":" + i); } } }
阿里巴巴Java开发规范不推荐直接使用
Executors.newFixedThreadPool()
的方式创建线程池。
守护线程
在Java中有两类线程:用户线程、守护线程(也称为服务线程)。
上文已提到,守护线程是一种用于监视其他线程工作的服务线程,因此,只要存在任何一个未结束的用户线程,守护线程就会一直运行,直到所有用户线程都结束后,守护线程才会随着 JVM 一同结束。用户线程与守护线程几乎没有区别,它们之间唯一的区别只在于线程退出的时机。
守护线程并非只有虚拟机内部提供,用户在编写程序时也可以自己设置守护线程。
Thread daemonThread = new Thread();
daemonThread.setDaemon(true);// 设置线程为守护线程
daemonThread.isDaemon();// 判断线程是否是守护线程
用户在设置守护线程时,应当注意:
daemonThread.setDaemon(true)
必须在thread.start()
之前执行,否则会抛出 IllegalThreadStateException 异常,这是因为程序无法把一个正在运行的线程设置为守护线程。- 在守护线程中产生的新线程也是守护线程。
- 并非所有的应用都可以分配给守护线程进行服务,如读写操作和计算逻辑。
JRE 判断程序是否执行结束的标准是所有的前台线程(用户线程)是否执行完毕,而不关注后台线程(守护线程)的状态,所以经常会出现前台线程结束,程序就执行完毕,并关闭守护线程,但实际上守护线程并未执行完成。因此我们不应当用守护线程去访问固有资源,如文件、数据库等,因为它可能在任何时候发生中断。
守护线程不依赖于终端,但是依赖于系统。
守护线程的优先级比较低,垃圾回收线程就是一个经典的守护线程。
线程与进程
-
进程
每个进程都有独立的代码和数据空间(进程上下文),进程间的切换会有较大的开销,一个进程包含一个或多个线程。(进程是资源分配的最小单位)
多进程是指操作系统能同时运行多个任务(程序)。
-
线程
同一类线程共享代码和数据空间,每个线程有独立的运行栈和程序计数器,线程切换开销小。(线程是cpu调度的最小单位)
多线程是指在同一程序中有多个顺序流在执行。
每个正在系统上运行的程序都是一个进程。每个进程包含一到多个线程。进程也可能是整个程序或者是部分程序的动态执行。线程是一组指令的集合,或者是程序的特殊段,它可以在程序里独立执行。
在Java中,一个应用程序可以包含多个线程。每个线程执行特定的任务,并可与其他线程并发执行多线程使系统的空转时间最少,提高CPU利用率、多线程编程环境用方便的模型隐藏CPU在任务间切换的事实在Java程序启动时,一个线程立刻运行,该线程通常称为程序的主线程。
主线程的重要性体现在两个方面:
- 主线程是产生其他子线程的线程。
- 通常主线程必须最后完成执行,因为它需要执行各种关闭动作。
线程的同步与互斥
互斥是指不同线程通过竞争进入临界区(共享的数据和硬件资源),为了防止访问冲突,同一时间只允许一个访问者对其进行访问,具有唯一性和排它性。但互斥无法限制访问者对资源的访问顺序,即访问是无序的。
线程的同步关系是指多个线程彼此合作,通过一定的逻辑关系来共同完成一个任务,它在互斥的基础上(大多数情况),通过其它机制实现访问者按照某种逻辑顺序对资源进行访问。
同步其实已经实现了互斥,所以同步是一种更为复杂的互斥。互斥是一种特殊的同步。
总的来说,同步与互斥的区别在于:
- 互斥是通过竞争对资源的独占使用,线程彼此之间不需要知道对方的存在,执行顺序是一个乱序。
- 同步是协调多个相互关联线程合作完成任务,彼此之间知道对方存在,执行顺序往往是有序的。
同步与互斥的关系同样适用于描述不同进程之间的关系。
线程池
线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。
使用线程池可以很好地提高性能,线程池在系统启动时即创建大量空闲的线程,程序将一个任务传给线程池,线程池就会启动一条线程来执行这个任务,执行结束以后,该线程并不会死亡,而是再次返回线程池中成为空闲状态,等待执行下一个任务。
为什么使用线程池?
多线程运行时间,系统不断的启动和关闭新线程,成本非常高,会过渡消耗系统资源,以及过渡切换线程的危险,从而可能导致系统资源的崩溃。使用线程池能很好地解决这一问题。
四种常见的线程池
-
CachedThreadPool
缓存线程池,没有核心线程,普通线程数量为
Integer.MAX_VALUE
(可以理解为无限),线程闲置60s后回收,任务队列使用SynchronousQueue
这种无容量的同步队列。[适用于任务量大但耗时低的场景]{.wavy}。使用示例:
public class Test { public static void main(String[] args) { // 1. 创建缓存线程池 ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); // 2. 创建线程任务 Runnable task = () -> System.out.println(Thread.currentThread().getName() + " ----> running");// use Lambda // 3. 向线程池提交任务 for (int i = 0; i < 5; i++) { cachedThreadPool.execute(task); } } }
-
ScheduledThreadPool
定时线程池,指定核心线程数量,普通线程数量无限,线程执行完任务立即回收,任务队列为延时阻塞队列。这是一个比较特别的线程池,[适用于执行定时或周期性的任务]{.wavy}。
使用示例:
public class Test { public static void main(String[] args) { // 1. 创建定时线程池 ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);// 核心线程数:5 // 2. 创建线程任务 Runnable task = () -> System.out.println(Thread.currentThread().getName() + " ----> running");// use Lambda // 3. 向线程池提交任务 scheduledThreadPool.schedule(task, 2, TimeUnit.SECONDS);// 延迟2s后执行任务 scheduledThreadPool.scheduleAtFixedRate(task, 50, 2000, TimeUnit.MILLISECONDS);// 延迟50ms后,每隔2000ms执行任务 } }
-
SingleThreadPool
单线程线程池。特点是线程池中只有一个线程(核心线程),线程执行完任务立即回收,使用有界阻塞队列(容量未指定,则使用默认值
Integer.MAX_VALUE
)。使用示例:
public class Test { public static void main(String[] args) { // 1. 创建单线程线程池 ExecutorService singleThreadPool = Executors.newSingleThreadScheduledExecutor(); // 2. 创建线程任务 Runnable task = () -> System.out.println(Thread.currentThread().getName() + " ----> running");// use Lambda // 3. 向线程池提交任务 singleThreadPool.execute(task); } }
-
FixedThreadPool
固定容量线程池,其特点是最大线程数就是核心线程数,意味着线程池只能创建核心线程,
keepAliveTime
为0,即线程执行完任务立即回收。任务队列未指定容量,代表使用默认值Integer.MAX_VALUE
。[适用于需要控制并发线程的场景]{.wavy}。使用示例:
public class Test { public static void main(String[] args) { // 1. 创建固定容量线程池 ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5); // 2. 创建线程任务 Runnable task = () -> System.out.println(Thread.currentThread().getName() + " ----> running");// use Lambda // 3. 向线程池提交任务 fixedThreadPool.execute(task); } }
自己设计线程池
线程池设计示例:
public class ThreadPool {
int threadPoolSize; // 线程池大小
final LinkedList<Runnable> tasks = new LinkedList<>(); // 任务容器
public ThreadPool() {
threadPoolSize = 5;
// 预先创建多个线程
System.out.println("Create " + threadPoolSize + " thread(s) and pre-start them:");
synchronized (tasks) {
for (int i = 0; i < threadPoolSize; i++) {
new TaskConsumeThread("Thread-" + i).start();
}
}
}
public void add(Runnable r) {
synchronized (tasks) {
tasks.add(r);
// 当有任务进入时,唤醒线程池中所有线程
tasks.notify();
}
}
class TaskConsumeThread extends Thread {
public TaskConsumeThread(String name) {
super(name);
}
Runnable task;
@Override
public void run() {
System.out.println(this.getName() + " is pre-started.");
while (true) {
synchronized (tasks) {
while (tasks.isEmpty()) {
try {
tasks.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
task = tasks.removeLast();
tasks.notifyAll();
}
System.out.println("\n" + this.getName() + " received a task:");
task.run();
}
}
}
}
测试调用:
public class Test {
public static void main(String[] args) {
ThreadPool pool = new ThreadPool();
for (int i = 0; i < 8; i++) {
Runnable task = () -> System.out.println(Thread.currentThread().getName() + " is dealing with it!");
pool.add(task);
try {
Thread.sleep((long) (Math.random() * 2000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
submit() 和 execute()
线程池框架提供了两种方式提交任务,根据不同的业务需求选择不同的方式。
-
execute()
函数的定义:public interface Executor { void execute(Runnable command); }
-
submit()
函数的定义:public interface ExecutorService extends Executor { ... <T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Runnable task, T result); Future<?> submit(Runnable task); ... }
submit() 和 execute() 的区别?
-
接收参数不同。
execute() 只能接收 Runnable 类型的参数,而 submit() 函数还可以接受 Callable 类型参数。
-
返回值不同。
submit() 的所有方法都有返回值 Future,而 execute() 函数的返回值为空。
-
异常处理。
submit() 更方便对异常进行处理,它不会抛出异常而是把异常保存在成员变量中,在
FutureTask.get
阻塞获取的时候再把异常抛出来。而 execute() 直接抛出异常之后,线程就死掉了。
线程池原理
线程池状态
线程池和线程一样拥有自己的状态,在 ThreadPoolExecutor 类中定义了一个 volatile 变量通过 runState 来表示线程池的状态,线程池有五种状态,分别为RUNNING、SHURDOWN、STOP、TIDYING、TERMINATED。
- 线程池创建后处于 RUNNING 状态。
- 调用 shutdown 后处于 SHUTDOWN 状态,此时线程池不能接受新的任务,会等待缓冲队列的任务完成。
- 调用 shutdownNow 后处于 STOP 状态,线程池不能接受新的任务,并尝试终止正在执行的任务。
- 当所有的任务已终止,线程池任务数量为 0 ,线程池会变为 TIDYING 状态,并且会执行钩子函数
terminated()
。 - 当线程池处在 TIDYING 状态时,并执行完
terminated()
之后,线程池就会变更为 TERMINATED 状态。
线程池原理
预先启动一些线程,线程无限循环从任务队列中获取一个任务进行执行,直到线程池被关闭。如果某个线程因为执行某个任务发生异常而终止,那么重新创建一个新的线程而已,如此反复。
线程池工作流程:
执行逻辑说明:
- 判断核心线程数是否已满,核心线程数大小和 corePoolSize 参数有关,未满则创建线程执行任务。
- 若核心线程数已满,则判断队列是否已满,队列是否已满和 workQueue 参数有关,未满则加入队列中。
- 若核心线程池和队列都已满,则判断线程池是否已满,线程池是否已满和 maximumPoolSize 参数有关,未满则创建线程执行任务。
- 若核心线程池、队列和线程池都已满,则采用拒绝策略处理无法执执行的任务,拒绝策略和 handler 参数有关。
为什么不允许使用Executors创建线程池
阿里巴巴 Java 开发手册:不允许使用 Executors 创建线程池?
线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
合理使用线程池可以带来以下好处:
- 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
- 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
- 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配、调优和监控。
一般来说线程池的创建最好通过 new ThreadPoolExecutor
来创建,这样方便控制参数。而使用 Executors 创建的线程池容易造成内存溢出{.wavy .success}。
记住线程池的工作流程,并且观察 Executors.newCachedThreadPool()
源代码:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}
当一个任务提交时,核心线程数为 0 ,则不创建核心线程,而 SynchronousQueue
是一个不存储元素的队列(可以理解为队列永远是满的),所以最终会创建非核心线程来执行任务。而对于非核心线程,会站其空闲 60s 后被回收,但 Integer.MAX_VALUE
又非常大,在资源有限的情况下,就容易导致内存溢出的异常。
而 newSingleThreadExecutor 和 newFixedThreadPool 均是使用 LinkedBlockingQueue
队列,而这个队列的长度指定为 Integer.MAX_VALUE
,即无界队列,因此往队列中可以插入无限多的任务,在资源有限的时候容易引起内存溢出的异常。
总结:
FixedThreadPool 和 SingleThreadExecutor,允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,导致内存溢出。
CachedThreadPool,允许创建的线程数为 Integer.MAX_VALUE 可能会创建大量的线程,导致内存移出。
解决方法:参考上文创建线程的多种方式部分。
线程安全
什么是线程安全
多个线程访问同一个对象时,如果不用考虑这些线程在运行时环境下的调度和交替执行,也不需要进行额外的同步,或者在调用方进行任何其他操作,调用这个对象的行为都可以获得正确的结果,那么这个对象就是线程安全的。
或者说,一个类或者程序所提供的接口对于线程来说是原子操作或者多个线程之间的切换不会导致该接口的执行结果存在二义性,也就是说我们不用考虑同步的问题。
quote from 百度百科_线程安全
多级缓存和一致性问题
什么是 CPU 缓存
CPU 缓存是位于 CPU 与内存之间的临时存储器,它的容量比内存小的多,但是交换速度却比内存要快得多。CPU 缓存一般直接跟 CPU 芯片集成或位于主板总线互连的独立芯片上。
随着硬件的发展,主存的读取速度远远跟不上 CPU 的频率,这样,在处理器时钟周期内,CPU 常常需要等待主存,因此导致 CPU 不必要的浪费。而 cache 的出现正是为了解决缓存 CPU 和主存之间的速度不匹配的问题。
CPU往往需要重复处理相同的数据、重复执行相同的指令,如果这部分数据、指令CPU能在CPU缓存中找到,CPU就不需要从内存或硬盘中再读取数据、指令,从而减少了整机的响应时间。所以,缓存的意义满足以下两种局部性原理:
- 时间局部性(Temporal Locality):如果一个信息项正在被访问,那么在近期它很可能还会被再次访问。
- 空间局部性(Spatial Locality):如果一个存储器的位置被引用,那么将来他附近的位置也可能被引用。
CPU 的三级缓存
在 Windows 环境中查看主机的三级缓存:
随着多核 CPU 的发展,CPU 缓存通常分成了三个级别: L1
, L2
, L3
。级别越小越接近 CPU,所以速度也更快,同时也代表着容量越小。在每个 CPU 内核上都存在两个一级缓存,一个是用于存储数据的 L1d Cache(Data Cache),另一个是用于存储指令的 L1i Cache(Instruction Cache)。以及缓存最小,也最接近 CPU,二三级则相对更大,距 CPU 也相对较远。一般来说,每级缓存的命中率大概都在 80% 左右,也就是说全部数据量的 80% 都可以在一级缓存中找到。剩下未命中的 20% 则从二级缓存中查找,如果三级缓存中都为找到,才从主存中读取。
如上图所示,一个核拥有两个 L1 缓存,一个 L2 缓存;在同一个 CPU 插槽之间的所有核共享一个 L3 缓存。
带有高速缓存 CPU 执行计算的流程:
- 将程序和数据从硬盘加载到内存中。
- 将程序和数据从内存加载到缓存中(目前多三级缓存,数据加载顺序:L3 -> L2 -> L1)。
- CPU将缓存中的数据加载到寄存器中,并进行运算。
- CPU会将数据刷新回缓存,并在一定的时间周期之后刷新回内存。
缓存一致性协议(MESI)
多核 CPU 情况下,存在多个 L1 缓存,要保证缓存数据的一致,不让数据产生混乱。其常见的解决方案是缓存一致性(MESI)。
除了 MESI 之外,还有锁住总线也可用于保证缓存数据的一致。
MESI (Modified Exclusive Shared Or Invalid
)也成为伊利诺斯协议,它是一种广泛使用的支持写回策略的缓存一致性协议。为了保证多个 CPU 缓存中共享数据的一致性,定义了缓存行(Cache Line)四种状态,而CPU对缓存行的四种操作可能会产生不一致的状态,因此缓存控制器监听到本地操作和远程操作的时候,需要对地址一致的缓存行的状态进行一致性修改,从而保证数据在多个缓存之间保持一致性。
缓存行(Cache Line):它是存储缓存数据的单元,最常见的缓存行大小是 64 个字节。
当多线程修改互相独立的变量时,如果这些变量共享同一个缓存行,就会无意中影响彼此的性能,这就是伪共享。
MESI 中的每个缓存行都有4个状态,分别是 Exclusive
、 Modified
、 Shared
和 Invalid
。
-
Modified(修改)
表示改缓存行中的内容被修改了,和内存中的数据不一致,数据只存在当前 Cache 中。
缓存行必须时刻监听所有试图读取该缓存行相对就主存的操作,这种操作必须在缓存将该缓存行写回主存并将状态变成S(共享)状态之前被延迟执行。
当被写回主存之后,该缓存行的状态会变成独享(exclusive)状态。
-
Exclusive(独享、互斥)
表示改缓存行中的数据和内存中的一致,数据只存在当前 Cache 中。
缓存行也必须监听其它缓存读主存中该缓存行的操作,一旦有这种操作,该缓存行需要变成S(共享)状态。
当 CPU 修改该缓存行中内容时,该状态可以变成 Modified 状态。
-
Shared(共享)
表示该缓存行中单数据和内存中的一致,数据存在于多个 Cache 中。
缓存行也必须监听其它缓存使该缓存行无效或者独享该缓存行的请求,并将该缓存行变成无效(Invalid)。
当有一个 CPU 修改该缓存行时,其它CPU中该缓存行可以被作废(变成无效状态 Invalid)。
-
Invalid(无效)
表示该缓存行中的内容无效。
MESI 状态转换图:
并发编程三大特性
并发编程的目标是充分的利用处理器的每一个核,以达到最高的处理性能,原子性、可见性和有序性是 Java 并发编程的三大特性。
要想并发程序正确地执行,必须要保证原子性、可见性以及有序性。只要有一个没有被保证,就有可能会导致程序运行不正确。
-
可见性
可见性是指当一个线程修改了共享变量后,其他线程能够立即看见这个修改。
CPU 中有一块缓存区叫做高速缓存,如果线程修改了某个变量的值,那么,程序会先将修改过的值放入缓存区,然后在满足一定条件时,该数据才会从缓存区同步到主存区。而只有同步到主存区后,其他线程才能“看到”这个修改后的值。
要保证可见性,就需要使用
volatile
关键字,它会将高速缓存区中被修改的结果刷新到主存中,然后再读取主存中的数据,这里使用到一种机制,叫做总线嗅探机制。这里我尝试画了一张图,来解释如何保证可见性:
解释:
总线嗅探机制会嗅探通过锁总线的数据,如果该数据只被一个核心引用,那么,该缓存行就会标记为独享状态(E),而当锁总线嗅探到多个线程同时引用该数据,那么 CPU 就会通过广播机制,通知所有引用该数据的核心,将对应的缓存行状态变更为共享状态(S),如果共享的数据被其中某一线程(核心)修改,那么此时,该数据相对于对其进行修改的线程来说,就会变更为修改状态(M),这时修改状态的数据刷新到主存区必然会再次经过锁总线,锁总线嗅探到数据发生了修改,就会再次通知其他所有引用该数据的核心,将各自原本引用数据对应的缓存行变更为无效状态(I)。而如果其他核心想要再次读取该数据,发现缓存行处于无效状态,就会重新从主内存中将这个变量加载到它们各自的缓存中。
可见性是保障多线程操作中数据一致性和结果正确性的基石,多线程环境下影响变量可见性的因素:
- 指令重排。
- 线程调度(切换)。
- 工作内存和主内存没有及时刷新。
JVM 保障可见性的方法:final、volatile、锁。
-
原子性
原子操作是指一个操作不会被线程调度机制打断,一旦开始,就一直运行到结束,中间不会有任何线程切换。
原子性可以保障读取到的某个属性的值是由一个线程写入的。 变量不会在同一时刻受到多个线程同时写入造成干扰。
基本类型的单次读写操作是原子的,但是复合操作如:
int i=0;i++
,就是非原子性的。原子性是拒绝多线程操作的,不论是多核还是单核,具有原子性的量,同一时刻只能有一个线程来对它进行操作!因此,在线程执行过程中,不允许其他并行线程对该变量进行读取和写入的操作。如果发生竞争,则其他线程必须等待。
使用 volatile 关键字可以保证变量单次操作的原子性,但要想在多线程环境下保证原子性,volatile 却是无法保证复合操作的原子性的,这时就需要通过锁机制、
synchronized
来确保。 -
有序性
由于 CPU 计算速度的发展远远高于内存读写速度,因此出现性能浪费,但即使采用了高速缓存来抵消内存访问时带来的延迟,这种情况仍旧十分突出。因此,为了尽量减少 cache 的等待时间,CPU 采用了指令级并行重排序来提高执行效率,这也被称为 CPU 乱序执行,而有序性则是指程序指令按照预期的顺序执行而非乱序执行。
虽然处理器会对指令进行重排序,但是它会保证程序最终结果会和代码顺序执行结果相同,这是因为处理器在进行重排序时是会考虑指令之间的数据依赖性,这样优化了效率,而且对单个线程的执行结果也不会产生影响。
但是,在多线程环境下,指令重排序就无法保障执行结果的正确性了,这时就需要使用到 synchronized 关键字和 Lock 接口,它们都可以保证了多线程环境下线程间操作的有序性:
- synchronized 关键字是由“一个变量在同一时刻只允许一条线程对其进行锁操作”这条规则获得的,这条规则决定了持有同一个锁的两个同步块只能串行地进入。
- Lock 接口的作用域 synchronized 关键字相似,但它提供了更加广泛的锁定操作。
volatile 关键字也是可以保证有序性的,因为它禁止了指令重排序。
关于三大特性,这篇文章也讲得很好:【高并发】如何解决可见性和有序性问题?
指令重排结果的正确性
as-if-serial 语义
as-if-serial
是指所有的指令都可以为了优化而被重排序,但是必须保证最终执行的结果和重排序之前的结果是一致的,其主要遵守的规则是重排序不破坏数据间的依赖关系。
as-if-serial 有效地保证了单线程环境下指令重排序执行结果的正确性。
happens-before 原则
先行发生原则(happens-before
)是 JMM
用来规定两个操作之间的偏序关系,这两个操作是可以跨线程的。happens-before 中确定了8条规则,如果两个操作之间的关系可以从其中推导出来,就说明这两个操作是有序的。
happens-before 并不限制指令重排,但是需要保证指令重排结果与按 happens-before 关系来执行的结果一致。happens-before 原则保证了前后两个操作间不会被重排序且后者对前者的内存是可见的。
happens-before 的八条原则:
-
程序次序规则(Program Order Rule)
在一个线程内,按照控制流顺序,书写在前面的操作先行发生于书写在后面的操作。
注意,这里说的是控制流顺序而不是程序代码顺序,因为要考虑到分支、循环等结构。
-
管程锁定规则(Monitor Lock Rule)
一个 unlock 操作先行发生于后面对同一个锁的 lock 操作。
这里必须强调的是“同一个锁”,而“后面”指的是时间上的先后顺序。
-
volatile 变量规则(Volatile Variale Rule)
对于一个 volatile 变量的写操作先行发生于这个变量的读操作。
这里的“后面”同样是指时间上的先后。
-
线程启动规则(Thread Start Rule)
Thread 对象是
start()
方法先行发生于此线程的每一个动作。 -
线程终止规则(Thread Termination Rule)
线程中的所有操作都先行发生于对此线程的中止检测。
我们可以通过
Thread::join()
方法是否结束、Thread::isAlive()
的返回值等手段检测线程是否已经终止执行。 -
线程中断规则(Thread Interruption Rule)
对线程的 interrupt() 方法的调用先行发生于被中断线程的代码检测到中断事件的发生。
可以通过
Thread::interrupt()
检测到是否有中断发生。 -
对象终结规则(Finalizer Rule)
一个对象的初始化完成(构造函数执行结束)先行发生于它的 finalize() 方法的开始。
-
传递性(Transitivity)
如果操作A先行发生于操作B,操作B先行发生于操作C,那么就可以得出操作A先行发生于操作C的结论。
这一条规则是建立在前面7条规则基础上,满足任意一条即可使用。
锁
锁与 synchronized 同步块一样,是一种线程同步机制。自 Java5 开始, java.util.concurrent.locks
包提供了另一种方式实现线程同步机制—— Lock 。
死锁、活锁和饥饿
-
死锁
多个线程相互占用对方的资源的锁,而又相互等对方释放锁,此时若无外力干预,这些线程则一直处理阻塞的假死状态,形成死锁。
死锁产生的四个必要条件:
- 互斥性:线程对资源的占有是排他性的,一个资源只能被一个线程占有,直到释放。
- 请求和保持条件:一个线程对请求被占有资源发生阻塞时,对已经获得的资源不释放。
- 不剥夺:一个线程在释放资源之前,其他的线程无法剥夺占用。
- 循环等待:发生死锁时,线程进入死循环,永久阻塞。
死锁的原因:
-
竞争不可抢占性资源。
-
竞争可消耗资源引起死锁。
-
进程推进顺序不当。
进程在运行过程中,请求和释放资源的顺序不当,也会导致产生进程死锁。
避免死锁的方法:
- 破坏“请求和保持”条件。
- 破坏“不可抢占”条件。
- 破坏“循环等待”条件。
-
活锁
活锁与死锁正好相反,它是指多个线程相互拿到对方资源的锁之后,却又相互谦让,都主动将资源释放给别的线程使用,这样这个资源在多个线程之间跳动而又得不到执行,这就是活锁。
-
饥饿
对于不同优先级的多个线程,优先级高的线程能够插队并优先执行,但如果优先级高的线程一直抢占(或某一线程一直独占)优先级低线程的资源,就会导致优先级低线程无法得到执行,这就是饥饿。
死锁程序示例:
public class DeadLockDemo {
public static void main(String[] args) {
Object lockA = new Object();
Object lockB = new Object();
new Thread(new Runnable() {
@Override
public void run() {
String name = Thread.currentThread().getName();
synchronized (lockA) {
System.out.println(name + " got lockA, want LockB");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lockB) {
System.out.println(name + " got lockB");
System.out.println(name + ": say Hello!");
}
}
}
}, "线程-A").start();
new Thread(new Runnable() {
@Override
public void run() {
String name = Thread.currentThread().getName();
synchronized (lockB) {
System.out.println(name + " got lockB, want LockA");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lockA) {
System.out.println(name + " got lockA");
System.out.println(name + ": say Hello!");
}
}
}
}, "线程-B").start();
}
}
公平锁与非公平锁
公平锁是指多个线程在等待同一个锁时,必须按照申请锁的先后顺序来一次获得锁。
相反的非公平锁是指多个线程获取锁的顺序并不是按照申请锁的顺序,有可能后申请的线程比先申请的线程优先获取锁。
公平锁的好处是等待锁的线程不会饿死,但是整体效率相对低一些;非公平锁的好处是整体效率相对高一些,但是有些线程可能会饿死或者说很早就在等待锁,但要等很久才会获得锁。其中的原因是公平锁是严格按照请求锁的顺序来排队获得锁的,而非公平锁是可以抢占的,即如果在某个时刻有线程需要获取锁,而这个时候刚好锁可用,那么这个线程会直接抢占,而这时阻塞在等待队列的线程则不会被唤醒。
对于 Java ReentrantLock
而言,通过构造函数可指定该锁是否是公平锁,默认是非公平锁,例如:new ReentrantLock(true)
为公平锁。
Synchronized 也是一种非公平锁,由于其并不像 ReentrantLock
是通过 AQS 的来实现线程调度,所以并没有任何办法使其变成公平锁。
共享锁与排他锁
共享锁(Share Lock)又称读锁,是读取操作创建的锁。其他用户可以并发读取数据,但任何事务都不能对数据进行修改(获取数据上的排他锁),直到已释放所有共享锁。
如果事务T对数据A加上共享锁后,则其他事务只能对A再加共享锁,不能加排他锁。获准共享锁的事务只能读数据,不能修改数据。
当使用 SELECT ... LOCK IN SHARE MODE;
进行查询时,Mysql 会对查询结果中的每行都加共享锁,当没有其他线程对查询结果集中的任何一行使用排他锁时,可以成功申请共享锁,否则会被阻塞。其他线程也可以读取使用共享锁的表,而且这些线程读取的是同一个版本的数据。
排他锁(Exclusive Lock)又称写锁,如果事务T对数据A加上排他锁后,则其他事务不能再对A加任任何类型的封锁。获准排他锁的事务既能读数据,又能修改数据。
当使用 SELECT ... FOR UPDATE
进行查询时,Mysql 会对查询结果中的每行都加排他锁,当没有其他线程对查询结果集中的任何一行使用排他锁时,可以成功申请排他锁,否则会被阻塞。
乐观锁与悲观锁
乐观锁与悲观锁不是指具体的什么类型的锁,而是指看待并发同步的角度(态度)。
悲观锁
悲观锁指的是对数据被外界(包括本系统的其他事务和外部系统的事务)修改持保守态度,因此,在整个数据处理过程中,将数据处于锁定状态。
当我们要对一个数据库中的一条数据进行修改的时候,为了避免同时被其他人修改,最好的办法就是直接对该数据进行加锁以防止并发。这种借助数据库锁机制在修改数据之前先锁定,再修改的方式被称之为悲观并发控制(又名“悲观锁”,Pessimistic Concurrency Control,缩写“PCC”)。如果一个事务执行的操作对某行数据应用了锁,那只有当这个事务把锁释放,其他事务才能够执行与该锁冲突的操作。
悲观并发控制主要用于数据争用激烈的环境,以及发生并发冲突时使用锁保护数据的成本要低于回滚事务的成本的环境中。
悲观锁的实现,往往依靠数据库提供的锁机制,在数据库中,悲观锁的流程如下:
- 在对任意记录进行修改前,先尝试为该记录加上排他锁(exclusive locking)。
- 如果加锁失败,说明该记录正在被修改,那么当前查询可能要等待或者抛出异常。
- 如果成功加锁,那么就可以对记录做修改,事务完成后就会解锁。
- 其间如果有其他对该记录做修改或加排他锁的操作,都会等待我们解锁或直接抛出异常。
在 MySQL InnoDB 中使用悲观锁:
要使用悲观锁,我们必须关闭 MySQL 数据库的自动提交属性,因为 MySQL 默认使用 autocommit 模式,也就是说,当你执行一个更新操作后,MySQL 会立刻将结果进行提交,关闭自动提交属性:
set autocommit=0;
。
-- 0.开始事务
begin;-- 或 begin work; 或 start transaction;
-- 1.查询出商品信息
select status from t_goods where id = 1 for update;
-- 2.根据商品信息生成订单
insert into t_orders (id, goods_id) values (null, 1);
-- 3.修改商品status为2
update t_goods set status = 2;
-- 4.提交事务
commit;-- 或 commit work;
这种使用 select ... for update
的方式,通过开启排他锁的方式实现悲观锁是一种比较典型的悲观锁策略,如果出现并发修改操作,其他事务必须等待本次事务提交之后才能执行,这样就保证了当前数据不会被其他事务修改。
注意:使用
select ... for update
会将数据锁住,但是我们需要注意一些锁的级别,MySQL InnoDB 默认使用行级锁,行级锁都是基于索引的,如果一条SQL语句用不到索引是不会使用行级锁的,就会使用表级锁把整张表锁住。
悲观锁的优缺点:
悲观并发控制实际上是“先取锁再访问”的保守策略,为数据处理的安全提供了保证。但是在效率方面,处理加锁的机制会让数据库产生额外的开销,还有增加产生死锁的机会;另外,在只读型事务处理中由于不会产生冲突,也没必要使用锁,这样做只能增加系统负载;还有会降低了并行性,一个事务如果锁定了某行数据,其他事务就必须等待该事务处理完才可以处理那行数。
乐观锁
乐观并发控制(又名“乐观锁”,Optimistic Concurrency Control,缩写“OCC”)。
乐观锁是相对悲观锁而言的,乐观锁假设认为数据一般情况下不会造成冲突,所以在数据进行提交更新的时候,才会正式对数据的冲突与否进行检测,如果发现冲突了,则让返回用户错误的信息,让用户决定如何去做。
相对于悲观锁,乐观锁在对数据库进行处理的时候,并不会使用数据库提供的锁机制。一般的实现乐观锁的方式就是记录数据版本。
数据版本:为数据增加的一个版本标识。当读取数据时,将版本标识的值一同读出,数据每更新一次,同时对版本标识进行更新。当我们提交更新的时候,判断数据库表对应记录的当前版本信息与第一次取出来的版本标识进行比对,如果数据库表当前版本号与第一次取出来的版本标识值相等,则予以更新,否则认为是过期数据。
实现数据版本有两种方式,第一种是使用版本号,第二种是使用时间戳。
使用版本号实现乐观锁:
-- 1.查询出商品信息
select (status, status, version) from t_goods where id=1
-- 2.根据商品信息生成订单
-- 3.修改商品status为2
update t_goods set status=2,version=version+1 where id=1 and version=#{version};
但是,使用乐观锁时,还需要注意进行粒度控制,如果并发率较高,则应当优化查询,减小乐观锁的粒度。
乐观锁的优缺点:
乐观并发控制相信事务之间的数据竞争的概率是比较小的,因此尽可能直接做下去,直到提交的时候才去锁定,所以不会产生任何锁和死锁。
乐观锁与悲观锁的选择
对于乐观锁与悲观锁,需要根据实际的业务场景进行选择:
- 乐观锁并未真正加锁,效率高。一旦锁的粒度掌握不好,更新失败的概率就会比较高,容易发生业务失败。
- 悲观锁依赖数据库锁,效率低。更新失败的概率比较低。
随着互联网三高架构的提出,悲观锁在生产环境中的应用频率越来越低。
阻塞锁与非阻塞锁
-
阻塞锁
多个线程同时调用同一个方法时,所有线程都将排队等待。没有获得锁的线程进入阻塞状态进行等待,等待的线程在获得响应的信号(唤醒或时间)时,才可以进入线程的准备就绪状态,准备就绪状态的所有线程,通过竞争,进入运行状态。
// 阻塞锁示例 public class Lock { private boolean isLocked = false; public synchronized void lock() throws InterruptedException { while(isLocked) { // 当其他线程进来,即处于等待阻塞状态 wait(); } isLocked = true; } public synchronized void unlock() { isLocked = false; notify(); } }
但是,当被调用的方法耗时较长且等待线程较多的时候,线程等待的时间就会很长,对于一些要求响应时间的系统来说,这种情况是不能容忍的,因此,需要让线程进入非阻塞状态,线程在未获取到锁时立刻返回,并告知用户稍后重试。
-
非阻塞锁
当多个线程竞争同一把锁时,其中某一线程成功获得锁,其他线程判断未获取到锁,则直接返回。
// 非阻塞锁示例 public class Lock { private boolean isLocked = false; public synchronized boolean lock() throws InterruptedException { if(isLocked) { // 当没有拿到锁,立即返回,线程不阻塞 return false; } isLocked = true; return true; } public synchronized void unlock() { isLocked = false; } }
自旋锁、互斥锁与读写锁
自旋锁
在 Java 中,自旋锁是指尝试获取锁的线程不会立即阻塞,而是采用循环的方式去尝试获取锁,这样的好处是减少线程上下文切换的消耗,缺点是循环会消耗 CPU。
JDK6 中已经变为默认开启自旋锁,并且引入了自适应的自旋锁。自适应意味着自旋的时间不在固定了,而是由前一次在同一个锁上的自旋时间及锁的拥有者的状态来决定。
自旋是在轻量级锁中使用的,在重量级锁中,线程不使用自旋。
互斥锁
相交进程之间的关系主要有两种:同步和互斥。
- 互斥,是指某一资源同时只允许一个访问者对其进行访问,具有唯一性和排它性。但互斥无法限制访问者对资源的访问顺序,即访问是无序的。
- 同步,是指在互斥的基础上(大多数情况),通过其它机制实现访问者对资源的有序访问。在大多数情况下,同步已经实现了互斥,特别是所有写入资源的情况必定是互斥的。少数情况是指可以允许多个访问者同时访问资源。
所谓互斥锁就是指一次最多只能有一个线程持有的锁。在 JDK 中 synchronized 和 JUC 的 Lock 就是互斥锁。
互斥锁的特点:一次只能一个线程拥有互斥锁,其他线程只有等待。
读写锁
读写锁实际是一种特殊的自旋锁,它把对共享资源的访问者划分成读者和写者,读者只对共享资源进行读访问,写者则需要对共享资源进行写操作。这种锁相对于自旋锁而言,能提高并发性,因为在多处理器系统中,它允许同时有多个读者来访问共享资源,最大可能的读者数为实际的逻辑 CPU 数。写者是排他性的,一个读写锁同时只能有一个写者或多个读者(与CPU数相关),但不能同时既有读者又有写者。
在读写锁保持期间也是抢占失效的。如果读写锁当前没有读者,也没有写者,那么写者可以立刻获得读写锁,否则它必须自旋在那里,直到没有任何写者或读者;如果读写锁没有写者,那么读者可以立即获得该读写锁,否则读者必须自旋在那里,直到写者释放该读写锁。
类锁与对象锁
类锁
类锁是加持在类上的,使用 synchronized static
或者 synchronized(Xxx.class)
方法使用的锁都是类锁,因为 class 和静态方法在系统中只会产生一份,所以在单系统环境中使用类锁是线程安全的。
public class TestLock {
public synchronized static void methodName1() {
System.out.println("类锁方式一");
}
public void methodName2() {
synchronized (TestLock.class) {
System.out.println("类锁方式二");
}
}
}
对象锁
synchronized 修饰非静态的方法和 synchronized(this)
使用的都是对象锁,一个系统可以有多个对象实例,所以使用对象锁不是线程安全的,除非保证一个系统该类型的对象只会创建一个(通常使用单例模式)才能保证线程安全。
单例模式保证对象安全:
// 单例模式-双重检查锁
public class TestLock {
private static TestLock testLock = null;
// 私有化构造函数不允许new来产生对象
private TestLock() {
}
// 保证一个系统只会创建一个对象实例
public synchronized static TestLock getInstance() {
if (testLock == null) {
synchronized (TestLock.class) {
if (testLock == null) {
testLock = new TestLock();
}
}
}
return testLock;
}
}
内置锁与显式锁
在 Java 并发编程中,可以将锁分为两类:内置锁(隐式锁,sychronized)和显式锁(lock)。
所谓的显示和隐式就是在使用的时候,使用者要不要手动写代码去获取锁和释放锁的操作。
内置锁的使用:
// 同步普通方法,其锁对象是:this
public synchronized void add(int t) {
this.v += t;
}
// 同步静态方法,其锁对象是:当前类的Class对象
public static synchronized void sub(int t) {
this.v -= t;
}
// 同步代码块,其锁对象是:synchronized所修饰的obj
public int decrementAndGet() {
synchronized(obj) {
return --v;
}
}
显式锁的使用:
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
lock.lock();// 手动获取锁
try {
// ...
} finally {
lock.unlock();// 手动释放锁
}
}
在使用 lock 的时候,使用者需要手动的获取和释放锁,如果没有释放锁,就有可能导致出现死锁的现象,通常需要配合 try/finally
语句块来完成。
内置锁和显式锁的区别:
-
使用方式不同。
-
等待是否可中断:
synchronized 不可中断,除非抛出异常或者正常运行完成。
lock 可以中断,中断方式:
- 调用设置超时方法
tryLock(long timeout ,timeUnit unit)
。 - 调用
lockInterruptibly()
、interrupt()
方法 。
- 调用设置超时方法
-
加锁时是否可以公平:
synchronized 是非公平锁。
lock 可以指定是否可以公平,默认为非公平所,指定方式:
new ReentrantLock(true)
,true 表示公平锁。 -
是否可以精确唤醒线程。
synchronized 要么随机唤醒一个线程,要么是唤醒所有等待的线程。
lock 可以用分组来实现唤醒需要唤醒的线程,它可以精确地唤醒线程。
可重入锁与不可重入锁
可重入锁也叫递归锁,是指在外层函数获得锁之后,内层递归函数仍然可以获取到该锁,即线程可以进入任何一个它已经拥有锁的代码块。在JAVA环境下 ReentrantLock 和 synchronized 都是可重入锁。可重入锁最大的作用是避免死锁。其实现原理是通过为每个锁关联一个请求计数器和一个占有它的线程,当计数为 0 时,认为锁是未被占有的;线程请求一个未被占有的锁时,JVM 将记录锁的占有者,并且将请求计数器置为 1 。如果同一个线程再次请求这个锁,计数器将递增;每次占用线程退出同步块,计数器值将递减。直到计数器为 0 时锁被释放。
可重入锁在释放锁前,线程可再次进入锁方法,通过这种方式,可以保证在递归的环境下,不会出现死锁的现象。
不可重入锁(自旋锁),线程在释放锁前不可再次进入锁方法,也就是说,获得一次锁就只能进入一次锁方法,再次进入则需要重新获取锁。
无锁、偏向锁、轻量级锁和重量级锁
Java 中每个对象都可作为锁,锁有四种级别,按照量级从轻到重分为:无锁、偏向锁、轻量级锁和重量级锁,并且,锁只能升级不能降级。
下图来源于网络,请点击放大进行查看:
无锁
无锁是指线程通过无限循环来执行更新操作,如果执行成功就退出循环,如果执行失败(有其他线程更新了值),则继续执行,直到成功为止。CAS 操作就属于无锁。如果从性能的角度来看,无锁状态的性能是非常高的。
偏向锁
引入偏向锁是为了在无多线程竞争的情况下尽量减少不必要的轻量级锁执行路径,因为轻量级锁的获取及释放依赖多次 CAS 原子指令,而偏向锁只需要在置换 ThreadID 的时候依赖一次 CAS 原子指令。
当只有一个线程竞争锁时,我们既不需要阻塞,也不需要自旋,因为只有一个线程在竞争,我们只需要判断该偏向锁中的 ThreadID 是否为当前线程即可。如果是,就执行同步代码,反之,就尝试使用 CAS 修改 ThreadID,修改成功执行同步代码,不成功就将偏向锁膨胀为轻量级锁。
轻量级锁
获取轻量锁的过程与偏向锁不同,竞争锁的线程首先需要拷贝对象头中的 Mark Word
到帧栈的锁记录中。拷贝成功后使用 CAS 操作尝试将对象的 Mark Word 更新为指向当前线程的指针。如果这个更新动作成功了,那么这个线程就拥有了该对象的锁。如果更新失败,那么意味着有多个线程在竞争。
当竞争线程尝试占用轻量级锁失败多次之后(使用自旋)轻量级锁就会膨胀为重量级锁,重量级线程指针指向竞争线程,竞争线程也会阻塞,等待轻量级线程释放锁后唤醒。
轻量级锁的前提假设是对于绝大部分的锁,在整个同步周期内都是不存在竞争的,通过CAS操作来避免时候互斥锁的开销。
重量级锁
重量级锁的加锁、解锁过程和轻量级锁差不多,区别在于:重量级锁在竞争失败后,线程阻塞,释放锁后,唤醒阻塞的线程,不使用自旋锁,它不会那么消耗 CPU,所以适合用在同步块执行时间长的情况下。
Compare and Swap
CAS 是
Compare and Swap
的缩写,意为比较并转换,它是解决多线程并行情况下使用锁造成性能损耗的一种机制,CAS 操作包含三个操作数——内存位置(V)、预期原值(A)和新值(B)。如果内存位置的值与预期原值相匹配,那么处理器会自动将该位置值更新为新值。否则,处理器不做任何操作。无论哪种情况,它都会在 CAS 指令之前返回该位置的值。CAS 有效地说明了“我认为位置V应该包含值A;如果包含该值,则将B放到这个位置;否则,不要更改该位置,只告诉我这个位置现在的值即可。quote from 百度百科_CAS
数据库相关锁机制
分布式锁
分布式锁,是控制分布式系统之间同步访问共享资源的一种方式。 在分布式系统中,常常需要协调他们的动作。如果不同的系统或是同一个系统的不同主机之间共享了一个或一组资源,那么访问这些资源的时候,往往需要互斥来防止彼此干扰来保证一致性,在这种情况下,便需要使用到分布式锁。
quote from 维基百科_分布式锁
CAP 原则:任何一个分布式系统都无法同时满足一致性(Consistency)、可用性(Availability)和分区容错性(Partition tolerance),最多只能同时满足两项。
也就是说单纯的 Java Api 并不能提供分布式锁的能力。目前主流的分布式锁主要有三种:数据库、缓存(Redis等)、Zookeeper 等中间件。
基于数据库的实现
-
基于数据库表
实现分布式锁最简单的方式,就是直接创建一张锁表,然后通过操作该表中的数据来实现。当我们要锁住某个方法或资源时,我们就在该表中增加一条记录,想要释放锁的时候就删除这条记录。
CREATE TABLE `methodLock` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键', `method_name` varchar(64) NOT NULL DEFAULT '' COMMENT '锁定的方法名', `desc` varchar(1024) NOT NULL DEFAULT '备注信息', `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '保存数据时间,自动生成', PRIMARY KEY (`id`), UNIQUE KEY `uidx_method_name` (`method_name`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='锁定中的方法';
当我们想要锁住某个方法时,则执行如下 SQL :
insert into methodLock(method_name, desc) values ('methodName', 'desc');
在这里的示例中,由于对字段 method_name 进行了唯一性约束,所以,当多个请求调用 methodName 方法时,数据库会保证只有一个操作可以成功,那么我们就可以认为操作成功的那个线程获得了该方法的锁,可以执行方法体内容。当方法执行完毕之后,需要释放锁,则执行如下 SQL :
delete from methodLock where method_name = 'methodName';
但这种实现方式存在较大问题:
- 这把锁依赖与数据库的可用性,一旦数据库宕机就会导致业务不可用。
- 这把锁没有失效时间,一旦解锁失败就会导致锁记录一直存在,其他线程无法再次获得锁。
- 这把锁只能是非阻塞的,因为数据的插入操作一旦执行失败就会直接报错,而不会等待锁的释放。
- 这把锁是非重入的,同一线程获得锁后,无法再次获得锁,因为数据库中已存在相同记录。
当然,对于这些问题,也可以有针对性的方案,例如:数据库同步保证数据可用、定时任务防止锁滞留、循环重试使线程等待、记录额外信息实现锁重入。
-
基于数据库排他锁
除了通过增删表记录来实现分布式锁外,还可以借助数据中自带的锁来实现。
在查询语句时增加
for update
,数据库就会在查询的过程中为数据库表增加排他锁,当某条记录被加上排他锁之后,其他线程无法再在该行记录上增加排他锁。我们可以认为获得排它锁的线程即可获得分布式锁,当获取到锁之后,可以执行方法的业务逻辑,执行完方法之后,通过
commit()
提交事务,即可释放锁。这里特指 Mysql 的 InnoDB 引擎,只有通过索引进行检索的时候才会使用行级锁,否则会使用表级锁,如果希望使用行级锁,则需要给对应的字段添加唯一索引。
相比增删表记录的方式实现分布式锁,这种方法可以有效地解决无法释放锁和锁阻塞的问题。
但这里还可能存在另外一个问题,虽然我们对 method_name 使用了唯一索引,并且显示使用 for update 来使用行级锁。但是,MySql 会对查询进行优化,即便在条件中使用了索引字段,但是否使用索引来检索数据是由 MySQL 通过判断不同执行计划的代价来决定的,如果 MySQL 认为全表扫效率更高,比如对一些很小的表,它就不会使用索引,这种情况下 InnoDB 将使用表锁,而不是行锁。
此外,我们要使用排他锁来进行分布式锁的 lock,那么一个排他锁长时间不提交,就会占用数据库连接。一旦类似的连接变得多了,就可能把数据库连接池撑爆。
基于缓存的实现
这里 redis 为例。
使用 redis 缓存实现分布式锁,需要使用到如下命令:
-
setnx
setnx key value
当且仅当 key 不存在时,将 key 设置为 value 字符串,若设置成功,则返回 1 。若 key 已存在,则不进行操作直接返回 0 。 -
expire
expire key timeout
为 key 设置一个超时时间,单位为 second ,超过这个时间会自动释放,可以避免死锁。 -
delete
delete key
删除 key 。
redis 分布式锁实现的思想:
- 获取锁的时候,使用 setnx 加锁,并使用 expire 命令为锁添加一个超时时间,超过该时间则自动释放锁,锁的 value 值为一个随机生成的 UUID,通过此在释放锁的时候进行判断。
- 获取锁的时候还设置一个获取的超时时间,若超过这个时间则放弃获取锁。
- 释放锁的时候,通过 UUID 判断是不是该锁,若是该锁,则执行 delete 进行锁释放。
可以使用缓存来代替数据库来实现分布式锁,这个可以提供更好的性能,同时,很多缓存服务都是集群部署的,可以避免单点问题。并且很多缓存服务都提供了可以用来实现分布式锁的方法,比如 redis 的 setnx 方法。并且,这些缓存服务也都提供了对数据的过期自动删除的支持,可以直接设置超时时间来控制锁的释放。
但缓存实现分布式锁也有其缺陷,即通过超时时间来控制锁的失效时间并不是十分靠谱。
基于 Zookeeper 的实现
[❗TODO]{.label .danger} 由于还不了解 zookeeper ,等详细了解之后再进行补充。
monitor
Monitor 其实是一种同步工具,或者说是同步机制,它通常被描述成一个对象,主要特点是:
- 同步:对象内的所有方法都互斥的执行。同一个 Monitor 只有一个运行许可,任何线程进入任何一个方法都需要获取该许可,并在离开时归还。
- 协作:Monitor 通常提供
signal
机制,允许正持有许可的线程暂时放弃许可,等待某个监视条件成立后,当前线程就可以通知正在等待这个条件的线程,让它可以重新获得运行许可。
在 Monitor Object 模式中,主要有四种类型的参与者:
- 监视者对象(Monitor Object):负责公共的接口方法,这些公共的接口方法会在多线程的环境下被调用执行。
- 同步方法:这些方法是监视者对象所定义的,它保证在任一事件内只有一个同步方法能够执行。
- 监控锁(Monitor Lock):每一个监视者对象都会拥有一把监视锁。
- 监控条件(Monitor Condition):同步方法使用监控锁和监控条件来决定方法是否需要阻塞或重新执行。
实际上 Java Object 类本身就是监视者对象,Java 对于 Monitor Object 模式做了内建的支持。每个 Object 都带了一把看不见的锁,通常称为内部锁、 Monitor 锁,或者 Instrinsic Lock,这把锁就是监控锁,并且通过 wait()
、notify()
、notifyAll()
方法构成监控条件。
关于 wait、notify、notifyAll 将在本文后续部分进行更加详细的说明。
锁的优化
关于锁的优化的方法和思路,主要可以从两个层面进行,其一是在虚拟机层面对锁进行优化,它主要是在偏向锁、轻量级锁和重量级锁方面进行优化。而另一层面则是在开发者可控的代码层面进行优化,主要有以下几种思路和方法:
-
减少锁持有的时间。
一个线程持有锁的时间越长,其他竞争线程等待的时间就越长,因此,为了提高锁的性能,只需要在必要时才进行同步,这样就能明显减小线程持有锁的时间。
优化前的同步代码块:
public synchronized void syncMethod() { // ... 其他代码片段 mutextMethod(); // ... 其他代码片段 }
优化后的同步代码块:
public void syncMethod() { // ... 其他代码片段 synchronized(this) { mutextMethod(); } // ... 其他代码片段 }
-
减小锁粒度。
将大对象(这个对象可能会被很多线程访问),拆成小对象,大大增加并行度,降低锁竞争。降低了锁的竞争,偏向锁,轻量级锁成功率才会提高。
最典型的减小锁粒度的案例就是
ConcurrentHashMap
,他只锁住了 Hash 桶中的某一个桶,而不像 HashTable 一样锁住整个对象。 -
锁分离。
最常见的锁分离就是读写锁 ReadWriteLock,根据功能进行分离成读锁和写锁,这样读读不互斥,读写互斥,写写互斥,即保证了线程安全,又提高了性能。
读写分离思想可以延伸,只要操作互不影响,锁就可以分离。
-
锁粗化。
为了保证多线程的执行效率,通常情况下会要求每个线程持有锁的时间尽量短,一旦使用完公共资源后,就应当立即释放锁。但是,也应当考虑不同的场景,如果一个线程对于同一个锁不断进行请求、同步和释放操作,其本身也会消耗宝贵的资源。
优化前的同步代码块:
public void syncMethod() { for (int i = 0; i < len; i++) { synchronized(lock) { // 频繁获取锁 // ... do sth. } } }
优化后的同步代码块:
public void syncMethod() { synchronized(lock) { // 获取锁频率减小 for (int i = 0; i < len; i++) { // ... do sth. } } }
注意:锁粗化的操作,需要根据实际情况进行考虑,被纳入同一个锁进行处理的多个操作,应当尽量保证它们能够在较短的时间内执行完毕,否则就会导致其他竞争线程等待时间过长。
-
锁消除。
锁消除是在编译器级别进行的,在即时编译时,如果发现不可能被共享的对象,则可以消除这些对象的锁操作。
这些在即时编译时出现的不可能被共享的对象,不一定是我们在编写程序时为其进行了加锁的操作,也有可能是在 JDK 的实现中就包含有加锁的操作,比如
Vector
和StringBuffer
类中的很多方法都是加锁的。当我们在一些不会有线程安全的情况下使用这些类的方法时,达到某些条件时,编译器会将锁消除来提高性能。
synchronized
synchronized 是 Java 中的关键字,在上文中介绍锁的相关概念时已经反复使用到,它是一种同步锁。它修饰的对象主要有以下几种:
- 修饰代码块,被修饰的代码块称为同步语句块,其作用的范围是大括号括起来的代码,作用的对象是调用这个代码块的对象。
- 修饰方法,被修饰的方法称为同步方法,其作用的范围是整个方法,作用的对象是调用这个方法的对象。
- 修饰静态方法,其作用的范围是整个静态方法,作用的对象是这个类的所有对象。
- 修饰类,其作用的范围是synchronized后面括号括起来的部分,作用主的对象是这个类的所有对象。
synchronized 的底层实现
针对一个 synchronized 修饰的代码块进行反编译,得到如下结果:
可以观察到,synchronized
关键字在经过 Javac 编译之后,会在同步块的前后形成 monitorenter
和 monitorexit
两个字节码指令。
根据《Java虚拟机规范》的要求:
- 在执行
monitorenter
指令时,首先要去尝试获取对象的锁(获取对象锁的过程,其实是获取 monitor 对象的所有权的过程)。 - 如果这个对象没被锁定,或者当前线程已经持有了那个对象的锁,就把锁的计数器的值增加一。
- 而在执行
monitorexit
指令时会将锁计数器减一,一旦计数器的值为零,锁随即就被释放了。 - 如果获取对象锁失败,那当前线程就应当被阻塞等待,直到请求锁定的对象被持有它的线程释放为止。
由此可以看出 synchronized 的实现原理:synchronized 的语义底层是通过一个 monitor 的对象来完成,其实 wait/notify 等方法也依赖于 monitor 对象,这就是为什么只有在同步的块或者方法中才能调用 wait/notify 等方法,否则会抛出 java.lang.IllegalMonitorStateException
的异常的原因。
synchronized 修饰的其他对象也是同理,尽管它们有可能并不是显式地调用 monitorenter 和 monitorexit 指令。
反编译文件之所以有两个 monitorexit 指令,是因为它们分别用于正常退出和异常退出。
synchronized 与 lock 的关系
参考上文
锁 >> 内置锁与显式锁
部分。
不使用 synchronized 如何实现一个线程安全的单例
借助 CAS(AtomicReference)实现单例模式:
public class Singleton {
private static final AtomicReference<Singleton> INSTANCE = new AtomicReference<>();
public Singleton() {}
public static Singleton getInstance() {
for (;;) {
Singleton instance = INSTANCE.get();
if (null != instance) {
return instance;
}
instance = new Singleton();
if (INSTANCE.compareAndSet(null, instance)) {
return instance;
}
}
}
}
用 CAS 的好处在于不需要使用传统的锁机制来保证线程安全,CAS 是一种基于忙等待的算法,依赖底层硬件的实现,相对于锁它没有线程切换和阻塞的额外消耗,可以支持较大的并行度。
CAS 的一个重要缺点在于如果忙等待一直执行不成功(一直在死循环中),会对 CPU 造成较大的执行开销。
synchronized 和原子性、可见性和有序性之间的关系
-
synchronized 与原子性
在 Java 中,为了保证原子性,提供了两个高级的字节码指令
monitorenter
和monitorexit
。这两个字节码指令,在 Java 中对应的关键字就是 synchronized 。通过 monitorenter 和 monitorexit 指令,可以保证被 synchronized 修饰的代码在同一时间只能被一个线程访问,在锁未释放之前,无法被其他线程访问到。因此,在Java中可以使用 synchronized 来保证方法和代码块内的操作是原子性的。
-
synchronized 与可见性
被 synchronized 修饰的代码,在开始执行时会加锁,执行完成后会进行解锁。而为了保证可见性,有这样一条规则:对一个变量解锁之前,必须先把此变量同步回主存中。这样解锁后,后续线程就可以访问到被修改后的值。
所以,synchronized 关键字锁住的对象,其值是具有可见性的。
-
synchronized 与有序性
由于 synchronized 修饰的代码,同一时间只能被同一线程访问。那么也就是单线程执行的。所以,可以保证其有序性。
volatile
volatile
通常被比喻成“轻量级的 synchronized
”,也是 Java 并发编程中比较重要的一个关键字。和 synchronized 不同,volatile 是一个变量修饰符,只能用来修饰变量,无法修饰方法及代码块等。
volatile 的用法比较简单,只需要在声明一个可能被多线程同时访问的变量时,使用 volatile 修饰即可:
public class Singleton {
private volatile static Singleton singleton;
private Singleton () {}
public static Singleton getSingleton() {
if (singleton == null) {
synchronized (Singleton.class) {
if (singleton == null) {
singleton = new Singleton();
}
}
}
return singleton;
}
}
volatile 的实现原理
对于 volatile 变量,当对 volatile 变量进行写操作的时候,JVM 会向处理器发送一条 lock 前缀的指令,将这个缓存中的变量回写到系统主存中。而在多处理器下,但单纯写回到内存也并不能使其他处理器的缓存数据得到更新,为了保证各个处理器的缓存是一致的,就会实现缓存一致性协议。
缓存一致性协议
在前文中已有详细说明,此处不再赘述。
总之,如果一个变量被 volatile 所修饰的话,在每次数据变化之后,其值都会被强制刷入主存。而其他处理器的缓存由于遵守了缓存一致性协议,也会把这个变量的值从主存加载到自己的缓存中,这就保证了一个 volatile 在并发编程中,其值在多个缓存中是可见的。
volatile 和原子性、可见性和有序性之间的关系
-
volatile 与原子性
要保证原子性,就需要通过字节码指令 monitorenter 和 monitorexit,但是 volatile 和这两个指令之间是没有任何关系的。所以,volatile 是不能保证原子性的。
只有在部分场景中,可以使用 volatile 来代替 synchronized:
- 运算结果并不依赖变量的当前值,或者能够确保只有单一的线程会修改变量的值。
- 变量不需要与其他状态变量共同参与不变约束。
-
volatile 与可见性
Java 中的 volatile 关键字提供了一个功能,那就是被其修饰的变量在被修改后可以立即同步到主内存,被其修饰的变量在每次使用之前都从主内存刷新。因此,可以使用 volatile 来保证多线程操作时变量的可见性。
关键词:缓存一致性协议,总线嗅探机制。
-
volatile 与有序性
volatile 可以禁止指令重排,这就保证了代码的程序会严格按照代码的先后顺序执行,这就保证了有序性。
关键词:内存屏障。
内存屏障
内存屏障(Memory Barrier),也称内存栅栏(Memory Fence),内存栅障,屏障指令等,是一类同步屏障指令,是 CPU 或编译器在对内存随机访问的操作中的一个同步点,使得此点之前的所有读写操作都执行后才可以开始执行此点之后的操作。
内存屏障要求达到当前执行点之前的所有操作,必须与主内存进行同步,才能够继续执行,而内存屏障之后的读操作都可以获得同步屏障之前的写操作的结果。因此,对于敏感的程序块,在写操作之后、读操作之前可以插入内存屏障。
通过 volatile 标记,可以解决编译器层面的可见性与重排序问题。而内存屏障则解决了硬件层面的可见性与重排序问题。
JVM 指令重排和 CPU 指令重排
[🔔 FIXME]{.label .warning} 暂未找到相关资料,网上大多都是将 CPU 和编译器的指令重排一并讨论,但并未说明两者区别。可以确定的是,CPU 指令重排是在硬件层面实现的,它在出厂时就已设置完成。而 JVM 指令重排是在 Java 虚拟机层面进行的。两者实现的方式基本相同。
由于缺乏相关资料,这一部分内容有所缺失,如果后续有了更加明确的了解,将会补充。如果读者朋友们有所补充,欢迎留言。
synchronized 与 volatile
synchronized 关键字能保证并发编程中不会出现原子性、可见性和有序性问题,而 volatile 只能保证可见性和有序性。那么,volatile 有什么存在的必要呢?
synchronized 本质上是一种加锁机制,但所有的锁都存在以下缺点:
-
有性能损耗
尽管 JDK 曾对 synchronized 进行了诸如适应性自旋、锁消除、锁粗化、轻量级锁和偏向锁等优化,但它仍旧存在一定的性能损耗。并且,这些方法都是通过避免对 Monitor 进行加锁当方法来控制的,但是并非所有情况都能通过这种方法进行优化,况且,优化行为本身也需要一定的耗时。
总之,加锁和解锁的过程是需要一定的时耗的。而 volatile 变量读操作的性能和普通变量几乎无差别,而 volatile 写操作由于需要插入内存屏障所以会慢一些,但即便如此,volatile 插入内存屏障的开销在大多数场景下也比锁的开销要低。
-
产生阻塞
synchronized 实现的锁本质上是一种阻塞锁,也就是说多个线程要排队访问同一个共享对象。
而 volatile 是 Java 虚拟机提供的一种轻量级同步机制,他是基于内存屏障实现的。说到底,他并不是锁,所以他不会有 synchronized 带来的阻塞和性能损耗的问题。
此外,volatile 的另外一个好处是禁止指令重排,在有些情况下,需要结合使用 volatile 和 synchronized 才能达到最大的性能效果。
线程相关方法
start 与 run
应当使用 Thread
类中的 start()
方法来启动一个线程。
-
start 方法
Java 线程通过 start 方法来启动,通过这个方法实现了多线程的运行,它无须等待 run 方法体代码执行完毕,就可以直接继续执行后续代码。通过 start 方法来启动一个线程,这时这个线程处于就绪(可运行)状态,并没有真正运行,一旦这个线程获得了 CPU 时间片,它才开始执行 run 方法,这里的 run 方法称为线程体,它包含了要执行的这个线程的内容,run 方法运行结束,线程即终止。
-
run 方法
run 方法只是 Thread 类中的一个普通方法,如果直接调用该方法,程序就只会有一个主线程运行,其执行路径只有一条,并且程序要等待 run 方法体执行完毕后才会执行后续代码。如果使用 run 方法来启动线程,它就是同步执行,而非异步,如果这样启动一个线程,那么就失去了线程的意义。
sleep 与 wait
在多线程环境中,Thread.sleep(long)
和 Thread.wait(long)
都可以在程序的调用处将线程阻塞指定毫秒数并返回,但是它们之间也有如下不同:
-
使用限制
使用 sleep 方法可以让让当前线程休眠,时间一到当前线程继续往下执行,在任何地方都能使用,但需要捕获
InterruptedException
异常。而使用 wait 方法则必须放在 synchronized 块里面,同样需要捕获 InterruptedException 异常,并且需要获取对象的锁。
-
使用场景
sleep 一般用于当前线程休眠,或者轮循暂停操作,wait 则多用于多线程之间的通信。
-
所属类
sleep 是 Thread 类的静态本地方法,wait 则是 Object 类的本地方法。
sleep 是让当前线程休眠,不涉及到对象类,也不需要获得对象的锁,所以是线程类的方法。wait 是让获得对象锁的线程实现等待,前提是要楚获得对象的锁,所以是类的方法。
-
释放锁
wait 可以释放当前线程对 lock 对象锁的持有,而 sleep 则不会。
Object lock = new Object(); synchronized (lock) { try { lock.wait(3000L); Thread.sleep(2000L); } catch (InterruptedException e) { e.printStackTrace(); } }
-
线程切换
sleep 会让出 CPU 执行时间且强制上下文切换,而 wait 则不一定,wait 后可能还是有机会重新竞争到锁继续执行的。
notify 与 notifyAll
notify()
和 notifyAll()
方法都是用来唤醒 wait
的线程。
- 如果线程调用了对象的 wait 方法,那么线程便会处于该对象的等待池中,等待池中的线程不会去竞争该对象的锁。
- 当有线程调用了对象的 notifyAll 方法或 notify 方法,被唤醒的的线程便会进入该对象的锁池中,锁池中的线程会去竞争该对象锁。
- 优先级高的线程竞争到对象锁的概率大,假若某线程没有竞争到该对象锁,它还会留在锁池中,唯有线程再次调用 wait 方法,它才会重新回到等待池中。
所谓唤醒线程,其实可以说是将线程由等待池移动到锁池,notify 只会唤醒(移动)某一个线程(具体是哪个由 JVM 决定),notifyAll 则会唤醒(移动)所有等待的线程。
“notify 可能会导致死锁,而 notifyAll 则不会。”注意,这句话并不完全准确,notify 可能导致的问题和死锁十分相似,并非真正意义上的死锁。
notify 可能会导致的问题(可称其为生产者消费者问题):
这里假定有几种对象,分别是:
- 生产者:负责生产资源,生产的资源将放入到公共资源池中。
- 消费者:负责消费资源,消费的资源将从公共资源池中移出。
- 公共资源池:有限容量,负责临时存放资源。
- 等待池:生产者和消费者的临时等待区。
当消费速度大于生产速度时,公共资源池中的资源很快就会被消耗空,此时消费者若是发现公共资源池中没有资源,消费者就会进入等待池中。同样,当生产速度大于消费速度时,公共资源池中的容量很快就会被占满,此时生产者若是发小公共资源池中没有可用空间,生产者就会进入等待池中。
如果以上两种情况都先后出现,那么,等待池中就会同时存在生产者和消费者。
当公共资源池为空时,生产者生产出新的资源,就会通知等待池的对象继续消费,但此时等待池中既有生产者也有消费者,如果使用 notify 通知,那么被唤醒的对象可能仍然是生产者,如果多次唤醒的都是生产者,最终导致生产者过剩,公共资源池占满,那么,此时所有生产者也将进入等待池。这时的情况就是:公共资源池占满,但无人消费,生产者因为资源池占满,也无法继续生产。
同理,当公共资源池占满时,消费者消费一定资源后,资源池出现空闲,想要通知等待池的生产者继续生产,但如果接收到通知的始终都是消费者,那么,公共资源池最终将被耗空,此时等待池的生产者没有被通知继续生产,消费者又没有可继续消费的资源,那么此时就出现了这种情况:公共资源池为空,但无人生产,消费者因为资源池为空,也无法继续消费。
出现上面的问题是因为使用了 notify 通知,它不会识别线程类型,并且只会唤醒某一个线程,但如果使用 notifyAll 来进行通知,将会唤醒所有线程,那么,无论是在资源占满还是资源耗空单情况下,都会有新唤醒的生产者或消费者进行处理,而不会导致生产者和消费者同时罢工的情况。
如何解决生产者与消费者问题?可以参考这篇文章。
ThreadLocal
ThreadLocal
被称为线程变量,意思是 ThreadLocal 中填充的变量属于当前线程,该变量对其他线程而言是隔离的,也就是说该变量是当前线程独有的变量。ThreadLocal 为变量在每个线程中都创建了一个副本,那么每个线程可以访问自己内部的副本变量。
对于同一个 ThreadLocal 所包含的对象,在不同的线程中有不同的副本,且该副本只能由当前线程使用,多线程之间并不共享。
ThreadLocal 提供了线程本地的实例。它与普通变量的区别在于,每个使用该变量的线程都会初始化一个完全独立的实例副本。ThreadLocal 变量通常使用 private static
进行修饰。当一个线程结束时,它所使用的所有 ThreadLocal 相对的实例副本都可被回收。
总的来说,ThreadLocal 适用于变量在线程间隔离而在方法或类间共享的场景。
ThreadLocal 与 synchronized 区别
ThreadLocal<T>
其实是与线程绑定的一个变量。ThreadLocal 和 Synchonized 都用于解决多线程并发访问,但是它们之间却有本质上的区别:
- Synchronized 用于线程间的数据共享,而 ThreadLocal 则用于线程间的数据隔离。
- Synchronized 是利用锁的机制,使变量或代码块在某一时该只能被一个线程访问,而 ThreadLocal 为每一个线程都提供了变量的副本,使得每个线程在某一时间访问到的并不是同一个对象,这样就隔离了多个线程对数据的数据共享。
向 ThreadLocal 存储数据,实际上是存储在它内部的 Map 中,并且 ThreadLocal 会将这个 Map 关联到当前线程。
ThreadLocal 的使用
public class ThreadLocalDemo {
private static ThreadLocal<String> threadLocal = new ThreadLocal<>();
static void doRemove(String str) {
System.out.println(str + " remove " + threadLocal.get() + " from ThreadLocal.");
threadLocal.remove();
}
public static void main(String[] args) throws InterruptedException {
new Thread(() -> {
System.out.println("Set vlaueA into ThreadLocal.");
ThreadLocalDemo.threadLocal.set("value A");
}, "A").start();
Thread.sleep(1000);
new Thread(() -> {
System.out.println("Set vlaueB into ThreadLocal.");
ThreadLocalDemo.threadLocal.set("value B");
doRemove("Thread-B");
System.out.println("ThreadLocal after remove: " + threadLocal.get());
}, "B").start();
}
}
// 执行结果:
// Set vlaueA into ThreadLocal.
// Set vlaueB into ThreadLocal.
// Thread-B remove value B from ThreadLocal.
// ThreadLocal after remove: null
从示例中可以观察到,两个线程之间分别保留有各自线程的 ThreadLocal 副本,线程之间对 ThreadLocal 副本的修改互不影响。
ThreadLocal 原理
可以通过查看 ThreadLoal 相关方法当源码来理解它的实现原理:
-
ThreadLocal 的 set 方法
// ThreadLocal.set() 源码 public void set(T value) { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) map.set(this, value); else createMap(t, value); } ThreadLocalMap getMap(Thread t) { return t.threadLocals; } void createMap(Thread t, T firstValue) { t.threadLocals = new ThreadLocalMap(this, firstValue); }
可以看到,ThreadLocal 的 set 方法,其内部会创建一个
ThreadLocalMap
对象(或从当前线程获取),并将当前线程的作为 key ,将传入的内容作为 value 进行存储。 -
ThreadLocal 的 get 方法
// ThreadLocal.get() 源码 public T get() { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) { ThreadLocalMap.Entry e = map.getEntry(this); if (e != null) { @SuppressWarnings("unchecked") T result = (T)e.value; return result; } } return setInitialValue(); } private T setInitialValue() { T value = initialValue(); Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) map.set(this, value); else createMap(t, value); return value; }
从这里可以看到,ThreadLocal 的 get 方法,实际上是从 ThreadLocal 内部的 ThreadLocalMap 中根据当前线程作为 key 进行取值。
-
ThreadLocal 的 remove 方法
// ThreadLocal.remove() 源码 public void remove() { ThreadLocalMap m = getMap(Thread.currentThread()); if (m != null) m.remove(this); }
ThreadLocal 的 remove 方法,也是直接根据当前线程删除其对应在 ThreadLocalMap 中的数据。
ThreadLocal 底层数据结构
上面提到,ThreadLocal 本质上是将当前线程作为 key ,传入的值作为 value 存储在一个 ThreadLocalMap 中,而 ThreadLocalMap 的部分源码如下:
// ThreadLocalMap 部分源码内容
private Entry[] table;
static class ThreadLocalMap {
static class Entry extends WeakReference<ThreadLocal<?>> {
/** The value associated with this ThreadLocal. */
Object value;
Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}
}
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
table = new Entry[INITIAL_CAPACITY];
int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
table[i] = new Entry(firstKey, firstValue);
size = 1;
setThreshold(INITIAL_CAPACITY);
}
可以看到,ThreadLocalMap 内部使用的是一个继承于 WeakReference 的 Entry 来存储数据的。
ThreadLocal 导致内存泄露
如果 ThreadLocal 使用不当,可能会导致内存泄露。
ThreadLocalMap 是 ThreadLocal 的内部类,它没有实现 Map 接口,而是用独立的方式实现了 Map 的功能,其内部的 Entry 也独立实现。
和 HashMap 的最大的不同在于,ThreadLocalMap 结构非常简单,没有 next 引用,也就是说 ThreadLocalMap 中解决 Hash 冲突的方式并非链表的方式,而是采用线性探测的方式。在 ThreadLocalMap 中,虽然也是用 Entry 来保存结构数据,但是这个 Entry 中 key 只能是 ThreadLocal 对象。而这个 Entry 继承自 WeakReference
(弱引用,生命周期只能存活到下一次GC前),但是只有 key 是弱引用,而 value 是强引用,这就导致 ThreadLocal 在没有外部对象强引用时,当发生 GC 时,使用弱引用的 key 会被回收,而使用强引用的 value 则会存活下来。
当线程没有结束,但是 ThreadLocal 已被回收,就可能导致线程中存在 ThreadLocalMap<null, Object>
的键值对,从而造成内存泄露。
简单来说,ThreadLocal 可能导致内存泄露,是因为 ThreadLocalMap 的 key 使用了弱引用,而 value 使用到是强引用。
使用 ThreadLocal 时,为了防止内存泄露,通常有两种手段:
- 使用完线程共享变量后,显示调用
ThreadLocalMap.remove()
方法来清除线程共享变量。 - JDK 建议 ThreadLocal 定义为
private static
,这样 ThreadLocal 的弱引用问题则不存在了。
ThreadLocal 常见使用场景
ThreadLocal 适用于如下两种场景:
- 每个线程需要有自己单独的实例。
- 实例需要在多个方法中共享,但不希望被多线程共享。
场景一:存储用户 Session
场景二:数据库连接,处理数据库事务
场景三:数据跨层传递
场景四:Spring 使用 ThreadLocal 解决线程安全问题
Spring 采用 Threadlocal 的方式,来保证单个线程中的数据库操作使用的是同一个数据库连接,同时,采用这种方式可以使业务层使用事务时不需要感知并管理 connection 对象,通过传播级别,巧妙地管理多个事务配置之间的切换,挂起和恢复。
Spring 框架里面就是用的 ThreadLocal 来实现这种隔离,主要是在 TransactionSynchronizationManager 这个类里面。
并发包
JUC
(java.util.concurrent)是 Java 并发编程的核心包,这个包的主要结构如下:
- atomic:原子包。基础数据类型的原子操作类型包,对基础类型进行了封装,用于基础类型并发下的原子计算。
- lock:锁包。多线程处理相同数据时,使用锁来保证数据库的准确性的最常用的方式,这个包下是锁相关的类。
- 其他类:JDK 原生中把其他类合并存放,并未区分类包。
并发包详细内容,可以参考Java8 API
同步容器与并发容器
同步容器:通过 synchronized 关键字修饰的容器,保证同一时刻内只有一个线程在使用容器,从而使得容器线程安全。
并发容器:允许多线程同时使用容器,并且保证线程安全。而为了达到尽可能提高并发,JUC 采用了多种优化方式来提高并发容器的执行效率,核心的就是:锁、CAS(无锁)、COW(读写分离)、分段锁。
常用的同步容器:
-
Vector
Vector 和 ArrayList 一样实现了 List 接口,其对于数组的各种操作和 ArrayList 一样,区别在于 Vertor 在可能出现线程不安全的所有方法都用 synchronized 进行了修饰。
-
Stack
Stack 是 Vertor 的子类,Stack 实现的是先进后出的栈。在出栈入栈等操作都进行了 synchronized 修饰。
-
HashTable
HashTable 实现了 Map 接口,它实现的功能 HashMap 基本一致(HashTable 不可存 null,而 HashMap 的键和值都可以存 null)。区别在于HashTable 使用了 synchronized 修饰了方法。
-
Collections 提供的同步集合类
List list = Collections.synchronizedList(new ArrayList())
Set set = Collections.synchronizedSet(new HashSet())
Map map = Collections.synchronizedMap(new HashMap())
Collections 通过代理模式对原本的操作加上了 synchronized 同步,而 synchronized 的同步粒度太大,导致在多线程处理的效率很低。所以在 JDK1.5 的时候推出了并发包下的并发容器,来应对多线程下容器处理效率低的问题。
常用的并发容器:
-
CopyOnWriteArrayList
CopyOnWriteArrayList 相当于实现了线程安全的 ArrayList,它的机制是在对容器有写入操作时,copy 出一份副本数组,完成操作后将副本数组引用赋值给容器。底层是通过 ReentrantLock 来保证同步。但它通过牺牲容器的一致性来换取容器的高并发效率(在 copy 期间读到的是旧数据)。所以不能在需要强一致性的场景下使用。
-
CopyOnWriteArraySet
CopyOnWriteArraySet 和 CopyOnWriteArrayList 原理一样,它是实现了 CopyOnWrite 机制的 Set 集合。
-
ConcurrentHashMap
ConcurrentHashMap 相当于实现了线程安全的 HashMap。其中的key是无序的,并且 key 和 value 都不能为 null。在 JDK8 之前, ConcurrentHashMap 采用了分段锁机制来提高并发效率,只有在操作同一分段的键值对时才需要加锁。到了 JDK8 之后,摒弃了锁分段机制,改为利用 CAS 算法。
-
ConcurrentSkipListMap
ConcurrentSkipListMap 相当于实现了线程安全的 TreeMap。其中的 key 是有序的,并且 key 和 value 都不能为 null。它采用的跳跃表的机制来替代红黑树。为什么不继续使用红黑树呢?因为红黑树在插入或删除节点的时候需要旋转调整,导致需要控制的粒度较大。而跳跃表使用的是链表,利用无锁 CAS 机制实现高并发线程安全。
-
ConcurrentSkipListSet
ConcurrentSkipListSet 和 ConcurrentSkipListMap 原理一样,它是实现了高并发线程安全的 TreeSet。
常用的 Queue 类型:
-
ArrayBlockingQueue
ArrayBlockingQueue 是采用数组实现的有界阻塞线程安全队列。如果向已满的队列继续塞入元素,将导致当前的线程阻塞。如果向空队列获取元素,那么将导致当前线程阻塞。采用 ReentrantLock 来保证在并发情况下的线程安全。
-
LinkedBlockingQueue
LinkedBlockingQueue 是一个基于单向链表的、范围任意的(其实是有界的)、FIFO 阻塞队列。访问与移除操作是在队头进行,添加操作是在队尾进行,并分别使用不同的锁进行保护,只有在可能涉及多个节点的操作才同时对两个锁进行加锁。
-
PriorityBlockingQueue
PriorityBlockingQueue 是一个支持优先级的无界阻塞队列。默认情况下元素采用自然顺序升序排列,也可以自定义类实现
compareTo()
方法来指定元素排序规则。 -
DelayQueue
DelayQueue 是一个内部使用优先级队列实现的无界阻塞队列。同时元素节点数据需要等待多久之后才可被访问。取数据队列为空时等待,有数据但延迟时间未到时超时等待。
-
SynchronousQueue
SynchronousQueue 没有容量,是一个不存储元素的阻塞队列,会直接将元素交给消费者,必须等队列中的添加元素被消费后才能继续添加新的元素。相当于一条容量为 1 的传送带。
-
LinkedTransferQueue
LinkedTransferQueue 是一个有链表组成的无界传输阻塞队列。它集合了 ConcurrentLinkedQueue、SynchronousQueue、LinkedBlockingQueue 等优点。具体机制较为复杂。
-
LinkedBlockingDeque
LinkedBlockingDeque 是一个由链表结构组成的双向阻塞队列。所谓双向队列指的是可以从队列的两端插入和移出元素。
-
ConcurrentLinkedQueue
上面的七种队列类型都是阻塞型,而 ConcurrentLinkedQueue 则是线程安全的无界非阻塞队列,其底层数据结构是使用单向链表实现,入队和出队操作都是使用 CAS 来保证线程安全。
Thread
Thread 类是 Java 中的线程类,它实现了 Runnable 接口,在 Thread 类中,关系到线程运行状态的有以下几个方法:
-
start
用于启动一个线程。
-
run
这个方法不需要用户来调用,当通过 start 方法启动线程后,线程在获得 CPU 时间片时,会自动执行 run 方法。注意,继承 Thread 类必须重写 run 方法,方法体中定义要执行的任务内容。
-
sleep
相当于让线程睡眠,交出 CPU,让 CPU 去执行其他任务,但需要注意,sleep 方法并不会释放锁。
当线程睡眠时间满后,不一定会立即得到执行,因为此时可能 CPU 正在执行其他的任务。所以说调用 sleep 方法相当于让线程进入阻塞状态。
-
yield
调用 yield 方法会让当前线程交出 CPU 权限,让 CPU 去执行其他的线程。它跟 sleep 方法类似,同样不会释放锁。但是 yield 不能控制具体的交出 CPU 的时间,另外,yield 方法只能让拥有相同优先级的线程有获取 CPU 执行时间的机会。
注意,调用 yield 方法并不会让线程进入阻塞状态,而是让线程重回就绪状态,它只需要等待重新获取 CPU 执行时间,这一点是和 sleep 方法不一样的。
-
join
实际上调用 join 方法是调用了 Object 的 wait 方法,wait 方法会让线程进入阻塞状态,并且会释放线程占有的锁,并交出 CPU 执行权限,由于 wait 方法会让线程释放对象锁,所以 join 方法同样会让线程释放对一个对象持有的锁。
-
interrupt
单独调用 interrupt 方法可以使得处于阻塞状态的线程抛出一个异常,因此,它可以用来中断一个正处于阻塞状态的线程;另外,可以通过 interrupt 方法和
isInterrupted()
方法来停止正在运行的线程。 -
stop:已废弃,它是一个不安全的方法。
-
destory:已废弃。
Runnable
Runnable 接口是线程辅助类,仅定义了一个方法 run 方法。
Runnable 的使用方法:
- 实现 Runnable 接口。
- 重写 run 方法。
- 创建 runnable 实例。
- 创建 Thread 实例。
- 将 Runnable 实例放入 Thread 实例中。
- 通过线程实例控制线程的行为,在运行时会调用 Runnable 接口中的 run 方法。
注意:Java 中真正能创建新线程的只有 Thread 类对象,通过实现 Runnable 的方式,最终还是通过 Thread 类对象来创建线程。
Callable
参考前文创建线程的多种方式
部分。
ReentrantLock
参考前文可重入锁与不可重入锁
部分。
ReentrantReadWriteLock
ReentrantReadWriteLock 是 Lock 的另一种实现方式,从前文我们已经知道 ReentrantLock 是一个排他锁,同一时间只允许一个线程访问,而ReentrantReadWriteLock 允许多个读线程同时访问,但不允许写线程和读线程、写线程和写线程同时访问。
参考前文自旋锁、互斥锁与读写锁
部分。
Atomic*
Semaphore
Semaphore
通常被称为信号量,可以用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理的使用资源。
它主要有以下常用方法:
acquire()
:获取一个令牌,在获取到令牌、或者被其他线程调用中断之前线程一直处于阻塞状态。acquire(int permits)
:获取一个令牌,在获取到令牌、或者被其他线程调用中断、或超时之前线程一直处于阻塞状态。acquireUninterruptibly()
:获取一个令牌,在获取到令牌之前线程一直处于阻塞状态(忽略中断)。tryAcquire()
:尝试获得令牌,返回获取令牌成功或失败,不阻塞线程。tryAcquire(long timeout, TimeUnit unit)
:尝试获得令牌,在超时时间内循环尝试获取,直到尝试获取成功或超时返回,不阻塞线程。release()
:释放一个令牌,唤醒一个获取令牌不成功的阻塞线程。hasQueuedThreads()
:等待队列里是否还存在等待线程。getQueueLength()
:获取等待队列里阻塞的线程数。drainPermits()
:清空令牌把可用令牌数置为0,返回清空令牌的数量。availablePermits()
:返回可用的令牌数量。
Semaphore 的实现原理:
Semaphore 提供了两个构造方法,用于对其进行初始化:
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
/**
* permits:信号数量
* fair:是否是公平锁(默认非公平)
*/
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
通过 acquire 方法获取信号时,其内部会调用 AQS 中的 acquireSharedInterruptibly()
方法:
// java.util.concurrent.Semaphore.class
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer.class
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
这个方法会调用 tryAcquireShared()
尝试获取信号,如果没有获取到信号,则会将当前线程加入等待队列并挂起。而 Semaphore 的内部类 NonfairSync 和 FairSync 都对 tryAcquireShared 方法进行了重写:
abstract static class Sync extends AbstractQueuedSynchronizer {
// ...
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 || compareAndSetState(available, remaining))
return remaining;
}
}
// ...
}
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 || compareAndSetState(available, remaining))
return remaining;
}
}
}
这时,公平锁与非公平锁在实现上有所区别,非公平锁直接使用 CAS 来尝试获取信号。而公平锁则会先调用 hasQueuedPredecessors()
方法,判断队列中是否有等待线程,如果队列中没有等待线程,才会使用 CAS 尝试获取信号,如果有则返回 -1,这时 acquire 就会继续执行 doAcquireSharedInterruptibly()
方法:
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);// 1
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {// 2
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) // 3
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
这个方法主要进行了以下逻辑:
- 封装一个 Node 节点,并加入队列尾部。
- 在无限循环中,如果当前节点是头节点,就尝试获取信号。
- 如果不是头节点,在经过节点状态判断后,挂起当前线程。
而 semaphore.release()
释放信号则经历了如下逻辑:
// java.util.concurrent.Semaphore.class
public void release() {
sync.releaseShared(1);
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer.class
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
// java.util.concurrent.Semaphore.class
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
可以看到,释放信号时也是使用 CAS 算法,并且将可用信号 state 加一。
CountDownLatch
CountDownLatch
是一个同步工具类,用来协调多个线程之间的同步,或者说起到线程之间的通信。
CountDownLatch 能够使一个线程在等待另外一些线程完成各自工作之后,再继续执行,它使用一个计数器进行实现,计数器初始值为线程的数量。当每一个线程完成自己任务后,计数器的值就会减一。当计数器的值为 0 时,表示所有的线程都已经完成一些任务,然后在 CountDownLatch 上等待的线程就可以恢复执行接下来的任务。
CountDownLatch 的用法:
-
某一线程在开始运行前等待 n 个线程执行完毕。
通过
new CountDownLatch(n)
来初始化计数器,每当有一个任务线程执行完毕,就执行countdownLatch.countDown()
将计数器减一,当计数器的值变为 0 时,在 CountDownLatch 上await()
的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。
-
实现多个线程开始执行任务的最大并行性。
这里需要强调的是并行性,而不是并发。其做法是初始化一个共享的
new CountDownLatch(1)
,多个线程在开始执行任务前,首先countdownlatch.await()
,当主线程调用countdownLatch.countDown()
时,计算器变为 0 ,多个线程就会被同时唤醒,并开始并行执行任务。有点类似于多个线程进行赛跑,当初始化
new CountDownLatch(1)
时,相当于参与赛跑的运动员(线程)进入预备状态,当执行countdownLatch.countDown()
就相当于发号枪响起,多个运动员(线程)就开始赛跑。
CountDownLatch 的不足:它是一次性的,计算器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当 CountDownLatch 使用完毕后,它不能再次被使用。
CountDownLatch 使用示例:
public class CountdownLatchTest {
public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(3);
final CountDownLatch latch = new CountDownLatch(3);
for (int i = 0; i < 3; i++) {
Runnable runnable = () -> {
try {
System.out.println("子线程【" + Thread.currentThread().getName() + "】开始执行");
Thread.sleep((long) (Math.random() * 10000));
System.out.println("子线程【" + Thread.currentThread().getName() + "】执行完毕");
latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
};
service.execute(runnable);
}
try {
System.out.println("主线程" + Thread.currentThread().getName() + "等待子线程执行完成...");
latch.await();// 阻塞当前线程,等待所有子线程执行完毕
System.out.println("子线程执行完毕。");
System.out.println("主线程【" + Thread.currentThread().getName() + "】开始执行...");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
ConcurrentHashMap
ConcurrentHashMap 通常会与 HashMap 和 HashTable 放在一起进行比较。
在非并发环境下,我们通常使用 HashMap 来作为存储 key/value 的容器,但是,HashMap 在多线程环境下却是存在线程安全问题的,而要处理这个问题,主要有以下几种方式:
-
使用
Collections.synchronizedmap(new HashMap<>())
。它其实是对 HashMap 的一个包装,但是这种方式只适用于并发量较小的情况。它返回一个
SynchronizedMap
对象,而在这个对象中的所有操作,都被synchronized
关键字修饰,并且每个方法获取的都是同一把锁 mutex,因此,所有操作之间是互斥的,从而减少了并发量。同理,对于 List 和 Set,Collections 也提供了类似的方法,参见上文
同步容器与并发容器
部分。 -
使用 HashTable。
HashTable 是一个线程安全的类,它使用 synchronized 来锁住整张 Hash 表来实现线程安全,即每次锁住整张表让线程独占。
-
使用 ConcurrentHashMap。
前两者在实现上均使用了 synchronized 关键字,性能和效率上都明显低于 ConcurrentHashMap,ConcurrentHashMap 允许多个修改操作并发进行,其关键在于使用了锁分离技术。它使用了多个锁来控制对 hash 表的不同部分进行的修改。ConcurrentHashMap 内部使用段(Segment)来表示这些不同的部分,每个段其实就是一个小的 Hashtable,它们有自己的锁。只要多个修改操作发生在不同的段上,它们就可以并发进行。
有些方法需要跨段,比如
size()
和containsValue()
,它们可能需要锁定整个表而而不仅仅是某个段,这需要按顺序(按顺序防止了死锁)锁定所有段,操作完毕后,又按顺序释放所有段的锁。
ConcurrentHashMap 实现原理:
ConcurrentHashMap 使用分段锁技术,将数据分成一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问,能够实现真正的并发访问。
其中,Segment 继承了 ReentrantLock,表明每个段都可以当做一个锁,这样对每个段中的数据需要进行同步操作的话都是使用每个 Segment 容器对象自身的锁来实现的。只有需要对全局进行改变时才会锁定所有的段。
虽然 ConcurrentHashMap 的底层数据结构,和方法的实现细节和 HashMap 大体一致,但两者在类结构上却没有任何关联。
Executors
Executor、Executors、ExecutorService之间的关系:
- Executors 是 JUC 包下的一个类,主要用于提供线程池相关的操作,它提供了一系列工厂方法用于创建线程池,返回的线程池都实现了 ExecutorService 接口。Executors 可用于创建四类线程池,这在前文
四种常见的线程池
部分已有详细说明。 - 而 Executor 则是 Java 线程池的超级接口,它提供一个
void execute(Runnable command)
方法,这个接口我们一般不直接使用,而是用它的继承接口 ExecutorService。 - ExecutorService 接口继承自 Executor ,它有两个实现类,分别是:ThreadPoolExecutor 和 ScheduledThreadPoolExecutor 。
参考
参考内容来源于网络,本文不保证参考链接的长期有效性,以及参考内容的原创性。
- Java工程师成神之路
- https://www.hollischuang.com/archives/934
- https://blog.csdn.net/shimiso/article/details/8964414
- https://blog.csdn.net/weixin_42981419/article/details/86162071
- https://blog.csdn.net/mu_wind/article/details/113806680
- https://www.cnblogs.com/h--d/p/14179144.html
- https://ld246.com/article/1576070925464
- https://www.jianshu.com/p/9c5a7d21c02f
- https://honeypps.com/java/locks-in-java
- https://www.hollischuang.com/archives/1716
- https://www.jianshu.com/p/6fe4bc3374a2
- https://www.hollischuang.com/archives/3928
- https://blog.csdn.net/u010445301/article/details/111322569
- https://my.oschina.net/hosee/blog/639352
著作権声明
本記事のリンク:https://www.chinmoku.cc/dev/java/basic/java-concurrency/
本博客中的所有内容,包括但不限于文字、图片、音频、视频、图表和其他可视化材料,均受版权法保护。未经本博客所有者书面授权许可,禁止在任何媒体、网站、社交平台或其他渠道上复制、传播、修改、发布、展示或以任何其他方式使用此博客中的任何内容。