CompletableFuture       

CompletableFuture

每个CompletableFuture的方法都会新建CompletableFuture对象,并将当前方法需要执行的任务封装成Completion对象,之后将Completion压入到上个方法中创建的CompletableFuture对象的栈中,这样每个CompletableFuture对象的栈都保存了下一个待执行的任务,通过栈将每个任务串在一起,形成一个链条。当执行完当前CompletableFuture对象的任务后,接着查看栈中是否有任务,如果有则直接执行,如果没有就退出。后一个CompletableFuture对象也是一样,首先检查前一个任务是否执行完成,如果没有则将任务压入前一个对象的栈中,之后退出,如果前一个任务执行完成,则执行当前任务,然后查看栈中的任务。

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
	//asyncPool是一个线程池,
	//它根据配置或者CPU的个数来决定是使用ForkJoinPool还是ThreadPerTaskExecutor作为线程池的实现
    return asyncSupplyStage(asyncPool, supplier);
}
static <U> CompletableFuture<U> asyncSupplyStage(Executor e,
                                                 Supplier<U> f) {
    if (f == null) throw new NullPointerException();
    //创建一个新的CompletableFuture对象,所以后面调用thenAccept()方法其实使用的是这个新建的对象
    CompletableFuture<U> d = new CompletableFuture<U>();
    //在线程池中执行AsyncSupply
    e.execute(new AsyncSupply<U>(d, f));
    return d;
}

new一个CompletableFuture返回,结果也赋值在这个CompletableFuture上,下一个任务也放在CompletableFuture里的栈上

public void run() {
    CompletableFuture<T> d; Supplier<T> f;
    //dep是在asyncSupplyStage()方法中新建的那个CompletableFuture对象
    //fn是自定义的Supplier对象
    if ((d = dep) != null && (f = fn) != null) {
        dep = null; fn = null;
        //CompletableFuture的result属性表示Supplier对象的执行结果,
        //更准确的说,result属性是用来记录我们编写的lambda表达式的运算结果的
        if (d.result == null) {
            try {
            	//使用CAS将Supplier的执行结果放入到属性result中
                d.completeValue(f.get());
            } catch (Throwable ex) {
            	//如果抛出异常,将异常对象记录到属性result
                d.completeThrowable(ex);
            }
        }
        //下面详细介绍postComplete()方法
        d.postComplete();
    }
}
--------------------------------------------------------------------------
	//postComplete()方法的作用是检查是否有下一个任务需要执行,如果需要便会触发该任务的执行
	//每调用一个CompletableFuture实例方法,CompletableFuture都将该实例方法要执行的任务封装成一个Completion对象,
	//并将Completion对象压入到当前CompletableFuture对象的栈中(栈顶使用stack属性记录),
	//也就是将后一个CompletableFuture对象的任务压入到前一个对象的栈中,
	//每次执行完当前任务后,都会调用postComplete()方法检查栈顶是否有待执行任务
    final void postComplete() {
        CompletableFuture<?> f = this; Completion h;
        //循环检查栈顶是否有待执行任务
        while ((h = f.stack) != null ||
               (f != this && (h = (f = this).stack) != null)) {
            CompletableFuture<?> d; Completion t;
            //casStack()将栈顶切换为下一个次栈顶元素
            if (f.casStack(h, t = h.next)) {
                if (t != null) {
                    if (f != this) {
                    	//当后一个CompletableFuture对象的栈中的任务需要嵌入到当前栈中执行时,
                    	//postComplete()获取到这些任务并调用pushStack()放入自己的栈中,
                    	//然后在本方法里面执行这些任务
                        pushStack(h);
                        continue;
                    }
                    h.next = null;    // detach
                }
                //如果栈中有待执行任务,调用tryFire()方法执行该任务,
                //如果tryFire()的入参为NESTED(值为-1),且返回的不是null,
                //说明后一个CompletableFuture对象的栈中的任务需要嵌入执行
                f = (d = h.tryFire(NESTED)) == null ? this : d;
            }
        }
    }


public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
    return uniAcceptStage(null, action);
}
private CompletableFuture<Void> uniAcceptStage(Executor e,
                                               Consumer<? super T> f) {
    if (f == null) throw new NullPointerException();
    //新建一个CompletableFuture对象d,本方法将该对象返回给调用方
    CompletableFuture<Void> d = new CompletableFuture<Void>();
    //uniAccept()检查前一个任务是否执行完成,如果执行成功了,那么执行Consumer对象
    //注意:入参this指的是supplyAsync()方法里面创建的CompletableFuture对象
    //这里逻辑比较绕,一定分清楚现在调用的thenAccept()方法是属于是supplyAsync()方法里面创建的CompletableFuture对象的
    if (e != null || !d.uniAccept(this, f, null)) {
        UniAccept<T> c = new UniAccept<T>(e, d, this, f);
        //将UniAccept对象压入到CompletableFuture对象的栈中
        //这个栈是supplyAsync()方法里面创建的CompletableFuture对象的
        //push()方法使用CAS将对象c压入到栈
        push(c);
        c.tryFire(SYNC);
    }
    return d;
}
final <S> boolean uniAccept(CompletableFuture<S> a,
                            Consumer<? super S> f, UniAccept<S> c) {
    Object r; Throwable x;
    //a.result表示上一个任务的执行结果,如果为null,表示任务还在执行中
    //上一个任务没有执行完,直接退出当前方法
    if (a == null || (r = a.result) == null || f == null)
        return false;
    //result = null表示当前任务还没执行
    //能够执行下面的逻辑,说明上一个任务执行完毕,可以执行当前任务了
    tryComplete: if (result == null) {
        if (r instanceof AltResult) {
            if ((x = ((AltResult)r).ex) != null) {
                completeThrowable(x, r);
                break tryComplete;
            }
            r = null;
        }
        try {
            if (c != null && !c.claim())
                return false;
            @SuppressWarnings("unchecked") S s = (S) r;
            //执行Consumer任务
            f.accept(s);
            //向result属性设置结果
            completeNull();
        } catch (Throwable ex) {
            completeThrowable(ex);
        }
    }
    return true;
}
final CompletableFuture<Void> tryFire(int mode) {
    CompletableFuture<Void> d; CompletableFuture<T> a;
    //dep表示thenAccept()方法中新建的CompletableFuture对象d
    //src表示supplyAsync()方法中创建的CompletableFuture对象
    if ((d = dep) == null ||
    	//uniAccept()在上面已经介绍过了
    	//这里调用uniAccept()方法可以确保在多线程环境下,Consumer对象已经可以被执行
        !d.uniAccept(a = src, fn, mode > 0 ? null : this))
        return null;
    dep = null; src = null; fn = null;
    //postFire()做收尾工作,检查对象a和对象d的栈是否有待执行任务,如果有分别调用postComplete()方法,
    //在调用前还会检查mode的值,如果mode为NESTED(-1),则说明栈的任务由别的线程执行,不再执行
    return d.postFire(a, mode);
}
final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) {
    if (a != null && a.stack != null) {
        if (mode < 0 || a.result == null)
            a.cleanStack();//清理栈,将执行过的任务从栈中清除
        else
            a.postComplete();//执行栈的任务,就本小节的例子来看,a栈的任务都已经执行过了
    }
    if (result != null && stack != null) {
        if (mode < 0)
            return this;//直接返回当前CompletableFuture对象,该对象的栈任务由其他线程执行
        else
            postComplete();//执行栈任务
    }
    return null;
}

image-20220406113716925

参考

https://blog.csdn.net/weixin_38308374/article/details/114681284