背景

问题:当查询接口较复杂时候,数据的获取都需要[远程调用],必然需要花费更多的时间。 如下场景所示:

1、查询博客置顶文章
2、查询博客点击排行榜
3、查询博客文章标签
4、查询博客友情链接

上面的描述本博客页面每次访问都需要查询的东西

那么,用户需要4s后才能统计的数据。很显然是不能接受的。 如果有多个线程同时完成这4步操作,也许只需要1s左右即可完成响应。

CompletableFuture介绍

在Java 8中, 新增加了一个包含50个方法左右的类: CompletableFuture,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。 CompletableFuture类实现了Future接口,所以你还是可以像以前一样通过get方法阻塞或者轮询的方式获得结果,但是这种方式不推荐使用。 CompletableFuture和FutureTask同属于Future接口的实现类,都可以获取线程的执行结果。

具体使用说明

单个任务

runAsync:无返回值

ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
    System.out.println("当前线程: " + Thread.currentThread().getName());
    int i = 10 / 2;
    System.out.println("运算结果:" + i);
}, executor);
future1.get();
// 结果:
当前线程: pool-1-thread-1
运算结果:5

supplyAsync:有返回值

whenComplete:能感知异常,能感知结果,但没办法给返回值
exceptionally:能感知异常,不能感知结果,能给返回值。相当于,如果出现异常就返回这个值

ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
    System.out.println("当前线程: " + Thread.currentThread().getName());
    int i = 10 / 0;
    System.out.println("运行结果:" + i);
    return i;
}, executor).whenComplete((res, excption) -> {
    // whenComplete虽然能得到异常信息,但是没办法修改返回值
    System.out.println("异步任务成功完成...结果是:" + res + ";异常是:" + excption);
}).exceptionally(throwable -> {
    // exceptionally能感知异常,而且能返回一个默认值,相当于,如果出现异常就返回这个值
    return 10;
});
System.out.println("返回值:" + future2.get());
// 结果:
当前线程: pool-1-thread-1
异步任务成功完成...结果是:null;异常是:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
返回值:10

supplyAsync:有返回值

handle能拿到返回结果,也能得到异常信息,也能修改返回值

ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(() -> {
    System.out.println("当前线程: " + Thread.currentThread().getName());
    int i = 10 / 2;
    System.out.println("运行结果:" + i);
    return i;
}, executor).handle((res,exception)->{
    if (exception != null) {
        return 0;
    }else{
        return res * 2;
    }
});
System.out.println("返回值:" + future3.get());
// 结果:
当前线程: pool-1-thread-1
运行结果:5
返回值:10

两个任务编排

一个任务执行完了再执行下一个,按照顺序一个一个的执行

thenRunAsync:不能接收上一次的执行结果,也没返回值

ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture<Void> future4 = CompletableFuture.supplyAsync(() -> {
    System.out.println("当前线程: " + Thread.currentThread().getName());
    int i = 10 / 2;
    System.out.println("运行结果:" + i);
    return i;
}, executor).thenRunAsync(() -> { // thenRunAsync:不能接收上一次的执行结果,也没返回值
    System.out.println("任务2启动了.....");
}, executor);
future4.get();
// 结果:
当前线程: pool-1-thread-1
运行结果:5
任务2启动了.....

thenAcceptAsync:能接收上一次的执行结果,但没返回值

ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture<Void> future5 = CompletableFuture.supplyAsync(() -> {
    System.out.println("当前线程: " + Thread.currentThread().getName());
    int i = 10 / 2;
    System.out.println("运行结果:" + i);
    return i;
}, executor).thenAcceptAsync(res -> { // thenAcceptAsync:能接收上一次的执行结果,但没返回值
    System.out.println("任务2启动了, 获取任务1的结果:" + res);
}, executor);
future5.get();
// 结果:
当前线程: pool-1-thread-1
运行结果:5
任务2启动了, 获取任务1的结果:5

thenApplyAsync:能接收上一次的执行结果,还有返回值

ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture<Integer> future6 = CompletableFuture.supplyAsync(() -> {
    System.out.println("当前线程: " + Thread.currentThread().getName());
    int i = 10 / 2;
    System.out.println("运行结果:" + i);
    return i;
}, executor).thenApplyAsync(res -> { // thenApplyAsync:能接收上一次的执行结果,还有返回值
    System.out.println("任务2启动了, 获取任务1的结果:" + res);
    return res * 4;// 最终返回这个值
}, executor);
System.out.println("返回值:" + future6.get());
// 结果:
当前线程: pool-1-thread-1
运行结果:5
任务2启动了, 获取任务1的结果:5
返回值:20

三任务编排

首先准备两个任务:

ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture<Object> future01 = CompletableFuture.supplyAsync(() -> {
    System.out.println("当前线程: " + Thread.currentThread().getName());
    int i = 10 / 2;
    System.out.println("运行结果:" + i);
    return i;
}, executor);
CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {
    System.out.println("当前线程: " + Thread.currentThread().getName());
    int i = 10 / 2;// 这里到时候可以换成 10 / 0,试试
    System.out.println("运行结果:" + i);
    return "hello";
}, executor);

runAfterBothAsync

任务01 任务02都完成了,再开始执行任务03,不感知任务01、02的结果的,也没返回值

CompletableFuture<Void> future03 = future01.runAfterBothAsync(future02, () -> {
    System.out.println("任务3开始");
}, executor);
// 结果:
当前线程: pool-1-thread-1
运行结果:5
当前线程: pool-1-thread-2
任务3开始

thenAcceptBothAsync

任务01 任务02都完成了,再开始执行任务03,能感知到任务01、02的结果,但没返回值

CompletableFuture<Void> future03 = future01.thenAcceptBothAsync(future02, (f1,f2) -> {
    System.out.println("任务3开始...得到之前的结果:01的结果:" + f1 + ", 02的结果:" + f2);
}, executor);
// 结果:
当前线程: pool-1-thread-1
运行结果:5
当前线程: pool-1-thread-2
运行结果:5
任务3开始...得到之前的结果:01的结果:5, 02的结果:hello

thenCombineAsync

任务01 任务02都完成了,再开始执行任务03,能感知到任务01、02的结果,而且自己可以带返回值

CompletableFuture<String> future03 = future01.thenCombineAsync(future02, (f1,f2) -> {
    System.out.println("任务3开始...得到之前的结果:01的结果:" + f1 + ", 02的结果:" + f2);
    return "任务3结束";
}, executor);
System.out.println("返回值:" + future03.get());
// 结果:
当前线程: pool-1-thread-1
运行结果:5
当前线程: pool-1-thread-2
运行结果:5
任务3开始...得到之前的结果:01的结果:5, 02的结果:hello
返回值:任务3结束

runAfterEitherAsync

三任务组合,只要01完成,就执行任务03,不感知结果,自己没返回值

CompletableFuture<Void> future03 = future01.runAfterEitherAsync(future02, () -> {
    System.out.println("任务3开始......");
}, executor);
// 结果:
当前线程: pool-1-thread-1
当前线程: pool-1-thread-2
运行结果:5
运行结果:5
任务3开始......

这里需要注意的是,如果01出现了异常,那么就不会执行03,但还会运行01的,下面方法的也是一样

acceptEitherAsync

三任务组合,只要01完成,就执行任务03,感知结果,自己没返回值

CompletableFuture<Void> future03 = future01.acceptEitherAsync(future02, (res) -> {
    System.out.println("任务3开始...得01的结果:" + res);
}, executor);
// 结果:
当前线程: pool-1-thread-1
当前线程: pool-1-thread-2
运行结果:5
运行结果:5
任务3开始...得01的结果:5

这里是获取的01任务的结果,而不是02任务的。

applyToEitherAsync

三任务组合,只要01完成,就执行任务03,感知结果,有返回值

CompletableFuture<String> future03 = future01.applyToEitherAsync(future02, (res) -> {
    System.out.println("任务3开始...得到01的结果:" + res);
    return "任务3结束";
}, executor);
System.out.println("返回值:" + future03.get());
// 结果:
当前线程: pool-1-thread-1
当前线程: pool-1-thread-2
运行结果:5
运行结果:5
任务3开始...得到之前的结果:5
返回值:任务3结束

多任务的编排

准备三个任务:

ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture<String> futureImg = CompletableFuture.supplyAsync(() -> {
    System.out.println("查询轮播图列表...");
    return "轮播图List";
}, executor);

CompletableFuture<String> futureNotice = CompletableFuture.supplyAsync(() -> {
    System.out.println("查询公告列表...");
    return "公告List";
}, executor);

CompletableFuture<String> futureMsg = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(3000);
        System.out.println("查询消息列表...");
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "消息List";
}, executor);

allOf:所有任务都执行完

CompletableFuture<Void> futureAllOf = CompletableFuture.allOf(futureImg, futureNotice, futureMsg);
futureAllOf.get();// 等待所有结果执行完成

anyOf:其中有一个任务执行完就可以

CompletableFuture<Object> futureAnyOf = CompletableFuture.anyOf(futureImg, futureNotice, futureMsg);
futureAnyOf.get();

CompletableFuture实战

上面所使用的的创建线程方式并不推荐,实际工作中还是使用 ThreadPoolExecutor 来创建线程池。

创建线程池

@Bean(name = "threadPoolTaskExecutor")
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    // 1: 创建核心线程数
    executor.setCorePoolSize(50);
    // 2:线程池维护线程的最大数量,只有在缓存队列满了之后才会申请超过核心线程数的线程
    executor.setMaxPoolSize(100);
    // 3:缓存队列 可以写大一点无非就浪费一点内存空间
    executor.setQueueCapacity(100);
    // 4:线程的空闲事件,当超过了核心线程数之外的线程在达到指定的空闲时间会被销毁
    executor.setKeepAliveSeconds(60);
    // 5:线程池中的线程的名称前缀
    executor.setThreadNamePrefix("test");
    /* 当线程的任务缓存队列已满并且线程池中的线程数量已经达到了最大连接数,如果还有任务来就会采取拒绝策略,
     * 通常有四种策略:
     * AbortPolicy:当线程池无法接受新任务时,会抛出 RejectedExecutionException异常,这就意味着新任务不会被加入队列中,也不会被执行。
     * DiscardPolicy:丢弃任务,但是不抛出异常
     * DiscardOldestPolicy: 丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
     * CallerRunsPolicy:重试添加当前的任务,自动重复调用execute()方法,直到成功。
     *                   扩展重试3次,如果3次都不成功在移除。
     * */
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    return executor;
}

创建线程池

@Autowired
@Qualifier("threadPoolTaskExecutor")
private ThreadPoolTaskExecutor executor;

@RequestMapping("/index")
public String index(Model model,HttpServletRequest req) {
    CompletableFuture<String> futureImg = CompletableFuture.runAsync(() -> {
        System.out.println("查询轮播图列表...");
        model.addAttribute("futureImg", "轮播图List");
    }, executor);
    CompletableFuture<String> futureNotice = CompletableFuture.runAsync(() -> {
        System.out.println("查询公告列表...");
        model.addAttribute("futureNotice", "公告List");
    }, executor);
    CompletableFuture<String> futureMsg = CompletableFuture.runAsync(() -> {
        System.out.println("查询消息列表...");
        model.addAttribute("futureMsg", "消息List");
    }, executor);
    CompletableFuture<Void> futureAllOf = CompletableFuture.allOf(futureImg, futureNotice, futureMsg);
    // 等待所有结果执行完成
    futureAllOf.get();
    return "xxx/index";
}