SpringBoot使用线程池之ThreadPoolTaskExecutor和ThreadPoolExecutor
文章目录
前言
一般的,我们提及线程池,最先想到的自然是java中的Executors,它作为一个线程池的工具类,提供给我们几种常见的线程池。稍微深入的童鞋应该也知道,它的底层实现,是使用了java中的ThreadPoolExecutor,而且一般在面试的时候,面试官问到的也会是这个ThreadPoolExecutor。
我的这篇文章说的是在SpringBoot中使用线程池,自然也和它有关(底层代码实现也是它)。因此先来讲讲ThreadPoolExecutor的基本特点吧!
预热:ThreadPoolExecutor
首先我们看看它的继承关系:
构造器
这里是它的参数最多的一个构造器:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
几个参数的含义分别是
corePoolSize
:要保留在池中的线程数,即使它们处于空闲状态,除非设置了allowCoreThreadTimeOut
;maximumPoolSize
:池中允许的最大线程数;keepAliveTime
:当线程数大于核心数时,这是多余空闲线程在终止前等待新任务的最长时间;unit
:keepAliveTime
参数的时间单位;workQueue
:用于在执行任务之前保存任务的队列。 这个队列将只保存execute
方法提交的Runnable
任务;threadFactory
:执行程序创建新线程时使用的工厂;handler
:执行被阻塞时使用的处理程序,因为达到了线程边界和队列容量;
其中,ThreadFactory
的默认实现是:Executors.defaultThreadFactory()
。
RejectedExecutionHandler
的默认实现是AbortPolicy
,内部使用的是抛出异常的方式。当然,在企业中,为了不丢失任务,CallerRunsPolicy
用的也是很多的。
CallerRunsPolicy
的功能是:被拒绝任务的处理程序,它直接在execute
方法的调用线程中运行被拒绝的任务,除非执行程序已关闭,在这种情况下任务将被丢弃。
四种拒绝策略
ThreadPoolExecutor.AbortPolicy
处理程序在拒绝时抛出运行时RejectedExecutionException
。
ThreadPoolExecutor.CallerRunsPolicy
调用execute自身的线程运行任务。 这提供了一个简单的反馈控制机制,可以减慢提交新任务的速度。
ThreadPoolExecutor.DiscardPolicy
无法执行的任务被简单地丢弃。
ThreadPoolExecutor.DiscardOldestPolicy
如果执行器没有关闭,工作队列头部的任务会被丢弃,然后重试执行(可能会再次失败,导致重复执行)。
工作流程
当一个新的任务提交给线程池时,线程池的处理步骤:
-
首先判断核心线程数是否已满,如果没满,则调用一个线程处理Task任务,如果已满,则执行步骤(2)。
-
这时会判断阻塞队列是否已满,如果阻塞队列没满,就将Task任务加入到阻塞队列中等待执行,如果阻塞队列已满,则执行步骤(3)。
-
判断是否大于最大线程数,如果小于最大线程数,则创建线程执行Task任务,如果大于最大线程数,则执行步骤(4)。
-
这时会使用淘汰策略来处理无法执行的Task任务。
正题:ThreadPoolTaskExecutor
那现在进入正题,ThreadPoolTaskExecutor 这个类是Spring-Context支持的一个,专门用于Spring环境的线程池。其底层是在ThreadPoolExecutor的基础上包装一层,使得与Spring的整合更加方便。
继承关系
这是根据Idea生成的一个继承关系:
成员变/常量
内部的成员变量有:
可以看到,确实依赖的是ThreadPoolExecutor。
初始化方法
其中有一个初始化用的方法:
public void initialize() {
if (logger.isDebugEnabled()) {
logger.debug("Initializing ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : ""));
}
if (!this.threadNamePrefixSet && this.beanName != null) {
setThreadNamePrefix(this.beanName + "-");
}
this.executor = initializeExecutor(this.threadFactory, this.rejectedExecutionHandler);
}
可以看到这里调用的是initializeExecutor(this.threadFactory, this.rejectedExecutionHandler)
,那么我们再来看看这个方法做了什么?其实是初始化了一个ThreadPoolExecutor
!
protected ExecutorService initializeExecutor(
ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);
ThreadPoolExecutor executor;
if (this.taskDecorator != null) {
executor = new ThreadPoolExecutor(
this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
queue, threadFactory, rejectedExecutionHandler) {
@Override
public void execute(Runnable command) {
Runnable decorated = taskDecorator.decorate(command);
if (decorated != command) {
decoratedTaskMap.put(decorated, command);
}
super.execute(decorated);
}
};
}
else {
executor = new ThreadPoolExecutor(
this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
queue, threadFactory, rejectedExecutionHandler);
}
if (this.allowCoreThreadTimeOut) {
executor.allowCoreThreadTimeOut(true);
}
this.threadPoolExecutor = executor;
return executor;
}
所以,它俩的关系,你明白了吗?就是这么暧昧!
高潮:SpringBoot中使用ThreadPoolTaskExecutor
首先确定你的java版本是1.8及其以上!
创建SpringBoot项目
然后创建一个最简单的SpringBoot项目:
依赖只需要:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
</dependency>
</dependencies>
文件列表
启动类并未使用,直接用测试类测试即可达到效果!
配置属性文件
thread-pool.config.corePoolSize = 10
thread-pool.config.maxPoolSize = 100
thread-pool.config.queueCapacity = 200
thread-pool.config.threadNamePrefix = MyThread-
# CallerRunsPolicy
thread-pool.config.rejectedExecutionHandler=
配置类
package org.feng.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
/**
* SpringBoot 装配线程池
* @author FengJinSong
*/
@Slf4j
@EnableAsync
@Configuration
public class ThreadPoolConfig implements AsyncConfigurer {
private static final String EXECUTOR_NAME = "asyncExecutor";
@Value("${thread-pool.config.corePoolSize:10}")
private Integer corePoolSize;
@Value("${thread-pool.config.maxPoolSize:100}")
private Integer maxPoolSize;
@Value("${thread-pool.config.queueCapacity:200}")
private Integer queueCapacity;
@Value("${thread-pool.config.threadNamePrefix:AsyncThread-}")
private String threadNamePrefix;
@Value("${thread-pool.config.rejectedExecutionHandler:CallerRunsPolicy}")
private String rejectedExecutionHandler;
@Bean(name = EXECUTOR_NAME)
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
// 核心线程数
threadPoolTaskExecutor.setCorePoolSize(corePoolSize);
// 最大线程数
threadPoolTaskExecutor.setMaxPoolSize(maxPoolSize);
// 阻塞队列容量
threadPoolTaskExecutor.setQueueCapacity(queueCapacity);
// 待任务在关机时完成--表明等待所有线程执行完
threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
// 线程名称前缀
threadPoolTaskExecutor.setThreadNamePrefix(threadNamePrefix);
// 设置拒绝策略
threadPoolTaskExecutor.setRejectedExecutionHandler(getRejectedExecutionHandler(rejectedExecutionHandler));
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return (throwable, method, obj) -> {
log.error("[ThreadPool Exception]:Message [{}], Method [{}]", throwable.getMessage(), method.getName());
for (Object param : obj) {
log.error("Parameter value [{}] ", param);
}
};
}
/**
* 根据传入的参数获取拒绝策略
* @param rejectedName 拒绝策略名,比如 CallerRunsPolicy
* @return RejectedExecutionHandler 实例对象,没有匹配的策略时,默认取 CallerRunsPolicy 实例
*/
public RejectedExecutionHandler getRejectedExecutionHandler(String rejectedName){
Map<String, RejectedExecutionHandler> rejectedExecutionHandlerMap = new HashMap<>(16);
rejectedExecutionHandlerMap.put("CallerRunsPolicy", new ThreadPoolExecutor.CallerRunsPolicy());
rejectedExecutionHandlerMap.put("AbortPolicy", new ThreadPoolExecutor.AbortPolicy());
rejectedExecutionHandlerMap.put("DiscardPolicy", new ThreadPoolExecutor.DiscardPolicy());
rejectedExecutionHandlerMap.put("DiscardOldestPolicy", new ThreadPoolExecutor.DiscardOldestPolicy());
return rejectedExecutionHandlerMap.getOrDefault(rejectedName, new ThreadPoolExecutor.CallerRunsPolicy());
}
}
测试
package org.feng;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import javax.annotation.Resource;
import java.util.concurrent.CompletableFuture;
@SpringBootTest
class SpringbootDemoApplicationTests {
@Resource
ThreadPoolTaskExecutor asyncExecutor;
@Test
void contextLoads() {
// 控制台输出:MyThread-1-666
CompletableFuture.runAsync(() -> {
System.out.println(String.join("-", Thread.currentThread().getName(), "666"));
}, asyncExecutor);
}
}
结语:个人骚话(可看可不看鸭)
关于这些东西,都不能仅限制在能看懂代码,要尝试思考,灵活运用,及其核心的思想是如何的。
最后,各位,如果本文对你多少有点 学到了 的样子,烦请高抬贵手点个赞吧,求求了。