01 Java 并发

线程状态转换
创建后尚未启动。
可能正在运行,也可能正在等待 CPU 时间片。包含了操作系统线程状态中的 Running 和 Ready。
等待获取一个排它锁,如果其线程释放了锁就会结束此状态。
等待其它线程显式地唤醒,否则不会被分配 CPU 时间片。
进入方法 | 退出方法 |
---|---|
没有设置 Timeout 参数的 Object.wait() 方法 | Object.notify() / Object.notifyAll() |
没有设置 Timeout 参数的 Thread.join() 方法 | 被调用的线程执行完毕 |
LockSupport.park() 方法 | LockSupport.unpark(Thread) |
无需等待其它线程显式地唤醒,在一定时间之后会被系统自动唤醒。
- 调用
Thread.sleep()
方法使线程进入限期等待状态时,常常用“使一个线程睡眠”进行描述。 - 调用
Object.wait()
方法使线程进入限期等待或者无限期等待时,常常用“挂起一个线程”进行描述。
睡眠和挂起是用来描述行为,而阻塞和等待用来描述状态。
阻塞和等待的区别在于,阻塞是被动的,它是在等待获取一个排它锁。而等待是主动的,通过调用
Thread.sleep()
和 Object.wait()
等方法进入。进入方法 | 退出方法 |
---|---|
Thread.sleep() 方法 | 时间结束 |
设置了 Timeout 参数的 Object.wait() 方法 | 时间结束 / Object.notify() / Object.notifyAll() |
设置了 Timeout 参数的 Thread.join() 方法 | 时间结束 / 被调用的线程执行完毕 |
LockSupport.parkNanos() 方法 | LockSupport.unpark(Thread) |
LockSupport.parkUntil() 方法 | LockSupport.unpark(Thread) |
可以是线程结束任务之后自己结束,或者产生了异常而结束。
有三种使用线程的方法:
- 实现 Runnable 接口;
- 实现 Callable 接口;
- 继承 Thread 类。
实现 Runnable 和 Callable 接口的类只能当做一个可以在线程中运行的任务,不是真正意义上的线程,因此最后还需要通过 Thread 来调用。可以说任务是通过线程驱动从而执行的。
時雨:在 《Java 并发核心知识体系精讲》中,参考 Oracle 官方文档,标注实现多线程方式只有两种:实现 Runnable 接口和继承 Thread 类。 两个的区别:实现 Runnable 接口最终调用target.run()
方法,而继承 Thread 类整个run()
方法都被重写。 更准确的说:创建线程只有一种方式那就是构造 Thread 类,而实现线程的执行单元有两种方式。
需要实现 Runnable 接口的
run()
方法,并把 Runnable 实例传给 Thread 类,通过 Thread 调用 start()
方法来启动线程。public class MyRunnable implements Runnable {
public void run() {
// ...
}
}
public static void main(String[] args) {
MyRunnable target = new MyRunnable();
Thread thread = new Thread(target);
thread.start();
}
Thread 内的
run()
方法里会调用 target.run()
方法,target 在构造函数时传入并完成后续初始化:@Override
public void run() {
if (target != null) {
target.run();
}
}
与 Runnable 相比,Callable 可以有返回值,返回值通过 FutureTask 进行封装。
public class MyCallable implements Callable<Integer> {
public Integer call() {
return 123;
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
MyCallable mc = new MyCallable();
FutureTask<Integer> ft = new FutureTask<>(mc);
Thread thread = new Thread(ft);
thread.start();
System.out.println(ft.get());
}
同样也是需要实现
run()
方法,因为 Thread 类也实现了 Runable 接口。这种方式整个 run()
方法都被重写,所以不会调用 target.run()
方法。当调用
start()
方法启动一个线程时,虚拟机会将该线程放入就绪队列中等待被调度,当一个线程被调度时会执行该线程的 run()
方法。public class MyThread extends Thread {
public void run() {
// ...
}
}
public static void main(String[] args) {
MyThread mt = new MyThread();
mt.start();
}
实现接口会更好一些,因为:
- Java 不支持多重继承,因此继承了 Thread 类就无法继承其它类,但是可以实现多个接口;
- 类可能只要求可执行就行,继承整个 Thread 类开销过大;
- 继承 Thread 每次执行任务都要新建一个线程,而通过实现接口可以利用线程池节省开销。
因为线程池内部创建线程也是使用 new Thread(),所以不算新的创建线程方式,观点错误。
Callable 和 FutureTask 本质上也是实现 Runnable 接口,观点错误。

Callable
定时器本质也是使用 new Thread(),所以不算新的创建线程方式,观点错误。
Timer timer = new Timer();
timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
System.out.println(System.currentTimeMillis());
}
}, 1000, 1000);
Executor 管理多个异步任务的执行,而无需显式地管理线程的生命周期。这里的异步是指多个任务的执行互不干扰,不需要进行同步操作。
主要有三种 Executor:
CachedThreadPool
:一个任务创建一个线程;FixedThreadPool
:所有任务只能使用固定大小的线程;SingleThreadExecutor
:相当于大小为 1 的FixedThreadPool
。
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
executorService.execute(new MyRunnable());
}
executorService.shutdown();
}
守护线程是程序运行时在后台提供服务的线程,不属于程序中不可或缺的部分。
当所有非守护线程结束时,程序也就终止,同时会杀死所有守护线程。
main()
属于非守护线程。在线程启动之前使用
setDaemon()
方法可以将一个线 程设置为守护线程。public static void main(String[] args) {
Thread thread = new Thread(new MyRunnable());
thread.setDaemon(true);
}
Thread.sleep(millisec)
方法会休眠当前正在执行的线程,millisec 单位为毫秒。sleep()
可能会抛出 InterruptedException
,因为异常不能跨线程传播回 main() 中,因此必须在本地进行处理。线程中抛出的其它异常也同样需要在本地进行处理。public void run() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
线程礼让,对静态方法
Thread.yield()
的调用声明了当前线程已经完成了生命周期中最重要的部分,可以切换给其它线程来执行。该方法只是对线程调度器的一个建议,而且也只是建议具有相同优先级的其它线程可以运行。public void run() {
Thread.yield();
}
线程优先级为 [0, 10],默认优先级是 5,优先级更高只是让线程争抢到 CPU 时间片的概率更大,并不一定优先级更高的线程一定先被执行。通过调用
setPriority()
和 getPriority()
设置和获取线程优先级。一个线程执行完毕之后会自动结束,如果在运行过程中发生异常也会提前结束。
如何正确停止线程:使用 interrupt 来通知,而不是强制。
通过调用一个线程的
interrupt()
来中断该线程,如果该线程处于阻塞、限期等待或者无限期等待状态,那么就会抛出 InterruptedException
,从而提前结束该线程。但是不能中断 I/O 阻塞和 synchronized 锁阻塞。对于以下代码,在
main()
中启动一个线程之后再中断它,由于线程中调用了 Thread.sleep()
方法,因此会抛出一个 InterruptedException
,从而提前结束线程,不执行之后的语句。public class InterruptExample {
private static class MyThread1 extends Thread {
@Override
public void run() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
Thread thread1 = new MyThread1();
thread1.start();
thread1.interrupt();
}
}
java.lang.InterruptedException: sleep interrupted at
java.lang.Thread.sleep(Native Method) at
InterruptExample.lambda$main$0(InterruptExample.java:5) at
InterruptExample$$Lambda$1/713338599.run(Unknown Source) at
java.lang.Thread.run(Thread.java:745)
如果一个线程的
run()
方法执行一个无限循环,并且没有执行 sleep()
等会抛出 InterruptedException
的操作,那么调用线程的 interrupt()
方法就无法使线程提前结束。但是调用
interrupt()
方法会设置线程的中断标记,此时调用 interrupted()
方法会返回 true。因此可以在循环体中使用该方法来判断线程是否处于中断状态,从而提前结束线程。時雨:如果使用的是实现 Runnable 接口启动新线程,则需要使用
Thread.currentThread().isInterrupted()
方法判断线程是否处于中断状态。時雨:Java 设计 sleep 函数时,如果 sleep 响应了中断,便会把
interrupted()
标记位清除,所以如果循环内 sleep,即使在循环条件中判断 interrupted()
也不能停止后续循环。public class InterruptExample {
private static class MyThread2 extends Thread {
@Override
public void run() {
while (!interrupted()) {
// ..
}
System.out.println("Thread end");
}
}
public static void main(String[] args) throws InterruptedException {
Thread thread2 = new MyThread2();
thread2.start();
thread2.interrupt();
}
}
Thread end
時雨:中断最佳实践:
- 1.优先方法签名抛出异常而不是 try/catch 捕获,让上层自行处理;
- 2.如果无法或不想抛出异常,try/catch 捕获后需要手动恢复中断
Thread.currentThread().interrupt();
,让上层可以获取中断状态。
为什么不用废弃的
stop()
停止线程:因为 stop()
方法停止线程会让线程运行到一半突然停止,没办法完成一个基本单位的操作,造成脏数据。为什么不用 volatile 设置 boolean 标记位的方法停止线程:在某些情况,volatile 标记位可以起到和
interrupted()
一样的效果,但是在长时间阻塞的情况下,比如生产者/消费者模式,使用 ArrayBlockingQueue
做容器,在判断完成进入循环体内后却阻塞了,不进入再判断,无法中断线程。int num = 0;
ArrayBlockingQueue storage = new ArrayBlockingQueue(10);
try {
while (num <= 10000 && !canceled) {
if (num % 100 == 0) {
// 在此阻塞,不进入下次循环判断,即使 canceled 置为 true 也无法停止运行
storage.put(num);
}
num++;
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("生成者停止运行");
}
修复方式只需要将
volatile
标记位判断改为 interrupted()
判断即可。注意:
interrupted
是静态方法,该方法获取并重置线程中断状态,目标线程是当前运行的线程而不是调用的的线程。public class Demo1 implements Runnable {
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(new Demo1());
thread.start();
thread.interrupt();
System.out.println(thread.isInterrupted()); // true
System.out.println(thread.interrupted()); // false 目标是主线程而不是 thread 线程
System.out.println(Thread.interrupted()); // false
System.out.println(thread.isInterrupted()); // true
thread.join();
}
@Override
public void run() {
while (true) {
}
}
}
调用 Executor 的
shutdown()
方法会等待线程都执行完毕之后再关闭,但是如果调用的是 shutdownNow()
方法,则相当于调用每个线程的 interrupt()
方法。以下使用 Lambda 创建线程,相当于创建了一个匿名内部线程。
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> {
try {
Thread.sleep(2000);
System.out.println("Thread run");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
executorService.shutdownNow();
System.out.println("Main run");
}
Main run java.lang.InterruptedException: sleep interrupted at
java.lang.Thread.sleep(Native Method) at
ExecutorInterruptExample.lambda$main$0(ExecutorInterruptExample.java:9) at
ExecutorInterruptExample$$Lambda$1/1160460865.run(Unknown Source) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
如果只想中断 Executor 中的一个线程,可以通过使用
submit()
方法来提交一 个线程,它会返回一个 Future<?> 对象,通过调用该对象的 cancel(true)
方法就可以中断线程。Future<?> future = executorService.submit(() -> {
// ..
});
future.cancel(true);
Java 提供了两种锁机制来控制多个线程对共享资源的互斥访问,第一个是 JVM 实现的 synchronized,而另一个是 JDK 实现的 ReentrantLock。
1. 同步一个代码块
public void func() {
synchronized (this) {
// ...
}
}
它只作用于同一个对象,如果调用两个对象上的同步代码块,就不会进行同步。
对于以下代码,使用 ExecutorService 执行了两个线程,由于调用的是同一个对象的同步代码块,因此这两个线程会进行同步,当一个线程进入同步语句块时,另一个线程就必须等待。
public class SynchronizedExample {
public void func1() {
synchronized (this) {
for (int i = 0; i < 10; i++) {
System.out.print(i + " ");
}
}
}
}
public static void main(String[] args) {
SynchronizedExample e1 = new SynchronizedExample();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> e1.func1());
executorService.execute(() -> e1.func1());
}
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9
对于以下代码,两个线程调用了不同对象的同步代码块,因此这两个线程就不需要同步。从输出结果可以看出,两个线程交叉执行。
public static void main(String[] args) {
SynchronizedExample e1 = new SynchronizedExample();
SynchronizedExample e2 = new SynchronizedExample();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> e1.func1());
executorService.execute(() -> e2.func1());
}
0 0 1 1 2 2 3 3 4 4 5 5 6 6 7 7 8 8 9 9
2. 同步一个方法
public synchronized void func () {
// ...
}
它和同步代码块一样,作用于同一个对象。
3. 同步一个类
public void func() {
synchronized (SynchronizedExample.class) {
// ...
}
}
作用于整个类,也就是说两个线程调用同一个类的不同对象上的这种同步语句,也会进行同步。
public class SynchronizedExample {
public void func2() {
synchronized (SynchronizedExample.class) {
for (int i = 0; i < 10; i++) {
System.out.print(i + " ");
}
}
}
}
public static void main(String[] args) {
SynchronizedExample e1 = new SynchronizedExample();
SynchronizedExample e2 = new SynchronizedExample();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> e1.func2());
executorService.execute(() -> e2.func2());
}
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9
4. 同步一个静态方法
public synchronized static void fun() {
// ...
}
作用于整个类。
ReentrantLock 是 java.util.concurrent(J.U.C)包中的锁。
public class LockExample {
private Lock lock = new ReentrantLock();
public void func() {
lock.lock();
try {
for (int i = 0; i < 10; i++) {
System.out.print(i + " ");
}
} finally {
lock.unlock(); // 确保释放锁,从而避免发生死锁。
}
}
}
public static void main(String[] args) {
LockExample lockExample = new LockExample();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> lockExample.func());
executorService.execute(() -> lockExample.func());
}
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9
1. 锁的实现
synchronized 是 JVM 实现的,而 ReentrantLock 是 JDK 实现的。
2. 性能
新版本 Java 对 synchronized 进行了很多优化,例如自旋锁等,synchronized 与 ReentrantLock 大致相同。
3. 等待可中断
当持有锁的线程长期不释放锁的时候,正在等待的线程可以选择放弃等待,改为处理其他事情。
ReentrantLock 可中断,而 synchronized 不行。
4. 公平锁
公平锁是指多个线程在等待同一个锁时,必须按照申请锁的时间顺序来依次获得锁。
synchronized 中的锁是非公平的,ReentrantLock 默认情况下也是非公平的,但是也可以是公平的。
5. 锁绑定多个条件
一个 ReentrantLock 可以同时绑定多个 Condition 对象。
除非需要使用 ReentrantLock 的高级功能,否则优先使用 synchronized。这是因为 synchronized 是 JVM 实现的一种锁机制,JVM 原生地支持它,而 ReentrantLock 不是所有的 JDK 版本都支持。并且使用 synchronized 不用担心没有释放锁而导致死锁问题,因为 JVM 会确保锁的释放。
当多个线程可以一起工作去解决某个问题时,如果某些部分必须在其它部分之前完成,那么就需要对线程进行协调。
在线程中调用另一个线程的
join()
方法,会将当前线程挂起,而不是忙等待,直到目标线程结束。对于以下代码,虽然 b 线程先启动,但是因为在 b 线程中调用了 a 线程的
join()
方法,b 线程会等待 a 线程结束才继续执行,因此最后能够保证 a 线程的输出先于 b 线程的输出。public class JoinExample {
private class A extends Thread {
@Override
public void run() {
System.out.println("A");
}
}
private class B extends Thread {
private A a;
B(A a) {
this.a = a;
}
@Override
public void run() {
try {
a.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("B");
}
}
public void test() {
A a = new A();
B b = new B(a);
b.start();
a.start();
}
}
public static void main(String[] args) {
JoinExample example = new JoinExample();
example.test();
}
A
B
调用
wait()
使得线程等待某个条件满足,线程在等待时会被挂起,当其他线程的运行使得这个条件满足时,其它线程会调用 notify()
或者 notifyAll()
来唤醒挂起的线程。它们都属于 Object 的一部分,而不属于 Thread。只能用在同步方法或者同步控制块中使用,否则会在运行时抛出
IllegalMonitorStateException
。使用
wait()
挂起期间,线程会释放锁。这是因为,如果没有释放锁,那么其它线程就无法进入对象的同步方法或者同步控制块中,那么就无法执行 notify()
或者 notifyAll()
来唤醒挂起的线程,造成死锁 。public class WaitNotifyExample {
public synchronized void before() {
System.out.println("before");
notifyAll();
}
public synchronized void after() {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("after");
}
}
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
WaitNotifyExample example = new WaitNotifyExample();
executorService.execute(() -> example.after());
executorService.execute(() -> example.before());
}
before after
wait() 和 sleep() 的区别
wait()
是 Object 的方法,而sleep()
是 Thread 的静态方法;wait()
会释放锁,sleep()
不会。
java.util.concurrent 类库中提供了 Condition 类来实现线程之间的协调,可以在 Condition 上调用
await()
方法使线程等待,其它线程调用 signal()
或 signalAll()
方法唤醒等待的线程。相比于
wait()
这种等待方式,await()
可以指定等待的条件,因此更加灵活。使用 Lock 来获取一个 Condition 对象。
public class AwaitSignalExample {
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void before() {
lock.lock();
try {
System.out.println("before");
condition.signalAll();
} finally {
lock.unlock();
}
}
public void after() {
lock.lock();
try {
condition.await();
System.out.println("after");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
AwaitSignalExample example = new AwaitSignalExample();
executorService.execute(() -> example.after());
executorService.execute(() -> example.before());
}
before after
java.util.concurrent(J.U.C)大大提高了并发性能,AQS 被认为是 J.U.C 的核心。
AQS(AbstractQueuedSynchronizer),即队列同步器。它是构建锁或者其他同步组件的基础框架(如 ReentrantLock、ReentrantReadWriteLock、Semaphore 等),JUC 并发包的作者(Doug Lea)期望它能够成为实现大部分同步需求的基础。它是 JUC 并发包中的核心基础组件。
用来控制一个或者多个线程等待多个线程。
维护了一个计数器 cnt,每次调用
countDown()
方法会让计数器的值减 1,减到 0 的时候,那些因为调用 await()
方法而在等待的线程就会被唤醒。
CountDownLatch
public class CountdownLatchExample {
public static void main(String[] args) throws InterruptedException {
final int totalThread = 10;
CountDownLatch countDownLatch = new CountDownLatch(totalThread);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < totalThread; i++) {
executorService.execute(() -> {
System.out.print("run..");
countDownLatch.countDown();
});
}
countDownLatch.await();
System.out.println("end");
executorService.shutdown();
}
}
run..run..run..run..run..run..run..run..run..run..end
用来控制多个线程互相等待,只有当多个线程都到达时,这些线程才会继续执行。
和 CountdownLatch 相似,都是通过维护计数器来实现的。线程执行
await()
方法之后计数器会减 1,并进行等待,直到计数器为 0,所有调用 await()
方法而在等待的线程才能继续执行。CyclicBarrier 和 CountdownLatch 的一个区别是,CyclicBarrier 的计数器通过调用
reset()
方法可以循环使用,所以它才叫做循环屏障。CyclicBarrier 有两个构造函数,其中 parties 指示计数器的初始值,barrierAction 在所有线程都到达屏障的时候会执行一次。
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
public CyclicBarrier(int parties) {
this(parties, null);
}

CyclicBarrier
public class CyclicBarrierExample {
public static void main(String[] args) {
final int totalThread = 10;
CyclicBarrier cyclicBarrier = new CyclicBarrier(totalThread);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < totalThread; i++) {
executorService.execute(() -> {
System.out.print("before..");
try {
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.print("after..");
});
}
executorService.shutdown();
}
}
before..before..before..before..before..before..before..before..before..before..after..after..after..after..after..after..after..after..after..after..
Semaphore 类似于操作系统中的信号量,可以控制对互斥资源的访问线程数。
以下代码模拟了对某个服务的并发请求,每次只能有 3 个客户端同时访问,请求总数为 10。
public class SemaphoreExample {
public static void main(String[] args) {
final int clientCount = 3;
final int totalRequestCount = 10;
Semaphore semaphore = new Semaphore(clientCount);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < totalRequestCount; i++) {
executorService.execute(()->{
try {
semaphore.acquire();
System.out.print(semaphore.availablePermits() + " ");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
});
}
executorService.shutdown();
}
}
2 1 2 2 2 2 2 1 2 2
在介绍 Callable 时我们知道它可以有返回值,返回值通过
Future<V>
进行封装。FutureTask 实现了 RunnableFuture 接口,该接口继承自 Runnable 和 Future<V>
接口,这使得 FutureTask 既可以当做一个任务执行,也可以有返回值。public interface RunnableFuture<V> extends Runnable, Future<V> {}
public class FutureTask<V> implements RunnableFuture<V> {}