问题

在我们的系统中,使用了这样的配置来开启异步操作:

<task:annotation-driven executor="executor"
    scheduler="scheduler" />

<task:executor id="executor" pool-size="16-128"
    keep-alive="60" rejection-policy="CALLER_RUNS" queue-capacity="1000" />

客户端开启异步代码

@Async()
public Future<Result4Calculate> calculateByLendId(intdid) {
    // 标记1
    // 调用REST服务;监控调用时间。
}

获取Future后的处理

try {
    keplerOverdue = summay4Overdue.get(5, TimeUnit.SECONDS);
    // 后续处理
} catch (Exception e) {
    // 标记2
    // 异常报警
}

 

在这种配置下,客户端在标记1处监控到的调用时间普遍在4s以内(平均时间不到1s,个别峰值请求会突破5s,全天超过5s的请求不到10个)。然而,在标记2处捕获到的超时异常却非常多(一天接近700+)。

问题出在哪儿?

 

分析

上述问题相关代码的调用时序如下图所示。

线程超时问题时序图

其中,rest client 与rest server间的交互时间可以明确监控到,用时超过5s的非常少。但是,get方法却经常抛出超时异常。经过初步分析,问题出现在ThreadPoolTaskExecutor的任务调度过程中。

任务调度逻辑

使用task:executor注解得到的bean是ThreadPoolTaskExecutor的实例。这个类本身并不做调度,而是将调度工作委托给了ThreadPoolExecutor。后者的任务调度代码如下:

/**
 * Executes the given task sometime in the future.  The task
 * may execute in a new thread or in an existing pooled thread.
 *
 * If the task cannot be submitted for execution, either because this
 * executor has been shutdown or because its capacity has been reached,
 * the task is handled by the current {@code RejectedExecutionHandler}.
 *
 * @param command the task to execute
 * @throws RejectedExecutionException at discretion of
 *         {@code RejectedExecutionHandler}, if the task
 *         cannot be accepted for execution
 * @throws NullPointerException if {@code command} is null
 */
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

通过其中的注释,我们可以知道它的核心调度逻辑如下(省略了一些检查等方法):

  1. 如果正在运行的线程数量小于corePoolSize(最小线程数),则尝试启动一个新线程,并将当入参command作为该线程的第一个task。否则进入步骤二。
  2. 如果没有按步骤1执行,那么尝试把入参command放入workQueue中。如果能成功入队,做后续处理;否则,进入步骤三。
  3. 如果没有按步骤2执行,那么将尝试创建一个新线程,然后做后续处理。

  简单的说,当向ThreadPoolExecutor提交一个任务时,它会优先交给线程池中的现有线程;如果暂时没有可用的线程,那么它会将任务放到队列中;一般只有在队列满了的时候(导致无法成功入队),才会创建新线程来处理队列中的任务。

顺带一说,任务入队后,在某些条件下也会创建新线程。但新线程不会立即执行当前任务,而是从队列中获取一个任务并开始执行。  

汇总分析

综上所述,我们可以确定以下信息:

  1. 根据系统配置,ThreadPoolExecutor中的corePoolSize = 16。

  2. 当并发数超过16时,ThreadPoolExecutor会按照步骤二进行任务调度,即把任务放入队列中,但没有及时创建新线程来执行这个任务。
    这一点是推测。但同时,通过日志中的线程名称确认的线程池内线程数量没有增长。日志中,异步线程的id从executor-1、executor-2一直到executor-16,但17及以上的都没有出现过。

  3. 队列中的任务出现积压、时间累积,导致某一个任务超时后,后续大量任务都超时。但是超时并没有阻止任务执行;任务仍然会继续通过rest client调用rest server,并被监控代码记录下时间。
    任务在队列中积压、累积,是引发一天数百次异常、报警的原因。而监控代码并未监控到任务调度的时间,因此都没有出现超时。

由此可知,出现超时问题时,线程池队列的情况是这个样子的: 线程池队列tu  

解决方案

初步考虑方案有三:

提高初始线程数。

提高步并发的初始线程数(如将16-128调整为32-128)。以此减少新任务进入队列的几率。 但是这个方案只是降低队列积压的风险,并不解决问题。

关闭队列。

将队列大小调整为0,以此保证每一个新任务都有一个新线程来执行。 这个方案的问题在于,并发压力大时,可能导致线程不够用。此时的异步调用会根据rejection-policy="CALLER_RUNS"的配置而变为同步调用。

更换线程池。

使用优先创建新线程(而非优先入队列)的线程池。 改动最大的方案。