MobX原理解析——依赖收集与更新
当完成数据的响应化后,后续就要通过对该数据的订阅完成依赖收集及响应等相关操作。这部分内容核心入口就是数据被劫持的get和set方法。本文就重点分析这一部分内容。
前置概念
在进入到整套流程的梳理之前,需要知道mobx中相关的一些抽象概念以及特定术语,因为相关变量的命名是依托于这些概念之上的,先了解前置概念更有利于梳理mobx的思想。
- Observable: 前文提到的响应式数据都称作Observable。实际上了解过RxJS的同学对这个应该会非常熟悉了。主要功能就是用于提供给订阅者响应式数据。
- Transaction: 事务。同样的,熟悉React的同学也不会对这个概念感到陌生。其表示一组原子性的操作用于批量处理响应式变化,防止重复计算。事务可以层层嵌套,只有当最外层事务结束之后,才会重新执行Reaction。
- Derivation: 衍生。包括计算值和Reaction。也就是对Observable的依赖函数了,会随着响应式数据的变化而执行相关逻辑。
- Action: 虽然可以直接修改Observable触发set方法,但更合理的操作则是通过action触发。其关键在于action能同时触发事务封装数据的变化,确保即使是多次的set方法也只触发一次Reaction。
- Atom: 响应式数据的核心功能实现都继承自该类,可以看做是响应式数据的最小单位。mobx也将其暴露给开发者自定义拓展一些响应式数据类型的能力。
依赖收集
我们知道,mobx监听响应式数据的接口有autorun,computed,when和reaction,不过其实本质都是一样的,本文就以autorun为例分析整个流程,先看该函数源码。
export function autorun(arg1: any, arg2: any, arg3?: any) {
let name: string,
view: (r: IReactionPublic) => any,
scope: any;
if (typeof arg1 === "string") {
name = arg1;
view = arg2;
scope = arg3;
} else {
name = arg1.name || ("Autorun@" + getNextId());
view = arg1;
scope = arg2;
}
...
if (scope)
view = view.bind(scope);
const reaction = new Reaction(name, function () {
this.track(reactionRunner);
});
function reactionRunner() {
view(reaction);
}
reaction.schedule();
return reaction.getDisposer();
}
可以看到autorun做的主要工作就是实例化一个Reaction对象,并调用其相关方法。先看Reaction类的结构。
export class Reaction implements IDerivation, IReactionPublic {
observing: IObservable[] = [];
newObserving: IObservable[] = [];
dependenciesState = IDerivationState.NOT_TRACKING;
diffValue = 0;
runId = 0;
unboundDepsCount = 0;
__mapid = "#" + getNextId();
isDisposed = false;
_isScheduled = false;
_isTrackPending = false;
_isRunning = false;
errorHandler: (error: any, derivation: IDerivation) => void;
constructor(public name: string = "Reaction@" + getNextId(), private onInvalidate: () => void) { }
onBecomeStale() {
this.schedule();
}
schedule() {
if (!this._isScheduled) {
this._isScheduled = true;
globalState.pendingReactions.push(this);
runReactions();
}
}
isScheduled() {
return this._isScheduled;
}
runReaction() {
if (!this.isDisposed) {
startBatch();
this._isScheduled = false;
if (shouldCompute(this)) {
this._isTrackPending = true;
this.onInvalidate();
if (this._isTrackPending && isSpyEnabled()) {
spyReport({
object: this,
type: "scheduled-reaction"
});
}
}
endBatch();
}
}
track(fn: () => void) {
startBatch();
const notify = isSpyEnabled();
let startTime;
if (notify) {
startTime = Date.now();
spyReportStart({
object: this,
type: "reaction",
fn
});
}
this._isRunning = true;
const result = trackDerivedFunction(this, fn, undefined);
this._isRunning = false;
this._isTrackPending = false;
if (this.isDisposed) {
clearObserving(this);
}
if (isCaughtException(result))
this.reportExceptionInDerivation(result.cause);
if (notify) {
spyReportEnd({
time: Date.now() - startTime
});
}
endBatch();
}
reportExceptionInDerivation(error: any) {
if (this.errorHandler) {
this.errorHandler(error, this);
return;
}
const message = `[mobx] Encountered an uncaught exception that was thrown by a reaction or observer component, in: '${this}`;
const messageToUser = getMessage("m037");
console.error(message || messageToUser, error);
if (isSpyEnabled()) {
spyReport({
type: "error",
message,
error,
object: this
});
}
globalState.globalReactionErrorHandlers.forEach(f => f(error, this));
}
dispose() {
if (!this.isDisposed) {
this.isDisposed = true;
if (!this._isRunning) {
startBatch();
clearObserving(this);
endBatch();
}
}
}
getDisposer(): IReactionDisposer {
const r = this.dispose.bind(this);
r.$mobx = this;
r.onError = registerErrorHandler;
return r;
}
...
}
其实例化时只需要两个参数,name和onInvalidate函数,autorun的onInvalidate函数就是调用自身的track方法并传入响应函数。这里先不管其调用时机,先往下看,直接执行了schedule方法。
该方法做了两件事,一是向全局状态globalState.pendingReactions
中添加自身,二是调用runReactions
函数,该函数就是依赖收集和更新的核心启动方法。
let reactionScheduler: (fn: () => void) => void = f => f();
function runReactions() {
if (globalState.inBatch > 0 || globalState.isRunningReactions) return
reactionScheduler(runReactionsHelper)
}
当此时不在事务中并且没有正在执行的Reactions时就会执行runReactionsHelper
function runReactionsHelper() {
globalState.isRunningReactions = true;
const allReactions = globalState.pendingReactions;
let iterations = 0;
while (allReactions.length > 0) {
if (++iterations === MAX_REACTION_ITERATIONS) {
console.error(`Reaction doesn't converge to a stable state after ${MAX_REACTION_ITERATIONS} iterations.`
+ ` Probably there is a cycle in the reactive function: ${allReactions[0]}`);
allReactions.splice(0);
}
let remainingReactions = allReactions.splice(0);
for (let i = 0, l = remainingReactions.length; i < l; i++)
remainingReactions[i].runReaction();
}
globalState.isRunningReactions = false;
}
其作用就是遍历所有globalState.pendingReactions
中收集的Reaction
对象,并执行每个对象的runReaction
方法。
在上文实例中看到该方法的实现可知,其作用就是开启事务并执行实例化Reaction时传入的onInvalidate
方法,也就是实例本身的带着响应函数为参数的track方法了。track函数除了开启事务并做一些收集工作之外,核心任何是执行trackDerivedFunction
方法。
export function trackDerivedFunction<T>(derivation: IDerivation, f: () => T, context) {
changeDependenciesStateTo0(derivation);
derivation.newObserving = new Array(derivation.observing.length + 100);
derivation.unboundDepsCount = 0;
derivation.runId = ++globalState.runId;
const prevTracking = globalState.trackingDerivation;
globalState.trackingDerivation = derivation;
let result;
try {
result = f.call(context);
} catch (e) {
result = new CaughtException(e);
}
globalState.trackingDerivation = prevTracking;
bindDependencies(derivation);
return result;
}
该函数首先将globalState.trackingDerivation
设置为当前正在处理的Reaction
,再调用监听函数,初始化autorun
立即执行监听函数的逻辑就是这里。
到这一步,我们就完成了依赖收集的准备工作,下面就是执行监听函数,并触发相关响应式属性的get方法,完成依赖收集了。不妨回顾一下被改造的get方法。
public get(): T {
this.reportObserved();
return this.dehanceValue(this.value);
}
主要目的就是执行reportObserved
函数并返回被处理后的值。再进入到该函数。
export function reportObserved(observable: IObservable) {
const derivation = globalState.trackingDerivation;
if (derivation !== null) {
if (derivation.runId !== observable.lastAccessedBy) {
observable.lastAccessedBy = derivation.runId;
derivation.newObserving![derivation.unboundDepsCount++] = observable;
}
} else if (observable.observers.length === 0) {
queueForUnobservation(observable);
}
}
该方法中获取到了globalState.trackingDerivation
,也就是正在执行的Reaction了,然后将可观察属性对象添加到Reaction的newObserving
中。这一步也就完成了Reaction对响应式数据的最新收集。
再回到trackDerivedFunction
函数中,执行完监听函数后,最后调用了bindDependencies
函数。
function bindDependencies(derivation: IDerivation) {
const prevObserving = derivation.observing;
const observing = derivation.observing = derivation.newObserving!;
let lowestNewObservingDerivationState = IDerivationState.UP_TO_DATE;
derivation.newObserving = null;
let i0 = 0, l = derivation.unboundDepsCount;
for (let i = 0; i < l; i++) {
const dep = observing[i];
if (dep.diffValue === 0) {
dep.diffValue = 1;
if (i0 !== i) observing[i0] = dep;
i0++;
}
if ((dep as any as IDerivation).dependenciesState > lowestNewObservingDerivationState) {
lowestNewObservingDerivationState = (dep as any as IDerivation).dependenciesState;
}
}
observing.length = i0;
l = prevObserving.length;
while (l--) {
const dep = prevObserving[l];
if (dep.diffValue === 0) {
removeObserver(dep, derivation);
}
dep.diffValue = 0;
}
while (i0--) {
const dep = observing[i0];
if (dep.diffValue === 1) {
dep.diffValue = 0;
addObserver(dep, derivation);
}
}
if (lowestNewObservingDerivationState !== IDerivationState.UP_TO_DATE) {
derivation.dependenciesState = lowestNewObservingDerivationState;
derivation.onBecomeStale();
}
}
export function addObserver(observable: IObservable, node: IDerivation) {
const l = observable.observers.length;
if (l) { // because object assignment is relatively expensive, let's not store data about index 0.
observable.observersIndexes[node.__mapid] = l;
}
observable.observers[l] = node;
if (observable.lowestObserverState > node.dependenciesState) observable.lowestObserverState = node.dependenciesState;
}
首先将newObserving
属性赋值给observing
,这一步就完成了Reaction
对响应式数据的收集.再遍历每个收集到的响应式数据执行其addObserver
将Reaction
又添加到响应式数据的observers
属性中,这一步就完成了响应式数据对Reaction
的收集。至此双向的依赖收集完成,可以看到和Vue的依赖收集的思想核心是完全相同的。
依赖更新
依赖收集完成后,触发响应式属性的set方法就会执行依赖更新的流程。先回顾一下被改造的set实现。
export function setPropertyValue(instance, name: string, newValue) {
const adm = instance.$mobx;
const observable = adm.values[name];
if (hasInterceptors(adm)) {
const change = interceptChange<IObjectWillChange>(adm, {
type: "update",
object: instance,
name, newValue
});
if (!change)
return;
newValue = change.newValue;
}
newValue = observable.prepareNewValue(newValue);
if (newValue !== UNCHANGED) {
const notify = hasListeners(adm);
const notifySpy = isSpyEnabled();
const change = notify || notifySpy ? {
type: "update",
object: instance,
oldValue: (observable as any).value,
name, newValue
} : null;
if (notifySpy)
spyReportStart(change);
observable.setNewValue(newValue);
if (notify)
notifyListeners(adm, change);
if (notifySpy)
spyReportEnd();
}
}
首先获取到目标对象的代理adm对象,再判断新值是否和原值相等,不等则执行setNewValue
方法。
setNewValue(newValue: T) {
const oldValue = this.value;
this.value = newValue;
this.reportChanged();
if (hasListeners(this)) {
notifyListeners(this, {
type: "update",
object: this,
newValue, oldValue
});
}
}
该方法的主要作用还是调用reportChanged
方法,此方法开启一个事务并调用propagateChanged
方法。
export function propagateChanged(observable: IObservable) {
if (observable.lowestObserverState === IDerivationState.STALE) return;
observable.lowestObserverState = IDerivationState.STALE;
const observers = observable.observers;
let i = observers.length;
while (i--) {
const d = observers[i];
if (d.dependenciesState === IDerivationState.UP_TO_DATE)
d.onBecomeStale();
d.dependenciesState = IDerivationState.STALE;
}
}
这一步获取到之前依赖收集到的所有Reaction
对象,并遍历调用他们的onBecomeStale
方法。再回顾一下Reaction
对象中的该方法。
onBecomeStale() {
this.schedule();
}
可以看到和收集依赖时走的逻辑是相同的。当然上文中也有一些针对初次收集依赖和依赖更新分别优化的逻辑部分,由于不是核心内容就不多做讲解。
小结
整套流程下来,可以看到mobx的核心思想和Vue非常类似,Reaction对象就可以对标Vue中的Watcher对象去理解。React相比Vue虽然更加灵活,但其本身缺乏对数据变化的感知,需要手动实现shouldComponentUpdate也是比较繁琐的一件事。但有了mobx的帮助,相当于将Vue响应式数据那一套抽象出来移植到React上补全了React数据处理能力的弱势,相比于Redux的严格范式要求,给予React开发者更自由的发挥空间。
-- EOF --