RxJS核心解析——Observable
最近需要在工作项目中使用RxJS,稍微查了查资料感觉还是个很有意思的东西,于是从今天开始集中主要精力学学RxJS啦。由于其核心思想和常规的JS写法有较大不同,在逐步分析其概念之前先谈谈相关背景知识。
数据通信机制
绝大部分工程其实就是对数据和视图的处理。目前处理数据的方法可以分为拉取和推送两种,与其对应的数据处理对象即为消费者和生产者。
拉取很简单,就是一种消费者主动索取所需数据的方式,生产者只有当消费者发出需求信号时才会提供数据。函数调用就是一种简单的拉取实例。
推送则是由生产者决定何时向消费者传送数据,消费者无法自己单独决定获取数据的时机。JS中的事件监听和Promise就是很典型的数据推送系统。
RxJS就是一种更为强大的数据推送系统,在其机制下,Observable(可观察对象)就是数据的生产者,可以更形象的理解为Stream(数据流),在数据流动的过程中可以通过多种映射操作符的处理,由Observer(观察者)通过(Subscribe)订阅的方式被动接受各种形式的数据。以下为几类常见的拉取和推送模型:
- Function: 单次拉取模型,调用时会同步的返回单一值。
- Generator: 多次拉取模型,调用时会同步的返回0到无限多个值。
- Promise: 单次推送模型,由生产者决定时机返回单一值。
- Observable: 多次推送模型,由生产者决定时机返回0到无限多个值。 这篇文章就从RxJS的Observable说起。
Observable的生命周期
创建
最常用的方法是使用Observable构造函数创建,Rx.Observable.create是其构造函数的别名,使用方法如下:
let observable = Rx.Observable.create(function subscribe(observer) {
const id = setInterval(() => {
observer.next('hi');
}, 1000);
});
该方法接受一个订阅函数,当创建的可观察对象被订阅时即可执行其中的逻辑流出数据源。
除了使用构造函数创建,我们还可以使用创建操作符来创建各种各样的Observable。简明列举如下:
- just() 将传入不同类型数据(可复数)转化为发射这些数据的Observable
- from() 将Iterable,Array转化为Observable
- repeat() 创建一个重复发射指定数据或数据序列的Observable
- repeatWhen() 和repeat相同,但是发射依赖另一个Observable的终止通知
- defer() 延迟创建型Observable,只有当被订阅时才为每个订阅创建一个新的Observable
- range() 创建一个发射指定范围内整数序列的Observable
- interval() 创建一个按照给定时间间隔发射无限递增整数序列的Observable
- timer() 创建一个在给定延时之后发射单个数据的Observable
- empty() 创建一个立刻完成的Observable
- error() 创建一个立刻错误的Observable
- never() 创建一个不发射任何数据的Observable
订阅
当完成Observable的创建后,就可以通过订阅的方式被动获取其推送的数据了。这篇文章不详细解说订阅,先了解其基本概念即可。通过以下方法可以订阅上文中创建的Observable。为了理解更方便以下都将Observable解释为数据流。
observable.subscribe(x => console.log(x));
可以看到,订阅方法名和创建数据流时的订阅函数名相同,我们可以将订阅方法的调用看作是对该订阅函数的唤起,此时内部的逻辑就准备执行了。如果对数据流多次订阅,订阅函数每次执行都是相互独立的。
执行
数据流创建后不会立刻开始流动,只有在被订阅之后数据流才会在特定时间段里同步或异步的产生特定数据,数据流动的逻辑在订阅函数中进行描述。
数据流可以推送三种类型的值:
- Next: 实际产生所需的数据,包括各种数据类型
- Error: 流动异常时抛出错误
- Complete:数据流传输完成
需要注意的是Error或Complete类型的值被流出时即代表着这个数据流的传输完成,不会再继续产生数据。实例如下:
let observable = Rx.Observable.create(function subscribe(observer) {
observer.next(1);
observer.next(2);
observer.complete();
observer.next(3); // 该数据不会流出
});
完成
数据流的流出可能是无限的,需要我们通过特定的API去终止数据流,这里不详细解释终止相关API,只需要知道终止时数据流的逻辑我们也可以作为返回值定义在订阅函数中。例子如下:
let observable = Rx.Observable.create(function subscribe(observer) {
let id = setInterval(() => {
observer.next('hi');
}, 1000);
return function unsubscribe() {
clearInterval(id);
};
});
理解Observable
综上我们可以看出,数据流比较像是一个更为强大的EventEmitter,当订阅它时与监听函数比较,可以更加自由的获取想要的数据。需要注意的是,订阅这个过程和监听不同,它是同步进行的。但其中的数据流逻辑可以决定数据是同步还是异步的流出。同步流出的例子如下:
let foo = Rx.Observable.create(function (observer) {
console.log('hello');
observer.next('1');
});
console.log('before');
foo.subscribe(function(x) {
console.log(x);
});
console.log('after');
// 输出结果依次为: before,hello,1,after
异步流出的例子如下:
let foo = Rx.Observable.create(function(observer) {
console.log('hello');
setTimeout(() => {
observer.next('1');
}, 1000);
});
console.log('before');
foo.subscribe(function(x) {
console.log(x);
});
console.log('after');
//输出结果依次为:before,hello,after,1
这里就可以初步感受到RxJS的强大了,这就像是Event机制,Promise机制,生成器和迭代器机制的融合,给予我们自由操作数据流动的权利。
-- EOF --