Node源码解析——ReadableStream
上篇文章了解了可写流,这篇继续看另一半可读流,可读比可写复杂很多,长文预警。
Readable概览
毕竟可写和可读是对立的,也能想到两者的实现结构肯定类似。那么也还是按照读可读流的步骤从fs的createReadStream开始。
function createReadStream(path, options) {
lazyLoadStreams();
return new ReadStream(path, options);
}
function lazyLoadStreams() {
if (!ReadStream) {
({ ReadStream, WriteStream } = require('internal/fs/streams'));
[ FileReadStream, FileWriteStream ] = [ ReadStream, WriteStream ];
}
}
// internal/fs/streams
function ReadStream(path, options) {
if (!(this instanceof ReadStream))
return new ReadStream(path, options);
// A little bit bigger buffer and water marks by default
options = copyObject(getOptions(options, {}));
if (options.highWaterMark === undefined)
options.highWaterMark = 64 * 1024;
if (options.autoDestroy === undefined) {
options.autoDestroy = false;
}
this[kFs] = options.fs || fs;
if (typeof this[kFs].open !== 'function') {
throw new ERR_INVALID_ARG_TYPE('options.fs.open', 'function',
this[kFs].open);
}
if (typeof this[kFs].read !== 'function') {
throw new ERR_INVALID_ARG_TYPE('options.fs.read', 'function',
this[kFs].read);
}
if (typeof this[kFs].close !== 'function') {
throw new ERR_INVALID_ARG_TYPE('options.fs.close', 'function',
this[kFs].close);
}
Readable.call(this, options);
// Path will be ignored when fd is specified, so it can be falsy
this.path = toPathIfFileURL(path);
this.fd = options.fd === undefined ? null : options.fd;
this.flags = options.flags === undefined ? 'r' : options.flags;
this.mode = options.mode === undefined ? 0o666 : options.mode;
this.start = options.start;
this.end = options.end;
this.autoClose = options.autoClose === undefined ? true : options.autoClose;
this.pos = undefined;
this.bytesRead = 0;
this.closed = false;
this[kIsPerformingIO] = false;
if (this.start !== undefined) {
checkPosition(this.start, 'start');
this.pos = this.start;
}
if (this.end === undefined) {
this.end = Infinity;
} else if (this.end !== Infinity) {
checkPosition(this.end, 'end');
if (this.start !== undefined && this.start > this.end) {
throw new ERR_OUT_OF_RANGE(
'start',
`<= "end" (here: ${this.end})`,
this.start
);
}
}
if (typeof this.fd !== 'number')
_openReadFs(this);
this.on('end', function() {
if (this.autoClose) {
this.destroy();
}
});
}
ObjectSetPrototypeOf(ReadStream.prototype, Readable.prototype);
ObjectSetPrototypeOf(ReadStream, Readable);
做的事也是相似的,初始化一些选项后继承readable。继续到stream中看readable构造函数实现。
function Readable(options) {
if (!(this instanceof Readable))
return new Readable(options);
// Checking for a Stream.Duplex instance is faster here instead of inside
// the ReadableState constructor, at least with V8 6.5
const isDuplex = this instanceof Stream.Duplex;
this._readableState = new ReadableState(options, this, isDuplex);
// legacy
this.readable = true;
if (options) {
if (typeof options.read === 'function')
this._read = options.read;
if (typeof options.destroy === 'function')
this._destroy = options.destroy;
}
Stream.call(this, options);
}
看到比writabledstream那边简单一些,只有stream的继承和readableState的创建,少了原型判断的hack。应该想到duplex的实现是确实继承自readable了,然后寄生构造了writable。不妨到_stream_duplex.js
中验证一下。打开就能看到这么一段注释。
// a duplex stream is just a stream that is both readable and writable.
// Since JS doesn't have multiple prototypal inheritance, this class
// prototypally inherits from Readable, and then parasitically from
// Writable.
得到验证,继续看readableState。
function ReadableState(options, stream, isDuplex) {
// Duplex streams are both readable and writable, but share
// the same options object.
// However, some cases require setting options to different
// values for the readable and the writable sides of the duplex stream.
// These options can be provided separately as readableXXX and writableXXX.
if (typeof isDuplex !== 'boolean')
isDuplex = stream instanceof Stream.Duplex;
// Object stream flag. Used to make read(n) ignore n and to
// make all the buffer merging and length checks go away
this.objectMode = !!(options && options.objectMode);
if (isDuplex)
this.objectMode = this.objectMode ||
!!(options && options.readableObjectMode);
// The point at which it stops calling _read() to fill the buffer
// Note: 0 is a valid value, means "don't call _read preemptively ever"
this.highWaterMark = options ?
getHighWaterMark(this, options, 'readableHighWaterMark', isDuplex) :
getDefaultHighWaterMark(false);
// A linked list is used to store data chunks instead of an array because the
// linked list can remove elements from the beginning faster than
// array.shift()
this.buffer = new BufferList();
this.length = 0;
this.pipes = [];
this.flowing = null;
this.ended = false;
this.endEmitted = false;
this.reading = false;
// A flag to be able to tell if the event 'readable'/'data' is emitted
// immediately, or on a later tick. We set this to true at first, because
// any actions that shouldn't happen until "later" should generally also
// not happen before the first read call.
this.sync = true;
// Whenever we return null, then we set a flag to say
// that we're awaiting a 'readable' event emission.
this.needReadable = false;
this.emittedReadable = false;
this.readableListening = false;
this.resumeScheduled = false;
this[kPaused] = null;
// True if the error was already emitted and should not be thrown again
this.errorEmitted = false;
// Should close be emitted on destroy. Defaults to true.
this.emitClose = !options || options.emitClose !== false;
// Should .destroy() be called after 'end' (and potentially 'finish')
this.autoDestroy = !options || options.autoDestroy !== false;
// Has it been destroyed
this.destroyed = false;
// Indicates whether the stream has errored.
this.errored = false;
// Crypto is kind of old and crusty. Historically, its default string
// encoding is 'binary' so we have to make this configurable.
// Everything else in the universe uses 'utf8', though.
this.defaultEncoding = (options && options.defaultEncoding) || 'utf8';
// Ref the piped dest which we need a drain event on it
// type: null | Writable | Set<Writable>
this.awaitDrainWriters = null;
this.multiAwaitDrain = false;
// If true, a maybeReadMore has been scheduled
this.readingMore = false;
this.decoder = null;
this.encoding = null;
if (options && options.encoding) {
if (!StringDecoder)
StringDecoder = require('string_decoder').StringDecoder;
this.decoder = new StringDecoder(options.encoding);
this.encoding = options.encoding;
}
}
这里有很多注释标注了需要注意的内容。
- 因为双工流共享了可读和可写流的选项,所以两种流中,命名相同的选项通过前缀readable和writable的形式区分。
- 同样有水位控制和_read这个内部读方法关联,当达到水位时继续调用_read应该停止向缓冲区写入数据。思想和write是一致的。
- 存储缓冲区的数据结构是链表而不是数组,为了更高效的移除数据。
- 设置了一系列标志位应付各种场景。
接下来就从使用上看流程实现。可读流有流动和暂停两种模式,先从基础的暂停模式开始看起。
暂停模式
可读流创建后默认为暂停模式,要显式的调用read方法获取数据,并通过push方法向可读流中填充供消费的数据。
那么这就有个问题了,自定义可读流的时候似乎并没有提供一个独立的数据填充接口。实际上通用做法是在_read这个接口上由自己填充逻辑通过push实现数据填充。这里的设计我感觉还是有点奇怪,一个接口要实现两个功能,不知道是历史原因还是有意设计如此?总之事实就是这样,还是可以看看fs的相关具体实现。在internal/fs/streams.js
找到。
// the actual read.
this[kIsPerformingIO] = true;
this[kFs].read(
this.fd, pool, pool.used, toRead, this.pos, (er, bytesRead) => {
this[kIsPerformingIO] = false;
// Tell ._destroy() that it's safe to close the fd now.
if (this.destroyed) return this.emit(kIoDone, er);
if (er) {
if (this.autoClose) {
this.destroy();
}
this.emit('error', er);
} else {
let b = null;
// Now that we know how much data we have actually read, re-wind the
// 'used' field if we can, and otherwise allow the remainder of our
// reservation to be used as a new pool later.
if (start + toRead === thisPool.used && thisPool === pool) {
const newUsed = thisPool.used + bytesRead - toRead;
thisPool.used = roundUpToMultipleOf8(newUsed);
} else {
// Round down to the next lowest multiple of 8 to ensure the new pool
// fragment start and end positions are aligned to an 8 byte boundary.
const alignedEnd = (start + toRead) & ~7;
const alignedStart = roundUpToMultipleOf8(start + bytesRead);
if (alignedEnd - alignedStart >= kMinPoolSpace) {
poolFragments.push(thisPool.slice(alignedStart, alignedEnd));
}
}
if (bytesRead > 0) {
this.bytesRead += bytesRead;
b = thisPool.slice(start, start + bytesRead);
}
this.push(b);
}
});
通过fs的read方法从fd中获取数据,在回调中再将数据通过push方法填充到缓冲区供后续消费。暂停模式下的数据流动模式如下图。
那么接下来就看整个流程的内部实现。首先找到push实现。
// Manually shove something into the read() buffer.
// This returns true if the highWaterMark has not been hit yet,
// similar to how Writable.write() returns true if you should
// write() some more.
Readable.prototype.push = function(chunk, encoding) {
return readableAddChunk(this, chunk, encoding, false);
};
注释也说明的很清楚,职能同write类似,都是将数据推入缓冲区,并通过返回值判断是否超过水位。继续调用readableAddChunk方法。
function readableAddChunk(stream, chunk, encoding, addToFront) {
debug('readableAddChunk', chunk);
const state = stream._readableState;
let err;
if (!state.objectMode) {
if (typeof chunk === 'string') {
encoding = encoding || state.defaultEncoding;
if (addToFront && state.encoding && state.encoding !== encoding) {
// When unshifting, if state.encoding is set, we have to save
// the string in the BufferList with the state encoding
chunk = Buffer.from(chunk, encoding).toString(state.encoding);
} else if (encoding !== state.encoding) {
chunk = Buffer.from(chunk, encoding);
encoding = '';
}
} else if (chunk instanceof Buffer) {
encoding = '';
} else if (Stream._isUint8Array(chunk)) {
chunk = Stream._uint8ArrayToBuffer(chunk);
encoding = '';
} else if (chunk != null) {
err = new ERR_INVALID_ARG_TYPE(
'chunk', ['string', 'Buffer', 'Uint8Array'], chunk);
}
}
if (err) {
errorOrDestroy(stream, err);
} else if (chunk === null) {
state.reading = false;
onEofChunk(stream, state);
} else if (state.objectMode || (chunk && chunk.length > 0)) {
if (addToFront) {
if (state.endEmitted)
errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());
else
addChunk(stream, state, chunk, true);
} else if (state.ended) {
errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF());
} else if (state.destroyed) {
return false;
} else {
state.reading = false;
if (state.decoder && !encoding) {
chunk = state.decoder.write(chunk);
if (state.objectMode || chunk.length !== 0)
addChunk(stream, state, chunk, false);
else
maybeReadMore(stream, state);
} else {
addChunk(stream, state, chunk, false);
}
}
} else if (!addToFront) {
state.reading = false;
maybeReadMore(stream, state);
}
// We can push more data if we are below the highWaterMark.
// Also, if we have no data yet, we can stand some more bytes.
// This is to work around cases where hwm=0, such as the repl.
return !state.ended &&
(state.length < state.highWaterMark || state.length === 0);
}
首先判断是否是对象模式,非对象模式要走一次encoding的过程。再通过addChunk这个函数进行真正的数据推入。在这一步里会将reading状态置为false,并注意返回值会根据state内部数据长度和水位来决定,来表示是否可以继续推入数据。继续看addChunk。
function addChunk(stream, state, chunk, addToFront) {
if (state.flowing && state.length === 0 && !state.sync) {
// Use the guard to avoid creating `Set()` repeatedly
// when we have multiple pipes.
if (state.multiAwaitDrain) {
state.awaitDrainWriters.clear();
} else {
state.awaitDrainWriters = null;
}
stream.emit('data', chunk);
} else {
// Update the buffer info.
state.length += state.objectMode ? 1 : chunk.length;
if (addToFront)
state.buffer.unshift(chunk);
else
state.buffer.push(chunk);
if (state.needReadable)
emitReadable(stream);
}
maybeReadMore(stream, state);
}
因为是暂停模式,直接走else分支。更新了数据长度和缓冲区。这里有两个注意点。
首先是needReadable状态的判断,表示需要触发可读事件,也就是目前处于数据从数据源流到readable中的状态节点。那么什么时候应该将其置为true?不妨想想,我们的push是在哪里调用的,实际上是限定在read这个接口内部的,那么可以想到needReadable的状态变化应该是置前到read了。全局搜一下needReadable,果然在read实现里找到这么一段。
// If the length is currently zero, then we *need* a readable event.
if (state.length === 0)
state.needReadable = true;
也就是说第一次调用read并push数据后就会触发readable事件了。继续看触发这个事件的函数内部。
// Don't emit readable right away in sync mode, because this can trigger
// another read() call => stack overflow. This way, it might trigger
// a nextTick recursion warning, but that's not so bad.
function emitReadable(stream) {
const state = stream._readableState;
debug('emitReadable', state.needReadable, state.emittedReadable);
state.needReadable = false;
if (!state.emittedReadable) {
debug('emitReadable', state.flowing);
state.emittedReadable = true;
process.nextTick(emitReadable_, stream);
}
}
function emitReadable_(stream) {
const state = stream._readableState;
debug('emitReadable_', state.destroyed, state.length, state.ended);
if (!state.destroyed && (state.length || state.ended)) {
stream.emit('readable');
state.emittedReadable = false;
}
// The stream needs another readable event if
// 1. It is not flowing, as the flow mechanism will take
// care of it.
// 2. It is not ended.
// 3. It is below the highWaterMark, so we can schedule
// another readable later.
state.needReadable =
!state.flowing &&
!state.ended &&
state.length <= state.highWaterMark;
flow(stream);
}
需要异步的触发readable事件,因为监听该事件可能继续调用read循环爆栈。触发该事件后还要继续判断是否需要另外的readable事件。条件在注释中也写的比较清楚,要满足以下三点。
- 不是流动模式
- 未结束
- 数据长度低于水位
之后调用的flow比较有趣,判断流动状态的一个函数,当处于流动状态时会主动调用read消费数据。暂停模式下不是重点,后面流动模式有大作用。
function flow(stream) {
const state = stream._readableState;
debug('flow', state.flowing);
while (state.flowing && stream.read() !== null);
}
emitReadable看完后就是第二个注意点maybeReadMore了。找到实现有一大段注释解释其功能。
// At this point, the user has presumably seen the 'readable' event,
// and called read() to consume some data. that may have triggered
// in turn another _read(n) call, in which case reading = true if
// it's in progress.
// However, if we're not ended, or reading, and the length < hwm,
// then go ahead and try to read some more preemptively.
function maybeReadMore(stream, state) {
if (!state.readingMore) {
state.readingMore = true;
process.nextTick(maybeReadMore_, stream, state);
}
}
function maybeReadMore_(stream, state) {
// Attempt to read more data if we should.
//
// The conditions for reading more data are (one of):
// - Not enough data buffered (state.length < state.highWaterMark). The loop
// is responsible for filling the buffer with enough data if such data
// is available. If highWaterMark is 0 and we are not in the flowing mode
// we should _not_ attempt to buffer any extra data. We'll get more data
// when the stream consumer calls read() instead.
// - No data in the buffer, and the stream is in flowing mode. In this mode
// the loop below is responsible for ensuring read() is called. Failing to
// call read here would abort the flow and there's no other mechanism for
// continuing the flow if the stream consumer has just subscribed to the
// 'data' event.
//
// In addition to the above conditions to keep reading data, the following
// conditions prevent the data from being read:
// - The stream has ended (state.ended).
// - There is already a pending 'read' operation (state.reading). This is a
// case where the the stream has called the implementation defined _read()
// method, but they are processing the call asynchronously and have _not_
// called push() with new data. In this case we skip performing more
// read()s. The execution ends in this method again after the _read() ends
// up calling push() with more data.
while (!state.reading && !state.ended &&
(state.length < state.highWaterMark ||
(state.flowing && state.length === 0))) {
const len = state.length;
debug('maybeReadMore read 0');
stream.read(0);
if (len === state.length)
// Didn't get any data, stop spinning.
break;
}
state.readingMore = false;
}
就是说有两种情况需要readingMore状态。
- 缓冲区未达到水位线
- 没有数据在缓冲区但处于流动模式
首先要知道一点,数据有一个从数据源到缓冲区到用户的过程,水位线是需要维护的,用户读多了就要从数据源补上,不读的时候并且达到水位线就停止从数据源取数据。这些情况固然可以自己根据返回值写逻辑处理,但为了方便node内部就通过readingMore自动维护了。也就是上面这两种情况下,通过readingMore的循环将内部缓冲区填到水位线。因为read兼顾数据推入和读取功能,调用read时传入0也就是不真正读取数据,只推入数据。
现在通过push有了一些填充数据,再看核心的read方法是怎么消费的。当然还是要明白,分析虽然是按照常规获取再消费的思路,但落实到代码上最先调用的还是read,再在其内部push。read的实现比较长还恰好都是很核心的部分,我们就分步慢慢看。
Readable.prototype.read = function(n) {
debug('read', n);
// Same as parseInt(undefined, 10), however V8 7.3 performance regressed
// in this scenario, so we are doing it manually.
if (n === undefined) {
n = NaN;
} else if (!NumberIsInteger(n)) {
n = parseInt(n, 10);
}
const state = this._readableState;
const nOrig = n;
// ...
首先是要读的长度统一格式化为number类型,并存一份原始值到nOrig,应该就是nOrigin的缩写。接下来就会根据n的大小来决定不同的处理手段了。
// If we're asking for more than the current hwm, then raise the hwm.
if (n > state.highWaterMark)
state.highWaterMark = computeNewHighWaterMark(n);
if (n !== 0)
state.emittedReadable = false;
首先就是和最高水位线的比较,如果大于最高水位线,就会抬升最高水位线。这里有点反直觉了,一般来说水位线应该是静止的参考线,为什么要抬升呢?谈一点我的粗浅理解。因为可读流的模型是双向的,有数据源的流入和对用户的输出,可读流本身类似一个中转的蓄水池。这里的水位线就像一个状态的转换标记,超过时就表示用户可以安全的接收流出。还未达到时,就表示数据源可以安全的流入,也会开启循环流入机制注水。当n大于这个标记时,也就是需求的水量大于整个蓄水池,为了让用户能安全接收n这个数量的数据,就必须把水位线抬到n以上了扩充蓄水池的体积了。computeNewHighWaterMark肯定就是按一定的规则去抬升高度。进去看下实现。
// Don't raise the hwm > 1GB
const MAX_HWM = 0x40000000;
function computeNewHighWaterMark(n) {
if (n >= MAX_HWM) {
// TODO(ronag): Throw ERR_VALUE_OUT_OF_RANGE.
n = MAX_HWM;
} else {
// Get the next highest power of 2 to prevent increasing hwm excessively in
// tiny amounts
n--;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
n++;
}
return n;
}
这个实现很有意思,我搜了一下在Java的HashMap中的tableSizeFor也有同样实现。原理就不细说了,又是一篇长文,总之这里只要记住其目的——得到大于等于n的最小的2的幂次方数值。保证了水位的太高不会太少,也不会太多。可以说是非常有用的一个工具函数了。
if (n !== 0)
state.emittedReadable = false;
// If we're doing read(0) to trigger a readable event, but we
// already have a bunch of data in the buffer, then just trigger
// the 'readable' event and move on.
if (n === 0 &&
state.needReadable &&
((state.highWaterMark !== 0 ?
state.length >= state.highWaterMark :
state.length > 0) ||
state.ended)) {
debug('read: emitReadable', state.length, state.ended);
if (state.length === 0 && state.ended)
endReadable(this);
else
emitReadable(this);
return null;
}
// ...
n不等于0的话,就说明已经进入读模式了,相当于蓄水池已经在放水了,标记一下已触发的readable事件。
n等于0并且needReadable为真并且蓄水池高于最高水位,那就是待放水的状态,触发readable事件供用户接住数据。同时注意到当可读流位于结束状态时会调用endReadable。看一下实现。
function endReadable(stream) {
const state = stream._readableState;
debug('endReadable', state.endEmitted);
if (!state.endEmitted) {
state.ended = true;
process.nextTick(endReadableNT, state, stream);
}
}
function endReadableNT(state, stream) {
debug('endReadableNT', state.endEmitted, state.length);
// Check that we didn't get one last unshift.
if (!state.errorEmitted && !state.endEmitted && state.length === 0) {
state.endEmitted = true;
stream.readable = false;
stream.emit('end');
if (state.autoDestroy) {
// In case of duplex streams we need a way to detect
// if the writable side is ready for autoDestroy as well
const wState = stream._writableState;
if (!wState || (wState.autoDestroy && wState.finished)) {
stream.destroy();
}
}
}
}
抛出end事件,并和可写流类似的有个针对双工流同步销毁的逻辑。继续往下看,如果不是待放水的状态,那么就是放水的状态了。
n = howMuchToRead(n, state);
// If we've ended, and we're now clear, then finish it up.
if (n === 0 && state.ended) {
if (state.length === 0)
endReadable(this);
return null;
}
// ...
function howMuchToRead(n, state) {
if (n <= 0 || (state.length === 0 && state.ended))
return 0;
if (state.objectMode)
return 1;
if (NumberIsNaN(n)) {
// Only flow one buffer at a time
if (state.flowing && state.length)
return state.buffer.first().length;
else
return state.length;
}
if (n <= state.length)
return n;
return state.ended ? state.length : 0;
}
计算要放多少水。主要还是处理各种情况,如果在放水状态下结束,依然走抛end事件的逻辑。正常情况下要放n这个量级的水。这里就要注意了,现在我们是不知道水池里的水够不够的,上面只是调整了最高水位线保证超过n。所以下面的逻辑在放水前,会判断要不要从数据源拿数据。
// if we need a readable event, then we need to do some reading.
var doRead = state.needReadable;
debug('need readable', doRead);
// If we currently have less than the highWaterMark, then also read some
if (state.length === 0 || state.length - n < state.highWaterMark) {
doRead = true;
debug('length less than watermark', doRead);
}
needReadable又来了。当水池低于水位时,将其置为true表示需要从数据源取数据。
// However, if we've ended, then there's no point, if we're already
// reading, then it's unnecessary, and if we're destroyed, then it's
// not allowed.
if (state.ended || state.reading || state.destroyed) {
doRead = false;
debug('reading or ended', doRead);
} else if (doRead) {
debug('do read');
state.reading = true;
state.sync = true;
// If the length is currently zero, then we *need* a readable event.
if (state.length === 0)
state.needReadable = true;
// Call internal read method
this._read(state.highWaterMark);
state.sync = false;
// If _read pushed data synchronously, then `reading` will be false,
// and we need to re-evaluate how much data we can return to the user.
if (!state.reading)
n = howMuchToRead(nOrig, state);
}
如果已经结束了或者销毁了,再取就没意义。或者已经处于读的状态就不需要再开启。接下来根据doRead进行具体的执行。
当需要取数据时,标记reading状态表示正在取数据中,调用内部read方法拿数据,在这个方法中就会通过上面的push实现真正的存储数据到蓄水池。因为内部read的实现异步同步都可以,当是同步时可能改变state,需要用之前保存的n原始值重新计算可读长度。
var ret;
if (n > 0)
ret = fromList(n, state);
else
ret = null;
if (ret === null) {
state.needReadable = state.length <= state.highWaterMark;
n = 0;
} else {
state.length -= n;
if (state.multiAwaitDrain) {
state.awaitDrainWriters.clear();
} else {
state.awaitDrainWriters = null;
}
}
if (state.length === 0) {
// If we have nothing in the buffer, then we want to know
// as soon as we *do* get something into the buffer.
if (!state.ended)
state.needReadable = true;
// If we tried to read() past the EOF, then emit end on the next tick.
if (nOrig !== n && state.ended)
endReadable(this);
}
if (ret !== null)
this.emit('data', ret);
return ret;
确认可读长度后就从水池里取数据到用户了。水池数据量不够时,ret会得到null,依然将needReadable置为true,继续注水。当水池数据量满足用户需求时,就返回这部分数据,维护state长度。如果是流动模式就通过data事件抛出。至此就完成了一次流出过程。
对暂停模式下的数据流动小结一下,就是从read开始,进入到内部的_read方法通过push注水,在push内部通过maybeReadMore将水池注到水位线以上,再通过read的其他分支逻辑出水。流程闭环都连接在read这个方法上。以我的理解来看,够聚合,但难免有些职责划分混乱,同步异步,流入流出,各种状态标记全部搅和在一起,不仔细读还真难看懂,或许还有优化空间。
流动模式
暂停模式的闭环看完了,流动模式其实也就简单了,本质是一样的,只是用户消费数据的形式由主动转为被动。通常来说有三种方案转化到流动模式。不过在看流动模式转化方案的细节之前,我们必须先搞清楚一点,就是流动模式和暂停模式的本质区别在哪里?虽然说从下面的源码可以分析出来,不过这里我先点出来让读者有个对核心思路梳理的准备——数据的主动输出,更准确的说,一旦蓄水池中有水时,就通过触发data事件将其输出,而不管是否有接收者。
下面就看不同的转化方法是如何实现这个逻辑的吧。
resume
// pause() and resume() are remnants of the legacy readable stream API
// If the user uses them, then switch into old mode.
Readable.prototype.resume = function() {
const state = this._readableState;
if (!state.flowing) {
debug('resume');
// We flow only if there is no one listening
// for readable, but we still have to call
// resume()
state.flowing = !state.readableListening;
resume(this, state);
}
state[kPaused] = false;
return this;
};
注释提到resume和pause是遗留api,按道理不推荐使用。不过看看思路还是必要的。这里首先会判断是否处于流动模式,如果不是流动模式并且没有监听readable事件的话就转为流动模式。为什么需要和readable事件互斥呢?因为历史原因stream提供了太多方法消费数据,node本身不推荐各种消费方案混用,不过还是设计了readable事件对流的控制权在data事件之上。
function resume(stream, state) {
if (!state.resumeScheduled) {
state.resumeScheduled = true;
process.nextTick(resume_, stream, state);
}
}
function resume_(stream, state) {
debug('resume', state.reading);
if (!state.reading) {
stream.read(0);
}
state.resumeScheduled = false;
stream.emit('resume');
flow(stream);
if (state.flowing && !state.reading)
stream.read(0);
}
异步调用resume_,在过程中标记一些状态,再调用了read(0)。之前也谈过,read(0)的目的在暂停模式下就是蓄水不放水,保证水池一直有安全流出的水位。resume的数据主动流出在flow方法中。前面也谈到flow方法会在流动模式下主动调用read,就是从这里开启真正的数据流动。
function flow(stream) {
const state = stream._readableState;
debug('flow', state.flowing);
while (state.flowing && stream.read() !== null);
}
再回顾一下read方法和flow有关的部分。
Readable.prototype.read = function(n) {
debug('read', n);
// Same as parseInt(undefined, 10), however V8 7.3 performance regressed
// in this scenario, so we are doing it manually.
if (n === undefined) {
n = NaN;
}
// ...
// howMuchToRead
if (NumberIsNaN(n)) {
// Only flow one buffer at a time
if (state.flowing && state.length)
return state.buffer.first().length;
else
return state.length;
}
// read
if (ret !== null)
this.emit('data', ret);
当不传n时会通过howMuchToRead计算要读多少数据,流动模式下就读第一个buffer长度,并通过data事件抛出。这里如果我们不监听data,数据其实也就主动流失了。因为flow中是一个while判断。
while (state.flowing && stream.read() !== null);
所以当水池里有数据时都会源源不断的流出。那么数据流空了呢?
if (state.length === 0 || state.length - n < state.highWaterMark) {
doRead = true;
debug('length less than watermark', doRead);
}
在read中切换needReadable,再调用_read,从中调push从数据源取数据。走到addChunk。
function addChunk(stream, state, chunk, addToFront) {
if (state.flowing && state.length === 0 && !state.sync) {
// Use the guard to avoid creating `Set()` repeatedly
// when we have multiple pipes.
if (state.multiAwaitDrain) {
state.awaitDrainWriters.clear();
} else {
state.awaitDrainWriters = null;
}
stream.emit('data', chunk);
}
// ...
直接通过data事件把要取的数据流出,形成闭环。
监听data事件
原理本质上还是调用resume。直接看源码。
Readable.prototype.on = function(ev, fn) {
const res = Stream.prototype.on.call(this, ev, fn);
const state = this._readableState;
if (ev === 'data') {
// Update readableListening so that resume() may be a no-op
// a few lines down. This is needed to support once('readable').
state.readableListening = this.listenerCount('readable') > 0;
// Try start flowing on next tick if stream isn't explicitly paused
if (state.flowing !== false)
this.resume();
} else if (ev === 'readable') {
if (!state.endEmitted && !state.readableListening) {
state.readableListening = state.needReadable = true;
state.flowing = false;
state.emittedReadable = false;
debug('on readable', state.length, state.reading);
if (state.length) {
emitReadable(this);
} else if (!state.reading) {
process.nextTick(nReadingNextTick, this);
}
}
}
return res;
};
pipe
内容比较长,直接看核心部分。
Readable.prototype.pipe = function(dest, pipeOpts) {
const src = this;
// ...
state.pipes.push(dest);
// ...
src.on('data', ondata);
function ondata(chunk) {
debug('ondata');
const ret = dest.write(chunk);
debug('dest.write', ret);
if (ret === false) {
// If the user unpiped during `dest.write()`, it is possible
// to get stuck in a permanently paused state if that write
// also returned false.
// => Check whether `dest` is still a piping destination.
if (!cleanedUp) {
if (state.pipes.length === 1 && state.pipes[0] === dest) {
debug('false write response, pause', 0);
state.awaitDrainWriters = dest;
state.multiAwaitDrain = false;
} else if (state.pipes.length > 1 && state.pipes.includes(dest)) {
debug('false write response, pause', state.awaitDrainWriters.size);
state.awaitDrainWriters.add(dest);
}
src.pause();
}
if (!ondrain) {
// When the dest drains, it reduces the awaitDrain counter
// on the source. This would be more elegant with a .once()
// handler in flow(), but adding and removing repeatedly is
// too slow.
ondrain = pipeOnDrain(src, dest);
dest.on('drain', ondrain);
}
}
}
// ...
// Tell the dest that it's being piped to
dest.emit('pipe', src);
// Start the flow if it hasn't been started already.
if (!state.flowing) {
debug('pipe resume');
src.resume();
}
return dest;
};
首先监听readable的data事件,再调用readable的resume转为流动模式。在data事件的回调上,将流出的数据写到可写流。注意到当write不可写时会有一个awaitDrain的计数存储处理,当drain事件触发时再写入。
综合看以上三种方法,实现其实是一致的,resume开启流动数据和data事件接住流动数据。
小结
还有一些衍生的东西没讲,比如双工流和转换流,但核心内容都在这里了,无非就是在可读和可写之上附加一些小trick,就不多谈了,感兴趣的可以自己去读一下。再回顾一下整个实现,核心思路还是在于一个蓄水池对接数据源和用户两端,根据水位的高低决定蓄水池行为。水的流动依赖于各种事件,内部有两套处理流动的机制——readable和data,分别对应主动和被动,但不得不说职责划分得并不是很清晰,实现全部都混杂在了read这个方法中,也没有明确隔离两种事件机制的应用场景,混着用会不可避免的出现问题,期待在未来的版本中有所改善。
-- EOF --