springboot~CompletableFuture并行计算
在Spring中,CompletableFuture通常用于异步编程,可以方便地处理异步任务的执行和结果处理,CompletableFuture
是 Java 8 引入的一个类,用于支持异步编程和并发操作。它基于 Future 和 CompletionStage 接口,提供了丰富的方法来处理异步任务的执行和结果处理。
下面是 CompletableFuture
实现的一些关键原理:
-
线程池支持:
CompletableFuture
内部使用线程池来执行异步任务,可以通过指定不同的线程池来控制任务的执行方式。默认情况下,CompletableFuture
使用ForkJoinPool.commonPool()
作为默认的线程池。 -
回调函数:
CompletableFuture
支持链式调用,可以通过thenApply()
,thenAccept()
,thenRun()
,thenCompose()
等方法添加回调函数,在异步任务完成后处理任务的结果或执行下一步操作。 -
异常处理:
CompletableFuture
提供了exceptionally()
,handle()
,whenComplete()
等方法来处理异步任务中可能抛出的异常,确保异常能够被捕获并处理。 -
组合操作:
CompletableFuture
支持多个 CompletableFuture 对象之间的组合操作,如thenCombine()
,thenCompose()
,allOf()
,anyOf()
等方法,实现并行执行、串行执行、等待所有任务完成等功能。 -
CompletableFuture 工厂方法:除了
supplyAsync()
方法外,CompletableFuture
还提供了一系列工厂方法来创建 CompletableFuture 对象,如runAsync()
,completedFuture()
,failedFuture()
等,方便快速创建并管理异步任务。
总的来说,CompletableFuture
的实现基于 Future 和 CompletionStage 接口,利用线程池、回调函数、异常处理、组合操作等机制,提供了强大而灵活的异步编程功能,使得开发人员能够更加方便地处理异步任务的执行和结果处理。
使用方法(一)链式
如果我们的业务方法已经写完了,这时可以直接通过supplyAsync方法来调用这些已知的方法,而不需要重新开发
CompletableFuture<String> a1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "Hello World";
});
CompletableFuture<String> a2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "Hello World";
});
CompletableFuture<String> a3 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "Hello World";
});
// 这块最后是并行计算时间为3秒
CompletableFuture.allOf(a1, a2, a3).join();
String result = a1.get() + " | " + a2.get() + " | " + a3.get();
使用方法(二)独立方法
如果方法比较独立,并且之前没有开发过,那么你可以通过异步方法来将这些逻辑与调用代码解耦
@Service
@EnableAsync
public class ParallelTaskService {
@Async
public CompletableFuture<String> task1() {
// 模拟一个耗时操作
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return CompletableFuture.completedFuture("Task 1 completed");
}
@Async
public CompletableFuture<String> task2() {
// 模拟另一个耗时操作
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return CompletableFuture.completedFuture("Task 2 completed");
}
}
// 并行计算时,响应时间是2和3秒之中最大值,即3秒
@GetMapping("/hello-world2")
public CompletableFuture<String> helloWorld2() {
CompletableFuture<String> task1Result = parallelTaskService.task1();
CompletableFuture<String> task2Result = parallelTaskService.task2();
// 等待所有任务都完成
CompletableFuture<Void> allOf = CompletableFuture.allOf(task1Result, task2Result);
// 处理所有任务完成后的逻辑
return allOf.thenApply(voidResult -> {
String result = task1Result.join() + " | " + task2Result.join();
return result;
});
}