CompletableFuture 的统一异常处理及超时处理
背景
项目中需要调用多个服务,如果使用 open-Feign 串行化调用,响应时间可能会比较长,后续使用了 CompletableFuture 和线程池进行异步调用,但是在调用时,可能存在调用的服务报错,导致整个查询方法直接抛出异常,无法继续后面的操作,注意其他异步调用的任务仍会处理完成。现在我想捕获调用时的异常,且不影响后续的执行。
@Slf4j(topic = "c.AsyncUtils")
public class AsyncUtils {
/**
* 带超时时间的异步任务
* @param task 异步任务
* @param serviceName 调用的服务名
* @param executor 线程池
* @param defaultValue 异步结果的默认值
* @param timeout 超时时间
* @param unit 超时时间单位
* @return 返回异步执行结果
* @param <T>
*/
public static <T>CompletableFuture<T> asyncCallWithTimeout(
Supplier<T> task,
String serviceName,
Executor executor,
T defaultValue,
long timeout,
TimeUnit unit
) {
// 开始原始异步任务
CompletableFuture<T> originFuture = CompletableFuture.supplyAsync(task, executor)
.exceptionally((ex) -> {
printLog(serviceName, ex);
return defaultValue;
});
// 执行定时任务, 检查异步任务是否超时
return AsyncTimeoutHandler.asyncWithTimeout(originFuture, timeout, unit, defaultValue);
}
/**
* 不带超时时间的异步任务
* @param task 异步任务
* @param serviceName 调用的服务名
* @param executor 线程池
* @param defaultValue 异步结果的默认值
* @return 返回异步执行结果
* @param <T>
*/
public static <T>CompletableFuture<T> asyncCall(
Supplier<T> task,
String serviceName,
Executor executor,
T defaultValue
) {
return CompletableFuture.supplyAsync(task, executor)
.exceptionally((ex)->{
printLog(serviceName, ex);
return defaultValue;
});
}
public static void printLog(String serviceName, Throwable ex) {
log.error("{} 服务调用失败:{}", serviceName, ex.getCause().getMessage());
}
}
@Slf4j(topic = "c.AsyncTimeoutHandler")
public class AsyncTimeoutHandler {
/**
* 创建定时任务线程池
*/
private static final ScheduledExecutorService SCHEDULED = new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors());
/**
* 指定时间内异步任务为执行完成,返回任务执行超时的警告,并返回默认值
* @param originalTask 异步调用的原始任务
* @param timeout 超时时间
* @param unit 超时时间单位
* @param defaultValue 默认值
* @return 超时任务和异步任务其中的一个(最快执行的)
* @param <T>
*/
public static <T>CompletableFuture<T> asyncWithTimeout(
CompletableFuture<T> originalTask,
long timeout,
TimeUnit unit,
T defaultValue
) {
// 1. 创建超时返回任务
CompletableFuture<T> timeoutFuture = new CompletableFuture<>();
SCHEDULED.schedule(() -> {
// 触发超时任务,如果原任务未执行完成
if (!originalTask.isDone()) {
timeoutFuture.complete(defaultValue);
// 强制取消原始任务
originalTask.cancel(true);
log.warn("任务执行超时, 返回默认值 {}", defaultValue);
}
}, timeout, unit);
// 2. 当 task 或 timeoutFuture 任一任务完成时返回结果, Function.identity() 表示不做任何处理
return originalTask.applyToEither(timeoutFuture, Function.identity())
.exceptionally(ex->{
log.error("任务异常:{}", ex.getCause().getMessage());
return defaultValue;
});
}
}
/**
* 测试异常捕获,不带超时时间
*/
@Test
public void testEx(){
Person person = new Person();
CompletableFuture<Integer> ageF = AsyncUtils.asyncCall(() -> {
Integer age = 21;
int i = 1 / 0;
System.out.println("调用成功年龄为: " + age);
return age;
}, "ageService", threadPoolExecutor, null);
CompletableFuture<String> nameF = AsyncUtils.asyncCall(() -> {
String name = "zhangsan";
int i = 1 / 0;
System.out.println("调用成功名字为: " + name);
return name;
}, "nameService", threadPoolExecutor, null);
CompletableFuture<LocalDate> birthdayF = AsyncUtils.asyncCall(() -> {
LocalDate birthday = LocalDate.parse("2003-10-29");
System.out.println("调用成功生日为: " + birthday);
return birthday;
}, "birthdayService", threadPoolExecutor, null);
CompletableFuture<Void> allFutures = CompletableFuture.allOf(ageF, nameF, birthdayF);
CompletableFuture<Person> resultFuture = allFutures.thenApply(v -> {
person.setAge(ageF.join());
person.setName(nameF.join());
person.setBirthday(birthdayF.join());
return person;
});
System.out.println(resultFuture.join());
System.out.println("执行结束");
}
总结
针对于异步调用其他服务时内部产生的异常,直接抽象工具类使用 CompletableFuture 内置的 exceptionally,返回指定的默认值。
针对于异步调用时间过长导致的问题,用户调用工具类时指定最大可忍耐的超时时间,在工具类中创建定义任务,如果到达超时时间后,检查 originalTask 是否被执行成功,如果未执行成功,设置打断标识,打断 originalTask,创建新的超时 timeoutFuture 设置默认值返回 timeoutFuture。如果未达到超时时间,则直接返回 originalTask。
代码地址:https://github.com/HuiLong-Zhou/my-daily/tree/master/src