Time Bomb Veela
  1. 1 Time Bomb Veela
  2. 2 Libertus Chen-U
  3. 3 Hypocrite Nush
  4. 4 Warcry mpi
  5. 5 Flower Of Life 发热巫女
  6. 6 Last Surprise Lyn
  7. 7 One Last You Jen Bird
  8. 8 Life Will Change Lyn
  9. 9 かかってこいよ NakamuraEmi
  10. 10 Quiet Storm Lyn
  11. 11 The Night We Stood Lyn
2018-06-15 22:48:37

Observable的冷热模式

Observable并不生产数据,其本质是数据生产者和消费者之间的桥梁,数据生产者可以是向observer.next(value)传递的任意值。冷模式与热模式的区别关键就在于生产者的激活时期。

冷Observable

冷Observable的生产者的创建和激活发生在订阅函数内部,这也是大部分常用Observable的模式。举例如下:

let obs = Rx.Observable.create(observer => observer.next(Date.now()));
obs.subscribe(v => console.log('1:' + v));
obs.subscribe(v => console.log('2:' + v));

// 1: 1454594668213
// 2: 1454594668214

可以看到两次订阅获取的到的数据并不相同,其关键就在于每个冷Observable的数据生产者在被订阅时才会即时创建(Date.now()函数的调用),和订阅者之间形成点对点的单播。每一个订阅都会走如下流程:

  1. 订阅者订阅Observable
  2. 创建生产者
  3. 激活生产者产生数据
  4. 订阅者监听Observable获得数据
  5. 数据单播完成

热Observable

热Observable的生产者的创建和激活发生在订阅函数外部,我们对上面的例子稍微做个改动。

const data = Date.now();

let obs = Rx.Observable.create(observer => observer.next(data));
obs.subscribe(v => console.log('1:' + v));
obs.subscribe(v => console.log('2:' + v));

// 1: 1454594668213
// 2: 1454594668213

这就是热Observable的雏形了。关键在于热Observable在订阅前就完成了创建并激活生产者的步骤,当不同订阅者订阅生产者时都会共享同一个生产者引用,其造成的直接结果就是数据不再是点对点的单播而是点对多的组播。热Observable的关键可总结为如下两点:

  1. 生产者创建时即时激活并产生数据流向Observable,无论是否有订阅者订阅Observable。
  2. 所有订阅者共享同一个生产者引用,数据流动为点对多的组播。

细心的读者可能已经发现了,上面的例子并不是严格意义的热Observable,其并没有满足第1点的数据即时激活并流向Observable,而是和冷模式相同,当有订阅时才开始流动。形象的说,其比冷模式热一些,比热模式冷一些,可以将该模式称为暖Observable

那么让我们将这个暖Observable改造成热Observable,看看两者有什么区别。

let obs = Rx.Observable.create(observer => observer.next(Date.now())).publish();
obs.connect();

obs.subscribe(v => console.log('1:' + v));
obs.subscribe(v => console.log('2:' + v));

当运行如上代码时,可以看到什么都不会发生。因为经过publish和connect转换后的冷obs已经是完全的热obs了,在connect方法之后,两个订阅方法之前,数据就已经完成了流动,因此随后的订阅也就无法得到新的数据了。那么问题来了,publish和connect究竟做了什么转化了obs的模式呢?

冷Observable到热Observable的转化原理

在讨论转化原理之前,我们先回顾一下之前文章提到过的Subject的特性,它是转化实现的关键。

  1. Subject本身是一个Observable,其包含了Observable应有的所有特性。
  2. Subject同时也是一个Observer,当它订阅Observable时,会接收流出数据并多播给其订阅者。
  3. 可以使用next方法对自己传值并多播给其订阅者。

multicast

有了Subject的帮助我们就可以进行转化了。例子如下:

function multicast(cold) {
    const subject = new Subject();
      subject.connect = cold.subscribe(subject);
     return new Observable((observer) => subject.subscribe(observer));
} 

此即为multicast操作符的基本原理,该操作符接受一个subject参数并回传一个带有connect方法的Observable,对其的所有订阅都代理给内部创建的subject,而subject则在内部实现了由冷到热的转化。

实例如下:

let obs = Rx.Observable.create(observer => observer.next(Date.now())).multicast(new Rx.Subject());

obs.subscribe(v => console.log('1:' + v));
obs.subscribe(v => console.log('2:' + v));

obs.connect();

// 1:1523687368734
// 2:1523687368734

有一点需要注意,connect方法才是数据流动的关键,相当于是subject开启了冷Observable的流动并中转为热Observable再多播给订阅者。所以要退订的话,需要调用connect方法返回的subscription的退订方法。

publish

该方法即是对multicast(new Rx.Subject())做了一层很薄的封装,两者完全等价,和Subject的三种变形类似的,同样也有三种变形:publishReplay,publishBehavior,publishLast。

refCount

现在我们可以通过手动控制connect来决定数据流动时机,但大多数情况下,我们希望有订阅者时才激活数据流,无订阅者时即取消数据流。这也就是引用计数的原理了。

function multicastAndRefCount(cold) {
    const subject = new Subject();
      subject = cold.subscribe(subject);
      let refs = 0;
     return new Observable((observer) => {
      refs++;
      let sub = subject.subscribe(observer);
      return () => {
          refs--;
          if (refs === 0) {
              subject.unsubscribe();
          }
          sub.unsubscribe();     
      }
    });
} 

体现在RxJS中的操作符即为refCount()了,必须搭配multicast一起使用。

let obs = Rx.Observable.interval(1000).publish().refCount();
let sub1 = obs.subscribe(v => console.log('1:' + v));
let sub2 = obs.subscribe(v => console.log('2:' + v));
setTimeout(() => {
    sub1.unsubscribe();
},3000);

setTimeout(() => {
    sub2.unsubscribe();
},5000);

// 1:0
// 2:0
// 1:1
// 2:1
// 2:2
// 2:3

share

同样的,针对于publish().refCount()也有一层薄封装即为share()。

-- EOF --

添加在分类「 前端开发 」下,并被添加 「RxJS」 标签。