使用CompletableFuture进行异步任务编排
1. 为什么需要异步编程?
每次只做一件事,更符合人脑思考的逻辑,更易于理解。如计算 ((7 * 3) + (17 + 23)) * (9 + 17),假设人脑每次思考需要耗时1s。
使用同步的方式:
- T0:计算 7 * 3 = 21
- T1:计算 17 + 23 = 40
- T2:计算 21 + 40 = 61
- T3:计算 9 + 17 = 26
- T4:计算 61 * 26 = 1586
耗时:1s * 5 = 5s。
使用异步的方式:
- T0:计算 7 * 3 = 21;计算 17 + 23 = 40;计算 9 + 17 = 26
- T1:计算 21 + 40 = 61
- T2:计算 61 * 26 = 1586
耗时:1s * 3 = 3s。
从结果上看,3秒相比于5s的提升并不明显,但随着任务数的增多,异步的优点会逐渐展现出来。任务数越多,且任务间的依赖越少时,异步的优势越明显。不严谨地说,同步处理任务的耗时是线性增长的,而异步处理任务的耗时是常量级的(常量值取决于任务间的依赖关系)。
2. 任务编排抽象
以下内容为自己的理解,以脱离编程语言层面,进行设计抽象,仅供参考。
任务编排抽象是指对某一业务流程进行拆解,以得到组成该流程的若干任务,并分析任务间的依赖关系,最后编排成对应的数据结构的过程。
2.1. 什么是任务编排?
通俗地讲,每个业务流程都由若干个任务组成,每个任务可以依赖其他任务的执行结果,而各任务间不能出现循环依赖。如果把每个任务看作是一个节点(Node),把每个任务对其他任务的依赖看作是一个有向边,那便可以构建出一个无环有向图,如下图:
任务之间可能存在的依赖关系如下:
无依赖关系。
依赖一个或多个任务的执行结果。
- 依赖一个任务的结果可以称为
THEN
关系,记为a.then(c)
。 - 依赖多个任务的结果可以称为
AND
关系,记为a.and(b, c)
。
- 依赖一个任务的结果可以称为
依赖多个任务中任意一个任务的执行结果可以称为
OR
关系,记为a.or(b, c)
根据图例我们可以进行以下任务编排:B.and(A, D).and(C, E)
2.2. 如何进行任务编排?
先入为主地,我们先讨论入度最大为1的无环有向图(即每个任务的执行结果最多被一个任务依赖),等价于一颗N叉树。如下示例:
从图中可以看到:
- 任务F依赖任务G的执行结果,对应上文的
THEN
关系。 - 任务D依赖任务B,C,H的执行结果,对应上文的
AND
关系。 - 任务R依赖任务F或任务E的执行结果,对应上文的
OR
关系。
从出度为0的任务(即不依赖其他任务结果的任务)开始,也就是N叉树的叶子节点,我们可以通过链式的方式进行任务编排:
- 从A节点开始:
A.then(B).and(C.and(H), D).then(E).or(G.then(F), R)
- 从C节点开始:
C.and(A.then(B).and(H, D)).then(E).or(G.then(F), R)
- 从H节点开始:
H.and(A.then(B).and(C, D)).then(E).or(G.then(F), R)
- 从G节点开始:
G.then(F).or(C.and(A.then(B).and(H), D).then(E), R)
接下来讨论入度可以大于1的无环有向图,即一个任务的结果可以被多个任务依赖,如下图:
上文的链式任务编排方式不适用,但可以通过层序遍历的方式进行任务的执行,即:
- 第1层:E,D
- 第2层:A
- 第3层:B,C
- 第4层:R
同一层的任务是并行关系,即任务E和任务D可以同时执行,前一层执行完才能执行下一层。
3. CompletableFuture用法
基于JDK17中的
CompletableFuture
编写。
JDK中提供了工具类CompletableFuture
进行链式异步任务编排,其是Future
的实现类,实现了Future
的能力。
3.1. 创建一个CompletableFuture
CompletableFuture
提供了一些静态方法来创建CompletableFuture
实例:
runAysnc
:入参为Runnable
,更抽象地说是无入参且无出参的lamda表达式,即()->{}
supplyAsync
:入参为Supplier<U>
,更抽象地说是无入参且出参是T
的lamda表达式,即()->T
(输入类型) -> (输出类型) | void |
T |
---|---|---|
void |
runAsync ()->{} |
supplyAsync ()->T |
示例代码如下:
CompletableFuture<Void> futureOfRun = CompletableFuture
.runAsync(
// () -> {}
() -> {
// do something
});
CompletableFuture<Integer> futureOfSupply = CompletableFuture
.supplyAsync(
// () -> T
() -> {
return 1;
});
CompletableFuture
默认使用的线程池是ASYNC_POOL
。
private static final Executor ASYNC_POOL = USE_COMMON_POOL ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
runAsync
和supplyAsync
对应的重载方法支持传入Executor
参数,以修改提交任务的线程池。
3.2. 构建Then关系的方法
相比于A.then(B)
方法,CompletableFuture
提供了更多的方法以接受不同的参数。
thenRun[Async]
:入参为Runnable
,更抽象地说是无入参且出参是void
的lamda表达式,即()->{}
thenAccept[Async]
:入参为Consumer<T>
,更抽象地说是入参是T
且出参是void
的lamda表达式,即T->{}
thenApply[Async]
:入参为Function<T, U>
,更抽象地说是入参是T
且出参是U
的lamda表达式,即T->U
输入类型 \ 输出类型 | void |
U |
---|---|---|
void |
thenRun[Async] ()->{} |
\ |
T |
thenAccept[Async] T->{} |
thenApply[Async] T->U |
示例代码如下:
Void resultOfThenRun = CompletableFuture.supplyAsync(() -> 1)
.thenRun(
// () -> {}
() -> {
// do something
})
.join();
Void resultOfThenAccept = CompletableFuture.supplyAsync(() -> 1)
.thenAccept(
// T -> {}
t -> {
// do something
})
.join();
Long resultOfThenApply = CompletableFuture.supplyAsync(() -> 1)
.thenApply(
// T -> U
t -> {
return Long.valueOf(t);
})
.join();
3.3. 构建And关系的方法
相比于A.and(B, C)
方法,CompletableFuture
提供了更多的方法以接受不同的参数。
runAfterBoth[Async]
:入参为Runnable
,更抽象地说是无入参且出参是void
的lamda表达式,即()->{}
thenAcceptBoth[Async]
:入参为BiConsumer<T, U>
,更抽象地说是入参是T, U
且出参是void
的lamda表达式,即(T, U)->{}
thenCombine[Async]
:入参为BiFunction<T, U, V>
,更抽象地说是入参是T, U
且出参是V
的lamda表达式,即(T, U)->V
输入类型 \ 输出类型 | void |
V |
---|---|---|
void |
runAfterBoth[Async] ()->{} |
\ |
T, U |
thenAcceptBoth[Async] (T,U)->{} |
thenCombine (T,U)->V |
示例代码如下:
Void resultOfRunAfterBoth = CompletableFuture.supplyAsync(() -> 1)
.runAfterBoth(CompletableFuture.supplyAsync(() -> 2L),
// () -> {}
() -> {
// do something
})
.join();
Void resultOfThenAcceptBoth = CompletableFuture.supplyAsync(() -> 1)
.thenAcceptBoth(CompletableFuture.supplyAsync(() -> 2L),
// (T, U) -> {}
(t, u) -> {
// do something
})
.join();
Long resultOfThenCombine = CompletableFuture.supplyAsync(() -> 1)
.thenCombine(CompletableFuture.supplyAsync(() -> true),
// (T, U) -> V
(t, u) -> {
return Long.valueOf(Boolean.TRUE.equals(u) ? t : -t);
})
.join();
3.4. 构建Or关系的方法
相比于A.or(B, C)
方法,CompletableFuture
提供了更多的方法以接受不同的参数。
runAfterEither[Async]
:入参为Runnable
,更抽象地说是无入参且出参是void
的lamda表达式,即()->{}
acceptEither[Async]
:入参为Consumer<T>
,更抽象地说是入参是T
且出参是void
的lamda表达式,即T->{}
applyToEither[Async]
:入参为Function<T, U>
,更抽象地说是入参是T
且出参是U
的lamda表达式,即T->U
输入类型 \ 输出类型 | void |
U |
---|---|---|
void |
runAfterEither[Async] ()->{} |
\ |
T |
acceptEither[Async] T->{} |
applyToEither[Async] T->U |
示例代码如下:
Void resultOfRunAfterEither = CompletableFuture.supplyAsync(() -> 1)
.runAfterEither(CompletableFuture.supplyAsync(() -> 2L),
// () -> {}
() -> {
// do something
})
.join();
Void resultOfAcceptEither = CompletableFuture.supplyAsync(() -> 1)
.acceptEither(CompletableFuture.supplyAsync(() -> 2),
// T -> {}
t -> {
// do something
})
.join();
Long resultOfApplyToEither = CompletableFuture.supplyAsync(() -> 1)
.applyToEither(CompletableFuture.supplyAsync(() -> 2),
// T -> U
t -> {
return Long.valueOf(t);
})
.join();
3.5. 静态方法
CompletableFuture
提供了allOf
和anyOf
两个静态方法。
allOf
:所有异步任务执行完后返回。anyOf
:任意一个异步任务执行完后返回。
CompletableFuture.allOf(futures).join();
CompletableFuture.anyOf(futures).join();
3.6. 异常处理
CompletableFuture
提供了exceptionally
和whenComplete
,handle
三个方法及其派生方法进行异常处理。
exceptionally
:发生异常时调用。whenComplete
或handle
:发生异常或产生结果时调用,区别是后者可以携带返回值。
CompletableFuture.supplyAsync(() -> 1)
.exceptionally(t -> {
// do something
return 0;
});
CompletableFuture.supplyAsync(() -> 1)
.whenComplete((v, t) -> {
// do something
});
CompletableFuture.supplyAsync(() -> 1)
.handle((v, t) -> {
return t == null ? v : 0L;
});
4. 回到示例
回到文章开头的示例,我们可以根据任务编排B.and(A, D).and(C, E)
,写出对应代码如下:
CompletableFuture.supplyAsync(() -> 17 + 23)
.thenCombine(CompletableFuture.supplyAsync(() -> 7 * 3), (a, b) -> a + b)
.thenCombine(CompletableFuture.supplyAsync(() -> 9 + 17), (c, d) -> c * d)
.join();