背景
线程池化主要因为普通方式创建线程的弊端以及线程池的优势:
普通方式创建线程弊端
每次new Thread新建对象,性能差;
线程缺乏统一的管理,可能无限制创建线程,相互竞争,有可能占用过多的系统资源导致OOM;
缺乏更多的功能,例如定期执行,线程中断。
线程池的优势
资源可控性:使用线程池可以避免创建大量线程而导致内存的消耗,还可以有效控制最大并发线程数,提高系统利用率;
提高响应速度:线程池的创建实际上是很消耗时间和性能的,但是线程池创建好之后有任务就运行,提升响应速度;
便于管理:池化技术最突出的一个特点就是可以帮助我们对线程池里的资源进行管理,由线程池统一分配和管理;
功能丰富:提供了定时执行,定期执行,单线程和并发数控制等功能。
线程池概述
线程池创建参数
Executors的工厂方法就给我们提供了创建多种不同线程池的方法。因为这个类只是一个创建对象的工厂,并没有涉及到很多的具体实现,具体的实现由 ThreadPoolExecutor实现。
就拿newFixedThreadPool来说明线程池创建所需要的参数。newFixedThreadPool创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()); }
corePoolSize(核心线程池大小):当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,当任务数大于核心线程数的时候就不会再创建。在这里要注意一点,线程池刚创建的时候,其中并没有创建任何线程,而是等任务来才去创建线程,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法 ,这样才会预先创建好corePoolSize个线程或者一个线程;
maximumPoolSize(线程池最大线程数):线程池允许创建的最大线程数,如果阻塞队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是,如果使用了无界队列,此参数就没有意义了。
poolSize(线程池中已有线程数量),corePoolSize,maximumPoolSize三者关系
poolSize < corePoolSize,新增一个线程处理任务;
poolSize = corePoolSize,将任务放到阻塞队列中等待执行;
阻塞队列已满,且poolSize < maximumPoolSize,新增一个线程处理任务;
阻塞队列已满,且poolSize = maximumPoolSize,线程池已经达到极限,会根据饱和策略RejectedExecutionHandler拒绝新的任务。
handler(饱和策略):当线程池和队列都满了,说明线程池已经处于饱和状态了,那么必须采取一种策略来处理还在提交过来的新任务。这个饱和策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。共有四种饱和策略提供,我们也可以选择自己实现饱和策略。
AbortPolicy:直接丢弃并且抛出RejectedExecutionException异常;
CallerRunsPolicy:调用当前线程池的所在的线程去执行被拒绝的任务;
DiscardOldestPolicy:当任务被拒绝时,会抛弃任务队列中最旧的任务,也就是最先加入队列的,再把这个新任务添加进去;
DiscardPolicy:被线程池拒绝的任务直接丢弃,并且不抛出异常。
参考链接:https://www.jianshu.com/p/9fec2424de54
keepAliveTime(线程活动保持时间):此参数默认在线程数大于corePoolSize的情况下才会起作用, 当线程的空闲时间达到keepAliveTime的时候就会终止,直至线程数目小于corePoolSize。不过如果调用了allowCoreThreadTimeOut方法,则当线程数目小于corePoolSize的时候也会起作用。
unit(keelAliveTime的时间单位):keelAliveTime的时间单位,一共有7种。
workQueue(阻塞队列):阻塞队列,用来存储等待执行的任务,主要以下几个阻塞队列。
ArrayBlockingQueue:这是一个基于数组结构的有界阻塞队列,此队列按照FIFO的原则对元素进行排序;
LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按照FIFO排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()就是使用了这个队列;
SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态。吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool()就使用了这个队列;
PriorityBlockingQueue:一个具有优先级的阻塞队列。
线程池执行流程
线程池的执行大致执行流程如下:
线程池源码分析
线程池的关键方法实现都是ThreadPoolExecutor来完成的,所以具体的源码分析集中对ThreadPoolExecutor类的分析。
主要变量
// ctl使用32位来表示一组复合变量: // 前3位:线程池的运行状态 // 后29位:线程池中有效的线程个数 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 表示线程池数量的位数,Iteger.SIZE - 3 = 29 private static final int COUNT_BITS = Integer.SIZE - 3; //线程池最大数量,2^29 - 1 private static final int CAPACITY = (1 << COUNT_BITS) - 1; /// 线程池状态由ctl的高三位进行存储, 共有5中状态 private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // 计算线程池状态 private static int runStateOf(int c) { return c & ~CAPACITY; } // 计算线程池线程数量 private static int workerCountOf(int c) { return c & CAPACITY; } // 根据线程池状态和线程个数计算ctl private static int ctlOf(int rs, int wc) { return rs | wc; } //用于存放线程任务的阻塞队列 private final BlockingQueue<Runnable> workQueue; //重入锁 - 保证向线程池中添加线程是线程同步的 private final ReentrantLock mainLock = new ReentrantLock(); // 线程池当中的线程集合,只有当拥有mainLock锁的时候,才可以进行访问 private final HashSet<Worker> workers = new HashSet<Worker>(); // 支持对线程进行条件性的阻塞和唤醒 private final Condition termination = mainLock.newCondition(); //创建新线程的线程工厂 private volatile ThreadFactory threadFactory; // 饱和拒绝策略 private volatile RejectedExecutionHandler handler;
上面的很多计算都涉及位运算,下面对一些运算进行说明:
CAPACITY计算
private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 1 << COUNT_BITS // 1的32位2进制是 // 00000000 00000000 00000000 00000001 // 左移29位的话就是 // 00100000 00000000 00000000 00000000 // 再进行减一的操作 // 000 11111 11111111 11111111 11111111 // 也就是说线程池最大数目就是 // 000 11111 11111111 11111111 11111111
RunState计算
在计算机底层使用补码来进行表示符号数。
// 可以接受新任务并且处理已经在阻塞队列的任务 - 高三位全是1 private static final int RUNNING = -1 << COUNT_BITS; // 不接受新任务,但是处理已经在阻塞队列的任务 - 高三位全是0 private static final int SHUTDOWN = 0 << COUNT_BITS; // 不接受新任务,也不处理阻塞队列里的任务,并且会中断正在处理的任务 - 高三位001 private static final int STOP = 1 << COUNT_BITS; // 所有任务都被中止,workerCount为0,线程状态转化为TIDYING并且调用terminated()钩子方法 - 高3位是010 private static final int TIDYING = 2 << COUNT_BITS; // 线程已经终止, terminated()钩子方法已经完成 - 高三位011 private static final int TERMINATED = 3 << COUNT_BITS; // 以RUNNING为例进行计算 // -1 << COUNT_BITS // -1的原码 // -1的反码,负数的反码是将原码除符号位以外全部取反 // 11111111 11111111 11111111 11111110 // -1的补码,负数的补码就是将反码+1 // 11111111 11111111 11111111 11111111 // 往左移29位,所以高3位全是1就是RUNNING状态 // 111 00000 00000000 00000000 00000000
runStateOf方法
获取当前线程池的运行状态。
private static int runStateOf(int c) { return c & ~CAPACITY; } // c & ~CAPACITY // ~是按位取反的意思 //&是按位与的意思 // 而CAPACITY是,高位3个0,低29位都是1,所以是 000 11111 11111111 11111111 11111111 // 取反的话就是 111 00000 00000000 00000000 00000000 // 传进来的c参数与取反的CAPACITY进行按位与操作 // 1、低位29个0进行按位与,还是29个0 // 2、高位3个1,既保持c参数的高3位 // 既高位保持原样,低29位都是0,这也就获得了线程池的运行状态runState
workerCountOf方法
获取线程池的当前有效线程数目。
private static int workerCountOf(int c) { return c & CAPACITY; } // CAPACITY的32位2进制是 000 11111 11111111 11111111 11111111 // 用入参c跟CAPACITY进行按位与操作 // 1、低29位都是1,所以保留c的低29位,也就是有效线程数 // 2、高3位都是0,所以c的高3位也是0 // 这样获取出来的便是workerCount的值
ctlOf方法
初始化ctl变量。
private static final int RUNNING = -1 << COUNT_BITS; private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static int ctlOf(int rs, int wc) { return rs | wc; } // RUNNING是 // 111 00000 00000000 00000000 00000000 // ctlOf是将rs和wc进行按位或的操作 // 初始化的时候是将RUNNING和0进行按位或 // 0的32位2进制是 // 00000000 00000000 00000000 00000000 // 所以初始化的ctl是 // 111 00000 00000000 00000000 00000000
execute方法
execute方法提交了一个任务给线程池进行执行,可能是根据任务创建一个worker对象执行,或者将任务加入到阻塞队列中等待执行。
public void execute(Runnable command) { //需要执行的任务command为空,抛出空指针异常 if (command == null) throw new NullPointerException(); // 执行的流程实际上分为三步 // 1. 如果运行的线程小于corePoolSize,以用户给定的Runable对象新开一个线程去执行 // 并且执行addWorker方法会以原子性操作去检查runState和workerCount,以防止出现线程安全性问题 // 2. 如果任务能够成功添加到队列当中,我们仍需要对添加的线程进行双重检查,有可能添加的线程在 前一次检查时已经死亡,又或者在进入该方法的时候线程池关闭了 // 所以我们需要复查状态,并有必要的话需要在停止时回滚入列操作,或者在没有线程的时候新开一个线程 //3. 如果任务无法入列,那我们需要尝试新增一个线程,如果新建线程失败了,我们就知道线程可能关闭了 //获取线程池的运行状态 int c = ctl.get(); //通过workCountOf方法算workerCount值, 若小于corePoolSize, // 则新建一个线程来执行当前任务 if (workerCountOf(c) < corePoolSize) { //添加任务到worker集合当中 if (addWorker(command, true)) //成功返回 return; //失败的话再次获取线程池的运行状态 c = ctl.get(); } // 判断线程池是否正处于RUNNING状态, 是的话添加Runnable对象到workQueue队列当中 if (isRunning(c) && workQueue.offer(command)) { //再次获取线程池的状态 int recheck = ctl.get(); //再次检查状态 //线程池不处于RUNNING状态, 将任务从workQueue队列中移除, 进行回滚 if (! isRunning(recheck) && remove(command)) //拒绝任务 reject(command); //workerCount等于0, 创建线程来执行阻塞队列中的任务 else if (workerCountOf(recheck) == 0) //添加worker addWorker(null, false); } //加入阻塞队列失败, 则尝试以线程池最大线程数新开线程去执行该任务 else if (!addWorker(command, false) //拒绝任务 reject(command); }
上述代码执行流程如下:
首先判断任务是否为空,空则抛出空指针异常;
不为空则获取线程池控制状态,判断是否小于corePoolSize,若小于,则添加到worker集合当中执行,如成功,则返回,如果失败的话再接着获取线程池控制状态,因为只有状态变了才会失败(失败的原因可能是WorkerCount改变,使得其大于corePoolSize),所以重新获取线程池状态;
判断线程池是否处于运行状态,是的话则添加任务到阻塞队列,加入时也会再次获取状态并且检测状态是否处于运行状态,不处于的话则将任务从阻塞队列移除,并且拒绝任务;
如果线程池里没有了线程,则创建新的线程去阻塞队列中获取任务执行;
如果以上都没执行成功,则需要开启最大线程池里的线程来执行任务,失败的话就丢弃。
Worker内部类
Worker内部类是对执行任务线程和执行任务的封装。它继承AQS同步器并且实现了Runnable接口,所以Worker很明显就是一个可执行任务并且又可以控制中断和锁效果的类。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { private static final long serialVersionUID = 6138294804551838833L; // 工作线程 final Thread thread; // 初始化任务,可能为空 Runnable firstTask; // 已完成任务数量 volatile long completedTasks; // 创建并初始化第一个任务, 使用线程工厂来创建线程 // 初始化有3步 // 1. 设置AQS的同步状态为-1, 表示该对象需要被唤醒 // 2. 初始化第一个任务 // 3.调用ThreadFactory来使自身创建一个线程, 并赋值给worker的成员变量thread Worker(Runnable firstTask) { setState(-1); this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } // 重写Runnable的run方法 public void run() { //调用ThreadPoolExecutor的runWorker方法来执行任务 runWorker(this); } //代表是否独占锁,0-非独占 1-独占 protected boolean isHeldExclusively() { return getState() != 0; } //重写AQS的tryAcquire方法尝试获取锁 protected boolean tryAcquire(int unused) { //尝试将AQS的同步状态从0改为1 if (compareAndSetState(0, 1)) { //如果改变成,则将当前独占模式的线程设置为当前线程并返回true setExclusiveOwnerThread(Thread.currentThread()); return true; } //否则返回false return false; } //重写AQS的tryRelease尝试释放锁 protected boolean tryRelease(int unused) { //设置当前独占模式的线程为null setExclusiveOwnerThread(null); //设置AQS同步状态为0 setState(0); //返回true return true; } //获取锁 public void lock() { acquire(1); } //尝试获取锁 public boolean tryLock() { return tryAcquire(1); } //释放锁 public void unlock() { release(1); } //是否被独占 public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
addWorker方法
addWorker方法主要用来生成worker对象,该对象包括了要执行了任务和执行任务的线程,并向workers集合中添加worker对象,并且启动线程,开始任务的执行。
private boolean addWorker(Runnable firstTask, boolean core) { //外部循环标记 retry: // 最外层自旋 for (;;) { //获取线程池控制状态 int c = ctl.get(); //获取线程运行状态 int rs = runStateOf(c); // 1. 如果线程池runState至少已经是SHUTDOWN // 2. 有一个是false则addWorker失败 // - runState == SHUTDOWN, 即状态已经大于SHUTDOWN了 // - firstTask == null, 即传进来的任务为空, 结合上面就是runState是SHUTDOWN, 但是 // firstTask不为空, 代表线程池已经关闭了还在传任务进来 // - 队列为空, 既然任务已经为空, 队列为空, 就不需要往线程池添加任务了 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; //内层自旋 for (;;) { //获取线程池的workerCount数量 int wc = workerCountOf(c); //如果workerCount超出最大值或者大于corePoolSize/maximumPoolSize //返回false if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //通过CAS操作,使workerCount数量+1,成功则跳出循环,回到retry标记 if (compareAndIncrementWorkerCount(c)) break retry; //CAS操作失败,再次获取线程池的控制状态 c = ctl.get(); // Re-read ctl //如果当前runState不等于刚开始获取的runState,则跳出内层循环,继续外层循环 if (runStateOf(c) != rs) continue retry; //CAS由于更改workerCount而失败,继续内层循环 } } //通过以上循环,能执行到这是workerCount成功+1了 //worker开始标记 boolean workerStarted = false; //worker添加标记 boolean workerAdded = false; //初始化worker为null Worker w = null; try { //初始化一个当前Runnable对象的worker对象 w = new Worker(firstTask); //获取该worker对应的线程 final Thread t = w.thread; //如果线程不为null if (t != null) { //初始线程池的锁 final ReentrantLock mainLock = this.mainLock; //获取锁,保证添加worker的同步线程安全 mainLock.lock(); try { //获取锁后再次检查,获取线程池runState int rs = runStateOf(ctl.get()); //当runState小于SHUTDOWN或者runState等于SHUTDOWN并且firstTask为null if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { //线程已存活 if (t.isAlive()) //线程未启动就存活,抛出IllegalThreadStateException异常 throw new IllegalThreadStateException(); //将worker对象添加到workers集合当中 workers.add(w); //获取workers集合的大小 int s = workers.size(); //如果大小超过largestPoolSize if (s > largestPoolSize) //重新设置largestPoolSize largestPoolSize = s; //标记worker已经被添加 workerAdded = true; } } finally { //释放锁 mainLock.unlock(); } //如果worker添加成功 if (workerAdded) { //启动线程,开始执行任务 t.start(); //标记worker已经启动 workerStarted = true; } } } finally { //如果worker没有启动成功 if (! workerStarted) //workerCount-1的操作 addWorkerFailed(w); } //返回worker是否启动的标记 return workerStarted; }
上述代码执行流程如下:
获取线程池的控制状态,进行判断,不符合则返回false,符合则下一步;
进行一个内层的自旋操作,判断workerCount是否大于上限,或者大于corePoolSize/maximumPoolSize,若不符合要求则返回false,否则对workerCount+1操作;
若workerCount + 1操作失败,再次获取线程池的控制状态,获取runState与刚开始获取的runState相比,不一致则跳出内层循环继续外层循环,否则继续内层循环;
workerCount + 1操作成功后,使用重入锁ReentrantLock来保证往workers当中添加worker实例的线程安全性,添加成功就启动该实例。
addWorkerFailed方法
addWorker方法添加worker失败,并且没有成功启动任务的时候,就会调用此方法,将任务从workers中移除,并且workerCount - 1操作。
private void addWorkerFailed(Worker w) { //重入锁 final ReentrantLock mainLock = this.mainLock; //获取锁 mainLock.lock(); try { //如果worker不为null if (w != null) //workers移除worker workers.remove(w); //通过CAS操作,workerCount-1 decrementWorkerCount(); // 执行tryTerminate尝试终止线程池 tryTerminate(); } finally { //释放锁 mainLock.unlock(); } }
tryTerminate方法
当对线程池执行了非正常成功逻辑的操作时,都会需要执行tryTerminate尝试终止线程池。
finavoid tryTerminate() { //自旋操作 for (;;) { //获取线程池控制状态 int c = ctl.get(); // 线程池处于RUNNING状态 // 线程池状态最小大于TIDYING, 已经终止 // 线程池==SHUTDOWN并且workQUeue不为空 // 直接return, 不能终止 if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; //如果workerCount不为0,且runState为STOP, 中断正在处理的任务 if (workerCountOf(c) != 0) { interruptIdleWorkers(ONLY_ONE); return; } //获取线程池的锁, 开始终止线程池 final ReentrantLock mainLock = this.mainLock; //获取锁 mainLock.lock(); try { //通过CAS操作,设置线程池状态为TIDYING if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally { //设置线程池的状态为TERMINATED ctl.set(ctlOf(TERMINATED, 0)); //发送释放信号给在termination条件上等待的线程 termination.signalAll(); } return; } } finally { //释放锁 mainLock.unlock(); } } }
runWorker方法
runWorker由worker对象的run方法调用,达到真正执行任务的作用。执行的任务可能来自worker对象本身,或者来自阻塞队列。该方法使用一个while循环不断从阻塞队列中获取任务。
final void runWorker(Worker w) { //获取当前线程 Thread wt = Thread.currentThread(); //获取worker里的任务 Runnable task = w.firstTask; //将worker实例的任务赋值为null w.firstTask = null; // unlock方法会调用AQS的release方法 // release方法会调用具体实现类也就是Worker的tryRelease方法 // 也就是将AQS状态置为0, 允许中断 w.unlock(); //是否突然完成 boolean completedAbruptly = true; try { // worker实例的task不为空, 或者通过getTask获取的不为空 // getTask方法会从阻塞队列获取任务进行执行 while (task != null || (task = getTask()) != null) { //获取锁 w.lock(); // 满足下面两个条件将中断该线程 // 1. 获取线程池的控制状态,至少要大于STOP状态 // 2. 检查当前线程是否中断并清除中断状态,并且再次检查线程池状态是否大于STOP // 如果上述满足,检查该对象是否处于中断状态,不清除中断标记 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) //中断该线程 wt.interrupt(); try { //执行任务之前调用的方法, 由子类具体实现 beforeExecute(wt, task); Throwable thrown = null; try { //执行任务 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //执行任务之后调用的方法, 也是由子类具体实现 afterExecute(task, thrown); } } finally { ////执行完后, 设置task为null task = null; //已完成任务数+1 w.completedTasks++; //释放锁 w.unlock(); } } completedAbruptly = false; } finally { //处理并退出当前worker processWorkerExit(w, completedAbruptly); } }
上述代码的执行流程如下:
首先执行了w.unlock(),这是为了将AQS的状态改为0,因为只有getState() >= 0的时候,线程才可以被中断;
判断firstTask是否为空,为空则通过getTask()取阻塞对象中获取任务,不为空接着往下执行;
判断是否符合中断状态,符合的话设置中断标记;
执行beforeExecute(),task.run(),afterExecute()方法;
任何一个出异常都会导致任务执行的终止,进入processWorkerExit来退出任务;
正常执行的话会接着回到步骤2,继续向下执行任务。
getTask方法
当firstTask为空的时候,会通过该方法去阻塞队列来接着获取任务去执行。
private Runnable getTask() { //标志是否获取任务超时 boolean timedOut = false; //自旋操作 for (;;) { //获取线程池的控制状态 int c = ctl.get(); //获取线程池的runState int rs = runStateOf(c); // 判断线程池的状态,出现以下两种情况 // 1. runState大于等于SHUTDOWN状态 // 2. runState大于等于STOP或者阻塞队列为空 // 将会通过CAS操作,进行workerCount - 1并返回null if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } //获取线程池的workerCount int wc = workerCountOf(c); // allowCoreThreadTimeOut:是否允许core Thread超时, 默认false // workerCount是否大于核心核心线程池, 除了core thread外, 其他线程会有超时操作 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 1. wc大于maximumPoolSize或者已超时 // 2. 队列不为空时保证至少有一个任务 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { // 通过CAS操作,workerCount - 1 //能进行-1操作,证明wc大于maximumPoolSize或者已经超时 if (compareAndDecrementWorkerCount(c)) //workCount - 1操作成功,返回null return null; // workCount - 1操作失败,继续循环 continue; } try { // 是否允许超时将会调用不同的方法来获取任务 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); //判断任务不为空返回任务 if (r != null) return r; //获取一段时间没有获取到,获取超时 timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
上述代码的执行流程如下:
获取线程池控制状态和runState,判断线程池是否已经关闭或者正在关闭,是的话则workerCount - 1操作并返回null;
获取workerCount判断是否大于核心线程池,并且根据allowCoreThreadTimeOut和 wc > corePoolSize判断是否允许超时操作;
判断workerCount是否大于最大线程池数目或者已经超时,是的话进行workerCount - 1操作,操作成功则返回null,不成功则回到步骤1重新继续;
根据timed采用何种方法来获取任务,timed 位true 则用poll方法从队列获取任务,否则用take方法从队列获取任务;
判断任务是否为空,不为空则返回获取的任务,否则回到步骤1重新继续。
processWorkerExit方法
任务执行完成或出现异常中断执行的时候,调用processWorkerExit方法进行相对应的操作。
private void processWorkerExit(Worker w, boolean completedAbruptly) { // completedAbruptly:在runWorker出现, 代表是否突然完成的意思 // 也就是在执行任务过程当中出现异常,就会突然完成,设置为true // 如果是突然完成,需要通过CAS操作, workerCount - 1 // 不是突然完成,则不需要 - 1, 因为getTask方法当中已经-1 if (completedAbruptly) decrementWorkerCount(); //生成重入锁 final ReentrantLock mainLock = this.mainLock; //获取锁 mainLock.lock(); try { //线程池统计的完成任务数completedTaskCount加上worker当中完成的任务数 completedTaskCount += w.completedTasks; //从HashSet<Worker>中移除 workers.remove(w); } finally { //释放锁 mainLock.unlock(); } //因为上述操作是释放任务或线程, 所以会判断线程池状态,尝试终止线程池 tryTerminate(); //获取线程池的控制状态 int c = ctl.get(); //判断runState是否小于STOP,即是RUNNING或者SHUTDOWN //如果是RUNNING或者SHUTDOWN, 代表没有成功终止线程池 if (runStateLessThan(c, STOP)) { // 是否突然完成 // 如若不是, 代表已经没有任务可获取完成, 因为getTask当中是while循环 if (!completedAbruptly) { // allowCoreThreadTimeOut: 是否允许core thread超时,默认false // min-默认是corePoolSize int min = allowCoreThreadTimeOut ? 0 : corePoolSize; //允许core thread超时并且队列不为空 //min为0, 即允许core thread超时, 这样就不需要维护核心核心线程池了 //如果workQueue不为空,则至少保持一个线程存活 if (min == 0 && ! workQueue.isEmpty()) min = 1; //如果workerCount大于min, 则表示满足所需, 可以直接返回 if (workerCountOf(c) >= min) return; } //如果是突然完成,添加一个空任务的worker线程 addWorker(null, false); } }
上述代码的执行流程如下:
首先判断线程是否突然终止,如果是突然终止,则workerCount - 1;
统计线程池完成任务数,并将worker从workers当中移除;
判断线程池状态,尝试终止线程池;
线程池没有成功终止,判断是否突然完成任务,不是则进行下一步
如允许核心线程超时,队列不为空,则至少保证一个线程存活;
添加一个空任务的worker线程。
参考资料
[1] https://www.cnblogs.com/KingJack/p/9595621.html