JAVA多线程编程之异步

日常开发中我们在一个接口中需要处理多个任务,通常都是串行的,这样导致接口的响应时间是每个任务的执行时间的总和。为了缩短响应时间,通常会使用异步处理多任务。

需求举例:查询书籍基本信息,书籍详细信息,作者信息并将结果数据返回。
假设查询书籍基本信息花费500毫秒,查询书籍详细信息花费500毫秒,查询作者信息花费500毫秒,共计1500毫秒,使用异步处理时间一般都是远小于1500毫秒的。

下面使用异步调用方式优化接口

1、异步任务类

实现 Callable 接口,用来处理带返回结果的任务。taskId 用来区别返回结果集数据

package com.example.demo.task;

import java.util.concurrent.Callable;

/**
 * 异步任务
 * @param <T>
 */
public class AsynTaskCallable<T> implements Callable<T>{

    private String taskId;

    private Callable<T> task;

    public AsynTaskCallable(String taskId, Callable<T> task) {
        this.taskId = taskId;
        this.task = task;
    }

    @Override
    public T call() throws Exception {
        T callResult = task.call();
        TaskResult result = new TaskResult();
        result.setTaskId(taskId);
        result.setData(callResult);
        return (T) result;
    }

}

2、异步任务调用类

用来调用异步任务辅助类,completionService 用来指定线程池执行异步任务,tasks 为带返回结果的任务,可以实现多场景复用,减少重复编写相似的代码。

package com.example.demo.task;

import com.sun.istack.internal.NotNull;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;

/**
 * 异步任务调用
 */
public class AsynTaskHelper<T> {

    /**
     * 使用指定线程池执行异步任务
     */
    private CompletionService<TaskResult<T>> completionService = null;

    /**
     * 任务集合
     */
    private List<Callable> tasks = null;

    /**
     * 设置线程池
     * @param executorService 线程池
     * @return
     */
    public AsynTaskHelper setExecutorService(ExecutorService executorService){
        completionService = new ExecutorCompletionService(executorService);
        return this;
    }

    /**
     * 添加任务,返回结果
     * @param taskId
     * @param task
     * @return
     */
    public AsynTaskHelper addTask(String taskId, Callable<T> task) {
        AsynTaskCallable callProxy = new AsynTaskCallable(taskId, task);
        if(null == tasks || tasks.isEmpty()){
            tasks = new ArrayList<>();
        }
        tasks.add(callProxy);
        return this;
    }

    /**
     * 提交任务
     * @return
     */
    public AsynTaskHelper submit(){
        if(null != tasks && !tasks.isEmpty()){
            for (Callable callResult : tasks) {
                completionService.submit(callResult);
            }
        }
        return this;
    }

    /**
     * 获取返回结果
     * @return Map<K, V> K为任务Id
     * @throws ExecutionException
     * @throws InterruptedException
     */
    public Map<String, T> getResult() throws ExecutionException, InterruptedException {
        return getResult(2, TimeUnit.SECONDS);
    }

    /**
     * 获取返回结果
     * @param timeout
     * @param unit
     * @return Map<K, V> K为任务Id
     * @throws InterruptedException
     * @throws ExecutionException
     */
    public Map<String, T> getResult(long timeout,@NotNull TimeUnit unit) throws InterruptedException, ExecutionException {
        Map<String, T> result = new HashMap<>();
        if(null == tasks){
            return result;
        }
        for (int i = 0; i < tasks.size(); i++) {
            Future<TaskResult<T>> poll = completionService.poll(timeout, unit);
            if(null != poll){
                TaskResult<T> task = poll.get();
                if(null != poll && null != task){
                    result.put(task.getTaskId(), task.getData());
                }
            }
        }
        return result;
    }

}

3、任务结果类

用来接收异步任务返回结果数据

package com.example.demo.task;

/**
 * 任务结果数据
 * @param <T>
 */
public class TaskResult<T> {

    /**
     * 任务Id
     */
    private String taskId;

    /**
     * 返回数据
     */
    private T data;

    public String getTaskId() {
        return taskId;
    }

    public void setTaskId(String taskId) {
        this.taskId = taskId;
    }

    public T getData() {
        return data;
    }

    public void setData(T data) {
        this.data = data;
    }

    @Override
    public String toString() {
        return "TaskResult{" +
                "taskId='" + taskId + '\'' +
                ", data=" + data +
                '}';
    }
}

4、异步调用

指定线程池执行任务

ExecutorService executor = Executors.newFixedThreadPool(500);

正常业务操作

//查询Book信息
Callable<Book> bookCall = () -> bookService.get(bookId);
//查询BookDetail信息
Callable<BookDetail> bookDetailCall = () -> bookDetailService.get(bookId);
//查询Author信息
Callable<Author> auhtorCall = () -> authorService.get(bookId);

创建异步任务

//创建异步任务
AsynTaskHelper taskCallors = new AsynTaskHelper()
         .setExecutorService(executor)
         .addTask("book", bookCall)
         .addTask("bookDetail", bookDetailCall)
         .addTask("author", auhtorCall)
         .submit();

获取结果,因为任务是异步的,可能第一时间拿不到结果,这里使用自旋的方式获取结果,如果3秒后还是没有结果则直接返回。

do{
   Map map = taskCallors.getResult();
    book = (Book) map.get("book");
    bookDetail = (BookDetail) map.get("bookDetail");
    author = (Author) map.get("author");
    runTime = System.currentTimeMillis() - beginTime;
} while ((null == book || null == bookDetail || null == author) && runTime < 3000);

完整示例调用代码

package com.example.demo.controller;

import com.example.demo.domain.Author;
import com.example.demo.domain.Book;
import com.example.demo.domain.BookDetail;
import com.example.demo.service.AuthorService;
import com.example.demo.service.BookDetailService;
import com.example.demo.service.BookService;
import com.example.demo.task.AsynTaskHelper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;

@RestController
public class BookController {

    @Autowired
    private BookService bookService;

    @Autowired
    private BookDetailService bookDetailService;

    @Autowired
    private AuthorService authorService;

    private ExecutorService executor = Executors.newFixedThreadPool(500);

    @GetMapping("books5/{bookId}")
    public Map find5(@PathVariable String bookId) throws ExecutionException, InterruptedException {
        Map<String, Object> result = new HashMap<>();

        Long beginTime = System.currentTimeMillis();
        System.out.println("开始并行查询,开始时间:" + beginTime);
        //查询Book信息
        Callable<Book> bookCall = () -> bookService.get(bookId);
        //查询BookDetail信息
        Callable<BookDetail> bookDetailCall = () -> bookDetailService.get(bookId);
        //查询Author信息
        Callable<Author> auhtorCall = () -> authorService.get(bookId);

        //创建异步任务
        AsynTaskHelper taskCallors = new AsynTaskHelper()
                .setExecutorService(executor)
                .addTask("book", bookCall)
                .addTask("bookDetail", bookDetailCall)
                .addTask("author", auhtorCall)
                .submit();

        Book book = null;
        BookDetail bookDetail = null;
        Author author = null;
        
        long runTime;
        do{
            Map map = taskCallors.getResult();
            book = (Book) map.get("book");
            bookDetail = (BookDetail) map.get("bookDetail");
            author = (Author) map.get("author");
            runTime = System.currentTimeMillis() - beginTime;
        } while ((null == book || null == bookDetail || null == author) && runTime < 3000);

        System.out.println("结束并行查询,总耗时:" + (System.currentTimeMillis() - beginTime));
        result.put("book", book);
        result.put("detail", bookDetail);
        result.put("author", author);

        return result;
    }

}

通过 AsynTaskHelper 调用异步任务能缩短接口响应时间,进而提升系统并发能力,后续有类似的使用场景也支持复用,减少重复编码工作。