StructuredTaskScope

StructuredTaskScope是为了解决主线程和子线程之间的可见性(observability),消除因取消和关闭而产生的常见风险,例如线程泄漏和取消延迟而引入的一个新组件。

先来看一个简单的例子:
我们有一个handle()方法,该方法需要分别调用另外两个方法findUser()fetchOrder(),将它们的结果组合起来返回给上游,这两个方法中都有I/O操作相互之间没有关联性,为了提高效率我们使用了一个线程池,并发的执行这两个方法,代码如下:

Response handle() throws ExecutionException, InterruptedException {
    Future<String>  user  = esvc.submit(() -> findUser());
    Future<Integer> order = esvc.submit(() -> fetchOrder());
    String theUser  = user.get();   // Join findUser
    int    theOrder = order.get();  // Join fetchOrder
    return new Response(theUser, theOrder);
}

这段代码有以下几个问题

  • 如果findUser()报错,主线程也会返回异常,但是另一个方法由于在不同的线程中,感知不到异常会继续执行下去,这种称为线程泄露
  • 如果handle()被中断,findUser()fetchOrder()会继续执行下去,也会造成线程泄露
  • 如果findUser()需要执行很长时间,而fetchOrder()一开始就抛出了异常,主线程会一直block在user.get()上直到该结果返回,白白浪费了时间。
    除此之外,主线程和子线程之间并没有关联,异常调用栈也不在一起,发生错误之后也难以排查和调试。

StructuredTaskScope就是为解决这些问题而创建的,我们看一个例子:

Response handle() throws ExecutionException, InterruptedException {
    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
        Supplier<String>  user  = scope.fork(() -> findUser());
        Supplier<Integer> order = scope.fork(() -> fetchOrder());

        scope.join()            // Join both subtasks
             .throwIfFailed();  // ... and propagate errors

        // Here, both subtasks have succeeded, so compose their results
        return new Response(user.get(), order.get());
    }
}
  • findUser()fetchOrder()中的任意一个报错,另一个任务都会被取消
  • 如果主线程在调用join()方法的之前或者过程中被中断,子任务会自动取消
  • 结构更加清晰:设置子任务,然后等待子任务结束或取消,最后返回结果
  • 可见性:在thread dump中可以清晰的展示出任务架构

需要注意的是StructuredTaskScope与ForkJoinPool不同,ForkJoinPool是为计算密集型任务设置的,StructuredTaskScope默认使用虚线程,主要面向I/O密集型。

参考:https://openjdk.org/jeps/453