Life Will Change Lyn
  1. 1 Life Will Change Lyn
  2. 2 One Last You Jen Bird
  3. 3 Libertus Chen-U
  4. 4 Warcry mpi
  5. 5 Last Surprise Lyn
  6. 6 Flower Of Life 发热巫女
  7. 7 The Night We Stood Lyn
  8. 8 かかってこいよ NakamuraEmi
  9. 9 Time Bomb Veela
  10. 10 Quiet Storm Lyn
  11. 11 Hypocrite Nush
2020-04-04 19:13:38

Node源码解析——ReadableStream

上篇文章了解了可写流,这篇继续看另一半可读流,可读比可写复杂很多,长文预警。

Readable概览

毕竟可写和可读是对立的,也能想到两者的实现结构肯定类似。那么也还是按照读可读流的步骤从fs的createReadStream开始。

function createReadStream(path, options) {
  lazyLoadStreams();
  return new ReadStream(path, options);
}

function lazyLoadStreams() {
  if (!ReadStream) {
    ({ ReadStream, WriteStream } = require('internal/fs/streams'));
    [ FileReadStream, FileWriteStream ] = [ ReadStream, WriteStream ];
  }
}

// internal/fs/streams
function ReadStream(path, options) {
  if (!(this instanceof ReadStream))
    return new ReadStream(path, options);

  // A little bit bigger buffer and water marks by default
  options = copyObject(getOptions(options, {}));
  if (options.highWaterMark === undefined)
    options.highWaterMark = 64 * 1024;

  if (options.autoDestroy === undefined) {
    options.autoDestroy = false;
  }

  this[kFs] = options.fs || fs;

  if (typeof this[kFs].open !== 'function') {
    throw new ERR_INVALID_ARG_TYPE('options.fs.open', 'function',
                                   this[kFs].open);
  }

  if (typeof this[kFs].read !== 'function') {
    throw new ERR_INVALID_ARG_TYPE('options.fs.read', 'function',
                                   this[kFs].read);
  }

  if (typeof this[kFs].close !== 'function') {
    throw new ERR_INVALID_ARG_TYPE('options.fs.close', 'function',
                                   this[kFs].close);
  }

  Readable.call(this, options);

  // Path will be ignored when fd is specified, so it can be falsy
  this.path = toPathIfFileURL(path);
  this.fd = options.fd === undefined ? null : options.fd;
  this.flags = options.flags === undefined ? 'r' : options.flags;
  this.mode = options.mode === undefined ? 0o666 : options.mode;

  this.start = options.start;
  this.end = options.end;
  this.autoClose = options.autoClose === undefined ? true : options.autoClose;
  this.pos = undefined;
  this.bytesRead = 0;
  this.closed = false;
  this[kIsPerformingIO] = false;

  if (this.start !== undefined) {
    checkPosition(this.start, 'start');

    this.pos = this.start;
  }

  if (this.end === undefined) {
    this.end = Infinity;
  } else if (this.end !== Infinity) {
    checkPosition(this.end, 'end');

    if (this.start !== undefined && this.start > this.end) {
      throw new ERR_OUT_OF_RANGE(
        'start',
        `<= "end" (here: ${this.end})`,
        this.start
      );
    }
  }

  if (typeof this.fd !== 'number')
    _openReadFs(this);

  this.on('end', function() {
    if (this.autoClose) {
      this.destroy();
    }
  });
}
ObjectSetPrototypeOf(ReadStream.prototype, Readable.prototype);
ObjectSetPrototypeOf(ReadStream, Readable);

做的事也是相似的,初始化一些选项后继承readable。继续到stream中看readable构造函数实现。

function Readable(options) {
  if (!(this instanceof Readable))
    return new Readable(options);

  // Checking for a Stream.Duplex instance is faster here instead of inside
  // the ReadableState constructor, at least with V8 6.5
  const isDuplex = this instanceof Stream.Duplex;

  this._readableState = new ReadableState(options, this, isDuplex);

  // legacy
  this.readable = true;

  if (options) {
    if (typeof options.read === 'function')
      this._read = options.read;

    if (typeof options.destroy === 'function')
      this._destroy = options.destroy;
  }

  Stream.call(this, options);
}

看到比writabledstream那边简单一些,只有stream的继承和readableState的创建,少了原型判断的hack。应该想到duplex的实现是确实继承自readable了,然后寄生构造了writable。不妨到_stream_duplex.js中验证一下。打开就能看到这么一段注释。

// a duplex stream is just a stream that is both readable and writable.
// Since JS doesn't have multiple prototypal inheritance, this class
// prototypally inherits from Readable, and then parasitically from
// Writable.

得到验证,继续看readableState。

function ReadableState(options, stream, isDuplex) {
  // Duplex streams are both readable and writable, but share
  // the same options object.
  // However, some cases require setting options to different
  // values for the readable and the writable sides of the duplex stream.
  // These options can be provided separately as readableXXX and writableXXX.
  if (typeof isDuplex !== 'boolean')
    isDuplex = stream instanceof Stream.Duplex;

  // Object stream flag. Used to make read(n) ignore n and to
  // make all the buffer merging and length checks go away
  this.objectMode = !!(options && options.objectMode);

  if (isDuplex)
    this.objectMode = this.objectMode ||
      !!(options && options.readableObjectMode);

  // The point at which it stops calling _read() to fill the buffer
  // Note: 0 is a valid value, means "don't call _read preemptively ever"
  this.highWaterMark = options ?
    getHighWaterMark(this, options, 'readableHighWaterMark', isDuplex) :
    getDefaultHighWaterMark(false);

  // A linked list is used to store data chunks instead of an array because the
  // linked list can remove elements from the beginning faster than
  // array.shift()
  this.buffer = new BufferList();
  this.length = 0;
  this.pipes = [];
  this.flowing = null;
  this.ended = false;
  this.endEmitted = false;
  this.reading = false;

  // A flag to be able to tell if the event 'readable'/'data' is emitted
  // immediately, or on a later tick.  We set this to true at first, because
  // any actions that shouldn't happen until "later" should generally also
  // not happen before the first read call.
  this.sync = true;

  // Whenever we return null, then we set a flag to say
  // that we're awaiting a 'readable' event emission.
  this.needReadable = false;
  this.emittedReadable = false;
  this.readableListening = false;
  this.resumeScheduled = false;
  this[kPaused] = null;

  // True if the error was already emitted and should not be thrown again
  this.errorEmitted = false;

  // Should close be emitted on destroy. Defaults to true.
  this.emitClose = !options || options.emitClose !== false;

  // Should .destroy() be called after 'end' (and potentially 'finish')
  this.autoDestroy = !options || options.autoDestroy !== false;

  // Has it been destroyed
  this.destroyed = false;

  // Indicates whether the stream has errored.
  this.errored = false;

  // Crypto is kind of old and crusty.  Historically, its default string
  // encoding is 'binary' so we have to make this configurable.
  // Everything else in the universe uses 'utf8', though.
  this.defaultEncoding = (options && options.defaultEncoding) || 'utf8';

  // Ref the piped dest which we need a drain event on it
  // type: null | Writable | Set<Writable>
  this.awaitDrainWriters = null;
  this.multiAwaitDrain = false;

  // If true, a maybeReadMore has been scheduled
  this.readingMore = false;

  this.decoder = null;
  this.encoding = null;
  if (options && options.encoding) {
    if (!StringDecoder)
      StringDecoder = require('string_decoder').StringDecoder;
    this.decoder = new StringDecoder(options.encoding);
    this.encoding = options.encoding;
  }
}

这里有很多注释标注了需要注意的内容。

  1. 因为双工流共享了可读和可写流的选项,所以两种流中,命名相同的选项通过前缀readable和writable的形式区分。
  2. 同样有水位控制和_read这个内部读方法关联,当达到水位时继续调用_read应该停止向缓冲区写入数据。思想和write是一致的。
  3. 存储缓冲区的数据结构是链表而不是数组,为了更高效的移除数据。
  4. 设置了一系列标志位应付各种场景。

接下来就从使用上看流程实现。可读流有流动和暂停两种模式,先从基础的暂停模式开始看起。

暂停模式

可读流创建后默认为暂停模式,要显式的调用read方法获取数据,并通过push方法向可读流中填充供消费的数据。

那么这就有个问题了,自定义可读流的时候似乎并没有提供一个独立的数据填充接口。实际上通用做法是在_read这个接口上由自己填充逻辑通过push实现数据填充。这里的设计我感觉还是有点奇怪,一个接口要实现两个功能,不知道是历史原因还是有意设计如此?总之事实就是这样,还是可以看看fs的相关具体实现。在internal/fs/streams.js找到。

 // the actual read.
  this[kIsPerformingIO] = true;
  this[kFs].read(
    this.fd, pool, pool.used, toRead, this.pos, (er, bytesRead) => {
      this[kIsPerformingIO] = false;
      // Tell ._destroy() that it's safe to close the fd now.
      if (this.destroyed) return this.emit(kIoDone, er);

      if (er) {
        if (this.autoClose) {
          this.destroy();
        }
        this.emit('error', er);
      } else {
        let b = null;
        // Now that we know how much data we have actually read, re-wind the
        // 'used' field if we can, and otherwise allow the remainder of our
        // reservation to be used as a new pool later.
        if (start + toRead === thisPool.used && thisPool === pool) {
          const newUsed = thisPool.used + bytesRead - toRead;
          thisPool.used = roundUpToMultipleOf8(newUsed);
        } else {
          // Round down to the next lowest multiple of 8 to ensure the new pool
          // fragment start and end positions are aligned to an 8 byte boundary.
          const alignedEnd = (start + toRead) & ~7;
          const alignedStart = roundUpToMultipleOf8(start + bytesRead);
          if (alignedEnd - alignedStart >= kMinPoolSpace) {
            poolFragments.push(thisPool.slice(alignedStart, alignedEnd));
          }
        }

        if (bytesRead > 0) {
          this.bytesRead += bytesRead;
          b = thisPool.slice(start, start + bytesRead);
        }

        this.push(b);
      }
    });

通过fs的read方法从fd中获取数据,在回调中再将数据通过push方法填充到缓冲区供后续消费。暂停模式下的数据流动模式如下图。

那么接下来就看整个流程的内部实现。首先找到push实现。

// Manually shove something into the read() buffer.
// This returns true if the highWaterMark has not been hit yet,
// similar to how Writable.write() returns true if you should
// write() some more.
Readable.prototype.push = function(chunk, encoding) {
  return readableAddChunk(this, chunk, encoding, false);
};

注释也说明的很清楚,职能同write类似,都是将数据推入缓冲区,并通过返回值判断是否超过水位。继续调用readableAddChunk方法。

function readableAddChunk(stream, chunk, encoding, addToFront) {
  debug('readableAddChunk', chunk);
  const state = stream._readableState;

  let err;
  if (!state.objectMode) {
    if (typeof chunk === 'string') {
      encoding = encoding || state.defaultEncoding;
      if (addToFront && state.encoding && state.encoding !== encoding) {
        // When unshifting, if state.encoding is set, we have to save
        // the string in the BufferList with the state encoding
        chunk = Buffer.from(chunk, encoding).toString(state.encoding);
      } else if (encoding !== state.encoding) {
        chunk = Buffer.from(chunk, encoding);
        encoding = '';
      }
    } else if (chunk instanceof Buffer) {
      encoding = '';
    } else if (Stream._isUint8Array(chunk)) {
      chunk = Stream._uint8ArrayToBuffer(chunk);
      encoding = '';
    } else if (chunk != null) {
      err = new ERR_INVALID_ARG_TYPE(
        'chunk', ['string', 'Buffer', 'Uint8Array'], chunk);
    }
  }

  if (err) {
    errorOrDestroy(stream, err);
  } else if (chunk === null) {
    state.reading = false;
    onEofChunk(stream, state);
  } else if (state.objectMode || (chunk && chunk.length > 0)) {
    if (addToFront) {
      if (state.endEmitted)
        errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());
      else
        addChunk(stream, state, chunk, true);
    } else if (state.ended) {
      errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF());
    } else if (state.destroyed) {
      return false;
    } else {
      state.reading = false;
      if (state.decoder && !encoding) {
        chunk = state.decoder.write(chunk);
        if (state.objectMode || chunk.length !== 0)
          addChunk(stream, state, chunk, false);
        else
          maybeReadMore(stream, state);
      } else {
        addChunk(stream, state, chunk, false);
      }
    }
  } else if (!addToFront) {
    state.reading = false;
    maybeReadMore(stream, state);
  }

    // We can push more data if we are below the highWaterMark.
  // Also, if we have no data yet, we can stand some more bytes.
  // This is to work around cases where hwm=0, such as the repl.
  return !state.ended &&
    (state.length < state.highWaterMark || state.length === 0);
}

首先判断是否是对象模式,非对象模式要走一次encoding的过程。再通过addChunk这个函数进行真正的数据推入。在这一步里会将reading状态置为false,并注意返回值会根据state内部数据长度和水位来决定,来表示是否可以继续推入数据。继续看addChunk。

function addChunk(stream, state, chunk, addToFront) {
  if (state.flowing && state.length === 0 && !state.sync) {
    // Use the guard to avoid creating `Set()` repeatedly
    // when we have multiple pipes.
    if (state.multiAwaitDrain) {
      state.awaitDrainWriters.clear();
    } else {
      state.awaitDrainWriters = null;
    }
    stream.emit('data', chunk);
  } else {
    // Update the buffer info.
    state.length += state.objectMode ? 1 : chunk.length;
    if (addToFront)
      state.buffer.unshift(chunk);
    else
      state.buffer.push(chunk);

    if (state.needReadable)
      emitReadable(stream);
  }
  maybeReadMore(stream, state);
}

因为是暂停模式,直接走else分支。更新了数据长度和缓冲区。这里有两个注意点。

首先是needReadable状态的判断,表示需要触发可读事件,也就是目前处于数据从数据源流到readable中的状态节点。那么什么时候应该将其置为true?不妨想想,我们的push是在哪里调用的,实际上是限定在read这个接口内部的,那么可以想到needReadable的状态变化应该是置前到read了。全局搜一下needReadable,果然在read实现里找到这么一段。

    // If the length is currently zero, then we *need* a readable event.
    if (state.length === 0)
      state.needReadable = true;

也就是说第一次调用read并push数据后就会触发readable事件了。继续看触发这个事件的函数内部。

// Don't emit readable right away in sync mode, because this can trigger
// another read() call => stack overflow.  This way, it might trigger
// a nextTick recursion warning, but that's not so bad.
function emitReadable(stream) {
  const state = stream._readableState;
  debug('emitReadable', state.needReadable, state.emittedReadable);
  state.needReadable = false;
  if (!state.emittedReadable) {
    debug('emitReadable', state.flowing);
    state.emittedReadable = true;
    process.nextTick(emitReadable_, stream);
  }
}

function emitReadable_(stream) {
  const state = stream._readableState;
  debug('emitReadable_', state.destroyed, state.length, state.ended);
  if (!state.destroyed && (state.length || state.ended)) {
    stream.emit('readable');
    state.emittedReadable = false;
  }

  // The stream needs another readable event if
  // 1. It is not flowing, as the flow mechanism will take
  //    care of it.
  // 2. It is not ended.
  // 3. It is below the highWaterMark, so we can schedule
  //    another readable later.
  state.needReadable =
    !state.flowing &&
    !state.ended &&
    state.length <= state.highWaterMark;
  flow(stream);
}

需要异步的触发readable事件,因为监听该事件可能继续调用read循环爆栈。触发该事件后还要继续判断是否需要另外的readable事件。条件在注释中也写的比较清楚,要满足以下三点。

  1. 不是流动模式
  2. 未结束
  3. 数据长度低于水位

之后调用的flow比较有趣,判断流动状态的一个函数,当处于流动状态时会主动调用read消费数据。暂停模式下不是重点,后面流动模式有大作用。


function flow(stream) {
  const state = stream._readableState;
  debug('flow', state.flowing);
  while (state.flowing && stream.read() !== null);
}

emitReadable看完后就是第二个注意点maybeReadMore了。找到实现有一大段注释解释其功能。

// At this point, the user has presumably seen the 'readable' event,
// and called read() to consume some data.  that may have triggered
// in turn another _read(n) call, in which case reading = true if
// it's in progress.
// However, if we're not ended, or reading, and the length < hwm,
// then go ahead and try to read some more preemptively.
function maybeReadMore(stream, state) {
  if (!state.readingMore) {
    state.readingMore = true;
    process.nextTick(maybeReadMore_, stream, state);
  }
}

function maybeReadMore_(stream, state) {
  // Attempt to read more data if we should.
  //
  // The conditions for reading more data are (one of):
  // - Not enough data buffered (state.length < state.highWaterMark). The loop
  //   is responsible for filling the buffer with enough data if such data
  //   is available. If highWaterMark is 0 and we are not in the flowing mode
  //   we should _not_ attempt to buffer any extra data. We'll get more data
  //   when the stream consumer calls read() instead.
  // - No data in the buffer, and the stream is in flowing mode. In this mode
  //   the loop below is responsible for ensuring read() is called. Failing to
  //   call read here would abort the flow and there's no other mechanism for
  //   continuing the flow if the stream consumer has just subscribed to the
  //   'data' event.
  //
  // In addition to the above conditions to keep reading data, the following
  // conditions prevent the data from being read:
  // - The stream has ended (state.ended).
  // - There is already a pending 'read' operation (state.reading). This is a
  //   case where the the stream has called the implementation defined _read()
  //   method, but they are processing the call asynchronously and have _not_
  //   called push() with new data. In this case we skip performing more
  //   read()s. The execution ends in this method again after the _read() ends
  //   up calling push() with more data.
  while (!state.reading && !state.ended &&
         (state.length < state.highWaterMark ||
          (state.flowing && state.length === 0))) {
    const len = state.length;
    debug('maybeReadMore read 0');
    stream.read(0);
    if (len === state.length)
      // Didn't get any data, stop spinning.
      break;
  }
  state.readingMore = false;
}

就是说有两种情况需要readingMore状态。

  1. 缓冲区未达到水位线
  2. 没有数据在缓冲区但处于流动模式

首先要知道一点,数据有一个从数据源到缓冲区到用户的过程,水位线是需要维护的,用户读多了就要从数据源补上,不读的时候并且达到水位线就停止从数据源取数据。这些情况固然可以自己根据返回值写逻辑处理,但为了方便node内部就通过readingMore自动维护了。也就是上面这两种情况下,通过readingMore的循环将内部缓冲区填到水位线。因为read兼顾数据推入和读取功能,调用read时传入0也就是不真正读取数据,只推入数据。

现在通过push有了一些填充数据,再看核心的read方法是怎么消费的。当然还是要明白,分析虽然是按照常规获取再消费的思路,但落实到代码上最先调用的还是read,再在其内部push。read的实现比较长还恰好都是很核心的部分,我们就分步慢慢看。

Readable.prototype.read = function(n) {
  debug('read', n);
  // Same as parseInt(undefined, 10), however V8 7.3 performance regressed
  // in this scenario, so we are doing it manually.
  if (n === undefined) {
    n = NaN;
  } else if (!NumberIsInteger(n)) {
    n = parseInt(n, 10);
  }
  const state = this._readableState;
  const nOrig = n;
    // ...

首先是要读的长度统一格式化为number类型,并存一份原始值到nOrig,应该就是nOrigin的缩写。接下来就会根据n的大小来决定不同的处理手段了。

 // If we're asking for more than the current hwm, then raise the hwm.
  if (n > state.highWaterMark)
    state.highWaterMark = computeNewHighWaterMark(n);

  if (n !== 0)
    state.emittedReadable = false;

首先就是和最高水位线的比较,如果大于最高水位线,就会抬升最高水位线。这里有点反直觉了,一般来说水位线应该是静止的参考线,为什么要抬升呢?谈一点我的粗浅理解。因为可读流的模型是双向的,有数据源的流入和对用户的输出,可读流本身类似一个中转的蓄水池。这里的水位线就像一个状态的转换标记,超过时就表示用户可以安全的接收流出。还未达到时,就表示数据源可以安全的流入,也会开启循环流入机制注水。当n大于这个标记时,也就是需求的水量大于整个蓄水池,为了让用户能安全接收n这个数量的数据,就必须把水位线抬到n以上了扩充蓄水池的体积了。computeNewHighWaterMark肯定就是按一定的规则去抬升高度。进去看下实现。

// Don't raise the hwm > 1GB
const MAX_HWM = 0x40000000;
function computeNewHighWaterMark(n) {
  if (n >= MAX_HWM) {
    // TODO(ronag): Throw ERR_VALUE_OUT_OF_RANGE.
    n = MAX_HWM;
  } else {
    // Get the next highest power of 2 to prevent increasing hwm excessively in
    // tiny amounts
    n--;
    n |= n >>> 1;
    n |= n >>> 2;
    n |= n >>> 4;
    n |= n >>> 8;
    n |= n >>> 16;
    n++;
  }
  return n;
}

这个实现很有意思,我搜了一下在Java的HashMap中的tableSizeFor也有同样实现。原理就不细说了,又是一篇长文,总之这里只要记住其目的——得到大于等于n的最小的2的幂次方数值。保证了水位的太高不会太少,也不会太多。可以说是非常有用的一个工具函数了。

 if (n !== 0)
    state.emittedReadable = false;

  // If we're doing read(0) to trigger a readable event, but we
  // already have a bunch of data in the buffer, then just trigger
  // the 'readable' event and move on.
  if (n === 0 &&
      state.needReadable &&
      ((state.highWaterMark !== 0 ?
        state.length >= state.highWaterMark :
        state.length > 0) ||
       state.ended)) {
    debug('read: emitReadable', state.length, state.ended);
    if (state.length === 0 && state.ended)
      endReadable(this);
    else
      emitReadable(this);
    return null;
  }
// ...

n不等于0的话,就说明已经进入读模式了,相当于蓄水池已经在放水了,标记一下已触发的readable事件。

n等于0并且needReadable为真并且蓄水池高于最高水位,那就是待放水的状态,触发readable事件供用户接住数据。同时注意到当可读流位于结束状态时会调用endReadable。看一下实现。

function endReadable(stream) {
  const state = stream._readableState;

  debug('endReadable', state.endEmitted);
  if (!state.endEmitted) {
    state.ended = true;
    process.nextTick(endReadableNT, state, stream);
  }
}

function endReadableNT(state, stream) {
  debug('endReadableNT', state.endEmitted, state.length);

  // Check that we didn't get one last unshift.
  if (!state.errorEmitted && !state.endEmitted && state.length === 0) {
    state.endEmitted = true;
    stream.readable = false;
    stream.emit('end');

    if (state.autoDestroy) {
      // In case of duplex streams we need a way to detect
      // if the writable side is ready for autoDestroy as well
      const wState = stream._writableState;
      if (!wState || (wState.autoDestroy && wState.finished)) {
        stream.destroy();
      }
    }
  }
}

抛出end事件,并和可写流类似的有个针对双工流同步销毁的逻辑。继续往下看,如果不是待放水的状态,那么就是放水的状态了。

n = howMuchToRead(n, state);
 // If we've ended, and we're now clear, then finish it up.
  if (n === 0 && state.ended) {
    if (state.length === 0)
      endReadable(this);
    return null;
  }
// ...

function howMuchToRead(n, state) {
  if (n <= 0 || (state.length === 0 && state.ended))
    return 0;
  if (state.objectMode)
    return 1;
  if (NumberIsNaN(n)) {
    // Only flow one buffer at a time
    if (state.flowing && state.length)
      return state.buffer.first().length;
    else
      return state.length;
  }
  if (n <= state.length)
    return n;
  return state.ended ? state.length : 0;
}

计算要放多少水。主要还是处理各种情况,如果在放水状态下结束,依然走抛end事件的逻辑。正常情况下要放n这个量级的水。这里就要注意了,现在我们是不知道水池里的水够不够的,上面只是调整了最高水位线保证超过n。所以下面的逻辑在放水前,会判断要不要从数据源拿数据。

  // if we need a readable event, then we need to do some reading.
  var doRead = state.needReadable;
  debug('need readable', doRead);

  // If we currently have less than the highWaterMark, then also read some
  if (state.length === 0 || state.length - n < state.highWaterMark) {
    doRead = true;
    debug('length less than watermark', doRead);
  }

needReadable又来了。当水池低于水位时,将其置为true表示需要从数据源取数据。

  // However, if we've ended, then there's no point, if we're already
  // reading, then it's unnecessary, and if we're destroyed, then it's
  // not allowed.
  if (state.ended || state.reading || state.destroyed) {
    doRead = false;
    debug('reading or ended', doRead);
  } else if (doRead) {
    debug('do read');
    state.reading = true;
    state.sync = true;
    // If the length is currently zero, then we *need* a readable event.
    if (state.length === 0)
      state.needReadable = true;
    // Call internal read method
    this._read(state.highWaterMark);
    state.sync = false;
    // If _read pushed data synchronously, then `reading` will be false,
    // and we need to re-evaluate how much data we can return to the user.
    if (!state.reading)
      n = howMuchToRead(nOrig, state);
  }

如果已经结束了或者销毁了,再取就没意义。或者已经处于读的状态就不需要再开启。接下来根据doRead进行具体的执行。

当需要取数据时,标记reading状态表示正在取数据中,调用内部read方法拿数据,在这个方法中就会通过上面的push实现真正的存储数据到蓄水池。因为内部read的实现异步同步都可以,当是同步时可能改变state,需要用之前保存的n原始值重新计算可读长度。

  var ret;
  if (n > 0)
    ret = fromList(n, state);
  else
    ret = null;

  if (ret === null) {
    state.needReadable = state.length <= state.highWaterMark;
    n = 0;
  } else {
    state.length -= n;
    if (state.multiAwaitDrain) {
      state.awaitDrainWriters.clear();
    } else {
      state.awaitDrainWriters = null;
    }
  }

  if (state.length === 0) {
    // If we have nothing in the buffer, then we want to know
    // as soon as we *do* get something into the buffer.
    if (!state.ended)
      state.needReadable = true;

    // If we tried to read() past the EOF, then emit end on the next tick.
    if (nOrig !== n && state.ended)
      endReadable(this);
  }

  if (ret !== null)
    this.emit('data', ret);

  return ret;

确认可读长度后就从水池里取数据到用户了。水池数据量不够时,ret会得到null,依然将needReadable置为true,继续注水。当水池数据量满足用户需求时,就返回这部分数据,维护state长度。如果是流动模式就通过data事件抛出。至此就完成了一次流出过程。

对暂停模式下的数据流动小结一下,就是从read开始,进入到内部的_read方法通过push注水,在push内部通过maybeReadMore将水池注到水位线以上,再通过read的其他分支逻辑出水。流程闭环都连接在read这个方法上。以我的理解来看,够聚合,但难免有些职责划分混乱,同步异步,流入流出,各种状态标记全部搅和在一起,不仔细读还真难看懂,或许还有优化空间。

流动模式

暂停模式的闭环看完了,流动模式其实也就简单了,本质是一样的,只是用户消费数据的形式由主动转为被动。通常来说有三种方案转化到流动模式。不过在看流动模式转化方案的细节之前,我们必须先搞清楚一点,就是流动模式和暂停模式的本质区别在哪里?虽然说从下面的源码可以分析出来,不过这里我先点出来让读者有个对核心思路梳理的准备——数据的主动输出,更准确的说,一旦蓄水池中有水时,就通过触发data事件将其输出,而不管是否有接收者。

下面就看不同的转化方法是如何实现这个逻辑的吧。

resume

// pause() and resume() are remnants of the legacy readable stream API
// If the user uses them, then switch into old mode.
Readable.prototype.resume = function() {
  const state = this._readableState;
  if (!state.flowing) {
    debug('resume');
    // We flow only if there is no one listening
    // for readable, but we still have to call
    // resume()
    state.flowing = !state.readableListening;
    resume(this, state);
  }
  state[kPaused] = false;
  return this;
};

注释提到resume和pause是遗留api,按道理不推荐使用。不过看看思路还是必要的。这里首先会判断是否处于流动模式,如果不是流动模式并且没有监听readable事件的话就转为流动模式。为什么需要和readable事件互斥呢?因为历史原因stream提供了太多方法消费数据,node本身不推荐各种消费方案混用,不过还是设计了readable事件对流的控制权在data事件之上。

function resume(stream, state) {
  if (!state.resumeScheduled) {
    state.resumeScheduled = true;
    process.nextTick(resume_, stream, state);
  }
}

function resume_(stream, state) {
  debug('resume', state.reading);
  if (!state.reading) {
    stream.read(0);
  }

  state.resumeScheduled = false;
  stream.emit('resume');
  flow(stream);
  if (state.flowing && !state.reading)
    stream.read(0);
}

异步调用resume_,在过程中标记一些状态,再调用了read(0)。之前也谈过,read(0)的目的在暂停模式下就是蓄水不放水,保证水池一直有安全流出的水位。resume的数据主动流出在flow方法中。前面也谈到flow方法会在流动模式下主动调用read,就是从这里开启真正的数据流动。

function flow(stream) {
  const state = stream._readableState;
  debug('flow', state.flowing);
  while (state.flowing && stream.read() !== null);
}

再回顾一下read方法和flow有关的部分。

Readable.prototype.read = function(n) {
  debug('read', n);
  // Same as parseInt(undefined, 10), however V8 7.3 performance regressed
  // in this scenario, so we are doing it manually.
  if (n === undefined) {
    n = NaN;
  }  

  // ...

  // howMuchToRead
if (NumberIsNaN(n)) {
    // Only flow one buffer at a time
    if (state.flowing && state.length)
      return state.buffer.first().length;
    else
      return state.length;
  }

  // read
    if (ret !== null)
    this.emit('data', ret);

当不传n时会通过howMuchToRead计算要读多少数据,流动模式下就读第一个buffer长度,并通过data事件抛出。这里如果我们不监听data,数据其实也就主动流失了。因为flow中是一个while判断。

while (state.flowing && stream.read() !== null);

所以当水池里有数据时都会源源不断的流出。那么数据流空了呢?

 if (state.length === 0 || state.length - n < state.highWaterMark) {
    doRead = true;
    debug('length less than watermark', doRead);
  }

在read中切换needReadable,再调用_read,从中调push从数据源取数据。走到addChunk。

function addChunk(stream, state, chunk, addToFront) {
  if (state.flowing && state.length === 0 && !state.sync) {
    // Use the guard to avoid creating `Set()` repeatedly
    // when we have multiple pipes.
    if (state.multiAwaitDrain) {
      state.awaitDrainWriters.clear();
    } else {
      state.awaitDrainWriters = null;
    }
    stream.emit('data', chunk);
  }
  // ...

直接通过data事件把要取的数据流出,形成闭环。

监听data事件

原理本质上还是调用resume。直接看源码。

Readable.prototype.on = function(ev, fn) {
  const res = Stream.prototype.on.call(this, ev, fn);
  const state = this._readableState;

  if (ev === 'data') {
    // Update readableListening so that resume() may be a no-op
    // a few lines down. This is needed to support once('readable').
    state.readableListening = this.listenerCount('readable') > 0;

    // Try start flowing on next tick if stream isn't explicitly paused
    if (state.flowing !== false)
      this.resume();
  } else if (ev === 'readable') {
    if (!state.endEmitted && !state.readableListening) {
      state.readableListening = state.needReadable = true;
      state.flowing = false;
      state.emittedReadable = false;
      debug('on readable', state.length, state.reading);
      if (state.length) {
        emitReadable(this);
      } else if (!state.reading) {
        process.nextTick(nReadingNextTick, this);
      }
    }
  }

  return res;
};

pipe

内容比较长,直接看核心部分。

Readable.prototype.pipe = function(dest, pipeOpts) {
  const src = this;
  // ...
  state.pipes.push(dest);
    // ...
  src.on('data', ondata);
  function ondata(chunk) {
    debug('ondata');
    const ret = dest.write(chunk);
    debug('dest.write', ret);
    if (ret === false) {
      // If the user unpiped during `dest.write()`, it is possible
      // to get stuck in a permanently paused state if that write
      // also returned false.
      // => Check whether `dest` is still a piping destination.
      if (!cleanedUp) {
        if (state.pipes.length === 1 && state.pipes[0] === dest) {
          debug('false write response, pause', 0);
          state.awaitDrainWriters = dest;
          state.multiAwaitDrain = false;
        } else if (state.pipes.length > 1 && state.pipes.includes(dest)) {
          debug('false write response, pause', state.awaitDrainWriters.size);
          state.awaitDrainWriters.add(dest);
        }
        src.pause();
      }
      if (!ondrain) {
        // When the dest drains, it reduces the awaitDrain counter
        // on the source.  This would be more elegant with a .once()
        // handler in flow(), but adding and removing repeatedly is
        // too slow.
        ondrain = pipeOnDrain(src, dest);
        dest.on('drain', ondrain);
      }
    }
  }
  // ...

  // Tell the dest that it's being piped to
  dest.emit('pipe', src);

  // Start the flow if it hasn't been started already.
  if (!state.flowing) {
    debug('pipe resume');
    src.resume();
  }

  return dest;
};

首先监听readable的data事件,再调用readable的resume转为流动模式。在data事件的回调上,将流出的数据写到可写流。注意到当write不可写时会有一个awaitDrain的计数存储处理,当drain事件触发时再写入。

综合看以上三种方法,实现其实是一致的,resume开启流动数据和data事件接住流动数据。

小结

还有一些衍生的东西没讲,比如双工流和转换流,但核心内容都在这里了,无非就是在可读和可写之上附加一些小trick,就不多谈了,感兴趣的可以自己去读一下。再回顾一下整个实现,核心思路还是在于一个蓄水池对接数据源和用户两端,根据水位的高低决定蓄水池行为。水的流动依赖于各种事件,内部有两套处理流动的机制——readable和data,分别对应主动和被动,但不得不说职责划分得并不是很清晰,实现全部都混杂在了read这个方法中,也没有明确隔离两种事件机制的应用场景,混着用会不可避免的出现问题,期待在未来的版本中有所改善。

-- EOF --

添加在分类「 前端开发 」下,并被添加 「Node.js」 标签。