CompletableFuture源码分析

简介

先说Future, 它用来描述一个异步计算的结果。isDone方法可以用来检查计算是否完成,get方法可以用来获取结果,直到完成前一直阻塞当前线程,cancel方法可以取消任务。而对于结果的获取,只能通过阻塞(get())或者轮询的方式[while(!isDone)]. 阻塞的方式违背了异步编程的理念,轮询的方式耗费无谓的CPU资源(CPU空转)。于是,CompletableFuture应运而生。

样例

后面介绍的源码都会以下面的用例为切入点,循着调用轨迹理解源码。如果任务很耗时,记得传Executor, 或者方法末尾加上future.get(); 因为CompletableFuture默认使用ForkJoinPool, 而ForkJoinPool里面的线程都是daemon线程,主线程跑完了,虚拟机也就over了。

    public void whenComplete() {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100);
        future.whenComplete((l, r) -> System.out.println(l));
    }

    public void thenApply() {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100);
        future.thenApply(i -> -i);
    }

    public void thenAccept() {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100);
        future.thenAccept(System.out::println);
    }

    public void thenRun() {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100);
        future.thenRun(() -> System.out.println("Done"));
    }

    public void thenAcceptBoth() {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100);
        CompletableFuture<Integer> other = CompletableFuture.supplyAsync(() -> 200);
        future.thenAcceptBoth(other, (x, y) -> System.out.println(x + y));
    }

    public void acceptEither() {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100);
        CompletableFuture<Integer> other = CompletableFuture.supplyAsync(() -> 200);
        future.acceptEither(other, System.out::println);

    }

    public void allOf() {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100);
        CompletableFuture<Integer> second = CompletableFuture.supplyAsync(() -> 200);
        CompletableFuture<Integer> third = CompletableFuture.supplyAsync(() -> 300);
        CompletableFuture.allOf(future, second, third);

    }

    public void anyOf() throws InterruptedException, ExecutionException {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100);
        CompletableFuture<Integer> second = CompletableFuture.supplyAsync(() -> 200);
        CompletableFuture<Integer> third = CompletableFuture.supplyAsync(() -> 300);
        CompletableFuture.anyOf(future, second, third);
    }

源码分析

supplyAsync

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
         return asyncSupplyStage(asyncPool, supplier); // asyncPool, ForkJoinPool.commonPool()或者ThreadPerTaskExecutor(实现了Executor接口,里面的内容是{new Thread(r).start();})
     }

asyncSupplyStage

static <U> CompletableFuture<U> asyncSupplyStage(Executor e, Supplier<U> f) {
        if (f == null)
            throw new NullPointerException();
        CompletableFuture<U> d = new CompletableFuture<U>(); // 构建一个新的CompletableFuture, 以此构建AsyncSupply作为Executor的执行参数
        e.execute(new AsyncSupply<U>(d, f)); // AsyncSupply继承了ForkJoinTask, 实现了Runnable, AsynchronousCompletionTask接口
        return d; // 返回d,立返
    }

AsyncSupply

// CompletableFuture的静态内部类,作为一个ForkJoinTask
    static final class AsyncSupply<T> extends ForkJoinTask<Void> implements Runnable, AsynchronousCompletionTask {
        CompletableFuture<T> dep; // AsyncSupply作为一个依赖Task,dep作为这个Task的Future
        Supplier<T> fn; // fn作为这个Task的具体执行逻辑,函数式编程

        AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {
            this.dep = dep;
            this.fn = fn;
        }

        public final Void getRawResult() {
            return null;
        }

        public final void setRawResult(Void v) {
        }

        public final boolean exec() {
            run();
            return true;
        }

        public void run() {
            CompletableFuture<T> d;
            Supplier<T> f;
            if ((d = dep) != null && (f = fn) != null) { // 非空判断
                dep = null;
                fn = null;
                if (d.result == null) { // 查看任务是否结束,如果已经结束(result != null),直接调用postComplete()方法
                    try {
                        d.completeValue(f.get()); // 等待任务结束,并设置结果
                    } catch (Throwable ex) {
                        d.completeThrowable(ex); // 异常
                    }
                }
                d.postComplete(); // 任务结束后,会执行所有依赖此任务的其他任务,这些任务以一个无锁并发栈的形式存在
            }
        }
    }

postComplete()

final void postComplete() {
        CompletableFuture<?> f = this; // 当前CompletableFuture
        Completion h; // 无锁并发栈,(Completion next), 保存的是依靠当前的CompletableFuture一串任务,完成即触发(回调)
        while ((h = f.stack) != null || (f != this && (h = (f = this).stack) != null)) { // 当f的stack为空时,使f重新指向当前的CompletableFuture,继续后面的结点
            CompletableFuture<?> d;
            Completion t;
            if (f.casStack(h, t = h.next)) { // 从头遍历stack,并更新头元素
                if (t != null) {
                    if (f != this) { // 如果f不是当前CompletableFuture,则将它的头结点压入到当前CompletableFuture的stack中,使树形结构变成链表结构,避免递归层次过深
                        pushStack(h);
                        continue; // 继续下一个结点,批量压入到当前栈中
                    }
                    h.next = null; // 如果是当前CompletableFuture, 解除头节点与栈的联系
                }
                f = (d = h.tryFire(NESTED)) == null ? this : d; // 调用头节点的tryFire()方法,该方法可看作Completion的钩子方法,执行完逻辑后,会向后传播的
            }
        }
    }

示意图

每个CompletableFuture持有一个Completion栈stack, 每个Completion持有一个CompletableFuture -> dep, 如此递归循环下去,是层次很深的树形结构,所以想办法将其变成链表结构。

首先取出头结点,下图中灰色Completion结点,它会返回一个CompletableFuture, 同样也拥有一个stack,策略是遍历这个CompletableFuture的stack的每个结点,依次压入到当前CompletableFuture的stack中,关系如下箭头所示,灰色结点指的是处理过的结点。

第一个Completion结点返回的CompletableFuture, 将拥有的stack里面的所有结点都压入了当前CompletableFuture的stack里面

后续的Completion结点返回的CompletableFuture, 将拥有的stack里面的所有结点都压入了当前CompletableFuture的stack里面,重新构成了一个链表结构,后续也按照前面的逻辑操作,如此反复,便会遍历完所有的CompletableFuture, 这些CompletableFuture(叶子结点)的stack为空,也是结束条件。

postComplete()最后调用的是Completion#tryFire()方法,先看下Completion的数据结构

Completion

abstract static class Completion extends ForkJoinTask<Void> implements Runnable, AsynchronousCompletionTask {
        volatile Completion next; // 无锁并发栈

        /**
         * 钩子方法,有三种模式,postComplete()方法里面使用的是NESTED模式,避免过深的递归调用 SYNC, ASYNC, or NESTED
         */
        abstract CompletableFuture<?> tryFire(int mode); // run()和exec()都调用了这个钩子方法

        /** cleanStack()方法里有用到 */
        abstract boolean isLive();

        public final void run() {
            tryFire(ASYNC);
        }

        public final boolean exec() {
            tryFire(ASYNC);
            return true;
        }

        public final Void getRawResult() {
            return null;
        }

        public final void setRawResult(Void v) {
        }
    }
static final int SYNC = 0;       同步
static final int ASYNC = 1;    异步
static final int NESTED = -1; 嵌套

继承了ForkJoinTask, 实现了Runnable, AsynchronousCompletionTask接口,它有诸多子类,如下图

后面的方法都对应着不同的子类。

先看一个子类UniCompletion

abstract static class UniCompletion<T,V> extends Completion {
        Executor executor;                 // 执行器
        CompletableFuture<V> dep;          // 依赖的任务
        CompletableFuture<T> src;          // 被依赖的任务

        UniCompletion(Executor executor, CompletableFuture<V> dep,
                      CompletableFuture<T> src) {
            this.executor = executor; this.dep = dep; this.src = src;
        }

        final boolean claim() { // 如果当前任务可以被执行,返回true,否则,返回false; 保证任务只被执行一次
            Executor e = executor;
            if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {
                if (e == null)
                    return true;
                executor = null; // 设置为不可用
                e.execute(this);
            }
            return false;
        }

        final boolean isLive() { return dep != null; }
    }

claim()方法保证任务只被执行一次。

whenComplete

whenComplete()/whenCompleteAsync()

public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action) {
        return uniWhenCompleteStage(null, action);
    }

    public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action) {
        return uniWhenCompleteStage(asyncPool, action);
    }

xxx和xxxAsync方法的区别是,有没有asyncPool作为入参,有的话,任务直接入参,不检查任务是否完成。uniWhenCompleteStage方法有说明。

uniWhenCompleteStage

private CompletableFuture<T> uniWhenCompleteStage(Executor e, BiConsumer<? super T, ? super Throwable> f) {
        if (f == null)
            throw new NullPointerException();
        CompletableFuture<T> d = new CompletableFuture<T>(); // 构建future
        if (e != null || !d.uniWhenComplete(this, f, null)) { // 如果线程池不为空,直接构建任务入栈,并调用tryFire()方法;否则,调用uniWhenComplete()方法,检查依赖的那个任务是否完成,没有完成返回false,
                                                                // 完成了返回true, 以及后续一些操作。
            UniWhenComplete<T> c = new UniWhenComplete<T>(e, d, this, f); // UniWhenComplete继承了UniCompletion
            push(c);
            c.tryFire(SYNC); // 先调一下钩子方法,检查一下任务是否结束
        }
        return d;
    }

uniWhenComplete

final boolean uniWhenComplete(CompletableFuture<T> a, BiConsumer<? super T, ? super Throwable> f, UniWhenComplete<T> c) {
        Object r;
        T t;
        Throwable x = null;
        if (a == null || (r = a.result) == null || f == null) // 被依赖的任务还未完成
            return false;
        if (result == null) { // 被依赖的任务完成了
            try {
                if (c != null && !c.claim()) // 判断任务是否能被执行
                    return false;
                if (r instanceof AltResult) { // 判断异常,AltResult类型很简单,里面只有一个属性Throwable ex; 
                    x = ((AltResult) r).ex;
                    t = null;
                } else {
                    @SuppressWarnings("unchecked")
                    T tr = (T) r; // 正常的结果
                    t = tr;
                }
                f.accept(t, x); // 执行任务
                if (x == null) {
                    internalComplete(r); // 任务的结果设置为被依赖任务的结果
                    return true;
                }
            } catch (Throwable ex) {
                if (x == null)
                    x = ex; // 记录异常
            }
            completeThrowable(x, r); // 设置异常和结果
        }
        return true;
 
剩余70%内容付费后可查看

发表评论

This site is protected by wp-copyrightpro.com