SpringBoot使用线程池之ThreadPoolTaskExecutor和ThreadPoolExecutor
文章目录
前言
一般的,我们提及线程池,最先想到的自然是java中的Executors,它作为一个线程池的工具类,提供给我们几种常见的线程池。稍微深入的童鞋应该也知道,它的底层实现,是使用了java中的ThreadPoolExecutor,而且一般在面试的时候,面试官问到的也会是这个ThreadPoolExecutor。
我的这篇文章说的是在SpringBoot中使用线程池,自然也和它有关(底层代码实现也是它)。因此先来讲讲ThreadPoolExecutor的基本特点吧!
预热:ThreadPoolExecutor
首先我们看看它的继承关系:
 
构造器
这里是它的参数最多的一个构造器:
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
 
几个参数的含义分别是
corePoolSize:要保留在池中的线程数,即使它们处于空闲状态,除非设置了allowCoreThreadTimeOut;maximumPoolSize:池中允许的最大线程数;keepAliveTime:当线程数大于核心数时,这是多余空闲线程在终止前等待新任务的最长时间;unit:keepAliveTime参数的时间单位;workQueue:用于在执行任务之前保存任务的队列。 这个队列将只保存execute方法提交的Runnable任务;threadFactory:执行程序创建新线程时使用的工厂;handler:执行被阻塞时使用的处理程序,因为达到了线程边界和队列容量;
其中,ThreadFactory 的默认实现是:Executors.defaultThreadFactory()。
RejectedExecutionHandler的默认实现是AbortPolicy,内部使用的是抛出异常的方式。当然,在企业中,为了不丢失任务,CallerRunsPolicy用的也是很多的。
 CallerRunsPolicy的功能是:被拒绝任务的处理程序,它直接在execute方法的调用线程中运行被拒绝的任务,除非执行程序已关闭,在这种情况下任务将被丢弃。
四种拒绝策略
ThreadPoolExecutor.AbortPolicy
处理程序在拒绝时抛出运行时RejectedExecutionException 。
ThreadPoolExecutor.CallerRunsPolicy
调用execute自身的线程运行任务。 这提供了一个简单的反馈控制机制,可以减慢提交新任务的速度。
ThreadPoolExecutor.DiscardPolicy
无法执行的任务被简单地丢弃。
ThreadPoolExecutor.DiscardOldestPolicy
如果执行器没有关闭,工作队列头部的任务会被丢弃,然后重试执行(可能会再次失败,导致重复执行)。
工作流程
当一个新的任务提交给线程池时,线程池的处理步骤:
-  
首先判断核心线程数是否已满,如果没满,则调用一个线程处理Task任务,如果已满,则执行步骤(2)。
 -  
这时会判断阻塞队列是否已满,如果阻塞队列没满,就将Task任务加入到阻塞队列中等待执行,如果阻塞队列已满,则执行步骤(3)。
 -  
判断是否大于最大线程数,如果小于最大线程数,则创建线程执行Task任务,如果大于最大线程数,则执行步骤(4)。
 -  
这时会使用淘汰策略来处理无法执行的Task任务。
 
正题:ThreadPoolTaskExecutor
那现在进入正题,ThreadPoolTaskExecutor 这个类是Spring-Context支持的一个,专门用于Spring环境的线程池。其底层是在ThreadPoolExecutor的基础上包装一层,使得与Spring的整合更加方便。
继承关系
这是根据Idea生成的一个继承关系:
 
成员变/常量
内部的成员变量有:
 
 可以看到,确实依赖的是ThreadPoolExecutor。
初始化方法
其中有一个初始化用的方法:
public void initialize() {
	if (logger.isDebugEnabled()) {
		logger.debug("Initializing ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : ""));
	}
	if (!this.threadNamePrefixSet && this.beanName != null) {
		setThreadNamePrefix(this.beanName + "-");
	}
	this.executor = initializeExecutor(this.threadFactory, this.rejectedExecutionHandler);
}
 
可以看到这里调用的是initializeExecutor(this.threadFactory, this.rejectedExecutionHandler),那么我们再来看看这个方法做了什么?其实是初始化了一个ThreadPoolExecutor!
protected ExecutorService initializeExecutor(
			ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
		BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);
		ThreadPoolExecutor executor;
		if (this.taskDecorator != null) {
			executor = new ThreadPoolExecutor(
					this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
					queue, threadFactory, rejectedExecutionHandler) {
				@Override
				public void execute(Runnable command) {
					Runnable decorated = taskDecorator.decorate(command);
					if (decorated != command) {
						decoratedTaskMap.put(decorated, command);
					}
					super.execute(decorated);
				}
			};
		}
		else {
			executor = new ThreadPoolExecutor(
					this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
					queue, threadFactory, rejectedExecutionHandler);
		}
		if (this.allowCoreThreadTimeOut) {
			executor.allowCoreThreadTimeOut(true);
		}
		this.threadPoolExecutor = executor;
		return executor;
	}
 
所以,它俩的关系,你明白了吗?就是这么暧昧!
高潮:SpringBoot中使用ThreadPoolTaskExecutor
首先确定你的java版本是1.8及其以上!
创建SpringBoot项目
然后创建一个最简单的SpringBoot项目:
 依赖只需要:
<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.20</version>
        </dependency>
    </dependencies>
 
文件列表
启动类并未使用,直接用测试类测试即可达到效果!
 
配置属性文件
thread-pool.config.corePoolSize = 10
thread-pool.config.maxPoolSize = 100
thread-pool.config.queueCapacity = 200
thread-pool.config.threadNamePrefix = MyThread-
#  CallerRunsPolicy
thread-pool.config.rejectedExecutionHandler=
 
配置类
package org.feng.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
/**
 * SpringBoot 装配线程池
 * @author FengJinSong
 */
@Slf4j
@EnableAsync
@Configuration
public class ThreadPoolConfig implements AsyncConfigurer {
    private static final String EXECUTOR_NAME = "asyncExecutor";
    @Value("${thread-pool.config.corePoolSize:10}")
    private Integer corePoolSize;
    @Value("${thread-pool.config.maxPoolSize:100}")
    private Integer maxPoolSize;
    @Value("${thread-pool.config.queueCapacity:200}")
    private Integer queueCapacity;
    @Value("${thread-pool.config.threadNamePrefix:AsyncThread-}")
    private String threadNamePrefix;
    @Value("${thread-pool.config.rejectedExecutionHandler:CallerRunsPolicy}")
    private String rejectedExecutionHandler;
    @Bean(name = EXECUTOR_NAME)
    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        // 核心线程数
        threadPoolTaskExecutor.setCorePoolSize(corePoolSize);
        // 最大线程数
        threadPoolTaskExecutor.setMaxPoolSize(maxPoolSize);
        // 阻塞队列容量
        threadPoolTaskExecutor.setQueueCapacity(queueCapacity);
        // 待任务在关机时完成--表明等待所有线程执行完
        threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        // 线程名称前缀
        threadPoolTaskExecutor.setThreadNamePrefix(threadNamePrefix);
        // 设置拒绝策略
        threadPoolTaskExecutor.setRejectedExecutionHandler(getRejectedExecutionHandler(rejectedExecutionHandler));
        threadPoolTaskExecutor.initialize();
        return threadPoolTaskExecutor;
    }
    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return (throwable, method, obj) -> {
            log.error("[ThreadPool Exception]:Message [{}], Method [{}]", throwable.getMessage(), method.getName());
            for (Object param : obj) {
                log.error("Parameter value [{}] ", param);
            }
        };
    }
    /**
     * 根据传入的参数获取拒绝策略
     * @param rejectedName 拒绝策略名,比如 CallerRunsPolicy
     * @return RejectedExecutionHandler 实例对象,没有匹配的策略时,默认取 CallerRunsPolicy 实例
     */
    public RejectedExecutionHandler getRejectedExecutionHandler(String rejectedName){
        Map<String, RejectedExecutionHandler> rejectedExecutionHandlerMap = new HashMap<>(16);
        rejectedExecutionHandlerMap.put("CallerRunsPolicy", new ThreadPoolExecutor.CallerRunsPolicy());
        rejectedExecutionHandlerMap.put("AbortPolicy", new ThreadPoolExecutor.AbortPolicy());
        rejectedExecutionHandlerMap.put("DiscardPolicy", new ThreadPoolExecutor.DiscardPolicy());
        rejectedExecutionHandlerMap.put("DiscardOldestPolicy", new ThreadPoolExecutor.DiscardOldestPolicy());
        return rejectedExecutionHandlerMap.getOrDefault(rejectedName, new ThreadPoolExecutor.CallerRunsPolicy());
    }
}
 
测试
package org.feng;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import javax.annotation.Resource;
import java.util.concurrent.CompletableFuture;
@SpringBootTest
class SpringbootDemoApplicationTests {
    @Resource
    ThreadPoolTaskExecutor asyncExecutor;
    @Test
    void contextLoads() {
        // 控制台输出:MyThread-1-666
        CompletableFuture.runAsync(() -> {
            System.out.println(String.join("-", Thread.currentThread().getName(), "666"));
        }, asyncExecutor);
    }
}
 
结语:个人骚话(可看可不看鸭)
关于这些东西,都不能仅限制在能看懂代码,要尝试思考,灵活运用,及其核心的思想是如何的。
 最后,各位,如果本文对你多少有点 学到了 的样子,烦请高抬贵手点个赞吧,求求了。