Node源码解析——net
上篇文章涉及到了net这个在node中构建异步网络的模块,也是node里很重要的基础模块,这篇文章就来把net搞清楚。
前置知识
因为net涉及到计算机网络的一些知识,考虑到前端对这一块不一定很熟悉,有必要先把相关概念简单解释一下,对细节有兴趣的可以再查阅相关资料。
TCP
官网文档上提到,net是用于创建基于流的TCP或IPC服务器与客户端,自然就先从这两个概念入手。TCP属于OSI七层模型的运输层,提供面向连接的可靠字节流服务。它属于一种协议,设定了一系列规则保证网络上的客户端和服务器交换数据的稳定。
Socket
不过从TCP到应用层的通信中,TCP需要为不同类型的应用层提供并发服务,而应用层本身多种多样,也需要一种统一的约束来保证TCP和不同应用的交流。就像插座和不同电器间的关系,就有了Socket。它对TCP(不局限于TCP这一种具体协议进行了抽象,形成最基本的函数交流接口,比如create,listen,accept,connect等。
文件描述符
Unix的哲学就是一切皆文件,自然Socket也不例外。虽然Socket是一个抽象的概念,但在Unix上它表现为一个可操作的文件,文件描述符可以看做该文件的索引,通过类型为int的文件描述符供程序访问并提供给内核定位进行实际操作。
IPC
进程间通信(inter process communication),就是指不同进程间的消息传递,主要用来支持程序中需要多个进程共同完成某个任务,保证多个进程独立运行同时又能相互之间交流信息的方案。Scoket本身是为网络通信而设计的,但在剥离了网络协议栈的处理之后的Socket也能作为IPC的实现方案之一,这种方案学名就称作UNIX Domain Socket,也是net模块的IPC服务器实现。下面两张图表现了两种Socket的工作区别。
源码
先看一个最基础的使用案例,用net创建一个服务器和客户端并相互通信的步骤。
先是服务端,创建后监听本机的8888端口,并当收到数据后打印出信息并回应信息给客户端。
//server.js
const net = require('net');
const server = net.createServer((socket)=>{
socket.on('data', (data) => {
console.log('client send message: ' , data.toString());
});
socket.write('hello client!');
});
server.listen(8888,'127.0.0.1' , ()=>{
console.log(server.address());
});
客户端后于服务器创建,连接到对应端口并监听TCP的消息变动与断开。
//client.js
const net = require('net');
const client = net.connect({port: 8888, host: '127.0.0.1'}, function() {
client.write('hello server!\r\n');
});
client.on('data', (data) => {
console.log('server send message: ' ,data.toString());
client.end();
});
client.on('end', () => {
console.log('disconnected from server');
});
接下来就沿着这个通用的工作流程一步步分析net模块中的内容。
TCP服务器
首先自然从一切的开端createServer
这个方法开始。打开lib/net.js这个文件找到这个方法,做的事很简单,只是实例化了一个Server
对象并返回。进入到这个构造函数中。
function Server(options, connectionListener) {
if (!(this instanceof Server))
return new Server(options, connectionListener);
EventEmitter.call(this);
if (typeof options === 'function') {
connectionListener = options;
options = {};
this.on('connection', connectionListener);
} else if (options == null || typeof options === 'object') {
options = { ...options };
if (typeof connectionListener === 'function') {
this.on('connection', connectionListener);
}
} else {
throw new ERR_INVALID_ARG_TYPE('options', 'Object', options);
}
this._connections = 0;
ObjectDefineProperty(this, 'connections', {
get: deprecate(() => {
if (this._usingWorkers) {
return null;
}
return this._connections;
}, 'Server.connections property is deprecated. ' +
'Use Server.getConnections method instead.', 'DEP0020'),
set: deprecate((val) => (this._connections = val),
'Server.connections property is deprecated.',
'DEP0020'),
configurable: true, enumerable: false
});
this[async_id_symbol] = -1;
this._handle = null;
this._usingWorkers = false;
this._workers = [];
this._unref = false;
this.allowHalfOpen = options.allowHalfOpen || false;
this.pauseOnConnect = !!options.pauseOnConnect;
}
这里主要初始化了一些内部参数,可以看到有cluster模块提到的_workers和_handle。这个_handle就是用来保存服务器的socket。但注意到在构造函数中并没有创建socket,而listen的回调第一个参数就是socket,那么显然关键就在listen中了。看设置到Server
原型链上的listen
方法。
Server.prototype.listen = function(...args) {
const normalized = normalizeArgs(args);
let options = normalized[0];
const cb = normalized[1];
if (this._handle) {
throw new ERR_SERVER_ALREADY_LISTEN();
}
if (cb !== null) {
this.once('listening', cb);
}
const backlogFromArgs =
// (handle, backlog) or (path, backlog) or (port, backlog)
toNumber(args.length > 1 && args[1]) ||
toNumber(args.length > 2 && args[2]); // (port, host, backlog)
options = options._handle || options.handle || options;
const flags = getFlags(options.ipv6Only);
// (handle[, backlog][, cb]) where handle is an object with a handle
if (options instanceof TCP) {
this._handle = options;
this[async_id_symbol] = this._handle.getAsyncId();
listenInCluster(this, null, -1, -1, backlogFromArgs);
return this;
}
// (handle[, backlog][, cb]) where handle is an object with a fd
if (typeof options.fd === 'number' && options.fd >= 0) {
listenInCluster(this, null, null, null, backlogFromArgs, options.fd);
return this;
}
// ([port][, host][, backlog][, cb]) where port is omitted,
// that is, listen(), listen(null), listen(cb), or listen(null, cb)
// or (options[, cb]) where options.port is explicitly set as undefined or
// null, bind to an arbitrary unused port
if (args.length === 0 || typeof args[0] === 'function' ||
(typeof options.port === 'undefined' && 'port' in options) ||
options.port === null) {
options.port = 0;
}
// ([port][, host][, backlog][, cb]) where port is specified
// or (options[, cb]) where options.port is specified
// or if options.port is normalized as 0 before
let backlog;
if (typeof options.port === 'number' || typeof options.port === 'string') {
if (!isLegalPort(options.port)) {
throw new ERR_SOCKET_BAD_PORT(options.port);
}
backlog = options.backlog || backlogFromArgs;
// start TCP server listening on host:port
if (options.host) {
lookupAndListen(this, options.port | 0, options.host, backlog,
options.exclusive, flags);
} else { // Undefined host, listens on unspecified address
// Default addressType 4 will be used to search for master server
listenInCluster(this, null, options.port | 0, 4,
backlog, undefined, options.exclusive);
}
return this;
}
// (path[, backlog][, cb]) or (options[, cb])
// where path or options.path is a UNIX domain socket or Windows pipe
if (options.path && isPipeName(options.path)) {
const pipeName = this._pipeName = options.path;
backlog = options.backlog || backlogFromArgs;
listenInCluster(this, pipeName, -1, -1,
backlog, undefined, options.exclusive);
if (!this._handle) {
// Failed and an error shall be emitted in the next tick.
// Therefore, we directly return.
return this;
}
let mode = 0;
if (options.readableAll === true)
mode |= PipeConstants.UV_READABLE;
if (options.writableAll === true)
mode |= PipeConstants.UV_WRITABLE;
if (mode !== 0) {
const err = this._handle.fchmod(mode);
if (err) {
this._handle.close();
this._handle = null;
throw errnoException(err, 'uv_pipe_chmod');
}
}
return this;
}
if (!(('port' in options) || ('path' in options))) {
throw new ERR_INVALID_ARG_VALUE('options', options,
'must have the property "port" or "path"');
}
throw new ERR_INVALID_OPT_VALUE('options', inspect(options));
}
这个函数在cluster模块也分析过,做的事就是根据参数类型调用listenInCluster
启动监听。现在就具体看一下有哪些情况,实际上注释也写的很清楚:
- TCP对象
- 文件描述符
- UNIX domain socket
- Windows pipe
- IP地址和端口
总之最终都会定位到一个Socket。然后进入listenInCluster
函数,涉及到主从进程的部分就不重复分析了,确定好用哪条进程创建这个服务器后,就会调用Server
实例的这个_listen2内部方法正式开始监听, _listen2通过legacy alias到setupListenHandle。
function setupListenHandle(address, port, addressType, backlog, fd, flags) {
debug('setupListenHandle', address, port, addressType, backlog, fd);
// If there is not yet a handle, we need to create one and bind.
// In the case of a server sent via IPC, we don't need to do this.
if (this._handle) {
debug('setupListenHandle: have a handle already');
} else {
debug('setupListenHandle: create a handle');
let rval = null;
// Try to bind to the unspecified IPv6 address, see if IPv6 is available
if (!address && typeof fd !== 'number') {
rval = createServerHandle(DEFAULT_IPV6_ADDR, port, 6, fd, flags);
if (typeof rval === 'number') {
rval = null;
address = DEFAULT_IPV4_ADDR;
addressType = 4;
} else {
address = DEFAULT_IPV6_ADDR;
addressType = 6;
}
}
if (rval === null)
rval = createServerHandle(address, port, addressType, fd, flags);
if (typeof rval === 'number') {
const error = uvExceptionWithHostPort(rval, 'listen', address, port);
process.nextTick(emitErrorNT, this, error);
return;
}
this._handle = rval;
}
this[async_id_symbol] = getNewAsyncId(this._handle);
this._handle.onconnection = onconnection;
this._handle[owner_symbol] = this;
// Use a backlog of 512 entries. We pass 511 to the listen() call because
// the kernel does: backlogsize = roundup_pow_of_two(backlogsize + 1);
// which will thus give us a backlog of 512 entries.
const err = this._handle.listen(backlog || 511);
if (err) {
const ex = uvExceptionWithHostPort(err, 'listen', address, port);
this._handle.close();
this._handle = null;
defaultTriggerAsyncIdScope(this[async_id_symbol],
process.nextTick,
emitErrorNT,
this,
ex);
return;
}
// Generate connection key, this should be unique to the connection
this._connectionKey = addressType + ':' + address + ':' + port;
// Unref the handle if the server was unref'ed prior to listening
if (this._unref)
this.unref();
defaultTriggerAsyncIdScope(this[async_id_symbol],
process.nextTick,
emitListeningNT,
this);
}
可以看到这个如果没有_handle的话是不会真正的监听的,这也是cluster的worker进程hack掉_handle的依据。如果_handle存在,进入createServerHandle
创建服务器句柄,这里也会根据参数区分ipv4和ipv6。继续看这个函数。
function createServerHandle(address, port, addressType, fd, flags) {
let err = 0;
// Assign handle in listen, and clean up if bind or listen fails
let handle;
let isTCP = false;
if (typeof fd === 'number' && fd >= 0) {
try {
handle = createHandle(fd, true);
} catch (e) {
// Not a fd we can listen on. This will trigger an error.
debug('listen invalid fd=%d:', fd, e.message);
return UV_EINVAL;
}
err = handle.open(fd);
if (err)
return err;
assert(!address && !port);
} else if (port === -1 && addressType === -1) {
handle = new Pipe(PipeConstants.SERVER);
if (process.platform === 'win32') {
const instances = parseInt(process.env.NODE_PENDING_PIPE_INSTANCES);
if (!NumberIsNaN(instances)) {
handle.setPendingInstances(instances);
}
}
} else {
handle = new TCP(TCPConstants.SERVER);
isTCP = true;
}
if (address || port || isTCP) {
debug('bind to', address || 'any');
if (!address) {
// Try binding to ipv6 first
err = handle.bind6(DEFAULT_IPV6_ADDR, port, flags);
if (err) {
handle.close();
// Fallback to ipv4
return createServerHandle(DEFAULT_IPV4_ADDR, port);
}
} else if (addressType === 6) {
err = handle.bind6(address, port, flags);
} else {
err = handle.bind(address, port);
}
}
if (err) {
handle.close();
return err;
}
return handle;
}
根据参数不同完成服务器的创建和端口的绑定,TCP
和Pipe
这两个对象都由c/c++模块提供。到这一步已经完成socket的创建和绑定了。再回到setupListenHandle
中,最后会调用_handle的_listen方法执行c++模块启动socket的listen。到这一步服务器已经准备就绪,接下来就等客户端连接并调用onconnection
响应了。onconnection
的设置也在net.js这个文件中。
function onconnection(err, clientHandle) {
const handle = this;
const self = handle[owner_symbol];
debug('onconnection');
if (err) {
self.emit('error', errnoException(err, 'accept'));
return;
}
if (self.maxConnections && self._connections >= self.maxConnections) {
clientHandle.close();
return;
}
const socket = new Socket({
handle: clientHandle,
allowHalfOpen: self.allowHalfOpen,
pauseOnCreate: self.pauseOnConnect,
readable: true,
writable: true
});
self._connections++;
socket.server = self;
socket._server = self;
DTRACE_NET_SERVER_CONNECTION(socket);
self.emit('connection', socket);
}
新建了Socket对象并通过connection
事件抛出,也就是createServer
时的回调能拿到的Socket对象了。
TCP客户端
客户端的入口在net.connect,进入具体实现:
function connect(...args) {
const normalized = normalizeArgs(args);
const options = normalized[0];
debug('createConnection', normalized);
const socket = new Socket(options);
if (options.timeout) {
socket.setTimeout(options.timeout);
}
return socket.connect(normalized);
}
同样是实例化了一个Socket对象,并调用其connect方法。到这里就把客户端和服务器打通了,本质上是两个Socket对象的交流。那么自然的,下一步就应该看看Socket构造函数的实现。
function Socket(options) {
if (!(this instanceof Socket)) return new Socket(options);
this.connecting = false;
// Problem with this is that users can supply their own handle, that may not
// have _handle.getAsyncId(). In this case an[async_id_symbol] should
// probably be supplied by async_hooks.
this[async_id_symbol] = -1;
this._hadError = false;
this[kHandle] = null;
this._parent = null;
this._host = null;
this[kLastWriteQueueSize] = 0;
this[kTimeout] = null;
this[kBuffer] = null;
this[kBufferCb] = null;
this[kBufferGen] = null;
if (typeof options === 'number')
options = { fd: options }; // Legacy interface.
else
options = { ...options };
options.readable = options.readable || false;
options.writable = options.writable || false;
const { allowHalfOpen } = options;
// Prevent the "no-half-open enforcer" from being inherited from `Duplex`.
options.allowHalfOpen = true;
// For backwards compat do not emit close on destroy.
options.emitClose = false;
options.autoDestroy = false;
// Handle strings directly.
options.decodeStrings = false;
stream.Duplex.call(this, options);
// Default to *not* allowing half open sockets.
this.allowHalfOpen = Boolean(allowHalfOpen);
if (options.handle) {
this._handle = options.handle; // private
this[async_id_symbol] = getNewAsyncId(this._handle);
} else {
const onread = options.onread;
if (onread !== null && typeof onread === 'object' &&
(isUint8Array(onread.buffer) || typeof onread.buffer === 'function') &&
typeof onread.callback === 'function') {
if (typeof onread.buffer === 'function') {
this[kBuffer] = true;
this[kBufferGen] = onread.buffer;
} else {
this[kBuffer] = onread.buffer;
}
this[kBufferCb] = onread.callback;
}
if (options.fd !== undefined) {
const { fd } = options;
let err;
// createHandle will throw ERR_INVALID_FD_TYPE if `fd` is not
// a valid `PIPE` or `TCP` descriptor
this._handle = createHandle(fd, false);
err = this._handle.open(fd);
// While difficult to fabricate, in some architectures
// `open` may return an error code for valid file descriptors
// which cannot be opened. This is difficult to test as most
// un-openable fds will throw on `createHandle`
if (err)
throw errnoException(err, 'open');
this[async_id_symbol] = this._handle.getAsyncId();
if ((fd === 1 || fd === 2) &&
(this._handle instanceof Pipe) &&
process.platform === 'win32') {
// Make stdout and stderr blocking on Windows
err = this._handle.setBlocking(true);
if (err)
throw errnoException(err, 'setBlocking');
this._writev = null;
this._write = makeSyncWrite(fd);
// makeSyncWrite adjusts this value like the original handle would, so
// we need to let it do that by turning it into a writable, own
// property.
ObjectDefineProperty(this._handle, 'bytesWritten', {
value: 0, writable: true
});
}
}
}
// Shut down the socket when we're finished with it.
this.on('end', onReadableStreamEnd);
initSocketHandle(this);
this._pendingData = null;
this._pendingEncoding = '';
// If we have a handle, then start the flow of data into the
// buffer. if not, then this will happen when we connect
if (this._handle && options.readable !== false) {
if (options.pauseOnCreate) {
// Stop the handle from reading and pause the stream
this._handle.reading = false;
this._handle.readStop();
this.readableFlowing = false;
} else if (!options.manualStart) {
this.read(0);
}
}
// Reserve properties
this.server = null;
this._server = null;
// Used after `.destroy()`
this[kBytesRead] = 0;
this[kBytesWritten] = 0;
}
ObjectSetPrototypeOf(Socket.prototype, stream.Duplex.prototype);
ObjectSetPrototypeOf(Socket, stream.Duplex);
Socket里除了初始化一些属性外,最重要的一点是继承了stream.Duplex的读写能力,同时调用了initSocketHandle
初始化_handle。
再看Socket实例的connect方法:
Socket.prototype.connect = function(...args) {
let normalized;
// If passed an array, it's treated as an array of arguments that have
// already been normalized (so we don't normalize more than once). This has
// been solved before in https://github.com/nodejs/node/pull/12342, but was
// reverted as it had unintended side effects.
if (ArrayIsArray(args[0]) && args[0][normalizedArgsSymbol]) {
normalized = args[0];
} else {
normalized = normalizeArgs(args);
}
const options = normalized[0];
const cb = normalized[1];
if (this.write !== Socket.prototype.write)
this.write = Socket.prototype.write;
if (this.destroyed) {
this._handle = null;
this._peername = null;
this._sockname = null;
}
const { path } = options;
const pipe = !!path;
debug('pipe', pipe, path);
if (!this._handle) {
this._handle = pipe ?
new Pipe(PipeConstants.SOCKET) :
new TCP(TCPConstants.SOCKET);
initSocketHandle(this);
}
if (cb !== null) {
this.once('connect', cb);
}
this._unrefTimer();
this.connecting = true;
this.writable = true;
if (pipe) {
validateString(path, 'options.path');
defaultTriggerAsyncIdScope(
this[async_id_symbol], internalConnect, this, path
);
} else {
lookupAndConnect(this, options);
}
return
除了参数设置外,值得注意的是用了自身原型链上的write覆盖了原write方法,然后调用lookupAndConnect
借用自己的dns模块查询服务器并连接,这里涉及到defaultTriggerAsyncIdScope
这个异步钩子方法,连接成功的回调会调用internalConnect
。
function internalConnect(
self, address, port, addressType, localAddress, localPort, flags) {
// TODO return promise from Socket.prototype.connect which
// wraps _connectReq.
assert(self.connecting);
let err;
if (localAddress || localPort) {
if (addressType === 4) {
localAddress = localAddress || DEFAULT_IPV4_ADDR;
err = self._handle.bind(localAddress, localPort);
} else { // addressType === 6
localAddress = localAddress || DEFAULT_IPV6_ADDR;
err = self._handle.bind6(localAddress, localPort, flags);
}
debug('binding to localAddress: %s and localPort: %d (addressType: %d)',
localAddress, localPort, addressType);
err = checkBindError(err, localPort, self._handle);
if (err) {
const ex = exceptionWithHostPort(err, 'bind', localAddress, localPort);
self.destroy(ex);
return;
}
}
if (addressType === 6 || addressType === 4) {
const req = new TCPConnectWrap();
req.oncomplete = afterConnect;
req.address = address;
req.port = port;
req.localAddress = localAddress;
req.localPort = localPort;
if (addressType === 4)
err = self._handle.connect(req, address, port);
else
err = self._handle.connect6(req, address, port);
} else {
const req = new PipeConnectWrap();
req.address = address;
req.oncomplete = afterConnect;
err = self._handle.connect(req, address, afterConnect);
}
if (err) {
const sockname = self._getsockname();
let details;
if (sockname) {
details = sockname.address + ':' + sockname.port;
}
const ex = exceptionWithHostPort(err, 'connect', address, port, details);
self.destroy(ex);
}
}
这里会经过一系列连接类型的判断构造参数,调用_handle上的connect启用c++模块建立连接,连接完成后会有一个afterConnect
的回调。
function afterConnect(status, handle, req, readable, writable) {
const self = handle[owner_symbol];
// Callback may come after call to destroy
if (self.destroyed) {
return;
}
debug('afterConnect');
assert(self.connecting);
self.connecting = false;
self._sockname = null;
if (status === 0) {
self.readable = readable;
if (!self._writableState.ended)
self.writable = writable;
self._unrefTimer();
self.emit('connect');
self.emit('ready');
// Start the first read, or get an immediate EOF.
// this doesn't actually consume any bytes, because len=0.
if (readable && !self.isPaused())
self.read(0);
} else {
self.connecting = false;
let details;
if (req.localAddress && req.localPort) {
details = req.localAddress + ':' + req.localPort;
}
const ex = exceptionWithHostPort(status,
'connect',
req.address,
req.port,
details);
if (details) {
ex.localAddress = req.localAddress;
ex.localPort = req.localPort;
}
self.destroy(ex);
}
}
在这里触发了connect
和ready
事件,到这里就完成了服务器和客户端的连接。
小结
到这里关于net
工作流程的简单介绍就结束了,可能有人会疑惑怎么只谈到了服务器和客户端的连接,数据交互呢?实际上也讲到了,数据交互的关键继承自Stream
这个模块,应该再起一篇文章单独谈。同时net
还用到了dns
这个模块,当然Node中的网络通信除了net
外还有dns
,udp
,http
等等,同样需要再来一篇文章。这两类属于接下来的计划,这篇文章就先到这里。
-- EOF --