niceyoo博客 niceyoo博客

如何关闭线程池?会创建不会关闭?调用关闭方法时线程池里的线程如何反应?

in Java基础 文章转载请注明来源!

前言

相信大家在面试的时候经常会遇到「线程池」相关的问题,比如:

  1. 什么是线程池?线程池的优点?
  2. 有哪几种创建线程池的方式?
  3. 四种创建线程池的使用场景?
  4. 线程池的底层原理?
  5. 线程池相关的参数,比如CorePoolSize、maximunPoolSize、keepAliveTime等等
  6. 为什么阿里巴巴不允许线程池使用Executors去创建?
  7. 如何合理的设置线程池参数的等等等等.....

如上这些问题,如果看过线程池源码的小伙伴,基本就能回答上来了,即便是看面试题也能说上个一二,但是当真的问你,如何关闭线程池?你能回答上来吗~

其实这个问题直接关乎到你到底用没用过线程池,可以想象一下,当面试官问你用没用过线程池,如果你回答用过,还头头是道的说了一下如何如何创建,各个有哪些使用场景,底层有哪些参数等等,但此时问你,线程池如何关闭呢?两种关闭方法有什么区别呢?调用线程池关闭方法时线程池里的线程会有什么反应呢?

这时,会不会很尴尬呢~,哦,会创建不会关闭呢。

正文,文末总结

今天我们不关心其他问题,就看如何关闭线程池,防止被面试官打个措手不及。

关闭线程池有两个方法,分别是:shutdown()、shutdownNow()

  • shutdownNow():调用该方法后,首先将线程池的状态设置为 stop,线程池拒绝接受新任务的提交,然后尝试停止所有正在执行或者暂停任务的线程(也就是线程池里现有的任务也不再执行),并返回等待执行任务的列表。。
  • shutdown():调用该方法后,会将线程池的状态设置为 shutdown,线程池拒绝接受新任务的提交,同时等待线程池内的任务执行完毕之后再关闭线程池。

看完这两个方法的解释,有一个小的结论:调用两个方法后都不会再接收新的任务,调用 shutdownNow() 会 “立刻” 停止线程池里所有的线程(注意,这里的立刻用的双引号,后面会否定这个立刻的),会返回等待执行的任务列表;调用 shutdown() 则会等待线程池里的任务执行完毕之后再关闭线程池,无返回值。

无码无真相,我们通过代码去看看这两个方法(JDK1.8 + ThreadPoolExecutor):

1、shutdownNow()
public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();  1、检查线程,是否有权限修改
        advanceRunState(STOP);  2、修改线程池的状态为STOP状态
        interruptWorkers();     3、遍历线程池里的所有工作线程,然后调用线程的interrupt方法
        tasks = drainQueue();   4、将队列里还没有执行的任务放到列表里,返回给调用方
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}

我们主要关注 try/catch 里的 4 行代码:

  1. checkShutdownAccess() :检查线程,是否有权限修改
  2. advanceRunState(STOP):修改线程池的状态为STOP状态
  3. interruptWorkers():遍历线程池里的所有工作线程,然后调用线程的interrupt方法
  4. drainQueue():将队列里还没有执行的任务放到列表里,返回给调用方

这里我们额外看一下第三步 interruptWorkers() 方法,这可能是一些熟悉的东西:

private void interruptWorkers() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers)
            w.interruptIfStarted();  这是这段代码的重点
    } finally {
        mainLock.unlock();
    }
}

可以看到一个 for 循环,然后调用 interruptIfStarted() 方法, 还是不熟悉怎么办?没关系的,我们接着往下看 interruptIfStarted() 方法:

void interruptIfStarted() {
    Thread t;
    if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
        try {
            t.interrupt();  这是这段代码的重点
        } catch (SecurityException ignore) {
        }
    }
}

看到没,终究还是调的线程的 interrupt() 方法,不熟悉如何关闭线程的小伙伴可以移步这里 > 如何暂停一个正在运行的线程?

先临时总结一下 shutdownNow() 方法的执行逻辑:将线程池状态修改为 STOP,然后遍历线程池里的工作线程,逐个调用线程的 interrupt() 方法来中断线程,因为是调用的 interrupt() 方法,所以线程并不会立刻执行结束,只是给线程设置了标志位,至于什么时候真的中断线程需要看 getTask() 方法的返回是否为 null 了(后面详看 getTask() 方法)。

新的问题来了,我们再来看调用 shutdownNow() 方法后,线程池的线程会做如何反应,此时我们需要看一下线程池里的 runWorker() 方法,先给不太了解线程池运行过程的小伙伴补充一下流程:

线程池调用execute提交任务 —> 创建Worker(设置属性thead、firstTask)—> worker.thread.start() —> 实际上调用的是 worker.run() —> 线程池的runWorker(worker) —> worker.firstTask.run()

总之,运行线程池的就是在 runWorker() 方法里:

final void runWorker(Worker w) {
  Thread wt = Thread.currentThread();
  Runnable task = w.firstTask;
  w.firstTask = null;
  w.unlock(); // allow interrupts
  boolean completedAbruptly = true; // 标志是不是用户任务异常导致终止的
  try {
      // 这里通过循环,不断地取任务来执行,getTask() 是会阻塞的
      while (task != null || (task = getTask()) != null) {
          w.lock();
          // stop 状态时不接受新任务,不执行已经加入任务队列的任务,还中断正在执行的任务
          // 所以对于 stop 状态以上是要中断线程的
          // (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)确保线程中断标志位为true且是stop状态以上,接着清除了中断标志
          // !wt.isInterrupted()则再一次检查保证线程需要设置中断标志位
          if ((runStateAtLeast(ctl.get(), STOP) ||
               (Thread.interrupted() &&
                runStateAtLeast(ctl.get(), STOP))) &&
              !wt.isInterrupted())
              wt.interrupt();
          try {
              beforeExecute(wt, task);// 回调方法,给子类具体实现
              Throwable thrown = null;
              try {
                  task.run(); // 执行我们提交给线程池的任务
              } catch (RuntimeException x) {
                  thrown = x; throw x;
              } catch (Error x) {
                  thrown = x; throw x;
              } catch (Throwable x) {
                  thrown = x; throw new Error(x);
              } finally {
                  afterExecute(task, thrown);//回调方法,给子类具体实现
              }
          } finally {
              task = null;// 置空,如果进入下一个循环可以继续取任务
              w.completedTasks++;// 完成数+1
              w.unlock();
          }
      }
      completedAbruptly = false;// 标记不是用户任务异常引起的
  } finally {
      processWorkerExit(w, completedAbruptly);
  }
}

正常情况下,线程池里的线程,通过 while 循环不停的执行任务,其中 task.run() 方法就是执行任务的关键代码,当我们调用了 shutdownNow() 方法时,task.run() 方法里面正处于IO阻塞时,则会导致报错,如果 task.run() 方法里正在正常执行,则不受影响,继续执行完这个任务。

还有一种情况,getTask() 方法返回 null 时,也会导致线程的退出。

private Runnable getTask() {
      boolean timedOut = false; // 取任务是否超时
      for (;;) {
          int c = ctl.get();
          int rs = runStateOf(c);
          // 这个状态判断挺重要的,起到线程池关闭作用
          if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
              decrementWorkerCount();// 线程数量减一
              return null;// 这里返回null,意味着一个线程会退出
          }
          int wc = workerCountOf(c);
          // 这里可以看出核心线程在空闲的时候也是可以设置被回收的
          // timed为true将要有时间限制地取任务
          boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

          if ((wc > maximumPoolSize || (timed && timedOut))
              && (wc > 1 || workQueue.isEmpty())) {
              // 大于最大限制线程数或超过空闲时间,并且当前线程数大于1或队列为空
              if (compareAndDecrementWorkerCount(c))
                  return null;// 说明线程数减一成功,返回null,意味着一个线程会退出
              continue;// 上面线程数减一失败,说明线程数量已被抢先改变,继续循环,
          }

          try {
                  // 从队列中读取任务
              Runnable r = timed ?
                  workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                  workQueue.take();
              if (r != null)
                  return r;
              timedOut = true;// 用于下一次循环中
          } catch (InterruptedException retry) {
              timedOut = false;
          }
      }
  }

这个 getTask() 过程就是,当我们调用 shutdownNow() 方法时,如果线程正处于从队列中读取任务( Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); )而发生阻塞,这样会导致抛出 InterruptedException 异常,但是这个异常被 try/catch 捕获掉了,同时设置了 timedOut 标志位,线程将会继续进入下一个 for 循环里继续执行。

但因为 shutdownNow() 方法将线程状态设置为 STOP,所以当执行到下一个 for 循环的第一个 if语句 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) 时,STOP 满足 >= SHUTDOWN,而 STOP 也满足 >= STOP,所以这个地方就尤为重要了,这个时候会进入 if 语句体中,返回 null ,然后线程退出。

至此,总结一下,调用 shutdownNow() 方法时,程池里的线程会有什么反应?

会有两种情况退出线程。

当我们调用 shutdownNow() 方法时,如果线程池正在 getTask() 方法中执行,就会通过 for 循环进入 if 语句,判断条件是 标志位 >= SHUTDOWN,或者 标志位 >= STOP,因为符合条件所以会返回 null,然后线程退出。

while (task != null || (task = getTask()) != null)

再就是线程执行提交任务到线程池时而处于阻塞状态,就会导致报错抛出 InterruptedException 异常;处于正常运行状态下则会执行完当前任务,然后通过 getTask() 方法返回 null 来退出。

2、shutdown()
public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();     1、检查线程,是否有权限修改
        advanceRunState(SHUTDOWN); 2、修改线程池的状态为STOP状态
        interruptIdleWorkers();    3、遍历线程池里的所有工作线程,然后调用线程的interrupt方法
        onShutdown();              4、留给子类具体实现,如 ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}

从上边 shutdownNow() 捋下来后,会发现 shutdown() 方法非常的相似:

  1. checkShutdownAccess():检查线程,是否有权限修改
  2. advanceRunState(SHUTDOWN):修改线程池的状态为STOP状态
  3. interruptIdleWorkers():遍历线程池里的所有工作线程,然后调用线程的interrupt方法
  4. onShutdown():留给子类具体实现,如 ScheduledThreadPoolExecutor

具体方法细节就不重复了,大致过程就是:shutdown() 方法会修改线程状态为 SHUTDOWN 状态,然后调用 interruptIdleWorkers() 方法来中断空闲线程,这个过程也是同样的遍历线程池里的工作线程,逐个调用线程的 interrupt() 方法,至于什么时候真的中断线程需要看 getTask() 方法的返回是否为 null 了。

然后就是:调用 shutdown() 方法时,程池里的线程会有什么反应?

if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {

在 getTask() 里的 if 判断中,由于线程池被 shutdown() 方法修改为 SHUTDOWN 状态,SHUTDOWN >= SHUTDOWN 条件成立,而逻辑且 & 的后半条件只有在队列为空,getTask() 方法才会返回 null,然后线程退出。

最后总结

关闭线程池有两种方法,分别是 shutdown()、shutdownNow(),调用两者都会让线程池不再接受新的任务,并且他们的原理都是遍历线程池中的工作线程,然后逐个调用线程的 inputter() 方法来中断线程,而两者的区别是,调用 shutdownNow() 会将线程池设置为 STOP 状态,该方法会返回等待执行的任务列表,而调用 shutdown() 方法会将线程池设置为 SHUTDOWN 状态,无返回值。

调用了shutdown()、shutdown() 时,线程池里的线程会有什么反应?

会有两种情况退出线程。

  1. 当我们调用关闭线程池方法时,如果线程池正在 getTask() 方法中执行,就会通过 for 循环进入 if 语句,判断线程池状态是否满足中断线程,如果满足就会返回 null,然后线程退出。
  2. 再就是线程执行提交任务到线程池时而处于阻塞状态时,就会导致报错抛出 InterruptedException 异常,线程退出;处于正常运行状态下则会执行完当前任务,然后通过 getTask() 方法返回 null 来退出。

额外补充一:

无论是 shutdownNow() 还是 shutdown(),由于原理都是调用单个线程的 interrupt() 方法,所以并不是直接就结束线程池的,而是通知线程池接下来的做法,但具体什么时间执行就不知道了,如何判断线程池真的关闭了可以调用 isTerminaed() 方法,返回 true 则表示关闭成功。

额外补充二:

如果需要同步等待线程池彻底关闭后才继续往下执行,需要调用 awaitTermination() 方法进行同步等待。其实实际开发过程中这种情景也是存在的,我在这举个简单的例子:

组合文件下载,就是用户有2个及以上的文件下载需求,但是为了考虑用户体验,希望最终返回给用户的是一个压缩包文件(内嵌好几个小文件),所以为了快速首先是在后台采用了多线程的方式下载文件,然后用 awaitTermination() 方法同步等待,将最终的结果压缩为一个文件返回给用户。

博客园持续更新,欢迎大家订阅关注,未来,我们一起成长。

本文首发于博客园:https://www.cnblogs.com/niceyoo/p/13657538.html

jrotty WeChat Pay

微信打赏

jrotty Alipay

支付宝打赏

文章二维码

扫描二维码,在手机上阅读!

本文基于《署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0)》许可协议授权
文章链接:https://sscai.club/index.php/archives/131/ (转载时请注明本文出处及文章链接)

线程池
发表新评论
前篇 后篇
雷姆
拉姆