正如先前看到的,ThreadPoolExecutor有四个构造函数,其中最简单的一个都有五个参数,然而它们最终调用的其实都是七个参数的那个构造函数:
|
|
构造函数做的事很简单,对各个参数根据参数传入的值进行赋值,每个参数的意义如下:
- corePoolSize :线程池的核心线程数
- maximumPoolSize :线程池允许的最大线程数
- keepAliveTime:线程的保活时间。这个保活时间有一个前提,它针对的是超出corePoolSize的空闲线程,也就是说,只有corePoolSize条线程之外的空闲线程,如果超过keepAliveTime的时间没有任务运行,就会被销毁,这个参数主要应用在cache线程池中
- unit:表示keepAliveTime的时间单位
- workQueue:任务队列,主要用来存储已经提交但还未执行的任务,不同线程池的排队策略不一样
- threadFactory:线程工厂,用来创建线程池中的线程,通常采用默认的就好
- handler:通常叫做拒绝策略,1、在线程池已经关闭的情况下 2、任务太多导致最大线程数和任务队列已经饱和,无法再接收新的任务 。在上面两种情况下,只要满足其中一种时,在使用execute()来提交新的任务时将会拒绝,而默认的拒绝策略是抛一个RejectedExecutionException异常
ThreadPoolExecutor对象构造好后,便可以向线程池提交任务,线程提交的方法有execute和submit,这里先说execute方法:
|
|
源码上这个方法内有一坨注释,实际上ThreadPoolExecutor的execute方法的执行策略已经在那堆注释里说清楚了,不过这里我们先把注释去掉,自己从源码上看出到底是什么策略。
首先,在对传入的runnable对象进行了一次判空之后,调用了ctl的get方法,ctl是一个AutomicInteger类对象,AutomicInteger是一个提供原子操作的Integer的类,也就是说它本身是线程安全的。这个变量虽然名字起得我不是很懂,但是却是线程池中很重要的一个变量,我们看一下这个变量的声明以及与这个变量相关的一些代码:
|
|
如果要彻底看懂上面的代码,我们还需要补一下Java基础中的位运算符和移位运算符,上面这段代码中用到的位运算符有&(与)、|(或)和~(非),用到的移位运算符有<<(左移运算符)、>>(右移运算符)。
- ‘&’运算:两个二进制数字对应位相与,只有同为1的位才得1,其余情况的位为0
- ‘|’运算:两个二进制数字对应位相或,只要有一位为1,则最后得到的数对应的位就为1
- ‘~’运算:对一个二进制数进行非运算,则原先是1的位变为0,原先是0的位变为1
- ‘^’运算:对两个二进制数进行异或运算,对应位置上的两个二进制数相同则结果中该位为0,相异则结果中该位为1.这个运算没出现在上面的代码中,但是和与、或、非属于一个系列的运算,所以一起放在这里补充
- ‘<<’运算:x<<num,表示x数字所表示的二进制数字整体左移num位,右边空出的位置由0补足,注意这种移位运算是有限制的,比如上面的代码中做移位运算的是int值,所以转换为二进制之后只有32位,如果有为1的位左移到了第32位,则会变成符号位,使整个数字变成负数,而移出32位之外的位也就是移出去了,不会再作为这个数字的一部分进行位运算
- ‘>>’运算:x>>num,表示x数字的二进制数字整体右移num位,移出位的删除,左边空出的位置分两种情况,如果数字本身是正数,则用0补足,如果是负数,则用1补足
了解了这一块基础知识后,再来看上面的代码,首先Integer.SIZE获取的是int值的大小,也就是32位,所以COUNT_BITS也就是29,CAPACITY使用了左移运算,对1的二进制左移29后减1,过程如下:
00000000000000000000000000000001
||左移29位后
00100000000000000000000000000000
||减1
00011111111111111111111111111111(CAPACITY值)
可以看到,CAPACITY最终是一个取低29位为1的二进制数,再看下面,分别是对-1、0、1、2、3五个数进行了左移29位的运算,得到了RUNNING\SHUTDOWN\STOP\TIDYING\TERMINATED五个表示运行状态的值,可以看到这五个与运行状态有关的值的后29位都是0,差异都在高3位上。
再看后面的三个方法,runStateOf方法传入一个int值,然后返回该值与CAPACITY的非值相与的结果。CAPACITY前面已经运算出来,是低29位为1的int类型二进制数,所以CAPACITY的非值就是高3位为1、低29位为0的二进制数,所以传入的int值c与CAPACITY的非值相与,最终会得到c值的高3位的值,对应前面分析的五个对应运行状态的值不难发现,高三位确实是这个int值用来存储运行状态的,所以这个方法自然可以返回一个用来表示运行状态的值。
同理,workerCountOf方法也是传入一个int值c,但是返回的是c与CAPACITY值直接相与的结果,也就是返回c值低29位的值,根据方法名不难判断,c值低29位是用来存储线程数量的。
|
|
我们看到还有一个ctlOf方法,传入两个int值相或,这个方法是干嘛的呢?回头看ctl的定义处,调用过这个方法:
|
|
作为原子变量的初始化的参数,该方法传入了RUNNING常量和0,RUNNING是表示运行状态的,那么很明显,0就是表示初始化时工作线程的数量只有0条,那么这个方法的作用也就是将运行状态和工作线程数量合到一个int值中,前3位存储运行状态,后29位存储线程数量,这就是ctl这个变量精妙的设计了。
线程池的运行状态可以先记住三个:
- RUNNING状态:线程池正常运行,可以接受新的任务并处理队列中的任务;
- SHUTDOWN状态:不再接受新的任务,但是会执行队列中的任务;
- STOP状态:不再接受新任务,不处理队列中的任务
了解了上面的内容之后,再回头看execute方法:
|
|
这里我们先假设是第一次调用线程池的execute方法,所以此时ctl的get方法返回的就是一开始通过ctlOf(RUNNING, 0)初始化的AutoMaticInteger值赋值给c,后面做了一个if判断,通过workerCountOf方法获取到c中存储的工作线程数,这里当然是0,然后与初始化ThreadPoolExecutor时传入的corePoolSize比较大小,如果小于corePoolSize则进行下一层判断,判断时调用addWorker方法,传入execute的参数Runnable对象和一个波尔值。我们看下addWorker方法:
|
|
这个方法是线程池的核心方法,也是一个相当难懂的方法,因为没有可以省略的代码所以不得不整个一坨贴过来,接下来我们将这个方法大卸八块,一部分一部分的解析这个方法。
方法中首先是一个被标记为retry的代码块,代码块中是一个没有限制语句的for循环,第一层for循环中,首先通过原子整数变量ctl的get方法获取到原子整数,然后通过runStateOf方法获取到这个整数值高三位存储的线程池运行状态值rs,接下来进行了一个if判断:
|
|
这个if语句包含的逻辑判断比较复杂,也是通过这个if语句实现了不能向线程池中添加线程的策略。将整个if语句拆开来看可以分为下面几种情况:
- rs>=SHUTDOWN、且rs!=SHUTDOWN时会直接返回false,即rs>SHUTDOWN时,也就是线程池状态为STOP\TIDYING\TERMINATED中的任意一个值都会直接在这里返回false;
- rs==SHUTDOWN、且firstTask!=null时直接返回false,firstTask是调用execute方法时传入的Runnable对象,就是说如果线程池为SHUTDOWN状态,此时直接调用execute方法向线程池添加不为null的Runnable对象会直接返回false,即SHUTDOWN状态的线程池不允许再添加不为null的任务;
- rs==SHUTDOWN、且firstTask为null、且workQueue为empty时直接返回false,workQueue是任务队列,就是说,如果任务队列中没有还未执行的任务,此时由execute方法传入为null的Runnable对象firstTask会直接返回false,但是反过来看,也就是说如果workQueue不为空、rs为SHUTDOWN,此时传入为null的firstTask是不会被返回false的,这是为什么呢?后面的源码会给出解答。
总结来说,线程池运行状态>SHUTDOWN、线程池运行状态为SHUTDOWN且execute进来的Runnable对象不为空、线程池运行状态为SHUTDOWN且execute进来的Runnable为空但任务队列为空这三种情况下该方法会直接返回false,不会进行接下来的操作。
反过来,如果线程池状态为RUNNING,或者线程池状态为SHUTDOWN但传入的firstTask为空且任务队列不为空时,才能运行到这个if代码段的后面的代码。
那么假设我们已经运行过了前面的if判断,接下来又是一个没有限制条件的for循环:
|
|
在这个第二层for循环里,首先通过原子整数获取到了存储在低29位的工作线程数wc,然后又进行了一个if判断,判断语句是一个或运算连接的两个判断条件:
- wc >= CAPACITY
- wc >= (core ? corePoolSize : maximumPoolSize)
由于是或运算符,两个条件只要有一个为真即会进入if语句,然后被返回false,我们看一下这两个条件。
CAPACITY我们前面已经得到,是一个低29位全部为1的二进制int值,由于低29位用来存储线程数量,其实这里CAPACITY也就代表着存储线程数量的最大值,也就是说,如果获取到的存储的线程数量大于等于存储线程数的最大值就直接返回false,这里好理解,作为一个存储容器自然要有一个自己所能存储的最大值;
第二个条件用到了core,core是传入addWorker方法的第二个参数,在execute方法中,第一次用到addWorker方法时传入了true:
|
|
而后面用到这个方法的时候传入了false:
|
|
结合core和corePoolSize的命名,不难看出这两个量之间的联系,事实上,core这个波尔值被用来选择当前是使用corePoolSize还是maximumPoolSize来与当前存储的工作线程数量wc进行比较,如果传入true,则wc与corePoolSize进行比较,否则与maximumPoolSize进行比较,而如果wc大于后面根据core取到的值 ,则直接返回false。
这里由于我们是假设在第一次使用线程池的execute方法,所以此时工作线程的数量为0,execute方法通过workerCountOf方法获取到的数量小于corePoolSize,所以可以跳过这个if语句运行到下面语句:
|
|
我们看一下compareAndIncrementWorkerCount方法:
|
|
很简单的一个方法,只是调用了原子整数变量ctl的compareAndSet方法,并传入当前的ctl值和加一后的ctl值。这个方法是做什么的呢?简单来说,这个方法调用的是原子整数类内部的Unsafe对象的compareAndSwapInt方法,这个方法是一个CAS方法,即“比较并转换”方法,这是一个直接使用CPU进行的原子操作方法,最终达到的效果就是使当前compareAndSet方法中传入的第一个参数值线程安全的变为传入的第二个参数的值,这里也就是使原来的值加一,放到上下文中来说也就是使工作线程数加一,然后返回操作的结果(一个波尔值),执行成功后就返回true,也就执行到if判断内的语句中,执行break retry语句,跳出整个由retry标记的for循环, 如果第一个值没有更新成第二个值,则返回false。
而如果compareAndIncrementWorkerCount方法返回false,则首先会再通过ctl的get方法读取一次ctl的值,然后获取ctl中现在保存的线程池运行状态,如果和之前在最外层循环刚开始的时候获取的运行状态不一样,则重新开始最外层的循环,否则就重新开始第二层for循环,即用来对工作线程数量加一的循环。
总的来说,addWorker方法中的两层for循环,第一层中用来对当前线程池的运行状态进行判断,运行状态不符合条件则直接返回false,运行到第二层for循环时说明当前线程池是RUNNING状态或者为SHUTDOWN但可以添加为null的Runnable的状态,所以第二层for循环就是用来为存储的工作线程数量加一。
工作线程数量成功加一后,则跳出二层for循环,可以看到后面运行到了一个Worker类对象,目前我们还在addWorker方法里,那么这个Worker类自然也就是这个方法的主角了,我们看一下这个类:
|
|
为了看的清楚,这里也是将关系不大的代码和注释删掉,只留下了主要代码,可以看到,Worker类本身是实现了Runnable接口的,而Worker类的构造函数又传入了一个Runnable对象,也就是firstTask:
|
|
可以看出,Worker是作为Runnable的实现类,对传入构造函数的Runnable对象进行了二次封装,将传入的Runnable对象赋值给自身的firstTask,并通过构造ThreadPoolExecutor对象时传入的ThreadFactory对象构造一个新的Thread对象赋值给自己的thread属性。
关于Worker类的其他代码,这里先不深究,我们继续走addWorker方法,为了看的清楚,这里把addWorker方法的后半部分放在下面:
|
|
在将两个波尔值设为false之后,初始化了一个Worker对象,传入了firstTask,根据我们前面看到的Worker的构造方法,我们已经知道经过这一行初始化Worker对象的代码,firstTask已经被赋值给了Worker对象w的firstTask,而w内部也通过ThreadPoolExecutor构造函数中初始化的ThreadFactory获得了一个Thread线程对象。
继续往下走,一个局部变量t获得了w中初始化的Thread对象t,在对t进行判空后,获取了一个ReentrantLock 对象,ReentrantLock是一个实现了Lock接口的锁类,这里代替synchronize进行加锁操作。
加锁操作后,try语句里面首先重新对线程池的运行状态进行了一次检查性的获取,在确认线程池的状态是RUNNING或SHUTDOWN&&firstTask为null的情况后,如果检测到当前Worker中的thread对象t已经是运行状态则抛出错误,否则就将前面初始化好的Worker对象添加到workers里面,workers是一个存储Worker对象的HashSet对象,添加完后对largestPoolSize进行一个更新,将worderAdded变为true,最后在finally中解锁。
最后根据编程true的workerAdded,启动当前worker对象中的线程,并将workerStarted变为true,finally块中,如果判断到workerStarted不为true则调用addWorkerFailed方法,最后将workerStarted返回。
强行把后半部分的流程流水账走下来。。。其实简单说下来,这还是在addWorker方法里面,addWorker方法前半部分就是两层没有限制的for循环,第一层对线程池的运行状态进行了一个判断,判断为可以添加线程的状态后,第二层将工作线程数加一,而跳出这两层for循环的addWorker的后半部分,就是使这个工作线程加一落到实处,具体操作也就是通过Worker类封装传入的Runnable对象并新建一个线程,然后将这个封装好的Worker对象添加到workers这个hashSet里,然后使这个worker中的thread线程对象运行起来。
那么这个线程池中最核心的addWorker方法就说完了吗?并没有,通过上面的分析我们看到,addWorker最后会在向workers中成功添加一个Worker对象后,直接启动这个worker对象中的thread线程对象,通过前面对Worker的构造函数的分析我们已经知道,Worker的构造函数除了将传入的Runnable对象赋值给自己的firstTask对象,还通过ThreadFactory初始化了Worker对象自己的thread线程对象,而新建线程对象时作为参数传入newThread方法的就是当前Worker对象,所以addWorker方法最后调用worker对象的thread的start方法运行线程,最终便调用到addWorker方法中新建的worker对象的run方法,而Worker类的run方法调用了runWorker方法:
|
|
addWorker方法的可怕之处就在于,你认为它结束了的时候,又可以给你引出下一坨代码。
可以看到,方法里首先获取了当前运行的线程对象wt,然后获取了传入的Worker对象中的Runnable对象task,获取当前运行的线程对象其实就是为了后面可以对该线程进行interrupt操作,这里不是重点,我们看对task的操作。
我们看到,在try块中有一个while循环,循环条件是task != null || (task = getTask()) != null,如果还记得一开始对addWorker方法进行的分析,不难记得addWorker在前半部分的第一层循环中对线程池的运行状态做了一个判断,而在线程池状态为SHUTDOWN状态时,如果参数传入的firstTask为null且任务队列不为空,还是可以成功的将为null的Runnable对象运行到addWorker方法的后半部分代码,不知道当时有没有对这个地方存有疑惑,为什么需要添加为null的firstTask呢?答案就在这里了。
如果我们execute时传入的Runnable对象为null,且此时线程池运行状态为SHUTDOWN,任务队列不为空,则此处如果要运行到while循环里面的代码,就要调用到getTask方法:
|
|
简单来说,这个方法的作用就是处理任务队列中的任务,具体代码的意义写在上面代码的注释里,最后根据timed的取值对任务队列进行poll或take操作,如果取出的Runnable对象不为null就直接返回。
//这里不太清晰,之后再回头看一次
在取到task之后,其实就是让task这个Runnable对象run起来,所以runWorker方法所做的事就是先运行起传入的不为null的Runnable对象,运行完之后再通过getTask从任务队列中取出新的task执行run方法。
这里要注意的是在每个task执行前后,分别执行了beforeExecute和afterExecute方法,这两个方法是空方法,如果要自定义线程池使用,这两个方法是可以覆写来进行扩展操作的。
至此,addWorker方法大体上走下来了,之所以说大体上是因为有些细节还没有走通,之后还会再回过头来走第二遍,一个这么健壮庞大的框架确实不是走一遍能吃下的。接下来会再回到execute方法,往下走addWorker方法后面的逻辑。