线程的实现,如何正确停止,六种状态的相互转换,本文都会一一讲解。
线程基础
线程的实现
线程实现的方式本质上有两种:实现Runnable
接口或者继承Thread
类
public class RunnableThread implements Runnable {
@Override
public void run() {
System.out.println('用实现Runnable接口实现线程');
}
}
public class ExtendsThread extends Thread {
@Override
public void run() {
System.out.println('用Thread类实现线程');
}
}
还有线程池
和Callable
也可以用来创建线程,他们本质上也是通过前两种基本方式实现。
Runnable的两大缺陷
- 不能返回一个返回值
- 不能抛出 checked Exception
原因:接口的定义中就规定了返回类型为void且没有抛出任何异常
public interface Runnable {
public abstract void run();
}
Runnable和Callable的区别
- 方法名,Callable规定的执行方法是call(),而Runnable规定的执行方法是 run();
- 返回值,Callable的任务执行后有返回值,而Runnable的任务执行后是没有返回值的;
- 抛出异常,call()方法可抛出异常,而run()方法是不能抛出受检查异常的;
- 和 Callable 配合的有一个Future类,通过 Future的
isDone
可以了解任务执行情况,或者通过cancel()
取消任务的执行,通过get()
获取任务执行的结果,这些功能都是 Runnable 做不到的,Callable 的功能要比 Runnable 强大。 - Thread类初始化时不接受Callable作为参数。
线程的停止
interrupt()
一旦调用某个线程的interrupt()
之后,这个线程的中断标记位就会被设置成true
。每个线程都有这样的标记位,当线程执行时,应该定期检查这个标记位,可以通过Thread.currentThread().isInterrupt()
来判断,如果标记位被设置成true,就说明有程序想终止该线程。
sleep期间被中断
- 如果 sleep、wait 等可以让线程进入阻塞的方法使线程休眠了,而处于休眠中的线程被中断,那么线程是可以感受到中断信号的,并且会抛出一个
InterruptedException
异常,同时清除中断信号,将中断标记位设置成 false。 - 该异常的处理:try/catch或者在方法签名中throws,在catch中一定要处理异常(可以再次中断,在 catch 语句块中调用Thread.currentThread().interrupt()函数),否则中断信号可能会被隐藏
错误的停止方式
stop()
:stop()会直接把线程停止,这样就没有给线程足够的时间来处理停止前保存数据的逻辑,会导致出现数据完整性等问题。suspend()
:suspend()和resume()而言,它们的问题在于如果线程调用suspend(),它并不会释放锁,就开始进入休眠,但此时有可能仍持有锁,这样就容易导致死锁问题,因为这把锁在线程被resume()之前,是不会被释放的。volatile
修饰标记位:对于处于阻塞状态的线程,可能出现无法进入下一次标记位值判断的情况,这样就无法感受中断信号。
线程的六种状态
New
调用start()变成Runnable
- 从
Runnable
状态进入Blocked
状态只有一种可能,就是进入 synchronized保护的代码时没有抢到monitor锁,获得锁后就能转回Runnable
Blocked
仅仅针对synchronized的monitor锁,如果线程在获取ReentrantLock的锁时没有抢到该锁就会进入Waiting
状态,因为本质上它执行了LockSupport.park()方法,所以会进入Waiting
状态。同样,Object.wait()和Thread.join()也会让线程从Runnable
进入Waiting
状态。-
只有
join
的线程执行完毕或者执行了LockSupport.unpark(),Waiting
状态才能返回Runnable
。 - 其他线程调用notify()或notifyAll()来唤醒处于
Waiting
状态的线程,它会直接进入Blocked
状态,因为唤醒Waiting
线程的线程如果调用notify()或notifyAll(),要求必须首先持有该monitor锁,所以处于Waiting状态的线程被唤醒时拿不到该锁,就会进入Blocked
状态,直到执行了notify()/notifyAll()的唤醒它的线程执行完毕并释放monitor锁,才可能轮到它去抢夺这把锁,如果它能抢到,就会从Blocked
状态回到Runnable
状态。(Timed Waiting状态同理) Runnable
状态调用sleep(m)、wait(m)、join(m)、parkUntil(m)时会进入Timed Waiting
状态- 对于
Timed Waiting
而言,如果它的超时时间到了且能直接获取到锁/join的线程运行结束/被中断/调用了LockSupport.unpark(),会直接恢复到Runnable
状态
wait/notify/notifyAll方法
- 为什么wait必须在synchronized保护的同步代码中使用?
class BlockingQueue { Queue<String> buffer = new LinkedList<String>(); public void give(String data) { buffer.add(data); notify(); // Since someone may be waiting in take } public String take() throws InterruptedException { while (buffer.isEmpty()) { wait(); } return buffer.remove(); } }
- wait方法所在的take方法没有被synchronized保护,所以它的while判断和wait方法无法构成原子操作,那么此时整个程序就很容易出错。
- 采用while循环的结构是为了处理虚假唤醒,这样即便被虚假唤醒了,也会再次检查while里面的条件,如果不满足条件,就会继续wait,也就消除了虚假唤醒的风险。
- 为什么 wait/notify/notifyAll 被定义在 Object 类中,而 sleep 定义在 Thread 类中?
因为 Java 中每个对象都有一把称之为 monitor 监视器的锁,由于每个对象都可以上锁,这就要求在对象头中有一个用来保存锁信息的位置。这个锁是对象级别的,而非线程级别的,wait/notify/notifyAll也都是锁级别的操作,它们的锁属于对象,所以把它们定义在Object类中是最合适,因为Object类是所有对象的父类。
wait和sleep方法的异同?
- 相同:都可以让线程阻塞;都可以响应interrupt中断,并抛出InterruptedException异常 不同:
- wait 方法必须在 synchronized 保护的代码中使用,而 sleep 方法并没有这个要求
- 在同步代码中执行 sleep 方法时,并不会释放 monitor 锁,但执行 wait 方法时会主动释放 monitor 锁
- sleep 方法中会要求必须定义一个时间,时间到期后会主动恢复,而对于没有参数的 wait 方法而言,意味着永久等待,直到被中断或被唤醒才能恢复,它并不会主动恢复
- wait/notify 是 Object 类的方法,而 sleep 是 Thread 类的方法
生产者消费者模式的实现
- 生产者和消费者之间通过一个阻塞队列作为传输媒介,阻塞队列为空时,消费者线程进入阻塞状态,阻塞队列为满时,生产者线程进入阻塞状态。
- 那么什么时候阻塞线程需要被唤醒呢?有两种情况。第一种情况是当消费者看到阻塞队列为空时,开始进入等待,这时生产者一旦往队列中放入数据,就会通知所有的消费者,唤醒阻塞的消费者线程。另一种情况是如果生产者发现队列已经满了,也会被阻塞,而一旦消费者获取数据之后就相当于队列空了一个位置,这时消费者就会通知所有正在阻塞的生产者进行生产,这便是对生产者消费者模式的简单介绍。
BlockingQueue
public static void main(String[] args) {
BlockingQueue<Object> queue = new ArrayBlockingQueue<>(10);
//生产者
Runnable producer = () -> {
while (true) {
queue.put(new Object());//生产数据
}
};
new Thread(producer).start();
new Thread(producer).start();
//消费者
Runnable consumer = () -> {
while (true) {
queue.take();//消费数据
}
};
new Thread(consumer).start();
new Thread(consumer).start();
}
以上便是利用 BlockingQueue 实现生产者消费者模式的代码。虽然代码非常简单,但实际上 ArrayBlockingQueue 已经在背后完成了很多工作,比如队列满了就去阻塞生产者线程,队列有空就去唤醒生产者线程等。
Condition
public class MyBlockingQueueForCondition {
private Queue queue;
private int max = 16;
private ReentrantLock lock = new ReentrantLock();
private Condition notEmpty = lock.newCondition();
private Condition notFull = lock.newCondition();
public MyBlockingQueueForCondition(int size) {
this.max = size;
queue = new LinkedList();
}
public void put(Object o) throws InterruptedException {
lock.lock();
try {
while (queue.size() == max) {
//生产者阻塞并释放线程
notFull.await();
}
queue.add(o);
//唤醒阻塞的消费者线程
notEmpty.signalAll();
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
while (queue.size() == 0) {
notEmpty.await();
}
Object item = queue.remove();
notFull.signalAll();
return item;
} finally {
lock.unlock();
}
}
}
- 为什么在take()方法中使用 while( queue.size() **== 0 ) 检查队列状态,而不能用 if( queue.size() == 0 )?(虚假唤醒**)
- 假设有两个消费者,第一个消费者调用take()时,发现队列为空,便进入等待状态并释放 Lock 锁;第二个消费者拿到锁并执行if( queue.size() == 0 ),也发现队列为空,于是进入等待状态;而此时生产者生产了一个数据,便会唤醒两个消费者线程,只有一个线程可以拿到锁,并执行
queue.remove
操作,另外一个线程因为没有拿到锁而卡在被唤醒的地方(notEmpty.await()
刚执行完),而第一个线程执行完操作后会在 finally 中通过unlock
解锁,此时第二个线程可以拿到锁,执行notEmpty.await()之后操作,即调用queue.remove,然而这个时候队列已经为空了,所以会抛出NoSuchElementException
异常,这不符合我们的逻辑。而如果用 while 做检查,当第一个消费者被唤醒得到锁并移除数据之后,第二个线程在执行remove前仍会进行 while 检查,发现此时依然满足 queue.size() == 0 的条件,就会继续执行await
方法,避免了获取的数据为 null 或抛出异常的情况。
wait/notify
实现原理同Condition非常类似
class MyBlockingQueue {
private int maxSize;
private LinkedList<Object> storage;
public MyBlockingQueue(int size) {
this.maxSize = size;
storage = new LinkedList<>();
}
public synchronized void put() throws InterruptedException {
while (storage.size() == maxSize) {
wait();
}
storage.add(new Object());
notifyAll();
}
public synchronized void take() throws InterruptedException {
while (storage.size() == 0) {
wait();
}
System.out.println(storage.remove());
notifyAll();
}
}
实现代码
/**
* 描述: wait/notify形式实现生产者消费者模式
*/
public class WaitStyle {
public static void main(String[] args) {
MyBlockingQueue myBlockingQueue = new MyBlockingQueue(10);
Producer producer = new Producer(myBlockingQueue);
Consumer consumer = new Consumer(myBlockingQueue);
new Thread(producer).start();
new Thread(consumer).start();
}
}
//生产者线程
class Producer implements Runnable {
private MyBlockingQueue storage;
public Producer(MyBlockingQueue storage) {
this.storage = storage;
}
@Override
public void run() {
for (int i = 0; i < 100; i++) {
try {
storage.put();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
//消费者线程
class Consumer implements Runnable {
private MyBlockingQueue storage;
public Consumer(MyBlockingQueue storage) {
this.storage = storage;
}
@Override
public void run() {
for (int i = 0; i < 100; i++) {
try {
storage.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
Condition 和 wait/notify的关系
我们比较两者实现阻塞队列的put()
方法
public void put(Object o) throws InterruptedException {
lock.lock();
try {
while (queue.size() == max) {
condition1.await();
}
queue.add(o);
condition2.signalAll();
} finally {
lock.unlock();
}
}
public synchronized void put() throws InterruptedException {
while (storage.size() == maxSize) {
this.wait();
}
storage.add(new Object());
this.notifyAll();
}
- Condition的
lock.lock()
对应进入synchronized
方法; condition.await()
对应object.wait()
;condition.signalAll()
对应object.notifyAll()
;lock.unlock()
对应退出synchronized
方法- Condition 就是用来代替相对应的 Object 的 wait/notify/notifyAll,所以在用法和性质上几乎都一样。