CompletableFuture 的统一异常处理及超时处理

作者: zhl 分类: JavaSE,JUC,微服务 发布时间: 2025-03-17 21:07

背景

项目中需要调用多个服务,如果使用 open-Feign 串行化调用,响应时间可能会比较长,后续使用了 CompletableFuture 和线程池进行异步调用,但是在调用时,可能存在调用的服务报错,导致整个查询方法直接抛出异常,无法继续后面的操作,注意其他异步调用的任务仍会处理完成。现在我想捕获调用时的异常,且不影响后续的执行。

@Slf4j(topic = "c.AsyncUtils")
public class AsyncUtils {
    /**
     * 带超时时间的异步任务
     * @param task  异步任务
     * @param serviceName   调用的服务名
     * @param executor  线程池
     * @param defaultValue  异步结果的默认值
     * @param timeout   超时时间
     * @param unit  超时时间单位
     * @return  返回异步执行结果
     * @param <T>
     */
    public static <T>CompletableFuture<T> asyncCallWithTimeout(
        Supplier<T> task,
        String serviceName,
        Executor executor,
        T defaultValue,
        long timeout,
        TimeUnit unit
    ) {
        // 开始原始异步任务
        CompletableFuture<T> originFuture = CompletableFuture.supplyAsync(task, executor)
            .exceptionally((ex) -> {
                printLog(serviceName, ex);
                return defaultValue;
            });
        // 执行定时任务, 检查异步任务是否超时
        return AsyncTimeoutHandler.asyncWithTimeout(originFuture, timeout, unit, defaultValue);
    }
    /**
     * 不带超时时间的异步任务
     * @param task  异步任务
     * @param serviceName   调用的服务名
     * @param executor  线程池
     * @param defaultValue  异步结果的默认值
     * @return  返回异步执行结果
     * @param <T>
     */
    public static <T>CompletableFuture<T> asyncCall(
        Supplier<T> task,
        String serviceName,
        Executor executor,
        T defaultValue
    ) {
        return CompletableFuture.supplyAsync(task, executor)
            .exceptionally((ex)->{
                printLog(serviceName, ex);
                return defaultValue;
            });
    }

    public static void printLog(String serviceName, Throwable ex) {
        log.error("{} 服务调用失败:{}", serviceName, ex.getCause().getMessage());
    }

}
@Slf4j(topic = "c.AsyncTimeoutHandler")
public class AsyncTimeoutHandler {

    /**
     *  创建定时任务线程池
     */
    private static final ScheduledExecutorService SCHEDULED = new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors());

    /**
     *  指定时间内异步任务为执行完成,返回任务执行超时的警告,并返回默认值
     * @param originalTask  异步调用的原始任务
     * @param timeout   超时时间
     * @param unit  超时时间单位
     * @param defaultValue  默认值
     * @return  超时任务和异步任务其中的一个(最快执行的)
     * @param <T>
     */
    public static <T>CompletableFuture<T> asyncWithTimeout(
        CompletableFuture<T> originalTask,
        long timeout,
        TimeUnit unit,
        T defaultValue
    ) {
        // 1. 创建超时返回任务
        CompletableFuture<T> timeoutFuture = new CompletableFuture<>();
        SCHEDULED.schedule(() -> {
            // 触发超时任务,如果原任务未执行完成
            if (!originalTask.isDone()) {
                timeoutFuture.complete(defaultValue);
                // 强制取消原始任务
                originalTask.cancel(true);
                log.warn("任务执行超时, 返回默认值 {}", defaultValue);
            }
        }, timeout, unit);

        // 2. 当 task 或 timeoutFuture 任一任务完成时返回结果, Function.identity() 表示不做任何处理
        return originalTask.applyToEither(timeoutFuture, Function.identity())
            .exceptionally(ex->{
                log.error("任务异常:{}", ex.getCause().getMessage());
                return defaultValue;
            });
    }
}
    /**
     * 测试异常捕获,不带超时时间
     */
    @Test
    public void testEx(){
        Person person = new Person();
        CompletableFuture<Integer> ageF = AsyncUtils.asyncCall(() -> {
            Integer age = 21;
            int i = 1 / 0;
            System.out.println("调用成功年龄为: " + age);
            return age;
        }, "ageService", threadPoolExecutor, null);

        CompletableFuture<String> nameF = AsyncUtils.asyncCall(() -> {
            String name = "zhangsan";
            int i = 1 / 0;
            System.out.println("调用成功名字为: " + name);
            return name;
        }, "nameService", threadPoolExecutor, null);

        CompletableFuture<LocalDate> birthdayF = AsyncUtils.asyncCall(() -> {
            LocalDate birthday = LocalDate.parse("2003-10-29");
            System.out.println("调用成功生日为: " + birthday);
            return birthday;
        }, "birthdayService", threadPoolExecutor, null);

        CompletableFuture<Void> allFutures = CompletableFuture.allOf(ageF, nameF, birthdayF);
        CompletableFuture<Person> resultFuture = allFutures.thenApply(v -> {
            person.setAge(ageF.join());
            person.setName(nameF.join());
            person.setBirthday(birthdayF.join());
            return person;
        });

        System.out.println(resultFuture.join());
        System.out.println("执行结束");

    }

总结

针对于异步调用其他服务时内部产生的异常,直接抽象工具类使用 CompletableFuture 内置的 exceptionally,返回指定的默认值。
针对于异步调用时间过长导致的问题,用户调用工具类时指定最大可忍耐的超时时间,在工具类中创建定义任务,如果到达超时时间后,检查 originalTask 是否被执行成功,如果未执行成功,设置打断标识,打断 originalTask,创建新的超时 timeoutFuture 设置默认值返回 timeoutFuture。如果未达到超时时间,则直接返回 originalTask。

代码地址:https://github.com/HuiLong-Zhou/my-daily/tree/master/src

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注