青年IT男

个人从事金融行业,就职过易极付、思建科技等重庆一流技术团队,目前就职于某网约车平台负责整个支付系统建设。自身对金融行业有强烈的爱好。同时也实践大数据、数据存储、自动化集成和部署、分布式微服务、人工智能等领域。

CompletableFuture源码分析

CompletableFuture源码分析

简介

先说Future, 它用来描述一个异步计算的结果。isDone方法可以用来检查计算是否完成,get方法可以用来获取结果,直到完成前一直阻塞当前线程,cancel方法可以取消任务。而对于结果的获取,只能通过阻塞(get())或者轮询的方式[while(!isDone)]. 阻塞的方式违背了异步编程的理念,轮询的方式耗费无谓的CPU资源(CPU空转)。于是,CompletableFuture应运而生。

样例

后面介绍的源码都会以下面的用例为切入点,循着调用轨迹理解源码。如果任务很耗时,记得传Executor, 或者方法末尾加上future.get(); 因为CompletableFuture默认使用ForkJoinPool, 而ForkJoinPool里面的线程都是daemon线程,主线程跑完了,虚拟机也就over了。

    public void whenComplete() {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100);
        future.whenComplete((l, r) -> System.out.println(l));
    }

    public void thenApply() {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100);
        future.thenApply(i -> -i);
    }

    public void thenAccept() {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100);
        future.thenAccept(System.out::println);
    }

    public void thenRun() {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100);
        future.thenRun(() -> System.out.println("Done"));
    }

    public void thenAcceptBoth() {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100);
        CompletableFuture<Integer> other = CompletableFuture.supplyAsync(() -> 200);
        future.thenAcceptBoth(other, (x, y) -> System.out.println(x + y));
    }

    public void acceptEither() {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100);
        CompletableFuture<Integer> other = CompletableFuture.supplyAsync(() -> 200);
        future.acceptEither(other, System.out::println);

    }

    public void allOf() {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100);
        CompletableFuture<Integer> second = CompletableFuture.supplyAsync(() -> 200);
        CompletableFuture<Integer> third = CompletableFuture.supplyAsync(() -> 300);
        CompletableFuture.allOf(future, second, third);

    }

    public void anyOf() throws InterruptedException, ExecutionException {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100);
        CompletableFuture<Integer> second = CompletableFuture.supplyAsync(() -> 200);
        CompletableFuture<Integer> third = CompletableFuture.supplyAsync(() -> 300);
        CompletableFuture.anyOf(future, second, third);
    }

源码分析

supplyAsync

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
         return asyncSupplyStage(asyncPool, supplier); // asyncPool, ForkJoinPool.commonPool()或者ThreadPerTaskExecutor(实现了Executor接口,里面的内容是{new Thread(r).start();})
     }

asyncSupplyStage

static <U> CompletableFuture<U> asyncSupplyStage(Executor e, Supplier<U> f) {
        if (f == null)
            throw new NullPointerException();
        CompletableFuture<U> d = new CompletableFuture<U>(); // 构建一个新的CompletableFuture, 以此构建AsyncSupply作为Executor的执行参数
        e.execute(new AsyncSupply<U>(d, f)); // AsyncSupply继承了ForkJoinTask, 实现了Runnable, AsynchronousCompletionTask接口
        return d; // 返回d,立返
    }

AsyncSupply

// CompletableFuture的静态内部类,作为一个ForkJoinTask
    static final class AsyncSupply<T> extends ForkJoinTask<Void> implements Runnable, AsynchronousCompletionTask {
        CompletableFuture<T> dep; // AsyncSupply作为一个依赖Task,dep作为这个Task的Future
        Supplier<T> fn; // fn作为这个Task的具体执行逻辑,函数式编程

        AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {
            this.dep = dep;
            this.fn = fn;
        }

        public final Void getRawResult() {
            return null;
        }

        public final void setRawResult(Void v) {
        }

        public final boolean exec() {
            run();
            return true;
        }

        public void run() {
            CompletableFuture<T> d;
            Supplier<T> f;
            if ((d = dep) != null && (f = fn) != null) { // 非空判断
                dep = null;
                fn = null;
                if (d.result == null) { // 查看任务是否结束,如果已经结束(result != null),直接调用postComplete()方法
                    try {
                        d.completeValue(f.get()); // 等待任务结束,并设置结果
                    } catch (Throwable ex) {
                        d.completeThrowable(ex); // 异常
                    }
                }
                d.postComplete(); // 任务结束后,会执行所有依赖此任务的其他任务,这些任务以一个无锁并发栈的形式存在
            }
        }
    }

postComplete()

final void postComplete() {
        CompletableFuture<?> f = this; // 当前CompletableFuture
        Completion h; // 无锁并发栈,(Completion next), 保存的是依靠当前的CompletableFuture一串任务,完成即触发(回调)
        while ((h = f.stack) != null || (f != this && (h = (f = this).stack) != null)) { // 当f的stack为空时,使f重新指向当前的CompletableFuture,继续后面的结点
            CompletableFuture<?> d;
            Completion t;
            if (f.casStack(h, t = h.next)) { // 从头遍历stack,并更新头元素
                if (t != null) {
                    if (f != this) { // 如果f不是当前CompletableFuture,则将它的头结点压入到当前CompletableFuture的stack中,使树形结构变成链表结构,避免递归层次过深
                        pushStack(h);
                        continue; // 继续下一个结点,批量压入到当前栈中
                    }
                    h.next = null; // 如果是当前CompletableFuture, 解除头节点与栈的联系
                }
                f = (d = h.tryFire(NESTED)) == null ? this : d; // 调用头节点的tryFire()方法,该方法可看作Completion的钩子方法,执行完逻辑后,会向后传播的
            }
        }
    }

示意图

每个CompletableFuture持有一个Completion栈stack, 每个Completion持有一个CompletableFuture -> dep, 如此递归循环下去,是层次很深的树形结构,所以想办法将其变成链表结构。

首先取出头结点,下图中灰色Completion结点,它会返回一个CompletableFuture, 同样也拥有一个stack,策略是遍历这个CompletableFuture的stack的每个结点,依次压入到当前CompletableFuture的stack中,关系如下箭头所示,灰色结点指的是处理过的结点。

第一个Completion结点返回的CompletableFuture, 将拥有的stack里面的所有结点都压入了当前CompletableFuture的stack里面

后续的Completion结点返回的CompletableFuture, 将拥有的stack里面的所有结点都压入了当前CompletableFuture的stack里面,重新构成了一个链表结构,后续也按照前面的逻辑操作,如此反复,便会遍历完所有的CompletableFuture, 这些CompletableFuture(叶子结点)的stack为空,也是结束条件。

postComplete()最后调用的是Completion#tryFire()方法,先看下Completion的数据结构

Completion

abstract static class Completion extends ForkJoinTask<Void> implements Runnable, AsynchronousCompletionTask {
        volatile Completion next; // 无锁并发栈

        /**
         * 钩子方法,有三种模式,postComplete()方法里面使用的是NESTED模式,避免过深的递归调用 SYNC, ASYNC, or NESTED
         */
        abstract CompletableFuture<?> tryFire(int mode); // run()和exec()都调用了这个钩子方法

        /** cleanStack()方法里有用到 */
        abstract boolean isLive();

        public final void run() {
            tryFire(ASYNC);
        }

        public final boolean exec() {
            tryFire(ASYNC);
            return true;
        }

        public final Void getRawResult() {
            return null;
        }

        public final void setRawResult(Void v) {
        }
    }
static final int SYNC = 0;       同步
static final int ASYNC = 1;    异步
static final int NESTED = -1; 嵌套

继承了ForkJoinTask, 实现了Runnable, AsynchronousCompletionTask接口,它有诸多子类,如下图

后面的方法都对应着不同的子类。

先看一个子类UniCompletion

abstract static class UniCompletion<T,V> extends Completion {
        Executor executor;                 // 执行器
        CompletableFuture<V> dep;          // 依赖的任务
        CompletableFuture<T> src;          // 被依赖的任务

        UniCompletion(Executor executor, CompletableFuture<V> dep,
                      CompletableFuture<T> src) {
            this.executor = executor; this.dep = dep; this.src = src;
        }

        final boolean claim() { // 如果当前任务可以被执行,返回true,否则,返回false; 保证任务只被执行一次
            Executor e = executor;
            if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {
                if (e == null)
                    return true;
                executor = null; // 设置为不可用
                e.execute(this);
            }
            return false;
        }

        final boolean isLive() { return dep != null; }
    }

claim()方法保证任务只被执行一次。

whenComplete

whenComplete()/whenCompleteAsync()

public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action) {
        return uniWhenCompleteStage(null, action);
    }

    public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action) {
        return uniWhenCompleteStage(asyncPool, action);
    }

xxx和xxxAsync方法的区别是,有没有asyncPool作为入参,有的话,任务直接入参,不检查任务是否完成。uniWhenCompleteStage方法有说明。

uniWhenCompleteStage

private CompletableFuture<T> uniWhenCompleteStage(Executor e, BiConsumer<? super T, ? super Throwable> f) {
        if (f == null)
            throw new NullPointerException();
        CompletableFuture<T> d = new CompletableFuture<T>(); // 构建future
        if (e != null || !d.uniWhenComplete(this, f, null)) { // 如果线程池不为空,直接构建任务入栈,并调用tryFire()方法;否则,调用uniWhenComplete()方法,检查依赖的那个任务是否完成,没有完成返回false,
                                                                // 完成了返回true, 以及后续一些操作。
            UniWhenComplete<T> c = new UniWhenComplete<T>(e, d, this, f); // UniWhenComplete继承了UniCompletion
            push(c);
            c.tryFire(SYNC); // 先调一下钩子方法,检查一下任务是否结束
        }
        return d;
    }

uniWhenComplete

final boolean uniWhenComplete(CompletableFuture<T> a, BiConsumer<? super T, ? super Throwable> f, UniWhenComplete<T> c) {
        Object r;
        T t;
        Throwable x = null;
        if (a == null || (r = a.result) == null || f == null) // 被依赖的任务还未完成
            return false;
        if (result == null) { // 被依赖的任务完成了
            try {
                if (c != null && !c.claim()) // 判断任务是否能被执行
                    return false;
                if (r instanceof AltResult) { // 判断异常,AltResult类型很简单,里面只有一个属性Throwable ex; 
                    x = ((AltResult) r).ex;
                    t = null;
                } else {
                    @SuppressWarnings("unchecked")
                    T tr = (T) r; // 正常的结果
                    t = tr;
                }
                f.accept(t, x); // 执行任务
                if (x == null) {
                    internalComplete(r); // 任务的结果设置为被依赖任务的结果
                    return true;
                }
            } catch (Throwable ex) {
                if (x == null)
                    x = ex; // 记录异常
            }
            completeThrowable(x, r); // 设置异常和结果
        }
        return true;
    }

push()

    final void push(UniCompletion<?, ?> c) {
        if (c != null) {
            while (result == null && !tryPushStack(c))
                lazySetNext(c, null); // 失败重置c的next域
        }
    }
    final boolean tryPushStack(Completion c) {
        Completion h = stack;
        lazySetNext(c, h);
        return UNSAFE.compareAndSwapObject(this, STACK, h, c);
    }
    static void lazySetNext(Completion c, Completion next) {
        UNSAFE.putOrderedObject(c, NEXT, next);
    }

UniWhenComplete

static final class UniWhenComplete<T> extends UniCompletion<T, T> {
        BiConsumer<? super T, ? super Throwable> fn;
        UniWhenComplete(Executor executor, CompletableFuture<T> dep, CompletableFuture<T> src,
                BiConsumer<? super T, ? super Throwable> fn) {
            super(executor, dep, src);
            this.fn = fn;
        }

        final CompletableFuture<T> tryFire(int mode) { // 钩子方法
            CompletableFuture<T> d; // 依赖的任务
            CompletableFuture<T> a; // 被依赖的任务
            if ((d = dep) == null || !d.uniWhenComplete(a = src, fn, mode > 0 ? null : this)) // 如果是异步模式(mode = 1),就不判断任务是否结束
                return null; // dep为空,说明已经调用过了
            dep = null;
            src = null;
            fn = null;
            return d.postFire(a, mode); // 钩子方法之后的处理
        }
    }

postFire()

final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) {
        if (a != null && a.stack != null) { // 被依赖的任务存在,且stack不为空,先处理它
            if (mode < 0 || a.result == null) // 如果是嵌套模式(mode = -1), 或者任务的结果为空,直接清空栈
                a.cleanStack();
            else
                a.postComplete(); // 否则,调用postComplete()方法
        }
        if (result != null && stack != null) { // 再处理当前任务
            if (mode < 0) // 嵌套模式,直接返回自身(树 -> 链表,避免过深的递归调用)
                return this;
            else
                postComplete(); // 调用postComplete()方法
        }
        return null;
    }

cleanStack()

final void cleanStack() { // 过滤掉已经死掉的结点(Not isLive)
        for (Completion p = null, q = stack; q != null;) { // q指针从头节点开始,向右移动,s一直执行q的下一个结点,p要么为空,要么指向遍历过的最后一个活着的结点,一旦发现q死掉了,就断开q, 连接p, s
            Completion s = q.next;
            if (q.isLive()) { // 还活着,p指向遍历过的最后一个结点,q向右移动
                p = q;
                q = s;
            } else if (p == null) { // 说明第一个结点就是死掉的,cas stack, q指向stack
                casStack(q, s);
                q = stack;
            } else { // 否则的话,连接p, s
                p.next = s;
                if (p.isLive()) // 再次判断p结点是否还或者(在这期间是否有别的线程改动了)
                    q = s; // 还活着,q继续向右移动
                else {
                    p = null; // 过期的值,从新开始
                    q = stack;
                }
            }
        }
    }

如下图

  1. 第1个结点是无效结点,更新stack,更新指针

  1. 第2个结点是有效结点,更新指针

  1. 第3个结点是无效结点,更新指针

  1. 第4个结点是有效结点,更新指针

thenApply

public <U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> fn) {
        return uniApplyStage(null, fn);
    }

    public <U> CompletableFuture<U> thenApplyAsync(Function<? super T, ? extends U> fn) {
        return uniApplyStage(asyncPool, fn);
    }

    private <V> CompletableFuture<V> uniApplyStage(Executor e, Function<? super T, ? extends V> f) {
        if (f == null)
            throw new NullPointerException();
        CompletableFuture<V> d = new CompletableFuture<V>();
        if (e != null || !d.uniApply(this, f, null)) {
            UniApply<T, V> c = new UniApply<T, V>(e, d, this, f);
            push(c);
            c.tryFire(SYNC);
        }
        return d;
    }

    final <S> boolean uniApply(CompletableFuture<S> a, Function<? super S, ? extends T> f, UniApply<S, T> c) {
        Object r;
        Throwable x;
        if (a == null || (r = a.result) == null || f == null)
            return false;
        tryComplete: if (result == null) {
            if (r instanceof AltResult) {
                if ((x = ((AltResult) r).ex) != null) {
                    completeThrowable(x, r); // 有异常,直接跳出
                    break tryComplete;
                }
                r = null;
            }
            try {
                if (c != null && !c.claim())
                    return false;
                @SuppressWarnings("unchecked")
                S s = (S) r;
                completeValue(f.apply(s));
            } catch (Throwable ex) {
                completeThrowable(ex);
            }
        }
        return true;
    }

    static final class UniApply<T, V> extends UniCompletion<T, V> {
        Function<? super T, ? extends V> fn;

        UniApply(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src,
                Function<? super T, ? extends V> fn) {
            super(executor, dep, src);
            this.fn = fn;
        }

        final CompletableFuture<V> tryFire(int mode) {
            CompletableFuture<V> d;
            CompletableFuture<T> a;
            if ((d = dep) == null || !d.uniApply(a = src, fn, mode > 0 ? null : this))
                return null;
            dep = null;
            src = null;
            fn = null;
            return d.postFire(a, mode);
        }
    }

一样的套路,thenApply/thenApplyAsync -> uniApplyStage -> uniApply -> tryFire -> postFire

thenAccept

public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
        return uniAcceptStage(null, action);
    }

    public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
        return uniAcceptStage(asyncPool, action);
    }

    private CompletableFuture<Void> uniAcceptStage(Executor e, Consumer<? super T> f) {
        if (f == null)
            throw new NullPointerException();
        CompletableFuture<Void> d = new CompletableFuture<Void>();
        if (e != null || !d.uniAccept(this, f, null)) {
            UniAccept<T> c = new UniAccept<T>(e, d, this, f);
            push(c);
            c.tryFire(SYNC);
        }
        return d;
    }

    final <S> boolean uniAccept(CompletableFuture<S> a, Consumer<? super S> f, UniAccept<S> c) {
        Object r;
        Throwable x;
        if (a == null || (r = a.result) == null || f == null)
            return false;
        tryComplete: if (result == null) {
            if (r instanceof AltResult) {
                if ((x = ((AltResult) r).ex) != null) {
                    completeThrowable(x, r); // 有异常直接跳出
                    break tryComplete;
                }
                r = null;
            }
            try {
                if (c != null && !c.claim())
                    return false;
                @SuppressWarnings("unchecked")
                S s = (S) r;
                f.accept(s);
                completeNull();
            } catch (Throwable ex) {
                completeThrowable(ex);
            }
        }
        return true;
    }

    static final class UniAccept<T> extends UniCompletion<T, Void> {
        Consumer<? super T> fn;

        UniAccept(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, Consumer<? super T> fn) {
            super(executor, dep, src);
            this.fn = fn;
        }

        final CompletableFuture<Void> tryFire(int mode) {
            CompletableFuture<Void> d;
            CompletableFuture<T> a;
            if ((d = dep) == null || !d.uniAccept(a = src, fn, mode > 0 ? null : this))
                return null;
            dep = null;
            src = null;
            fn = null;
            return d.postFire(a, mode);
        }
    }

thenAccept/thenAcceptAsync -> uniAcceptStage -> uniAccept -> tryFire -> postFire

thenRun

public CompletableFuture<Void> thenRun(Runnable action) {
        return uniRunStage(null, action);
    }

    public CompletableFuture<Void> thenRunAsync(Runnable action) {
        return uniRunStage(asyncPool, action);
    }

    private CompletableFuture<Void> uniRunStage(Executor e, Runnable f) {
        if (f == null)
            throw new NullPointerException();
        CompletableFuture<Void> d = new CompletableFuture<Void>();
        if (e != null || !d.uniRun(this, f, null)) {
            UniRun<T> c = new UniRun<T>(e, d, this, f);
            push(c);
            c.tryFire(SYNC);
        }
        return d;
    }

    final boolean uniRun(CompletableFuture<?> a, Runnable f, UniRun<?> c) {
        Object r;
        Throwable x;
        if (a == null || (r = a.result) == null || f == null)
            return false;
        if (result == null) {
            if (r instanceof AltResult && (x = ((AltResult) r).ex) != null)
                completeThrowable(x, r);
            else
                try {
                    if (c != null && !c.claim())
                        return false;
                    f.run();
                    completeNull();
                } catch (Throwable ex) {
                    completeThrowable(ex);
                }
        }
        return true;
    }

    static final class UniRun<T> extends UniCompletion<T, Void> {
        Runnable fn;

        UniRun(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, Runnable fn) {
            super(executor, dep, src);
            this.fn = fn;
        }

        final CompletableFuture<Void> tryFire(int mode) {
            CompletableFuture<Void> d;
            CompletableFuture<T> a;
            if ((d = dep) == null || !d.uniRun(a = src, fn, mode > 0 ? null : this))
                return null;
            dep = null;
            src = null;
            fn = null;
            return d.postFire(a, mode);
        }
    }

thenRun/thenRunAsync -> uniRunStage -> uniRun -> tryFire -> postFire

thenAcceptBoth

public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other,
            BiConsumer<? super T, ? super U> action) {
        return biAcceptStage(null, other, action);
    }

    public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,
            BiConsumer<? super T, ? super U> action) {
        return biAcceptStage(asyncPool, other, action);
    }

biAcceptStage

 private <U> CompletableFuture<Void> biAcceptStage(Executor e, CompletionStage<U> o,
            BiConsumer<? super T, ? super U> f) {
        CompletableFuture<U> b;
        if (f == null || (b = o.toCompletableFuture()) == null)
            throw new NullPointerException();
        CompletableFuture<Void> d = new CompletableFuture<Void>();
        if (e != null || !d.biAccept(this, b, f, null)) {
            BiAccept<T, U> c = new BiAccept<T, U>(e, d, this, b, f);
            bipush(b, c);
            c.tryFire(SYNC);
        }
        return d;
    }

bipush

final void bipush(CompletableFuture<?> b, BiCompletion<?, ?, ?> c) {
        if (c != null) {
            Object r;
            while ((r = result) == null && !tryPushStack(c)) // a的result还没准备好,c压入栈
                lazySetNext(c, null); // 失败重置c的next域
            if (b != null && b != this && b.result == null) { // b的result也还没准备好
                Completion q = (r != null) ? c : new CoCompletion(c); // 根据a的result决定是否构建CoCompletion, 如果a未结束,则构建一个CoCompletion, CoCompletion最后调用的也是BiCompletion的tryFire
                while (b.result == null && !b.tryPushStack(q)) // 将q压入栈
                    lazySetNext(q, null); // 失败重置q的next域
            }
        }
    }

CoCompletion

static final class CoCompletion extends Completion {
        BiCompletion<?, ?, ?> base;

        CoCompletion(BiCompletion<?, ?, ?> base) {
            this.base = base;
        }

        final CompletableFuture<?> tryFire(int mode) {
            BiCompletion<?, ?, ?> c;
            CompletableFuture<?> d;
            if ((c = base) == null || (d = c.tryFire(mode)) == null) // 调用的还是BiCompletion的tryFire方法
                return null;
            base = null;
            return d;
        }

        final boolean isLive() {
            BiCompletion<?, ?, ?> c;
            return (c = base) != null && c.dep != null;
        }
    }

biAccept

final <R, S> boolean biAccept(CompletableFuture<R> a, CompletableFuture<S> b, BiConsumer<? super R, ? super S> f,
            BiAccept<R, S> c) {
        Object r, s;
        Throwable x;
        if (a == null || (r = a.result) == null || b == null || (s = b.result) == null || f == null)
            return false; // a和b都完成了,才会往下走
        tryComplete: if (result == null) {
            if (r instanceof AltResult) {
                if ((x = ((AltResult) r).ex) != null) { // a的异常检查
                    completeThrowable(x, r);
                    break tryComplete;
                }
                r = null;
            }
            if (s instanceof AltResult) {
                if ((x = ((AltResult) s).ex) != null) { // b的异常检查
                    completeThrowable(x, s);
                    break tryComplete;
                }
                s = null;
            }
            try {
                if (c != null && !c.claim())
                    return false;
                @SuppressWarnings("unchecked")
                R rr = (R) r;
                @SuppressWarnings("unchecked")
                S ss = (S) s;
                f.accept(rr, ss); // 执行任务
                completeNull();
            } catch (Throwable ex) {
                completeThrowable(ex);
            }
        }
        return true;
    }

BiAccept

static final class BiAccept<T, U> extends BiCompletion<T, U, Void> {
        BiConsumer<? super T, ? super U> fn;

        BiAccept(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, CompletableFuture<U> snd,
                BiConsumer<? super T, ? super U> fn) {
            super(executor, dep, src, snd);
            this.fn = fn;
        }

        final CompletableFuture<Void> tryFire(int mode) {
            CompletableFuture<Void> d;
            CompletableFuture<T> a;
            CompletableFuture<U> b;
            if ((d = dep) == null || !d.biAccept(a = src, b = snd, fn, mode > 0 ? null : this))
                return null;
            dep = null;
            src = null;
            snd = null;
            fn = null;
            return d.postFire(a, b, mode);
        }
    }

    abstract static class BiCompletion<T, U, V> extends UniCompletion<T, V> {
        CompletableFuture<U> snd; // second source for action

        BiCompletion(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, CompletableFuture<U> snd) {
            super(executor, dep, src);
            this.snd = snd;
        }
    }

thenAcceptBoth/thenAcceptBothAsync -> biAcceptStage -> biAccept -> tryFire -> postFire

acceptEither

public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action) {
        return orAcceptStage(null, other, action);
    }

    public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action) {
        return orAcceptStage(asyncPool, other, action);
    }

    private <U extends T> CompletableFuture<Void> orAcceptStage(Executor e, CompletionStage<U> o,
            Consumer<? super T> f) {
        CompletableFuture<U> b;
        if (f == null || (b = o.toCompletableFuture()) == null)
            throw new NullPointerException();
        CompletableFuture<Void> d = new CompletableFuture<Void>();
        if (e != null || !d.orAccept(this, b, f, null)) {
            OrAccept<T, U> c = new OrAccept<T, U>(e, d, this, b, f);
            orpush(b, c);
            c.tryFire(SYNC);
        }
        return d;
    }

    final <R, S extends R> boolean orAccept(CompletableFuture<R> a, CompletableFuture<S> b, Consumer<? super R> f,
            OrAccept<R, S> c) {
        Object r;
        Throwable x;
        if (a == null || b == null || ((r = a.result) == null && (r = b.result) == null) || f == null)
            return false; // a和b有一个完成了就往下走
        tryComplete: if (result == null) {
            try {
                if (c != null && !c.claim())
                    return false;
                if (r instanceof AltResult) { // 异常
                    if ((x = ((AltResult) r).ex) != null) {
                        completeThrowable(x, r);
                        break tryComplete;
                    }
                    r = null;
                }
                @SuppressWarnings("unchecked")
                R rr = (R) r;
                f.accept(rr); // 执行
                completeNull();
            } catch (Throwable ex) {
                completeThrowable(ex);
            }
        }
        return true;
    }

    static final class OrAccept<T, U extends T> extends BiCompletion<T, U, Void> {
        Consumer<? super T> fn;

        OrAccept(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, CompletableFuture<U> snd,
                Consumer<? super T> fn) {
            super(executor, dep, src, snd);
            this.fn = fn;
        }

        final CompletableFuture<Void> tryFire(int mode) {
            CompletableFuture<Void> d;
            CompletableFuture<T> a;
            CompletableFuture<U> b;
            if ((d = dep) == null || !d.orAccept(a = src, b = snd, fn, mode > 0 ? null : this))
                return null;
            dep = null;
            src = null;
            snd = null;
            fn = null;
            return d.postFire(a, b, mode);
        }
    }

    final void orpush(CompletableFuture<?> b, BiCompletion<?, ?, ?> c) {
        if (c != null) {
            while ((b == null || b.result == null) && result == null) { // a和b的result都没好,才会考虑入栈
                if (tryPushStack(c)) { // 先入a的栈
                    if (b != null && b != this && b.result == null) { // 入a的栈成功,b的result还没好
                        Completion q = new CoCompletion(c); // a还未结束,用c构建CoCompletion
                        while (result == null && b.result == null && !b.tryPushStack(q)) // 再次判断,a和b的result都没好,才会考虑入栈
                            lazySetNext(q, null); // 失败置空q的next域
                    }
                    break;
                }
                lazySetNext(c, null); // 失败置空c的next域
            }
        }
    }

acceptEither/acceptEitherAsync -> orAcceptStage -> orAccept -> tryFire -> postFire

allOf

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
        return andTree(cfs, 0, cfs.length - 1);
    }

    static CompletableFuture<Void> andTree(CompletableFuture<?>[] cfs, int lo, int hi) { // 将一个数组构建成一棵树,二叉树,动态规划
        CompletableFuture<Void> d = new CompletableFuture<Void>();
        if (lo > hi) // empty
            d.result = NIL;
        else {
            CompletableFuture<?> a, b;
            int mid = (lo + hi) >>> 1;
            if ((a = (lo == mid ? cfs[lo] : andTree(cfs, lo, mid))) == null
                    || (b = (lo == hi ? a : (hi == mid + 1) ? cfs[hi] : andTree(cfs, mid + 1, hi))) == null)
                throw new NullPointerException();
            if (!d.biRelay(a, b)) {
                BiRelay<?, ?> c = new BiRelay<>(d, a, b);
                a.bipush(b, c); // both
                c.tryFire(SYNC);
            }
        }
        return d;
    }

    static final class BiRelay<T, U> extends BiCompletion<T, U, Void> { // for And
        BiRelay(CompletableFuture<Void> dep, CompletableFuture<T> src, CompletableFuture<U> snd) {
            super(null, dep, src, snd);
        }

        final CompletableFuture<Void> tryFire(int mode) {
            CompletableFuture<Void> d;
            CompletableFuture<T> a;
            CompletableFuture<U> b;
            if ((d = dep) == null || !d.biRelay(a = src, b = snd))
                return null;
            src = null;
            snd = null;
            dep = null;
            return d.postFire(a, b, mode);
        }
    }

    boolean biRelay(CompletableFuture<?> a, CompletableFuture<?> b) {
        Object r, s;
        Throwable x;
        if (a == null || (r = a.result) == null || b == null || (s = b.result) == null)
            return false; // a和b都结束了才往下执行
        if (result == null) {
            if (r instanceof AltResult && (x = ((AltResult) r).ex) != null)
                completeThrowable(x, r);
            else if (s instanceof AltResult && (x = ((AltResult) s).ex) != null)
                completeThrowable(x, s);
            else
                completeNull(); // 辅助结点,什么都不做
        }
        return true;
    }

allOf -> andTree -> biRelay -> tryFire -> postFire

anyOf

public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
        return orTree(cfs, 0, cfs.length - 1);
    }

    static CompletableFuture<Object> orTree(CompletableFuture<?>[] cfs, int lo, int hi) { // 将一个数组构建成一棵树,二叉树,动态规划
        CompletableFuture<Object> d = new CompletableFuture<Object>();
        if (lo <= hi) {
            CompletableFuture<?> a, b;
            int mid = (lo + hi) >>> 1;
            if ((a = (lo == mid ? cfs[lo] : orTree(cfs, lo, mid))) == null
                    || (b = (lo == hi ? a : (hi == mid + 1) ? cfs[hi] : orTree(cfs, mid + 1, hi))) == null)
                throw new NullPointerException();
            if (!d.orRelay(a, b)) {
                OrRelay<?, ?> c = new OrRelay<>(d, a, b);
                a.orpush(b, c);
                c.tryFire(SYNC);
            }
        }
        return d;
    }

    static final class OrRelay<T, U> extends BiCompletion<T, U, Object> { // for Or
        OrRelay(CompletableFuture<Object> dep, CompletableFuture<T> src, CompletableFuture<U> snd) {
            super(null, dep, src, snd);
        }

        final CompletableFuture<Object> tryFire(int mode) {
            CompletableFuture<Object> d;
            CompletableFuture<T> a;
            CompletableFuture<U> b;
            if ((d = dep) == null || !d.orRelay(a = src, b = snd))
                return null;
            src = null;
            snd = null;
            dep = null;
            return d.postFire(a, b, mode);
        }
    }

    final boolean orRelay(CompletableFuture<?> a, CompletableFuture<?> b) {
        Object r;
        if (a == null || b == null || ((r = a.result) == null && (r = b.result) == null))
            return false; // a和b有一个结束就往下进行
        if (result == null)
            completeRelay(r);
        return true;
    }

anyOf -> orTree -> orRelay -> tryFire -> postFire

数组构建树

allOf和anyOf都用到了数组构建成树的策略。

假设有一个任务Z(虚拟的,什么都不做),依赖一组任务[A, B, C, D, E, F, G, H]

对于allOf, 当这组任务都完成时,才会执行Z;对于anyOf, 当这组任务中有任何一个完成,就执行任务Z。

如果这组任务是数组结构或者链表结构,我们该如何解决呢?遍历数组或者是链表,当任务都完成或者有一个完成时,就执行Z,需要不停地遍历,这是轮询的方法,不合适。

整个基调是回调,是指,当一个任务完成时,会接着执行所有依赖于它的任务。

作为一个数组或者链表,该如何应用回调呢?谁在先,谁在后呢?因为不知道哪个任务会先完成,所以没法确定次序。而且这组任务之间也不应该相互依赖,它们只不过都是被Z依赖。

如果这组任务只有一个的话,那就演变成了X.thenXXX(Z), 如果这组任务有两个的话,allOf -> Both,anyOf -> Either

如果Z依赖Z1,Z2两个个任务,Z1和Z2依赖Z11,Z12和Z21,Z22四个任务,依次类推,当虚拟的任务的个数达到真实任务的个数的一半时,就让虚拟任务监听真实的任务,动态规划加二叉树,时间复杂度也只是logn级别的。
static String array2Tree(String[] cfs, int lo, int hi) {
        String d = new String(cfs[lo] + cfs[hi]);
        if (lo <= hi) {
            String a, b;
            int mid = (lo + hi) >>> 1; // 二分
            if (lo == mid) { // a作为左半部分的的结果
                a = cfs[lo]; // 当只有不超过两个元素时,a直接取第一个值
            } else {
                a = array2Tree(cfs, lo, mid);
            }
            if (lo == hi) { // 当只有一个元素的时候,b取a的值
                b = a;
            } else {
                if (hi == mid + 1) { // 右半部分只有两个元素时,b取第二个元素的值
                    b = cfs[hi];
                } else {
                    b = array2Tree(cfs, mid + 1, hi);
                }
            }
            if (a == null || b == null) {
                throw new NullPointerException();
            }
            System.out.println("[" + a + "][" + b + "]->[" + d + "]");
        }
        return d;
    }

Console

[A][B]->[AB]
[C][D]->[CD]
[AB][CD]->[AD]
[E][F]->[EF]
[G][H]->[GH]
[EF][GH]->[EH]
[AD][EH]->[AH]

如下图

对于allOf, Z只要保证Z1和Z2都完成了就行,Z1和Z2分别保证Z11,Z12 和 Z21,Z22都完成了就像,而Z11,Z12,Z21,Z22则分别保证了A-H任务都完成。

对应anyOf, Z 只要保证Z1和Z2有一个完成了就像,Z1和Z2联合保证了Z11,Z12,Z21,Z22这4个任务只要有一个完成了就行,同理,Z11,Z12,Z21,Z22则联合保证了A-H中有一个任务完成了就行。

然后,Z就可以执行了,其实Z什么也没做,只是从这组任务里得出一个结果。

文章转载出处:http://www.cnblogs.com/aniao/p/aniao_cf.html

0
1028826685@qq.com