Time Bomb Veela
  1. 1 Time Bomb Veela
  2. 2 Libertus Chen-U
  3. 3 Life Will Change Lyn
  4. 4 Last Surprise Lyn
  5. 5 One Last You Jen Bird
  6. 6 Warcry mpi
  7. 7 かかってこいよ NakamuraEmi
  8. 8 Hypocrite Nush
  9. 9 Flower Of Life 发热巫女
  10. 10 Quiet Storm Lyn
  11. 11 The Night We Stood Lyn
2020-03-26 22:08:42

Node源码解析——WritableStream

终于到这个核心模块了。包括之前写过的网络模块,在Node中大量模块的底层都用到了stream处理数据。这篇文章就来看stream的可写流内部实现。

Writable

stream的使用一般有两种形式,一是集成到其他模块的API,对用户不可见。二是通过stream模块提供的自定义接口。当然也应该猜到,其他模块底层也是用了自定义接口的实现。不妨就从最直观的文件系统API入手查看。fs模块中有一个创建可写流的接口fs.createWriteStream,在lib/fs.js找到其实现。

function createWriteStream(path, options) {
  lazyLoadStreams();
  return new WriteStream(path, options);
}

function lazyLoadStreams() {
  if (!ReadStream) {
    ({ ReadStream, WriteStream } = require('internal/fs/streams'));
    [ FileReadStream, FileWriteStream ] = [ ReadStream, WriteStream ];
  }
}

继续从internal/fs/streams中取到可读和可写流的构造函数。先看可写流。

const { Readable, Writable } = require('stream');

function WriteStream(path, options) {
  if (!(this instanceof WriteStream))
    return new WriteStream(path, options);

  options = copyObject(getOptions(options, {}));

  // Only buffers are supported.
  options.decodeStrings = true;

  if (options.autoDestroy === undefined) {
    options.autoDestroy = false;
  }

  this[kFs] = options.fs || fs;
  if (typeof this[kFs].open !== 'function') {
    throw new ERR_INVALID_ARG_TYPE('options.fs.open', 'function',
                                   this[kFs].open);
  }

  if (!this[kFs].write && !this[kFs].writev) {
    throw new ERR_INVALID_ARG_TYPE('options.fs.write', 'function',
                                   this[kFs].write);
  }

  if (this[kFs].write && typeof this[kFs].write !== 'function') {
    throw new ERR_INVALID_ARG_TYPE('options.fs.write', 'function',
                                   this[kFs].write);
  }

  if (this[kFs].writev && typeof this[kFs].writev !== 'function') {
    throw new ERR_INVALID_ARG_TYPE('options.fs.writev', 'function',
                                   this[kFs].writev);
  }

  if (typeof this[kFs].close !== 'function') {
    throw new ERR_INVALID_ARG_TYPE('options.fs.close', 'function',
                                   this[kFs].close);
  }

  // It's enough to override either, in which case only one will be used.
  if (!this[kFs].write) {
    this._write = null;
  }
  if (!this[kFs].writev) {
    this._writev = null;
  }

  Writable.call(this, options);

  // Path will be ignored when fd is specified, so it can be falsy
  this.path = toPathIfFileURL(path);
  this.fd = options.fd === undefined ? null : options.fd;
  this.flags = options.flags === undefined ? 'w' : options.flags;
  this.mode = options.mode === undefined ? 0o666 : options.mode;

  this.start = options.start;
  this.autoClose = options.autoClose === undefined ? true : !!options.autoClose;
  this.pos = undefined;
  this.bytesWritten = 0;
  this.closed = false;
  this[kIsPerformingIO] = false;

  if (this.start !== undefined) {
    checkPosition(this.start, 'start');

    this.pos = this.start;
  }

  if (options.encoding)
    this.setDefaultEncoding(options.encoding);

  if (typeof this.fd !== 'number')
    _openWriteFs(this);
}
ObjectSetPrototypeOf(WriteStream.prototype, Writable.prototype);
ObjectSetPrototypeOf(WriteStream, Writable);

果然就是继承自stream模块的Writeable了。追根溯源,无论stream在多少模块中用到,只要搞清楚stream模块本身的实现即可。就进入到WriteStream的实现,入口在lib/stream.js

const Stream = module.exports = require('internal/streams/legacy');

Stream.Readable = require('_stream_readable');
Stream.Writable = require('_stream_writable');
Stream.Duplex = require('_stream_duplex');
Stream.Transform = require('_stream_transform');
Stream.PassThrough = require('_stream_passthrough');

注意到,stream和http类似,实现按流的种类被划分为几个文件。这一小节看可写流就进入到lib/_stream_writable.js找到构造函数实现。


function Writable(options) {
  // Writable ctor is applied to Duplexes, too.
  // `realHasInstance` is necessary because using plain `instanceof`
  // would return false, as no `_writableState` property is attached.

  // Trying to use the custom `instanceof` for Writable here will also break the
  // Node.js LazyTransform implementation, which has a non-trivial getter for
  // `_writableState` that would lead to infinite recursion.

  // Checking for a Stream.Duplex instance is faster here instead of inside
  // the WritableState constructor, at least with V8 6.5
  const isDuplex = (this instanceof Stream.Duplex);

  if (!isDuplex && !realHasInstance.call(Writable, this))
    return new Writable(options);

  this._writableState = new WritableState(options, this, isDuplex);

  // legacy.
  this.writable = true;

  if (options) {
    if (typeof options.write === 'function')
      this._write = options.write;

    if (typeof options.writev === 'function')
      this._writev = options.writev;

    if (typeof options.destroy === 'function')
      this._destroy = options.destroy;

    if (typeof options.final === 'function')
      this._final = options.final;
  }

  Stream.call(this, options);
}

这个构造函数中需要关注的点有以下3个。

  1. RealHasInstance的目的。因为Duplex流实现了可读和可写流,但并不是多重继承,所以常规的instanceof Writable会返回false,所以需要hack掉这里的instanceof判断。而instanceof的实现在FunctionPrototype[SymbolHasInstance]这个符号属性上,就有了下面这段hack代码。
// Test _writableState for inheritance to account for Duplex streams,
// whose prototype chain only points to Readable.
var realHasInstance;
if (typeof Symbol === 'function' && SymbolHasInstance) {
  realHasInstance = FunctionPrototype[SymbolHasInstance];
  ObjectDefineProperty(Writable, SymbolHasInstance, {
    value: function(object) {
      if (realHasInstance.call(this, object))
        return true;
      if (this !== Writable)
        return false;

      return object && object._writableState instanceof WritableState;
    }
  });
} else {
  realHasInstance = function(object) {
    return object instanceof this;
  };
}

realHasInstance就是保存的原来的instanceof。

  1. WritableState的实例化以及通过options参数自定义写入方法。WritableState上统一管理着可写流的状态数据。
  2. Stream的继承。我们知道,有一些实现是所有类型的流都通用的,比如事件机制的实现。Stream的继承就是用来做这件事。

接下来就直接看关键方法write的实现。

Writable.prototype.write = function(chunk, encoding, cb) {
  const state = this._writableState;
  var ret = false;
  const isBuf = !state.objectMode && Stream._isUint8Array(chunk);

  // Do not use Object.getPrototypeOf as it is slower since V8 7.3.
  if (isBuf && !(chunk instanceof Buffer)) {
    chunk = Stream._uint8ArrayToBuffer(chunk);
  }

  if (typeof encoding === 'function') {
    cb = encoding;
    encoding = null;
  }

  if (isBuf)
    encoding = 'buffer';
  else if (!encoding)
    encoding = state.defaultEncoding;

  if (typeof cb !== 'function')
    cb = nop;

  let err;
  if (state.ending) {
    err = new ERR_STREAM_WRITE_AFTER_END();
  } else if (state.destroyed) {
    err = new ERR_STREAM_DESTROYED('write');
  } else if (chunk === null) {
    err = new ERR_STREAM_NULL_VALUES();
  } else if (!isBuf && typeof chunk !== 'string' && !state.objectMode) {
    err = new ERR_INVALID_ARG_TYPE('chunk', ['string', 'Buffer'], chunk);
  } else {
    state.pendingcb++;
    ret = writeOrBuffer(this, state, chunk, encoding, cb);
  }

  if (err) {
    process.nextTick(cb, err);
    errorOrDestroy(this, err, true);
  }

  return ret;
};

主要是处理参数和错误,如果流程正常,会计数pendingcb并继续调用writeOrBuffer。

// If we're already writing something, then just put this
// in the queue, and wait our turn.  Otherwise, call _write
// If we return false, then we need a drain event, so set that flag.
function writeOrBuffer(stream, state, chunk, encoding, cb) {
  if (!state.objectMode &&
      state.decodeStrings !== false &&
      encoding !== 'buffer' &&
      typeof chunk === 'string') {
    chunk = Buffer.from(chunk, encoding);
    encoding = 'buffer';
  }
  const len = state.objectMode ? 1 : chunk.length;

  state.length += len;

  const ret = state.length < state.highWaterMark;
  // We must ensure that previous needDrain will not be reset to false.
  if (!ret)
    state.needDrain = true;

  if (state.writing || state.corked || state.errored) {
    var last = state.lastBufferedRequest;
    state.lastBufferedRequest = {
      chunk,
      encoding,
      callback: cb,
      next: null
    };
    if (last) {
      last.next = state.lastBufferedRequest;
    } else {
      state.bufferedRequest = state.lastBufferedRequest;
    }
    state.bufferedRequestCount += 1;
  } else {
    doWrite(stream, state, false, len, chunk, encoding, cb);
  }

  // Return false if errored or destroyed in order to break
  // any synchronous while(stream.write(data)) loops.
  return ret && !state.errored && !state.destroyed;
}

这里比较关键的点首先是length的维护和后续水位的判断,直接关系到drain状态。之后如果处于writing状态的话会先缓存到lastBufferedRequest并通过next和state上的bufferedRequestCount维护数量,等待上一次的写入结束。如果正常可写入的话,就会调用doWrite,应该要做真正的写入操作了。

function doWrite(stream, state, writev, len, chunk, encoding, cb) {
  state.writelen = len;
  state.writecb = cb;
  state.writing = true;
  state.sync = true;
  if (state.destroyed)
    state.onwrite(new ERR_STREAM_DESTROYED('write'));
  else if (writev)
    stream._writev(chunk, state.onwrite);
  else
    stream._write(chunk, encoding, state.onwrite);
  state.sync = false;
}

将writing置为true继续调用_writev或者_write,两者的区别文档也解释了,分别针对多数据块和单数据块。直接看_write。

Writable.prototype._write = function(chunk, encoding, cb) {
  if (this._writev) {
    this._writev([{ chunk, encoding }], cb);
  } else {
    process.nextTick(cb, new ERR_METHOD_NOT_IMPLEMENTED('_write()'));
  }
};

小朋友你是否有很多问号,直接是个抛错未实现的方法。不过仔细一想也明白,writable本身就是个基类,具体的写入实现应该针对不同场景下对数据的处理方案。文档中也给了自己实现可写流的例子,确实要自己实现_write方法。那么既然文章以fs模块开始,就顺便看看fs模块对这个方法的实现有什么值得注意之处好了。来到lib/internal/fs/streams.js找到实现。

WriteStream.prototype._write = function(data, encoding, cb) {
  if (typeof this.fd !== 'number') {
    return this.once('open', function() {
      this._write(data, encoding, cb);
    });
  }

  if (this.destroyed) return cb(new ERR_STREAM_DESTROYED('write'));

  this[kIsPerformingIO] = true;
  this[kFs].write(this.fd, data, 0, data.length, this.pos, (er, bytes) => {
    this[kIsPerformingIO] = false;
    // Tell ._destroy() that it's safe to close the fd now.
    if (this.destroyed) {
      cb(er);
      return this.emit(kIoDone, er);
    }

    if (er) {
      if (this.autoClose) {
        this.destroy();
      }
      return cb(er);
    }
    this.bytesWritten += bytes;
    cb();
  });

  if (this.pos !== undefined)
    this.pos += data.length;
};

似乎是没什么特别的流程,拿到data,encoding和cb也就是onwrite作为参数,调用fs本身的write方法实现写入data逻辑,之后在回调中注意调用cb即可。那么也就可以明白了,writable本身的核心在于在真正的写入之前和之后提供的一些控制能力。比如writing和corked标志位的控制,决定是真正的调用写入方法还是通过lastBufferedRequest缓存。

这里也注意到,当你调用write后,无论是否超过水位,除了影响drain状态外,writable本身是不会阻止你的写入的,需要你自己写逻辑根据write返回的标志位控制写入。

继续看写入完成之后的onwrite,也可以猜到,除了调用我们传入的回调外,主要集中于写入后状态的处理,比如水位控制等。


function onwrite(stream, er) {
  const state = stream._writableState;
  const sync = state.sync;
  const cb = state.writecb;

  if (typeof cb !== 'function') {
    errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK());
    return;
  }

  state.writing = false;
  state.writecb = null;
  state.length -= state.writelen;
  state.writelen = 0;

  if (er) {
    state.errored = true;
    if (sync) {
      process.nextTick(onwriteError, stream, state, er, cb);
    } else {
      onwriteError(stream, state, er, cb);
    }
  } else {
    // Check if we're actually ready to finish, but don't emit yet
    var finished = needFinish(state) || stream.destroyed;

    if (!finished &&
        !state.corked &&
        !state.bufferProcessing &&
        state.bufferedRequest) {
      clearBuffer(stream, state);
    }

    if (sync) {
      // It is a common case that the callback passed to .write() is always
      // the same. In that case, we do not schedule a new nextTick(), but rather
      // just increase a counter, to improve performance and avoid memory
      // allocations.
      if (state.afterWriteTickInfo !== null &&
          state.afterWriteTickInfo.cb === cb) {
        state.afterWriteTickInfo.count++;
      } else {
        state.afterWriteTickInfo = { count: 1, cb, stream, state };
        process.nextTick(afterWriteTick, state.afterWriteTickInfo);
      }
    } else {
      afterWrite(stream, state, 1, cb);
    }
  }
}

除了writing和length控制外,还有两个值得注意的一点。一个就是在满足条件的情况下调用clearBuffer一次写入缓存数据,也就是api里cork的相关实现了。另外一个就是针对同步写入时的callback调用性能优化,因为大部分情况下,同步写入的回调是一样的,如此的话就会用计数代替异步调用,避免内存泄漏。这些处理完毕后,就会调用afterWrite。

function afterWrite(stream, state, count, cb) {
  const needDrain = !state.ending && !stream.destroyed && state.length === 0 &&
    state.needDrain;
  if (needDrain) {
    state.needDrain = false;
    stream.emit('drain');
  }

  while (count-- > 0) {
    state.pendingcb--;
    cb();
  }

  if (state.destroyed) {
    errorBuffer(state, new ERR_STREAM_DESTROYED('write'));
  }

  finishMaybe(stream, state);
}

集中于drain事件的处理和我们传入的回调的调用,继续调用finishMaybe处理finish事件。

function finishMaybe(stream, state, sync) {
  const need = needFinish(state);
  if (need) {
    prefinish(stream, state);
    if (state.pendingcb === 0) {
      state.pendingcb++;
      if (sync) {
        process.nextTick(finish, stream, state);
      } else {
        finish(stream, state);
      }
    }
  }
  return need;
}

当满足finish条件时,pendingcb加1也就是finish这个回调,进而调用这个回调。

function finish(stream, state) {
  state.pendingcb--;
  state.finished = true;
  stream.emit('finish');

  if (state.autoDestroy) {
    // In case of duplex streams we need a way to detect
    // if the readable side is ready for autoDestroy as well
    const rState = stream._readableState;
    if (!rState || (rState.autoDestroy && rState.endEmitted)) {
      stream.destroy();
    }
  }
}

这里触发finish事件并处理duplex的可读可写同步销毁的问题,至此writable的整体流程就结束了。

小结

writable的结构其实和之前读过的tcp还比较像,核心还是在数据写入前设计一层缓冲区处理。下一篇文章继续看readable。

-- EOF --

添加在分类「 前端开发 」下,并被添加 「Node.js」 标签。