StructuredTaskScope
StructuredTaskScope
是为了解决主线程和子线程之间的可见性(observability),消除因取消和关闭而产生的常见风险,例如线程泄漏和取消延迟而引入的一个新组件。
先来看一个简单的例子:
我们有一个handle()
方法,该方法需要分别调用另外两个方法findUser()
和fetchOrder()
,将它们的结果组合起来返回给上游,这两个方法中都有I/O操作相互之间没有关联性,为了提高效率我们使用了一个线程池,并发的执行这两个方法,代码如下:
Response handle() throws ExecutionException, InterruptedException {
Future<String> user = esvc.submit(() -> findUser());
Future<Integer> order = esvc.submit(() -> fetchOrder());
String theUser = user.get(); // Join findUser
int theOrder = order.get(); // Join fetchOrder
return new Response(theUser, theOrder);
}
这段代码有以下几个问题
- 如果
findUser()
报错,主线程也会返回异常,但是另一个方法由于在不同的线程中,感知不到异常会继续执行下去,这种称为线程泄露 - 如果
handle()
被中断,findUser()
和fetchOrder()
会继续执行下去,也会造成线程泄露 - 如果
findUser()
需要执行很长时间,而fetchOrder()
一开始就抛出了异常,主线程会一直block在user.get()上直到该结果返回,白白浪费了时间。
除此之外,主线程和子线程之间并没有关联,异常调用栈也不在一起,发生错误之后也难以排查和调试。
StructuredTaskScope就是为解决这些问题而创建的,我们看一个例子:
Response handle() throws ExecutionException, InterruptedException {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Supplier<String> user = scope.fork(() -> findUser());
Supplier<Integer> order = scope.fork(() -> fetchOrder());
scope.join() // Join both subtasks
.throwIfFailed(); // ... and propagate errors
// Here, both subtasks have succeeded, so compose their results
return new Response(user.get(), order.get());
}
}
findUser()
和fetchOrder()
中的任意一个报错,另一个任务都会被取消- 如果主线程在调用
join()
方法的之前或者过程中被中断,子任务会自动取消 - 结构更加清晰:设置子任务,然后等待子任务结束或取消,最后返回结果
- 可见性:在thread dump中可以清晰的展示出任务架构
需要注意的是StructuredTaskScope与ForkJoinPool不同,ForkJoinPool是为计算密集型任务设置的,StructuredTaskScope默认使用虚线程,主要面向I/O密集型。
参考:https://openjdk.org/jeps/453