【高并发】深度解析ScheduledThreadPoolExecutor类的源代码( 三 )

compareTo方法的主要作用就是对各延迟任务进行排序,距离下次执行时间靠前的任务就排在前面 。
delayedExecute方法delayedExecute方法是ScheduledThreadPoolExecutor类中延迟执行任务的方法,源代码如下所示 。
private void delayedExecute(RunnableScheduledFuture<?> task) { //如果当前线程池已经关闭 //则执行线程池的拒绝策略 if (isShutdown())reject(task); //线程池没有关闭 else {//将任务添加到阻塞队列中super.getQueue().add(task);//如果当前线程池是SHUTDOWN状态//并且当前线程池状态下不能执行任务//并且成功从阻塞队列中移除任务if (isShutdown() &&!canRunInCurrentRunState(task.isPeriodic()) &&remove(task))//取消任务的执行 , 但不会中断执行中的任务task.cancel(false);else//调用ThreadPoolExecutor类中的ensurePrestart()方法ensurePrestart(); }}可以看到在delayedExecute方法内部调用了canRunInCurrentRunState方法,canRunInCurrentRunState方法的源码实现如下所示 。
boolean canRunInCurrentRunState(boolean periodic) { return isRunningOrShutdown(periodic ? continueExistingPeriodicTasksAfterShutdown : executeExistingDelayedTasksAfterShutdown);}可以看到canRunInCurrentRunState方法的逻辑比较简单,就是判断线程池当前状态下能够执行任务 。
另外 , 在delayedExecute方法内部还调用了ThreadPoolExecutor类中的ensurePrestart()方法,接下来,我们看下ThreadPoolExecutor类中的ensurePrestart()方法的实现,如下所示 。
void ensurePrestart() { int wc = workerCountOf(ctl.get()); if (wc < corePoolSize)addWorker(null, true); else if (wc == 0)addWorker(null, false);}在ThreadPoolExecutor类中的ensurePrestart()方法中,首先获取当前线程池中线程的数量 , 如果线程数量小于corePoolSize则调用addWorker方法传递null和true,如果线程数量为0,则调用addWorker方法传递null和false 。
关于addWork()方法的源码解析,大家可以参考【高并发专题】中的《高并发之——通过ThreadPoolExecutor类的源码深度解析线程池执行任务的核心流程》一文,这里,不再赘述 。
reExecutePeriodic方法reExecutePeriodic方法的源代码如下所示 。
void reExecutePeriodic(RunnableScheduledFuture<?> task) { //线程池当前状态下能够执行任务 if (canRunInCurrentRunState(true)) {//将任务放入队列super.getQueue().add(task);//线程池当前状态下不能执行任务,并且成功移除任务if (!canRunInCurrentRunState(true) && remove(task))//取消任务task.cancel(false);else//调用ThreadPoolExecutor类的ensurePrestart()方法ensurePrestart(); }}总体来说reExecutePeriodic方法的逻辑比较简单 , 但是,这里需要注意和delayedExecute方法的不同点:调用reExecutePeriodic方法的时候已经执行过一次任务,所以 , 并不会触发线程池的拒绝策略;传入reExecutePeriodic方法的任务一定是周期性的任务 。
onShutdown方法onShutdown方法是ThreadPoolExecutor类中的钩子函数,它是在ThreadPoolExecutor类中的shutdown方法中调用的,而在ThreadPoolExecutor类中的onShutdown方法是一个空方法,如下所示 。
void onShutdown() {}ThreadPoolExecutor类中的onShutdown方法交由子类实现,所以ScheduledThreadPoolExecutor类覆写了onShutdown方法 , 实现了具体的逻辑,ScheduledThreadPoolExecutor类中的onShutdown方法的源码实现如下所示 。
@Overridevoid onShutdown() { //获取队列 BlockingQueue<Runnable> q = super.getQueue(); //在线程池已经调用shutdown方法后,是否继续执行现有延迟任务 boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy(); //在线程池已经调用shutdown方法后,是否继续执行现有定时任务 boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy(); //在线程池已经调用shutdown方法后,不继续执行现有延迟任务和定时任务 if (!keepDelayed && !keepPeriodic) {//遍历队列中的所有任务for (Object e : q.toArray())//取消任务的执行if (e instanceof RunnableScheduledFuture<?>)((RunnableScheduledFuture<?>) e).cancel(false);//清空队列q.clear(); } //在线程池已经调用shutdown方法后,继续执行现有延迟任务和定时任务 else {//遍历队列中的所有任务for (Object e : q.toArray()) {//当前任务是RunnableScheduledFuture类型if (e instanceof RunnableScheduledFuture) {//将任务强转为RunnableScheduledFuture类型RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e;//在线程池调用shutdown方法后不继续的延迟任务或周期任务//则从队列中删除并取消任务if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||t.isCancelled()) {if (q.remove(t))t.cancel(false);}}} } //最终调用tryTerminate()方法 tryTerminate();}

推荐阅读