Observable的冷热模式
Observable并不生产数据,其本质是数据生产者和消费者之间的桥梁,数据生产者可以是向observer.next(value)传递的任意值。冷模式与热模式的区别关键就在于生产者的激活时期。
冷Observable
冷Observable的生产者的创建和激活发生在订阅函数内部,这也是大部分常用Observable的模式。举例如下:
let obs = Rx.Observable.create(observer => observer.next(Date.now()));
obs.subscribe(v => console.log('1:' + v));
obs.subscribe(v => console.log('2:' + v));
// 1: 1454594668213
// 2: 1454594668214
可以看到两次订阅获取的到的数据并不相同,其关键就在于每个冷Observable的数据生产者在被订阅时才会即时创建(Date.now()函数的调用),和订阅者之间形成点对点的单播。每一个订阅都会走如下流程:
- 订阅者订阅Observable
- 创建生产者
- 激活生产者产生数据
- 订阅者监听Observable获得数据
- 数据单播完成
热Observable
热Observable的生产者的创建和激活发生在订阅函数外部,我们对上面的例子稍微做个改动。
const data = Date.now();
let obs = Rx.Observable.create(observer => observer.next(data));
obs.subscribe(v => console.log('1:' + v));
obs.subscribe(v => console.log('2:' + v));
// 1: 1454594668213
// 2: 1454594668213
这就是热Observable的雏形了。关键在于热Observable在订阅前就完成了创建并激活生产者的步骤,当不同订阅者订阅生产者时都会共享同一个生产者引用,其造成的直接结果就是数据不再是点对点的单播而是点对多的组播。热Observable的关键可总结为如下两点:
- 生产者创建时即时激活并产生数据流向Observable,无论是否有订阅者订阅Observable。
- 所有订阅者共享同一个生产者引用,数据流动为点对多的组播。
细心的读者可能已经发现了,上面的例子并不是严格意义的热Observable,其并没有满足第1点的数据即时激活并流向Observable,而是和冷模式相同,当有订阅时才开始流动。形象的说,其比冷模式热一些,比热模式冷一些,可以将该模式称为暖Observable。
那么让我们将这个暖Observable改造成热Observable,看看两者有什么区别。
let obs = Rx.Observable.create(observer => observer.next(Date.now())).publish();
obs.connect();
obs.subscribe(v => console.log('1:' + v));
obs.subscribe(v => console.log('2:' + v));
当运行如上代码时,可以看到什么都不会发生。因为经过publish和connect转换后的冷obs已经是完全的热obs了,在connect方法之后,两个订阅方法之前,数据就已经完成了流动,因此随后的订阅也就无法得到新的数据了。那么问题来了,publish和connect究竟做了什么转化了obs的模式呢?
冷Observable到热Observable的转化原理
在讨论转化原理之前,我们先回顾一下之前文章提到过的Subject的特性,它是转化实现的关键。
- Subject本身是一个Observable,其包含了Observable应有的所有特性。
- Subject同时也是一个Observer,当它订阅Observable时,会接收流出数据并多播给其订阅者。
- 可以使用next方法对自己传值并多播给其订阅者。
multicast
有了Subject的帮助我们就可以进行转化了。例子如下:
function multicast(cold) {
const subject = new Subject();
subject.connect = cold.subscribe(subject);
return new Observable((observer) => subject.subscribe(observer));
}
此即为multicast操作符的基本原理,该操作符接受一个subject参数并回传一个带有connect方法的Observable,对其的所有订阅都代理给内部创建的subject,而subject则在内部实现了由冷到热的转化。
实例如下:
let obs = Rx.Observable.create(observer => observer.next(Date.now())).multicast(new Rx.Subject());
obs.subscribe(v => console.log('1:' + v));
obs.subscribe(v => console.log('2:' + v));
obs.connect();
// 1:1523687368734
// 2:1523687368734
有一点需要注意,connect方法才是数据流动的关键,相当于是subject开启了冷Observable的流动并中转为热Observable再多播给订阅者。所以要退订的话,需要调用connect方法返回的subscription的退订方法。
publish
该方法即是对multicast(new Rx.Subject())做了一层很薄的封装,两者完全等价,和Subject的三种变形类似的,同样也有三种变形:publishReplay,publishBehavior,publishLast。
refCount
现在我们可以通过手动控制connect来决定数据流动时机,但大多数情况下,我们希望有订阅者时才激活数据流,无订阅者时即取消数据流。这也就是引用计数的原理了。
function multicastAndRefCount(cold) {
const subject = new Subject();
subject = cold.subscribe(subject);
let refs = 0;
return new Observable((observer) => {
refs++;
let sub = subject.subscribe(observer);
return () => {
refs--;
if (refs === 0) {
subject.unsubscribe();
}
sub.unsubscribe();
}
});
}
体现在RxJS中的操作符即为refCount()了,必须搭配multicast一起使用。
let obs = Rx.Observable.interval(1000).publish().refCount();
let sub1 = obs.subscribe(v => console.log('1:' + v));
let sub2 = obs.subscribe(v => console.log('2:' + v));
setTimeout(() => {
sub1.unsubscribe();
},3000);
setTimeout(() => {
sub2.unsubscribe();
},5000);
// 1:0
// 2:0
// 1:1
// 2:1
// 2:2
// 2:3
share
同样的,针对于publish().refCount()也有一层薄封装即为share()。
-- EOF --