RxJS核心概念解析——Subject
现在我们已经了解Observable和Observer了,数据来自于Observable,流入Observer对数据处理,这样单点对单点的处理方式很简洁,但似乎有些过于单一。有时候我们希望同一份数据能在多个地方被用到,这就是数据流的桥梁——Subject发挥作用的地方了。Subject既可以用作Observer,也可以用作Observable。
多路推送
普通的数据流不具备多路推送数据的能力,因为每一个数据流都是独立的执行环境。多次订阅时会两条数据流会分先后输出数据。例子如下:
let ob = Rx.Observable.create(function (observer) {
observer.next(1);
observer.next(2);
observer.complete();
});
ob.subscribe({
next: (v) => console.log('A:', v)
});
ob.subscribe({
next: (v) => console.log('B:', v)
});
// 输出结果依次为:'A: 1' 'A: 2' 'B: 1' 'B: 2'
而Subject作为数据流时共享同一个执行环境,数据流时会同时流向订阅者。
let subject = new Rx.Subject();
subject.subscribe({
next: (v) => console.log('A:', v)
});
subject.subscribe({
next: (v) => console.log('B:', v)
});
subject.next(1);
subject.next(2);
// 输出结果依次为:'A: 1' 'B: 1' 'A: 2' 'B: 2'
同时Subject也可以作为观察者,来接受普通数据流推送的数据,并再次通过自身作为特殊数据流的能力将单路推送转化为多路推送,这也就是Subject的核心桥梁作用了。例子如下:
let subject = new Rx.Subject();
subject.subscribe({
next: (v) => console.log('A:', v)
});
subject.subscribe({
next: (v) => console.log('B:', v)
});
let observable = Rx.Observable.from([1, 2, 3]);
observable.subscribe(subject);
// 输出结果依次为:'A: 1' 'B: 1' 'A: 2' 'B: 2' 'A: 3' 'B: 3'
Subject的衍生类
BehaviorSubject
这种Subject会向订阅它的观察者从最新的值开始发起。针对不同的订阅,最新的值即为最近向观察者发送过的值。如果是第一次订阅,则发送初始该Subject时传入的值。例子如下:
let subject = new Rx.BehaviorSubject(0);
subject.subscribe({
next: (v) => console.log('A:', v)
});
subject.next(1);
subject.subscribe({
next: (v) => console.log('B:', v)
})
subject.next(2);
// 输出结果依次为:'A: 0' 'A: 1' 'B: 1' 'A: 2' 'B: 2'
ReplaySubject
这种Subject可以向新的订阅者立刻推送曾经发送过的数值,参数决定发送数量。例子如下:
let subject = new Rx.ReplaySubject(2);
subject.subscribe({
next: (v) => console.log('A:', v)
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.subscribe({
next: (v) => console.log('B:', v)
});
subject.next(4);
// 输出如下:'A: 1' 'A: 2' 'A: 3' 'B: 2' 'B: 3' 'A: 4' 'B: 4'
参数除了指定回放数量外,也可以指定为回放时间,单位为毫秒,在该时间段内的所有发送过的数值都会先推送给新的观察者。
AsyncSubject
这种Subject只会在数据流完成后向所有观察者推送最后一个值。例子如下:
let subject = new Rx.AsyncSubject();
subject.subscribe({
next: (v) => console.log('A:', v)
});
subject.next(1);
subject.next(2);
subject.subscribe({
next: (v) => console.log('B:', v)
});
subject.next(3);
subject.complete();
//输出结果依次为:'A: 3' 'B: 3'
-- EOF --