铁锤的IT奇妙探险 Java Back-End Coder

一文详解线程基础


线程的实现,如何正确停止,六种状态的相互转换,本文都会一一讲解。

线程基础

线程的实现

线程实现的方式本质上有两种:实现Runnable接口或者继承Thread

public class RunnableThread implements Runnable {

    @Override
    public void run() {
        System.out.println('用实现Runnable接口实现线程');
    }
}

public class ExtendsThread extends Thread {

    @Override
    public void run() {
        System.out.println('用Thread类实现线程');
    }
}

还有线程池Callable也可以用来创建线程,他们本质上也是通过前两种基本方式实现。

Runnable的两大缺陷

  1. 不能返回一个返回值
  2. 不能抛出 checked Exception

原因:接口的定义中就规定了返回类型为void且没有抛出任何异常

public interface Runnable {
   public abstract void run();
}

Runnable和Callable的区别

  1. 方法名,Callable规定的执行方法是call(),而Runnable规定的执行方法是 run();
  2. 返回值,Callable的任务执行后有返回值,而Runnable的任务执行后是没有返回值的;
  3. 抛出异常,call()方法可抛出异常,而run()方法是不能抛出受检查异常的;
  4. 和 Callable 配合的有一个Future类,通过 Future的isDone 可以了解任务执行情况,或者通过cancel()取消任务的执行,通过get()获取任务执行的结果,这些功能都是 Runnable 做不到的,Callable 的功能要比 Runnable 强大。
  5. Thread类初始化时不接受Callable作为参数。

线程的停止

interrupt()

一旦调用某个线程的interrupt()之后,这个线程的中断标记位就会被设置成true。每个线程都有这样的标记位,当线程执行时,应该定期检查这个标记位,可以通过Thread.currentThread().isInterrupt()来判断,如果标记位被设置成true,就说明有程序想终止该线程。

sleep期间被中断

  1. 如果 sleep、wait 等可以让线程进入阻塞的方法使线程休眠了,而处于休眠中的线程被中断,那么线程是可以感受到中断信号的,并且会抛出一个InterruptedException异常,同时清除中断信号,将中断标记位设置成 false。
  2. 该异常的处理:try/catch或者在方法签名中throws,在catch中一定要处理异常(可以再次中断,在 catch 语句块中调用Thread.currentThread().interrupt()函数),否则中断信号可能会被隐藏

错误的停止方式

  1. stop():stop()会直接把线程停止,这样就没有给线程足够的时间来处理停止前保存数据的逻辑,会导致出现数据完整性等问题。
  2. suspend():suspend()和resume()而言,它们的问题在于如果线程调用suspend(),它并不会释放锁,就开始进入休眠,但此时有可能仍持有锁,这样就容易导致死锁问题,因为这把锁在线程被resume()之前,是不会被释放的。
  3. volatile修饰标记位:对于处于阻塞状态的线程,可能出现无法进入下一次标记位值判断的情况,这样就无法感受中断信号。

线程的六种状态

  • New调用start()变成Runnable
  • Runnable状态进入Blocked状态只有一种可能,就是进入 synchronized保护的代码时没有抢到monitor锁,获得锁后就能转回Runnable
  • Blocked仅仅针对synchronized的monitor锁,如果线程在获取ReentrantLock的锁时没有抢到该锁就会进入Waiting状态,因为本质上它执行了LockSupport.park()方法,所以会进入Waiting状态。同样,Object.wait()和Thread.join()也会让线程从Runnable进入Waiting状态。
  • 只有join的线程执行完毕或者执行了LockSupport.unpark(),Waiting状态才能返回Runnable

  • 其他线程调用notify()或notifyAll()来唤醒处于Waiting状态的线程,它会直接进入Blocked状态,因为唤醒Waiting线程的线程如果调用notify()或notifyAll(),要求必须首先持有该monitor锁,所以处于Waiting状态的线程被唤醒时拿不到该锁,就会进入Blocked状态,直到执行了notify()/notifyAll()的唤醒它的线程执行完毕并释放monitor锁,才可能轮到它去抢夺这把锁,如果它能抢到,就会从Blocked状态回到Runnable状态。(Timed Waiting状态同理)
  • Runnable状态调用sleep(m)、wait(m)、join(m)、parkUntil(m)时会进入Timed Waiting状态
  • 对于Timed Waiting而言,如果它的超时时间到了且能直接获取到锁/join的线程运行结束/被中断/调用了LockSupport.unpark(),会直接恢复到Runnable状态

wait/notify/notifyAll方法

  1. 为什么wait必须在synchronized保护的同步代码中使用?
    class BlockingQueue {
     Queue<String> buffer = new LinkedList<String>();
     public void give(String data) {
         buffer.add(data);
         notify();  // Since someone may be waiting in take
     }
     public String take() throws InterruptedException {
         while (buffer.isEmpty()) {
             wait();
         }
         return buffer.remove();
     }
    }
    
    • wait方法所在的take方法没有被synchronized保护,所以它的while判断和wait方法无法构成原子操作,那么此时整个程序就很容易出错。
    • 采用while循环的结构是为了处理虚假唤醒,这样即便被虚假唤醒了,也会再次检查while里面的条件,如果不满足条件,就会继续wait,也就消除了虚假唤醒的风险。
  • 为什么 wait/notify/notifyAll 被定义在 Object 类中,而 sleep 定义在 Thread 类中?

因为 Java 中每个对象都有一把称之为 monitor 监视器的锁,由于每个对象都可以上锁,这就要求在对象头中有一个用来保存锁信息的位置。这个锁是对象级别的,而非线程级别的,wait/notify/notifyAll也都是锁级别的操作,它们的锁属于对象,所以把它们定义在Object类中是最合适,因为Object类是所有对象的父类。

wait和sleep方法的异同?

  • 相同:都可以让线程阻塞;都可以响应interrupt中断,并抛出InterruptedException异常 不同:
  • wait 方法必须在 synchronized 保护的代码中使用,而 sleep 方法并没有这个要求
  • 在同步代码中执行 sleep 方法时,并不会释放 monitor 锁,但执行 wait 方法时会主动释放 monitor 锁
  • sleep 方法中会要求必须定义一个时间,时间到期后会主动恢复,而对于没有参数的 wait 方法而言,意味着永久等待,直到被中断或被唤醒才能恢复,它并不会主动恢复
  • wait/notify 是 Object 类的方法,而 sleep 是 Thread 类的方法

生产者消费者模式的实现

  • 生产者和消费者之间通过一个阻塞队列作为传输媒介,阻塞队列为空时,消费者线程进入阻塞状态,阻塞队列为满时,生产者线程进入阻塞状态。
  • 那么什么时候阻塞线程需要被唤醒呢?有两种情况。第一种情况是当消费者看到阻塞队列为空时,开始进入等待,这时生产者一旦往队列中放入数据,就会通知所有的消费者,唤醒阻塞的消费者线程。另一种情况是如果生产者发现队列已经满了,也会被阻塞,而一旦消费者获取数据之后就相当于队列空了一个位置,这时消费者就会通知所有正在阻塞的生产者进行生产,这便是对生产者消费者模式的简单介绍。

BlockingQueue

public static void main(String[] args) {
 
  BlockingQueue<Object> queue = new ArrayBlockingQueue<>(10);
 //生产者
 Runnable producer = () -> {
    while (true) {
          queue.put(new Object());//生产数据
  }
   };
 
new Thread(producer).start();
new Thread(producer).start();
 //消费者
Runnable consumer = () -> {
      while (true) {
           queue.take();//消费数据
}
   };
new Thread(consumer).start();
new Thread(consumer).start();
}

以上便是利用 BlockingQueue 实现生产者消费者模式的代码。虽然代码非常简单,但实际上 ArrayBlockingQueue 已经在背后完成了很多工作,比如队列满了就去阻塞生产者线程,队列有空就去唤醒生产者线程等。

Condition

public class MyBlockingQueueForCondition {
 
   private Queue queue;
   private int max = 16;
   private ReentrantLock lock = new ReentrantLock();
   private Condition notEmpty = lock.newCondition();
   private Condition notFull = lock.newCondition();
 
 
   public MyBlockingQueueForCondition(int size) {
       this.max = size;
       queue = new LinkedList();
   }
 
   public void put(Object o) throws InterruptedException {
       lock.lock();
       try {
           while (queue.size() == max) {
				//生产者阻塞并释放线程
               notFull.await();
           }
           queue.add(o);
			//唤醒阻塞的消费者线程
           notEmpty.signalAll();
       } finally {
           lock.unlock();
       }
   }
 
   public Object take() throws InterruptedException {
       lock.lock();
       try {
           while (queue.size() == 0) {
               notEmpty.await();
           }
           Object item = queue.remove();
           notFull.signalAll();
           return item;
       } finally {
           lock.unlock();
       }
   }
}

  • 为什么在take()方法中使用 while( queue.size() **== 0 ) 检查队列状态,而不能用 if( queue.size() == 0 )?(虚假唤醒**)
  • 假设有两个消费者,第一个消费者调用take()时,发现队列为空,便进入等待状态并释放 Lock 锁;第二个消费者拿到锁并执行if( queue.size() == 0 ),也发现队列为空,于是进入等待状态;而此时生产者生产了一个数据,便会唤醒两个消费者线程,只有一个线程可以拿到锁,并执行 queue.remove 操作,另外一个线程因为没有拿到锁而卡在被唤醒的地方(notEmpty.await()刚执行完),而第一个线程执行完操作后会在 finally 中通过 unlock解锁,此时第二个线程可以拿到锁,执行notEmpty.await()之后操作,即调用queue.remove,然而这个时候队列已经为空了,所以会抛出 NoSuchElementException异常,这不符合我们的逻辑。而如果用 while 做检查,当第一个消费者被唤醒得到锁并移除数据之后,第二个线程在执行remove前仍会进行 while 检查,发现此时依然满足 queue.size() == 0 的条件,就会继续执行 await 方法,避免了获取的数据为 null 或抛出异常的情况。

wait/notify

实现原理同Condition非常类似

class MyBlockingQueue {
 
   private int maxSize;
   private LinkedList<Object> storage;
 
   public MyBlockingQueue(int size) {
       this.maxSize = size;
       storage = new LinkedList<>();
   }
 
   public synchronized void put() throws InterruptedException {
       while (storage.size() == maxSize) {
           wait();
       }
       storage.add(new Object());
       notifyAll();
   }
 
   public synchronized void take() throws InterruptedException {
       while (storage.size() == 0) {
           wait();
       }
       System.out.println(storage.remove());
       notifyAll();
   }
}

实现代码

/**
* 描述:     wait/notify形式实现生产者消费者模式
*/
public class WaitStyle {
 
   public static void main(String[] args) {
       MyBlockingQueue myBlockingQueue = new MyBlockingQueue(10);
       Producer producer = new Producer(myBlockingQueue);
       Consumer consumer = new Consumer(myBlockingQueue);
       new Thread(producer).start();
       new Thread(consumer).start();
   }
}
 //生产者线程
class Producer implements Runnable {
 
   private MyBlockingQueue storage;
 
   public Producer(MyBlockingQueue storage) {
       this.storage = storage;
   }
 
   @Override
   public void run() {
       for (int i = 0; i < 100; i++) {
           try {
               storage.put();
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
       }
   }
}
 //消费者线程
class Consumer implements Runnable {
 
   private MyBlockingQueue storage;
 
   public Consumer(MyBlockingQueue storage) {
       this.storage = storage;
   }
 
   @Override
   public void run() {
       for (int i = 0; i < 100; i++) {
           try {
               storage.take();
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
       }
   }
}

Condition 和 wait/notify的关系

我们比较两者实现阻塞队列的put()方法

public void put(Object o) throws InterruptedException {
   lock.lock();
   try {
      while (queue.size() == max) {
         condition1.await();
      }
      queue.add(o);
      condition2.signalAll();
   } finally {
      lock.unlock();
   }
}

public synchronized void put() throws InterruptedException {
   while (storage.size() == maxSize) {
      this.wait();
   }
   storage.add(new Object());
   this.notifyAll();
}
  • Condition的lock.lock() 对应进入 synchronized 方法;
  • condition.await() 对应 object.wait()
  • condition.signalAll() 对应 object.notifyAll();
  • lock.unlock() 对应退出 synchronized 方法
  • Condition 就是用来代替相对应的 Object 的 wait/notify/notifyAll,所以在用法和性质上几乎都一样。

Similar Posts

Comments