One Last You Jen Bird
  1. 1 One Last You Jen Bird
  2. 2 Flower Of Life 发热巫女
  3. 3 Libertus Chen-U
  4. 4 Life Will Change Lyn
  5. 5 Last Surprise Lyn
  6. 6 Warcry mpi
  7. 7 The Night We Stood Lyn
  8. 8 Time Bomb Veela
  9. 9 かかってこいよ NakamuraEmi
  10. 10 Quiet Storm Lyn
  11. 11 Hypocrite Nush
2020-01-31 14:33:42

Node源码解析——net

上篇文章涉及到了net这个在node中构建异步网络的模块,也是node里很重要的基础模块,这篇文章就来把net搞清楚。

前置知识

因为net涉及到计算机网络的一些知识,考虑到前端对这一块不一定很熟悉,有必要先把相关概念简单解释一下,对细节有兴趣的可以再查阅相关资料。

TCP

官网文档上提到,net是用于创建基于流的TCP或IPC服务器与客户端,自然就先从这两个概念入手。TCP属于OSI七层模型的运输层,提供面向连接的可靠字节流服务。它属于一种协议,设定了一系列规则保证网络上的客户端和服务器交换数据的稳定。

Socket

不过从TCP到应用层的通信中,TCP需要为不同类型的应用层提供并发服务,而应用层本身多种多样,也需要一种统一的约束来保证TCP和不同应用的交流。就像插座和不同电器间的关系,就有了Socket。它对TCP(不局限于TCP这一种具体协议进行了抽象,形成最基本的函数交流接口,比如create,listen,accept,connect等。

文件描述符

Unix的哲学就是一切皆文件,自然Socket也不例外。虽然Socket是一个抽象的概念,但在Unix上它表现为一个可操作的文件,文件描述符可以看做该文件的索引,通过类型为int的文件描述符供程序访问并提供给内核定位进行实际操作。

IPC

进程间通信(inter process communication),就是指不同进程间的消息传递,主要用来支持程序中需要多个进程共同完成某个任务,保证多个进程独立运行同时又能相互之间交流信息的方案。Scoket本身是为网络通信而设计的,但在剥离了网络协议栈的处理之后的Socket也能作为IPC的实现方案之一,这种方案学名就称作UNIX Domain Socket,也是net模块的IPC服务器实现。下面两张图表现了两种Socket的工作区别。

源码

先看一个最基础的使用案例,用net创建一个服务器和客户端并相互通信的步骤。

先是服务端,创建后监听本机的8888端口,并当收到数据后打印出信息并回应信息给客户端。

//server.js
const net = require('net');
const server = net.createServer((socket)=>{
  socket.on('data', (data) => {
    console.log('client send message: ' , data.toString());
  });
  socket.write('hello client!');
});
server.listen(8888,'127.0.0.1' , ()=>{
  console.log(server.address());
});

客户端后于服务器创建,连接到对应端口并监听TCP的消息变动与断开。

//client.js
const net = require('net');
const client = net.connect({port: 8888, host: '127.0.0.1'}, function() {
    client.write('hello server!\r\n');
});
client.on('data', (data) => {
  console.log('server send message: ' ,data.toString());
  client.end();
});
client.on('end', () => {
  console.log('disconnected from server');
});

接下来就沿着这个通用的工作流程一步步分析net模块中的内容。

TCP服务器

首先自然从一切的开端createServer这个方法开始。打开lib/net.js这个文件找到这个方法,做的事很简单,只是实例化了一个Server对象并返回。进入到这个构造函数中。

function Server(options, connectionListener) {
  if (!(this instanceof Server))
    return new Server(options, connectionListener);

  EventEmitter.call(this);

  if (typeof options === 'function') {
    connectionListener = options;
    options = {};
    this.on('connection', connectionListener);
  } else if (options == null || typeof options === 'object') {
    options = { ...options };

    if (typeof connectionListener === 'function') {
      this.on('connection', connectionListener);
    }
  } else {
    throw new ERR_INVALID_ARG_TYPE('options', 'Object', options);
  }

  this._connections = 0;

  ObjectDefineProperty(this, 'connections', {
    get: deprecate(() => {

      if (this._usingWorkers) {
        return null;
      }
      return this._connections;
    }, 'Server.connections property is deprecated. ' +
       'Use Server.getConnections method instead.', 'DEP0020'),
    set: deprecate((val) => (this._connections = val),
                   'Server.connections property is deprecated.',
                   'DEP0020'),
    configurable: true, enumerable: false
  });

  this[async_id_symbol] = -1;
  this._handle = null;
  this._usingWorkers = false;
  this._workers = [];
  this._unref = false;

  this.allowHalfOpen = options.allowHalfOpen || false;
  this.pauseOnConnect = !!options.pauseOnConnect;
}

这里主要初始化了一些内部参数,可以看到有cluster模块提到的_workers和_handle。这个_handle就是用来保存服务器的socket。但注意到在构造函数中并没有创建socket,而listen的回调第一个参数就是socket,那么显然关键就在listen中了。看设置到Server原型链上的listen方法。

Server.prototype.listen = function(...args) {
  const normalized = normalizeArgs(args);
  let options = normalized[0];
  const cb = normalized[1];

  if (this._handle) {
    throw new ERR_SERVER_ALREADY_LISTEN();
  }

  if (cb !== null) {
    this.once('listening', cb);
  }
  const backlogFromArgs =
    // (handle, backlog) or (path, backlog) or (port, backlog)
    toNumber(args.length > 1 && args[1]) ||
    toNumber(args.length > 2 && args[2]);  // (port, host, backlog)

  options = options._handle || options.handle || options;
  const flags = getFlags(options.ipv6Only);
  // (handle[, backlog][, cb]) where handle is an object with a handle
  if (options instanceof TCP) {
    this._handle = options;
    this[async_id_symbol] = this._handle.getAsyncId();
    listenInCluster(this, null, -1, -1, backlogFromArgs);
    return this;
  }
  // (handle[, backlog][, cb]) where handle is an object with a fd
  if (typeof options.fd === 'number' && options.fd >= 0) {
    listenInCluster(this, null, null, null, backlogFromArgs, options.fd);
    return this;
  }

  // ([port][, host][, backlog][, cb]) where port is omitted,
  // that is, listen(), listen(null), listen(cb), or listen(null, cb)
  // or (options[, cb]) where options.port is explicitly set as undefined or
  // null, bind to an arbitrary unused port
  if (args.length === 0 || typeof args[0] === 'function' ||
      (typeof options.port === 'undefined' && 'port' in options) ||
      options.port === null) {
    options.port = 0;
  }
  // ([port][, host][, backlog][, cb]) where port is specified
  // or (options[, cb]) where options.port is specified
  // or if options.port is normalized as 0 before
  let backlog;
  if (typeof options.port === 'number' || typeof options.port === 'string') {
    if (!isLegalPort(options.port)) {
      throw new ERR_SOCKET_BAD_PORT(options.port);
    }
    backlog = options.backlog || backlogFromArgs;
    // start TCP server listening on host:port
    if (options.host) {
      lookupAndListen(this, options.port | 0, options.host, backlog,
                      options.exclusive, flags);
    } else { // Undefined host, listens on unspecified address
      // Default addressType 4 will be used to search for master server
      listenInCluster(this, null, options.port | 0, 4,
                      backlog, undefined, options.exclusive);
    }
    return this;
  }

  // (path[, backlog][, cb]) or (options[, cb])
  // where path or options.path is a UNIX domain socket or Windows pipe
  if (options.path && isPipeName(options.path)) {
    const pipeName = this._pipeName = options.path;
    backlog = options.backlog || backlogFromArgs;
    listenInCluster(this, pipeName, -1, -1,
                    backlog, undefined, options.exclusive);

    if (!this._handle) {
      // Failed and an error shall be emitted in the next tick.
      // Therefore, we directly return.
      return this;
    }

    let mode = 0;
    if (options.readableAll === true)
      mode |= PipeConstants.UV_READABLE;
    if (options.writableAll === true)
      mode |= PipeConstants.UV_WRITABLE;
    if (mode !== 0) {
      const err = this._handle.fchmod(mode);
      if (err) {
        this._handle.close();
        this._handle = null;
        throw errnoException(err, 'uv_pipe_chmod');
      }
    }
    return this;
  }

  if (!(('port' in options) || ('path' in options))) {
    throw new ERR_INVALID_ARG_VALUE('options', options,
                                    'must have the property "port" or "path"');
  }

  throw new ERR_INVALID_OPT_VALUE('options', inspect(options));
}

这个函数在cluster模块也分析过,做的事就是根据参数类型调用listenInCluster启动监听。现在就具体看一下有哪些情况,实际上注释也写的很清楚:

  • TCP对象
  • 文件描述符
  • UNIX domain socket
  • Windows pipe
  • IP地址和端口

总之最终都会定位到一个Socket。然后进入listenInCluster函数,涉及到主从进程的部分就不重复分析了,确定好用哪条进程创建这个服务器后,就会调用Server实例的这个_listen2内部方法正式开始监听, _listen2通过legacy alias到setupListenHandle。

function setupListenHandle(address, port, addressType, backlog, fd, flags) {
  debug('setupListenHandle', address, port, addressType, backlog, fd);

  // If there is not yet a handle, we need to create one and bind.
  // In the case of a server sent via IPC, we don't need to do this.
  if (this._handle) {
    debug('setupListenHandle: have a handle already');
  } else {
    debug('setupListenHandle: create a handle');

    let rval = null;

    // Try to bind to the unspecified IPv6 address, see if IPv6 is available
    if (!address && typeof fd !== 'number') {
      rval = createServerHandle(DEFAULT_IPV6_ADDR, port, 6, fd, flags);

      if (typeof rval === 'number') {
        rval = null;
        address = DEFAULT_IPV4_ADDR;
        addressType = 4;
      } else {
        address = DEFAULT_IPV6_ADDR;
        addressType = 6;
      }
    }

    if (rval === null)
      rval = createServerHandle(address, port, addressType, fd, flags);

    if (typeof rval === 'number') {
      const error = uvExceptionWithHostPort(rval, 'listen', address, port);
      process.nextTick(emitErrorNT, this, error);
      return;
    }
    this._handle = rval;
  }

  this[async_id_symbol] = getNewAsyncId(this._handle);
  this._handle.onconnection = onconnection;
  this._handle[owner_symbol] = this;

  // Use a backlog of 512 entries. We pass 511 to the listen() call because
  // the kernel does: backlogsize = roundup_pow_of_two(backlogsize + 1);
  // which will thus give us a backlog of 512 entries.
  const err = this._handle.listen(backlog || 511);

  if (err) {
    const ex = uvExceptionWithHostPort(err, 'listen', address, port);
    this._handle.close();
    this._handle = null;
    defaultTriggerAsyncIdScope(this[async_id_symbol],
                               process.nextTick,
                               emitErrorNT,
                               this,
                               ex);
    return;
  }

  // Generate connection key, this should be unique to the connection
  this._connectionKey = addressType + ':' + address + ':' + port;

  // Unref the handle if the server was unref'ed prior to listening
  if (this._unref)
    this.unref();

  defaultTriggerAsyncIdScope(this[async_id_symbol],
                             process.nextTick,
                             emitListeningNT,
                             this);
}

可以看到这个如果没有_handle的话是不会真正的监听的,这也是cluster的worker进程hack掉_handle的依据。如果_handle存在,进入createServerHandle创建服务器句柄,这里也会根据参数区分ipv4和ipv6。继续看这个函数。

function createServerHandle(address, port, addressType, fd, flags) {
  let err = 0;
  // Assign handle in listen, and clean up if bind or listen fails
  let handle;

  let isTCP = false;
  if (typeof fd === 'number' && fd >= 0) {
    try {
      handle = createHandle(fd, true);
    } catch (e) {
      // Not a fd we can listen on.  This will trigger an error.
      debug('listen invalid fd=%d:', fd, e.message);
      return UV_EINVAL;
    }

    err = handle.open(fd);
    if (err)
      return err;

    assert(!address && !port);
  } else if (port === -1 && addressType === -1) {
    handle = new Pipe(PipeConstants.SERVER);
    if (process.platform === 'win32') {
      const instances = parseInt(process.env.NODE_PENDING_PIPE_INSTANCES);
      if (!NumberIsNaN(instances)) {
        handle.setPendingInstances(instances);
      }
    }
  } else {
    handle = new TCP(TCPConstants.SERVER);
    isTCP = true;
  }

  if (address || port || isTCP) {
    debug('bind to', address || 'any');
    if (!address) {
      // Try binding to ipv6 first
      err = handle.bind6(DEFAULT_IPV6_ADDR, port, flags);
      if (err) {
        handle.close();
        // Fallback to ipv4
        return createServerHandle(DEFAULT_IPV4_ADDR, port);
      }
    } else if (addressType === 6) {
      err = handle.bind6(address, port, flags);
    } else {
      err = handle.bind(address, port);
    }
  }

  if (err) {
    handle.close();
    return err;
  }

  return handle;
}

根据参数不同完成服务器的创建和端口的绑定,TCPPipe这两个对象都由c/c++模块提供。到这一步已经完成socket的创建和绑定了。再回到setupListenHandle中,最后会调用_handle的_listen方法执行c++模块启动socket的listen。到这一步服务器已经准备就绪,接下来就等客户端连接并调用onconnection响应了。onconnection的设置也在net.js这个文件中。

function onconnection(err, clientHandle) {
  const handle = this;
  const self = handle[owner_symbol];

  debug('onconnection');

  if (err) {
    self.emit('error', errnoException(err, 'accept'));
    return;
  }

  if (self.maxConnections && self._connections >= self.maxConnections) {
    clientHandle.close();
    return;
  }

  const socket = new Socket({
    handle: clientHandle,
    allowHalfOpen: self.allowHalfOpen,
    pauseOnCreate: self.pauseOnConnect,
    readable: true,
    writable: true
  });

  self._connections++;
  socket.server = self;
  socket._server = self;

  DTRACE_NET_SERVER_CONNECTION(socket);
  self.emit('connection', socket);
}

新建了Socket对象并通过connection事件抛出,也就是createServer时的回调能拿到的Socket对象了。

TCP客户端

客户端的入口在net.connect,进入具体实现:

function connect(...args) {
  const normalized = normalizeArgs(args);
  const options = normalized[0];
  debug('createConnection', normalized);
  const socket = new Socket(options);

  if (options.timeout) {
    socket.setTimeout(options.timeout);
  }

  return socket.connect(normalized);
}

同样是实例化了一个Socket对象,并调用其connect方法。到这里就把客户端和服务器打通了,本质上是两个Socket对象的交流。那么自然的,下一步就应该看看Socket构造函数的实现。

function Socket(options) {
  if (!(this instanceof Socket)) return new Socket(options);

  this.connecting = false;
  // Problem with this is that users can supply their own handle, that may not
  // have _handle.getAsyncId(). In this case an[async_id_symbol] should
  // probably be supplied by async_hooks.
  this[async_id_symbol] = -1;
  this._hadError = false;
  this[kHandle] = null;
  this._parent = null;
  this._host = null;
  this[kLastWriteQueueSize] = 0;
  this[kTimeout] = null;
  this[kBuffer] = null;
  this[kBufferCb] = null;
  this[kBufferGen] = null;

  if (typeof options === 'number')
    options = { fd: options }; // Legacy interface.
  else
    options = { ...options };

  options.readable = options.readable || false;
  options.writable = options.writable || false;
  const { allowHalfOpen } = options;

  // Prevent the "no-half-open enforcer" from being inherited from `Duplex`.
  options.allowHalfOpen = true;
  // For backwards compat do not emit close on destroy.
  options.emitClose = false;
  options.autoDestroy = false;
  // Handle strings directly.
  options.decodeStrings = false;
  stream.Duplex.call(this, options);

  // Default to *not* allowing half open sockets.
  this.allowHalfOpen = Boolean(allowHalfOpen);

  if (options.handle) {
    this._handle = options.handle; // private
    this[async_id_symbol] = getNewAsyncId(this._handle);
  } else {
    const onread = options.onread;
    if (onread !== null && typeof onread === 'object' &&
        (isUint8Array(onread.buffer) || typeof onread.buffer === 'function') &&
        typeof onread.callback === 'function') {
      if (typeof onread.buffer === 'function') {
        this[kBuffer] = true;
        this[kBufferGen] = onread.buffer;
      } else {
        this[kBuffer] = onread.buffer;
      }
      this[kBufferCb] = onread.callback;
    }
    if (options.fd !== undefined) {
      const { fd } = options;
      let err;

      // createHandle will throw ERR_INVALID_FD_TYPE if `fd` is not
      // a valid `PIPE` or `TCP` descriptor
      this._handle = createHandle(fd, false);

      err = this._handle.open(fd);

      // While difficult to fabricate, in some architectures
      // `open` may return an error code for valid file descriptors
      // which cannot be opened. This is difficult to test as most
      // un-openable fds will throw on `createHandle`
      if (err)
        throw errnoException(err, 'open');

      this[async_id_symbol] = this._handle.getAsyncId();

      if ((fd === 1 || fd === 2) &&
          (this._handle instanceof Pipe) &&
          process.platform === 'win32') {
        // Make stdout and stderr blocking on Windows
        err = this._handle.setBlocking(true);
        if (err)
          throw errnoException(err, 'setBlocking');

        this._writev = null;
        this._write = makeSyncWrite(fd);
        // makeSyncWrite adjusts this value like the original handle would, so
        // we need to let it do that by turning it into a writable, own
        // property.
        ObjectDefineProperty(this._handle, 'bytesWritten', {
          value: 0, writable: true
        });
      }
    }
  }

  // Shut down the socket when we're finished with it.
  this.on('end', onReadableStreamEnd);

  initSocketHandle(this);

  this._pendingData = null;
  this._pendingEncoding = '';

  // If we have a handle, then start the flow of data into the
  // buffer.  if not, then this will happen when we connect
  if (this._handle && options.readable !== false) {
    if (options.pauseOnCreate) {
      // Stop the handle from reading and pause the stream
      this._handle.reading = false;
      this._handle.readStop();
      this.readableFlowing = false;
    } else if (!options.manualStart) {
      this.read(0);
    }
  }

  // Reserve properties
  this.server = null;
  this._server = null;

  // Used after `.destroy()`
  this[kBytesRead] = 0;
  this[kBytesWritten] = 0;
}
ObjectSetPrototypeOf(Socket.prototype, stream.Duplex.prototype);
ObjectSetPrototypeOf(Socket, stream.Duplex);

Socket里除了初始化一些属性外,最重要的一点是继承了stream.Duplex的读写能力,同时调用了initSocketHandle初始化_handle。

再看Socket实例的connect方法:

Socket.prototype.connect = function(...args) {
  let normalized;
  // If passed an array, it's treated as an array of arguments that have
  // already been normalized (so we don't normalize more than once). This has
  // been solved before in https://github.com/nodejs/node/pull/12342, but was
  // reverted as it had unintended side effects.
  if (ArrayIsArray(args[0]) && args[0][normalizedArgsSymbol]) {
    normalized = args[0];
  } else {
    normalized = normalizeArgs(args);
  }
  const options = normalized[0];
  const cb = normalized[1];

  if (this.write !== Socket.prototype.write)
    this.write = Socket.prototype.write;

  if (this.destroyed) {
    this._handle = null;
    this._peername = null;
    this._sockname = null;
  }

  const { path } = options;
  const pipe = !!path;
  debug('pipe', pipe, path);

  if (!this._handle) {
    this._handle = pipe ?
      new Pipe(PipeConstants.SOCKET) :
      new TCP(TCPConstants.SOCKET);
    initSocketHandle(this);
  }

  if (cb !== null) {
    this.once('connect', cb);
  }

  this._unrefTimer();

  this.connecting = true;
  this.writable = true;

  if (pipe) {
    validateString(path, 'options.path');
    defaultTriggerAsyncIdScope(
      this[async_id_symbol], internalConnect, this, path
    );
  } else {
    lookupAndConnect(this, options);
  }
  return 

除了参数设置外,值得注意的是用了自身原型链上的write覆盖了原write方法,然后调用lookupAndConnect借用自己的dns模块查询服务器并连接,这里涉及到defaultTriggerAsyncIdScope这个异步钩子方法,连接成功的回调会调用internalConnect

function internalConnect(
  self, address, port, addressType, localAddress, localPort, flags) {
  // TODO return promise from Socket.prototype.connect which
  // wraps _connectReq.

  assert(self.connecting);

  let err;

  if (localAddress || localPort) {
    if (addressType === 4) {
      localAddress = localAddress || DEFAULT_IPV4_ADDR;
      err = self._handle.bind(localAddress, localPort);
    } else { // addressType === 6
      localAddress = localAddress || DEFAULT_IPV6_ADDR;
      err = self._handle.bind6(localAddress, localPort, flags);
    }
    debug('binding to localAddress: %s and localPort: %d (addressType: %d)',
          localAddress, localPort, addressType);

    err = checkBindError(err, localPort, self._handle);
    if (err) {
      const ex = exceptionWithHostPort(err, 'bind', localAddress, localPort);
      self.destroy(ex);
      return;
    }
  }

  if (addressType === 6 || addressType === 4) {
    const req = new TCPConnectWrap();
    req.oncomplete = afterConnect;
    req.address = address;
    req.port = port;
    req.localAddress = localAddress;
    req.localPort = localPort;

    if (addressType === 4)
      err = self._handle.connect(req, address, port);
    else
      err = self._handle.connect6(req, address, port);
  } else {
    const req = new PipeConnectWrap();
    req.address = address;
    req.oncomplete = afterConnect;

    err = self._handle.connect(req, address, afterConnect);
  }

  if (err) {
    const sockname = self._getsockname();
    let details;

    if (sockname) {
      details = sockname.address + ':' + sockname.port;
    }

    const ex = exceptionWithHostPort(err, 'connect', address, port, details);
    self.destroy(ex);
  }
}

这里会经过一系列连接类型的判断构造参数,调用_handle上的connect启用c++模块建立连接,连接完成后会有一个afterConnect的回调。

function afterConnect(status, handle, req, readable, writable) {
  const self = handle[owner_symbol];

  // Callback may come after call to destroy
  if (self.destroyed) {
    return;
  }

  debug('afterConnect');

  assert(self.connecting);
  self.connecting = false;
  self._sockname = null;

  if (status === 0) {
    self.readable = readable;
    if (!self._writableState.ended)
      self.writable = writable;
    self._unrefTimer();

    self.emit('connect');
    self.emit('ready');

    // Start the first read, or get an immediate EOF.
    // this doesn't actually consume any bytes, because len=0.
    if (readable && !self.isPaused())
      self.read(0);

  } else {
    self.connecting = false;
    let details;
    if (req.localAddress && req.localPort) {
      details = req.localAddress + ':' + req.localPort;
    }
    const ex = exceptionWithHostPort(status,
                                     'connect',
                                     req.address,
                                     req.port,
                                     details);
    if (details) {
      ex.localAddress = req.localAddress;
      ex.localPort = req.localPort;
    }
    self.destroy(ex);
  }
}

在这里触发了connectready事件,到这里就完成了服务器和客户端的连接。

小结

到这里关于net工作流程的简单介绍就结束了,可能有人会疑惑怎么只谈到了服务器和客户端的连接,数据交互呢?实际上也讲到了,数据交互的关键继承自Stream这个模块,应该再起一篇文章单独谈。同时net还用到了dns这个模块,当然Node中的网络通信除了net外还有dnsudphttp等等,同样需要再来一篇文章。这两类属于接下来的计划,这篇文章就先到这里。

-- EOF --

添加在分类「 前端开发 」下,并被添加 「Node.js」 标签。