Node源码解析——WritableStream
终于到这个核心模块了。包括之前写过的网络模块,在Node中大量模块的底层都用到了stream处理数据。这篇文章就来看stream的可写流内部实现。
Writable
stream的使用一般有两种形式,一是集成到其他模块的API,对用户不可见。二是通过stream模块提供的自定义接口。当然也应该猜到,其他模块底层也是用了自定义接口的实现。不妨就从最直观的文件系统API入手查看。fs模块中有一个创建可写流的接口fs.createWriteStream,在lib/fs.js
找到其实现。
function createWriteStream(path, options) {
lazyLoadStreams();
return new WriteStream(path, options);
}
function lazyLoadStreams() {
if (!ReadStream) {
({ ReadStream, WriteStream } = require('internal/fs/streams'));
[ FileReadStream, FileWriteStream ] = [ ReadStream, WriteStream ];
}
}
继续从internal/fs/streams
中取到可读和可写流的构造函数。先看可写流。
const { Readable, Writable } = require('stream');
function WriteStream(path, options) {
if (!(this instanceof WriteStream))
return new WriteStream(path, options);
options = copyObject(getOptions(options, {}));
// Only buffers are supported.
options.decodeStrings = true;
if (options.autoDestroy === undefined) {
options.autoDestroy = false;
}
this[kFs] = options.fs || fs;
if (typeof this[kFs].open !== 'function') {
throw new ERR_INVALID_ARG_TYPE('options.fs.open', 'function',
this[kFs].open);
}
if (!this[kFs].write && !this[kFs].writev) {
throw new ERR_INVALID_ARG_TYPE('options.fs.write', 'function',
this[kFs].write);
}
if (this[kFs].write && typeof this[kFs].write !== 'function') {
throw new ERR_INVALID_ARG_TYPE('options.fs.write', 'function',
this[kFs].write);
}
if (this[kFs].writev && typeof this[kFs].writev !== 'function') {
throw new ERR_INVALID_ARG_TYPE('options.fs.writev', 'function',
this[kFs].writev);
}
if (typeof this[kFs].close !== 'function') {
throw new ERR_INVALID_ARG_TYPE('options.fs.close', 'function',
this[kFs].close);
}
// It's enough to override either, in which case only one will be used.
if (!this[kFs].write) {
this._write = null;
}
if (!this[kFs].writev) {
this._writev = null;
}
Writable.call(this, options);
// Path will be ignored when fd is specified, so it can be falsy
this.path = toPathIfFileURL(path);
this.fd = options.fd === undefined ? null : options.fd;
this.flags = options.flags === undefined ? 'w' : options.flags;
this.mode = options.mode === undefined ? 0o666 : options.mode;
this.start = options.start;
this.autoClose = options.autoClose === undefined ? true : !!options.autoClose;
this.pos = undefined;
this.bytesWritten = 0;
this.closed = false;
this[kIsPerformingIO] = false;
if (this.start !== undefined) {
checkPosition(this.start, 'start');
this.pos = this.start;
}
if (options.encoding)
this.setDefaultEncoding(options.encoding);
if (typeof this.fd !== 'number')
_openWriteFs(this);
}
ObjectSetPrototypeOf(WriteStream.prototype, Writable.prototype);
ObjectSetPrototypeOf(WriteStream, Writable);
果然就是继承自stream模块的Writeable了。追根溯源,无论stream在多少模块中用到,只要搞清楚stream模块本身的实现即可。就进入到WriteStream的实现,入口在lib/stream.js
。
const Stream = module.exports = require('internal/streams/legacy');
Stream.Readable = require('_stream_readable');
Stream.Writable = require('_stream_writable');
Stream.Duplex = require('_stream_duplex');
Stream.Transform = require('_stream_transform');
Stream.PassThrough = require('_stream_passthrough');
注意到,stream和http类似,实现按流的种类被划分为几个文件。这一小节看可写流就进入到lib/_stream_writable.js
找到构造函数实现。
function Writable(options) {
// Writable ctor is applied to Duplexes, too.
// `realHasInstance` is necessary because using plain `instanceof`
// would return false, as no `_writableState` property is attached.
// Trying to use the custom `instanceof` for Writable here will also break the
// Node.js LazyTransform implementation, which has a non-trivial getter for
// `_writableState` that would lead to infinite recursion.
// Checking for a Stream.Duplex instance is faster here instead of inside
// the WritableState constructor, at least with V8 6.5
const isDuplex = (this instanceof Stream.Duplex);
if (!isDuplex && !realHasInstance.call(Writable, this))
return new Writable(options);
this._writableState = new WritableState(options, this, isDuplex);
// legacy.
this.writable = true;
if (options) {
if (typeof options.write === 'function')
this._write = options.write;
if (typeof options.writev === 'function')
this._writev = options.writev;
if (typeof options.destroy === 'function')
this._destroy = options.destroy;
if (typeof options.final === 'function')
this._final = options.final;
}
Stream.call(this, options);
}
这个构造函数中需要关注的点有以下3个。
- RealHasInstance的目的。因为Duplex流实现了可读和可写流,但并不是多重继承,所以常规的instanceof Writable会返回false,所以需要hack掉这里的instanceof判断。而instanceof的实现在FunctionPrototype[SymbolHasInstance]这个符号属性上,就有了下面这段hack代码。
// Test _writableState for inheritance to account for Duplex streams,
// whose prototype chain only points to Readable.
var realHasInstance;
if (typeof Symbol === 'function' && SymbolHasInstance) {
realHasInstance = FunctionPrototype[SymbolHasInstance];
ObjectDefineProperty(Writable, SymbolHasInstance, {
value: function(object) {
if (realHasInstance.call(this, object))
return true;
if (this !== Writable)
return false;
return object && object._writableState instanceof WritableState;
}
});
} else {
realHasInstance = function(object) {
return object instanceof this;
};
}
realHasInstance就是保存的原来的instanceof。
- WritableState的实例化以及通过options参数自定义写入方法。WritableState上统一管理着可写流的状态数据。
- Stream的继承。我们知道,有一些实现是所有类型的流都通用的,比如事件机制的实现。Stream的继承就是用来做这件事。
接下来就直接看关键方法write的实现。
Writable.prototype.write = function(chunk, encoding, cb) {
const state = this._writableState;
var ret = false;
const isBuf = !state.objectMode && Stream._isUint8Array(chunk);
// Do not use Object.getPrototypeOf as it is slower since V8 7.3.
if (isBuf && !(chunk instanceof Buffer)) {
chunk = Stream._uint8ArrayToBuffer(chunk);
}
if (typeof encoding === 'function') {
cb = encoding;
encoding = null;
}
if (isBuf)
encoding = 'buffer';
else if (!encoding)
encoding = state.defaultEncoding;
if (typeof cb !== 'function')
cb = nop;
let err;
if (state.ending) {
err = new ERR_STREAM_WRITE_AFTER_END();
} else if (state.destroyed) {
err = new ERR_STREAM_DESTROYED('write');
} else if (chunk === null) {
err = new ERR_STREAM_NULL_VALUES();
} else if (!isBuf && typeof chunk !== 'string' && !state.objectMode) {
err = new ERR_INVALID_ARG_TYPE('chunk', ['string', 'Buffer'], chunk);
} else {
state.pendingcb++;
ret = writeOrBuffer(this, state, chunk, encoding, cb);
}
if (err) {
process.nextTick(cb, err);
errorOrDestroy(this, err, true);
}
return ret;
};
主要是处理参数和错误,如果流程正常,会计数pendingcb并继续调用writeOrBuffer。
// If we're already writing something, then just put this
// in the queue, and wait our turn. Otherwise, call _write
// If we return false, then we need a drain event, so set that flag.
function writeOrBuffer(stream, state, chunk, encoding, cb) {
if (!state.objectMode &&
state.decodeStrings !== false &&
encoding !== 'buffer' &&
typeof chunk === 'string') {
chunk = Buffer.from(chunk, encoding);
encoding = 'buffer';
}
const len = state.objectMode ? 1 : chunk.length;
state.length += len;
const ret = state.length < state.highWaterMark;
// We must ensure that previous needDrain will not be reset to false.
if (!ret)
state.needDrain = true;
if (state.writing || state.corked || state.errored) {
var last = state.lastBufferedRequest;
state.lastBufferedRequest = {
chunk,
encoding,
callback: cb,
next: null
};
if (last) {
last.next = state.lastBufferedRequest;
} else {
state.bufferedRequest = state.lastBufferedRequest;
}
state.bufferedRequestCount += 1;
} else {
doWrite(stream, state, false, len, chunk, encoding, cb);
}
// Return false if errored or destroyed in order to break
// any synchronous while(stream.write(data)) loops.
return ret && !state.errored && !state.destroyed;
}
这里比较关键的点首先是length的维护和后续水位的判断,直接关系到drain状态。之后如果处于writing状态的话会先缓存到lastBufferedRequest并通过next和state上的bufferedRequestCount维护数量,等待上一次的写入结束。如果正常可写入的话,就会调用doWrite,应该要做真正的写入操作了。
function doWrite(stream, state, writev, len, chunk, encoding, cb) {
state.writelen = len;
state.writecb = cb;
state.writing = true;
state.sync = true;
if (state.destroyed)
state.onwrite(new ERR_STREAM_DESTROYED('write'));
else if (writev)
stream._writev(chunk, state.onwrite);
else
stream._write(chunk, encoding, state.onwrite);
state.sync = false;
}
将writing置为true继续调用_writev或者_write,两者的区别文档也解释了,分别针对多数据块和单数据块。直接看_write。
Writable.prototype._write = function(chunk, encoding, cb) {
if (this._writev) {
this._writev([{ chunk, encoding }], cb);
} else {
process.nextTick(cb, new ERR_METHOD_NOT_IMPLEMENTED('_write()'));
}
};
小朋友你是否有很多问号,直接是个抛错未实现的方法。不过仔细一想也明白,writable本身就是个基类,具体的写入实现应该针对不同场景下对数据的处理方案。文档中也给了自己实现可写流的例子,确实要自己实现_write方法。那么既然文章以fs模块开始,就顺便看看fs模块对这个方法的实现有什么值得注意之处好了。来到lib/internal/fs/streams.js
找到实现。
WriteStream.prototype._write = function(data, encoding, cb) {
if (typeof this.fd !== 'number') {
return this.once('open', function() {
this._write(data, encoding, cb);
});
}
if (this.destroyed) return cb(new ERR_STREAM_DESTROYED('write'));
this[kIsPerformingIO] = true;
this[kFs].write(this.fd, data, 0, data.length, this.pos, (er, bytes) => {
this[kIsPerformingIO] = false;
// Tell ._destroy() that it's safe to close the fd now.
if (this.destroyed) {
cb(er);
return this.emit(kIoDone, er);
}
if (er) {
if (this.autoClose) {
this.destroy();
}
return cb(er);
}
this.bytesWritten += bytes;
cb();
});
if (this.pos !== undefined)
this.pos += data.length;
};
似乎是没什么特别的流程,拿到data,encoding和cb也就是onwrite作为参数,调用fs本身的write方法实现写入data逻辑,之后在回调中注意调用cb即可。那么也就可以明白了,writable本身的核心在于在真正的写入之前和之后提供的一些控制能力。比如writing和corked标志位的控制,决定是真正的调用写入方法还是通过lastBufferedRequest缓存。
这里也注意到,当你调用write后,无论是否超过水位,除了影响drain状态外,writable本身是不会阻止你的写入的,需要你自己写逻辑根据write返回的标志位控制写入。
继续看写入完成之后的onwrite,也可以猜到,除了调用我们传入的回调外,主要集中于写入后状态的处理,比如水位控制等。
function onwrite(stream, er) {
const state = stream._writableState;
const sync = state.sync;
const cb = state.writecb;
if (typeof cb !== 'function') {
errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK());
return;
}
state.writing = false;
state.writecb = null;
state.length -= state.writelen;
state.writelen = 0;
if (er) {
state.errored = true;
if (sync) {
process.nextTick(onwriteError, stream, state, er, cb);
} else {
onwriteError(stream, state, er, cb);
}
} else {
// Check if we're actually ready to finish, but don't emit yet
var finished = needFinish(state) || stream.destroyed;
if (!finished &&
!state.corked &&
!state.bufferProcessing &&
state.bufferedRequest) {
clearBuffer(stream, state);
}
if (sync) {
// It is a common case that the callback passed to .write() is always
// the same. In that case, we do not schedule a new nextTick(), but rather
// just increase a counter, to improve performance and avoid memory
// allocations.
if (state.afterWriteTickInfo !== null &&
state.afterWriteTickInfo.cb === cb) {
state.afterWriteTickInfo.count++;
} else {
state.afterWriteTickInfo = { count: 1, cb, stream, state };
process.nextTick(afterWriteTick, state.afterWriteTickInfo);
}
} else {
afterWrite(stream, state, 1, cb);
}
}
}
除了writing和length控制外,还有两个值得注意的一点。一个就是在满足条件的情况下调用clearBuffer一次写入缓存数据,也就是api里cork的相关实现了。另外一个就是针对同步写入时的callback调用性能优化,因为大部分情况下,同步写入的回调是一样的,如此的话就会用计数代替异步调用,避免内存泄漏。这些处理完毕后,就会调用afterWrite。
function afterWrite(stream, state, count, cb) {
const needDrain = !state.ending && !stream.destroyed && state.length === 0 &&
state.needDrain;
if (needDrain) {
state.needDrain = false;
stream.emit('drain');
}
while (count-- > 0) {
state.pendingcb--;
cb();
}
if (state.destroyed) {
errorBuffer(state, new ERR_STREAM_DESTROYED('write'));
}
finishMaybe(stream, state);
}
集中于drain事件的处理和我们传入的回调的调用,继续调用finishMaybe处理finish事件。
function finishMaybe(stream, state, sync) {
const need = needFinish(state);
if (need) {
prefinish(stream, state);
if (state.pendingcb === 0) {
state.pendingcb++;
if (sync) {
process.nextTick(finish, stream, state);
} else {
finish(stream, state);
}
}
}
return need;
}
当满足finish条件时,pendingcb加1也就是finish这个回调,进而调用这个回调。
function finish(stream, state) {
state.pendingcb--;
state.finished = true;
stream.emit('finish');
if (state.autoDestroy) {
// In case of duplex streams we need a way to detect
// if the readable side is ready for autoDestroy as well
const rState = stream._readableState;
if (!rState || (rState.autoDestroy && rState.endEmitted)) {
stream.destroy();
}
}
}
这里触发finish事件并处理duplex的可读可写同步销毁的问题,至此writable的整体流程就结束了。
小结
writable的结构其实和之前读过的tcp还比较像,核心还是在数据写入前设计一层缓冲区处理。下一篇文章继续看readable。
-- EOF --