RxJS操作符——数据流的结合
有时候我们需要的数据可能来自不同的数据流,但从多条数据流里分别取数据又比较难操作,所以RxJS为我们提供了一类将不同数据流结合到一条数据流的方法。
低阶数据流结合
startWith/concat
该操作符来数据流开头/末尾插入指定的项,接受可变数量的参数,参数类型可以为普通数据或者数据流,新的数据流会在结果中被展开。结果会返回一条在源数据流的开头/末尾插入新数据的数据流。
let source = Rx.Observable.of(1, 2, 3);
let target1 = source.startWith(4, 5, 6);
let target2 = source.concat(Rx.Observable.of(4, 5, 6));
// 1-2-3-|
// startWith concat
// 4-5-6-1-2-3-| 1-2-3-4-5-6-|
需要注意的是,订阅插入逻辑的时机是在前一个数据流complete之后,如果不会完结,则永远订阅不到下一个数据流中的数据。
merge
该操作符的使用方法和上例类似,区别在于不讲究源数据流的先后顺序,而是将数据流内各个数据按时间顺序融合到一起。参数为可变数量的数据流。
let source1 = Rx.Observable.of(1, 2, 3).flatMap((d, i) => Rx.Observable.of(d).delay(i * 1000));
let source2 = Rx.Observable.of(4, 5, 6).flatMap((d, i) => Rx.Observable.of(d).delay(i * 1200));
let target = source1.merge(source2);
// ---1------2---------3-|
// -----4-------5--------6-|
// merge
// ---1-4----2---5------3-6--|
需要注意,merge有一个可选参数concurrent指定同时合并的数据流条数,当需要合并3条数据流而该参数设置为2时,第3条数据流中的数据则不会被下游订阅到,除非前两条数据流其中之一完结。
race
就像赛跑一样,结合多条数据流时,订阅最先发出数据的数据流,其他数据流则会被退订抛弃。
let source1 = Rx.Observable.of(1);
let source2 = Rx.Observable.of(2).delay(1000);
let target = source1.race(source2);
// 1-|
// ----2--|
// race
// 1-|
combineLatest
该方法用于将多条数据流的最新值按照自己定义的规则结合到一起。参数可以是数据流数组,在任一数据流发出数据时,和源数据流的最新值共同传入一个映射函数形成新的数据流。映射函数为最后一个参数。没提供映射函数时则仅发出最新值数组。
let source1 = Rx.Observable.of(1, 2, 3).flatMap((d, i) => Rx.Observable.of(d).delay(i * 1000));
let source2 = Rx.Observable.of(4, 5, 6).flatMap((d, i) => Rx.Observable.of(d).delay(i * 1500));
let cf = (di, d2) => d1 + '~' + d2;
let target = source1.combineLatest(source2, cf);
// ---1------2---------3-|
// --------4-------5--------6-|
// combineLstest
// --------1~4-2~4-2~5--3~5-3~6-|
withLatestFrom
该方法和combineLatest原理完全相同,区别在于数据流的合并时机是区分主次的,结合数据流仅在源数据流输出元素时才生成新元素,而不是每条数据流输出时都生成。
let source1 = Rx.Observable.of(1, 2, 3).flatMap((d, i) => Rx.Observable.of(d).delay(i * 1000));
let source2 = Rx.Observable.of(4, 5, 6).flatMap((d, i) => Rx.Observable.of(d).delay(i * 1500));
let cf = (di, d2) => d1 + '~' + d2;
let target = source1.withLatestFrom(source2, cf);
// ---1------2---------3-|
// --------4-------5--------6-|
// withLastestFrom
// --------- 2~4-----3~5-|
zip
原理仍然和前两者相同,区别在于结合数据流的新数据是参数数据流严格一对一生成的,就像拉链一样,即是当所有原数据流发射出数据后才结合。新数据流完结之时在于上游数据流其中之一完结,并且完结数据流的最后一个数据完成配对。
let source1 = Rx.Observable.of(1, 2, 3).flatMap((d, i) => Rx.Observable.of(d).delay(i * 1000));
let source2 = Rx.Observable.of(4, 5, 6).flatMap((d, i) => Rx.Observable.of(d).delay(i * 1500));
let cf = (di, d2) => d1 + '-' + d2;
let target = source1.zip(source2, cf);
// ---1------2---------3-|
// --------4-------5--------6-|
// zip
// ------ 1-4-----2-5-----3-6-|
可以注意到,如果上游数据流流速不同步并且有巨大差异,那么流速较快的数据流就有必要保存住未配对数据并且持续流出数据,极端情况下会积压大量未配对数据产生内存问题。这是使用该操作符需要考虑的问题。
forkJoin
该操作符同样是结合多条数据流,不过它只产生一个数据,那就是所有数据流完结之后所输出的最后一个数据。
let source1 = Rx.Observable.of(1, 2, 3);
let source2 = Rx.Observable.range(0, 10);
let target = Rx.Obserbable.forkJoin(source1, source2);
// 1-2-3-|
// 0-1-2-3-4-5-6-7-8-9-|
// forkJoin
// -------------------[3, 9]-|
高阶数据流结合
前文介绍了各种结合操作符,基本上已经涵盖了日常业务使用的方方面面,接下来这部分介绍的操作符结合逻辑和前文基本相同,但区别在于其是应用在高阶数据流之上的。
高阶数据流的概念可以类比高阶函数,就是产生数据流的数据流。以下这些操作符的关键就在于——展开。将高阶数据流中的内部数据流展开后再应用结合逻辑。
concat => concatAll
将源高阶数据流内部的每条数据流展开生成新的数据流。只有当先展开的数据流展开完成时才订阅下一条数据流展开,不忽略任何数据。结合逻辑和concat完全相同。
let source = Rx.Observable.interval(1000).take(2).map(x => Rx.Observable.interval(1500).map(y => x + y).take(2));
let target = source.concatAll();
// ---0---1-|
// | |
// -------00-----01-|
// |
// -----10-----11-|
// concatAll
// -------00-----01-----10-----11-|
merge => mergeAll
和concatAll原理相同,类比于merge,将源数据流内部的数据流由展开订阅变为展开融合,不忽略任何数据。
let source = Rx.Observable.interval(1000).take(2).map(x => Rx.Observable.interval(1500).map(y => x + y).take(2));
let target = source.mergeAll();
// ---0---1-|
// | |
// -------00-----01-|
// |
// -----10-----11-|
// mergeAll
// -------00---10-01--11-|
switch
该方法同样用于将数据流中的数据融合,和concatAll的区别在于当两条内部数据流重叠流动时会抛弃旧的数据流而订阅新的数据流中的数据。顾名思义,切换就是使用新的抛弃旧的。
let source = Rx.Observable.interval(1000).take(2).map(x => Rx.Observable.interval(1500).map(y => x + y).take(2));
let target = source.switch();
// ---0---1-|
// | |
// -------00-----01-|
// |
// -----10-----11-|
// switch
// ------------10-----11-|
exhaust
该方法和swtich相对,同样顾名思义,耗尽,就是旧的数据流完结之前不订阅新的,即是当两条数据流重叠时不抛弃旧数据流而是抛弃新数据流。
let source = Rx.Observable.interval(1000).take(2).map(x => Rx.Observable.interval(1500).map(y => x + y).take(2));
let target = source.exhaust();
// ---0---1-|
// | |
// -------00-----01-|
// |
// -----10-----11-|
// exhaust
// -------00-----01-|
switchMap
语法糖操作符,将数据流中的每个数据都通过map映射为数据流之后再通过switch方法展开。
exhaustMap
语法糖操作符,将数据流中的每个数据都通过map映射为数据流之后再通过exhaust方法展开。
这里可以对比一下4种展开高阶数据流方法的区别:
- mergeAll: 不忽略任何一条高阶数据流,展开后按数据流出时间顺序融合数据,可能存在数据乱序。
- concatAll: 不忽略任何一条高阶数据流,展开后按高阶数据流的完成顺序融合数据,数据严格排序。
- switch: 同一时刻只能订阅一条高阶数据流,当两条高阶数据流流出数据时机重叠时,抛弃旧的数据流。
- exhaust: 同一时刻只能订阅一条高阶数据流,当两条高阶数据流流出数据时机重叠时,抛弃新的数据流。
-- EOF --