青年IT男

个人从事金融行业,就职过易极付、思建科技等重庆一流技术团队,目前就职于某网约车平台负责整个支付系统建设。自身对金融行业有强烈的爱好。同时也实践大数据、数据存储、自动化集成和部署、分布式微服务、响应式编程、人工智能等领域。

操作符熔合——响应式Spring的道法术器(20)

操作符熔合——响应式Spring的道法术器(20)

2.10 操作符熔合

操作符熔合是响应式编程领域比较前沿的研究话题,目的在于通过将多个操作符以某种方式熔合起来,以达到优化的效果,进而降低开销(比如执行时间,内存)。

以下部分内容参考了 Dávid Karnok 的 Operator-fusion(part1part2)。

2.10.1 分代的概念

首先介绍一下关于响应式编程库的分代的概念。直至现在,响应式编程库及相关概念仍然在不断的更新和升级。作者根据自己在响应式编程领域的研究经验,将响应式编程库分为四代。

第零代

起初的响应式编程工具主要包括类似java.util.Observable的基于订阅者模式的API,以及那些基于回调的API,如 Swing/AWT/Android 中的 addXXXListener。但是两者都有些共同的不足:不方便组合(就像我们前边的操作链那样)。

第一代

后来,Erik Meijer 和他在微软的团队解决了难以组合的问题,从而诞生了第一代响应式编程库:2010 年左右的 Rx.NET,2011 年的 Reactive4Java,以及 2013 年早期的 RxJava。

其他的语言也陆续基于 Rx.NET 的架构开发了类似的库,但很快大家发现这种架构存在的问题。第一个问题是,最初的 IObservable/IObserver 在纯单线程中实现后,如果使用类似 take() 的操作符,之后的序列无法取消。Rx.NET 通过在诸如 range() 的数据源进行异步,绕开了这个问题。

第二个问题是,当生产者与消费者之间存在一个异步边界(不在同一线程)时,如果消费者消费数据的速度不够快,也会导致问题。这时消费者的代码会非常繁琐,这就是我们前面多次提到的 backpressure 问题。

第二代

RxJava团队针对上边的两个问题设计了一套新的架构。

首先是引入了 Subscriber 类,它能通过 isUnsubscribed() 方法判断是否取消订阅,数据源或者操作符发在数据之前都会调用该方法进行检查。

然后,backpressure 的问题则通过双方协调的方式解决,利用 Producer 接口,Subscriber 告知上游自己能处理数据的量(request() 方法)。

第三个改进是 lift() 函数,使用它可以直接在 Subscriber 之间进行函数式的变换。几乎所有操作符的实现都被重写,改成了利用新的 Operator 接口和 lift() 函数。

第三代

后边的故事本系列文章的读者就熟悉了。响应式编程的兴起使得大家意识到互相之间要兼容。于是来自多个公司的工程师们聚在了一起,设计了一套响应式流(Reactive-Streams)规范,主要成果是 4 个接口,30 条关于这几个接口的规则,以及这几个接口里的 7 个方法。

Reactive-Streams 规范使得响应式编程实现库之间可以相互兼容,从而能够随意切换具体的实现库。

因此基于响应式流的实现属于第三代,它的实现包括 RxJava 2.x,Project Reactor 和 Akka-Streams等。

第四代

在响应式流之上实现一套可组合的库需要完全不同的内部架构,因此 RxJava 2.x 不得不完全重写。作者参与了重写的过程,并发现有些操作符可以通过某种方式进行合并优化,以节省各种开销,例如队列,并发原子操作,以及数据请求等。

经过积极的交流之后,作者及其同事包括其他相应时库的作者创建了一个reactive-streams-commons 库,设计了一套实现上述优化的组件,之后称之为操作符熔合。

第四代的响应式编程库和第三代从外部看起来没多大区别,但其实内部操作符的实现发生了很大的变化。

Reactor3已经是第四代响应式编程库了。

2.10.2 响应式流的生命周期

聊到响应式流的生命周期,就需要再次搬出本章第一节的那个图:

title

为了方便下边讨论操作符熔合的问题,我们将整个过程分为三个阶段:

  1. 装配期,也就是.subscribe方法调用以前,针对每一个操作符会创建一个FluxXxx对象,并通过类似装饰器模式的方式关联起来;
  2. 订阅期,这是在.subscribe方法调用后,由最后一下操作符向上游依次调用subscribe方法以及向下回调onSubscribe方法的期间,这时候元素还没有发出;
  3. 运行期,这是数据生成并发往下游,且以最多一个终止事件(onError或者onComplete)终止的阶段。

不同的阶段有不同的优化方案,下面具体介绍两种主要的优化方案:宏熔合和微熔合。

2.10.3 宏熔合(macro-fusion)

宏熔合主要发生在装配期。通过2.1节自己动手写了一个简单的操作符实现,我们知道对于每个操作符都会创建一个FluxXxx即响应的Subscription对象。因此如果把连续的多个操作符合并为单个操作符,就可以优化订阅时的开销。这就是宏熔合的主要目标,具体来说有如下几种方式:

1)操作符替换

比如,有几个数据源,在对它们进行concatmerge操作时,如果数据源只发出一个元素,那么就没必要执行操作了,直接发出这个元素就可以了;

在比如,我们使用range生成的数据源并应用subscribeOn()时,对于这种单线程生成的数据源,subscribeOnpublishOn几乎没太大区别,因此可以替换为publishOn,以便引入更多优化。

2)替换为自定义发布者

有些组合出现的操作符可以合并为一个单独的操作符。

比如,just().subscribeOn()just().flatMap()这样的组合,它们带来的开销(内部队列的创建,调度器 worker 的创建和销毁,多个原子变量的修改)相对于它们发出的数据过高了,尤其just()中只有一个元素的情况下,完全可以合并为一个发布者发出数据。

3)合并相同的操作符

比如如下例子:

    Observable.range(1, 10)
       .filter(v -> v % 3 == 0)
       .filter(v -> v % 2 == 0)
       .map(v -> v + 1)
       .map(v -> v * v)
       .subscribe(System.out::println);

filtermap分别调用了两次,这样代码会比较清晰,但是如果range比较大,那么优化其性能开销带来的收益就很明显了。策略就是对于同一类的操作符进行合并:

对于两个filter(),会把两个 lambda 表达式合并起来:

    Predicate<Integer> p1 = v -> v % 3 == 0;
    Predicate<Integer> p2 = v -> v % 2 == 0;

    Predicate<Integer> p3 = v -> p1.test(v) && p2.test(v);

map()也可以进行类似的合并:

    Function<Integer, Integer> f1 = v -> v + 1;
    Function<Integer, Integer> f2 = v -> v * v;

    Function<Integer, Integer> f3 = v -> f2.apply(f1.apply(v));

2.10.4 微熔合(micro-fusion)

微熔合发生在订阅期,通过多个操作符共用内部资源和数据结构以减少开销。

微熔合有以下几种形式:

1)Conditional Subscriber

阅读FluxFilterFluxDistinct的源码发现,当我们使用过滤操作符filter()或者distinct()时,如果被丢弃掉,那么会调用 request(1)。request(1) 会触发原子递增操作,或者是 CAS 循环,大量这样的操作很快就能积累出性能下降。

Conditional Subscriber 的思路是为 Subscriber 增加一个boolean tryOnNext(T v)方法(见Fusable.ConditionalSubscriber),它可以告知上游自己是否会真的消费这个数据。这样在数据被丢弃时,能够跳过原子递增,并继续发射数据,直到实际发出的数据量达到了请求数。

2)同步熔合

在响应式编程库内部,许多地方都需要用到队列以便进行数据缓存。比如,有些拥有输出队列的操作符,和那些需要输入队列的操作符可以共用同一个队列实例,这样就可以节省内存分配。

同步熔合就是采用这一方式进行优化的。对于那些操作符的上游必然是同步的情形,它们可以假装自己是一个队列。

有些用于生成数据源操作符,比如rangefromIterablefromArrayfromStreamfromCallable,它们都是同步的,而且都有队列的特性。因此它们内部的Subscription就可以实现Queue。而对于会使用队列的操作符比如observeOn()flatMap()publish()zip()来说,如果发现上游的Subscription实现了Queue接口,那就无需创建自己的队列了。

3)异步熔合

有些情况下,数据源也有自己的队列,会在下游发出请求时从中取出数据并发出。与上边的情况类似,这时数据源也可以实现 Queue 接口,然后让后续的操作符直接使用,而不用创建自己的队列。但如果这个操作符也支持同步熔合的话,就需要采用新的协议。

在Reactor中,这种优化通过一个新的接口Fusable.QueuedSubscription来定义。

2.10.5 最后

操作符熔合是降低响应式数据流开销的一个有效途径,可以在保持API不变的情况下把开销降低到接近于常规的 Java Stream 序列(Project Reactor 2.5 M1 降低了 50%+,RxJava 2.x 则降低了 200%+)的水平。

操作符熔合主要是针对相邻的操作符进行优化,如果针对每一对可能相邻使用的操作符都进行优化设计,那将是矩阵式的数量级,因此操作符的熔合更多是一种基于使用情况的考量,尤其主要关注于用户经常使用到的操作符和操作符组合。

也许可以为操作符以及数据序列以某种方式进行建模,在模型上通过图算法自动发现那些可以被熔合的操作符,而这又将是另一个可以研究的话题了。目前基于操作符熔合的优化仍然处于进行时,关于响应式编程库仍然有性能潜力可以挖掘。为这些大牛们点赞!

作者:享学IT
来源:CSDN
原文:https://blog.csdn.net/get_set/article/details/79455258
版权声明:本文为博主原创文章,转载请附上博文链接!

0
青年IT男

个人从事金融行业,就职过易极付、思建科技等重庆一流技术团队,目前就职于某网约车平台负责整个支付系统建设。自身对金融行业有强烈的爱好。同时也实践大数据、数据存储、自动化集成和部署、分布式微服务、响应式编程、人工智能等领域。

评论已关闭。

This site is protected by wp-copyrightpro.com