Node源码解析——http(中)
上一篇文章中介绍了Server的准备,这一篇继续看剩下的问题。
request
首先把另一半看一下,http当然不止响应,还有请求的能力。与createServer对应,同样是从顶层暴露出的request接口。
function request(url, options, cb) {
return new ClientRequest(url, options, cb);
}
实例化了来自_http_client.js
的ClientRequest构造函数。这个构造函数内做的事情比较多,省略掉非关键内容介绍核心流程。
function ClientRequest(input, options, cb) {
OutgoingMessage.call(this);
// ...
let agent = options.agent;
const defaultAgent = options._defaultAgent || Agent.globalAgent;
if (agent === false) {
agent = new defaultAgent.constructor();
} else if (agent === null || agent === undefined) {
if (typeof options.createConnection !== 'function') {
agent = defaultAgent;
}
// Explicitly pass through this statement as agent will not be used
// when createConnection is provided.
} else if (typeof agent.addRequest !== 'function') {
throw new ERR_INVALID_ARG_TYPE('options.agent',
['Agent-like Object', 'undefined', 'false'],
agent);
}
this.agent = agent;
// ...
if (this.agent) {
// If there is an agent we should default to Connection:keep-alive,
// but only if the Agent will actually reuse the connection!
// If it's not a keepAlive agent, and the maxSockets==Infinity, then
// there's never a case where this socket will actually be reused
if (!this.agent.keepAlive && !NumberIsFinite(this.agent.maxSockets)) {
this._last = true;
this.shouldKeepAlive = false;
} else {
this._last = false;
this.shouldKeepAlive = true;
}
}
// ...
const oncreate = (err, socket) => {
if (called)
return;
called = true;
if (err) {
process.nextTick(() => this.emit('error', err));
return;
}
this.onSocket(socket);
this._deferToConnect(null, null, () => this._flush());
};
// initiate connection
if (this.agent) {
this.agent.addRequest(this, options);
} else {
// No agent, default to Connection:close.
this._last = true;
this.shouldKeepAlive = false;
if (typeof options.createConnection === 'function') {
const newSocket = options.createConnection(options, oncreate);
if (newSocket && !called) {
called = true;
this.onSocket(newSocket);
} else {
return;
}
} else {
debug('CLIENT use net.createConnection', options);
this.onSocket(net.createConnection(options));
}
}
this._deferToConnect(null, null, () => this._flush());
}
ObjectSetPrototypeOf(ClientRequest.prototype, OutgoingMessage.prototype);
ObjectSetPrototypeOf(ClientRequest, OutgoingMessage);
在这个构造函数中首先要注意到的就是继承了OutgoingMessage,再就是agent的初始化。这里直接从官网复制过来一段关于agent的介绍。
An
Agent
is responsible for managing connection persistence and reuse for HTTP clients. It maintains a queue of pending requests for a given host and port, reusing a single socket connection for each until the queue is empty, at which time the socket is either destroyed or put into a pool where it is kept to be used again for requests to the same host and port. Whether it is destroyed or pooled depends on thekeepAlive
option.Pooled connections have TCP Keep-Alive enabled for them, but servers may still close idle connections, in which case they will be removed from the pool and a new connection will be made when a new HTTP request is made for that host and port. Servers may also refuse to allow multiple requests over the same connection, in which case the connection will have to be remade for every request and cannot be pooled. The
Agent
will still make the requests to that server, but each one will occur over a new connection.When a connection is closed by the client or the server, it is removed from the pool. Any unused sockets in the pool will be unrefed so as not to keep the Node.js process running when there are no outstanding requests.
因为http的每个请求都是基于tcp的,需要通过三次握手建立连接,比较消耗时间,在浏览器中有浏览器帮我们维护tcp复用,而node就需要我们自己维护了,所以给http请求提供了一层代理服务,用于管理连接的创建销毁以及复用。
在谈到agent实现之前,先说一下agent和maxSockets的关系,注释有提到,keepalive的默认maxSockets为无限,因为可以实现socket复用。但keepalive关闭的时候就需要通过maxSockets限制socket数量了,因为每个tcp连接创建的socket本质还是一个文件句柄,而linux限制了单一进程最大可打开文件数。
最后做了两个操作,一是通过agent的addRequest方法准备处理请求,二是通过_deferToConnect设置当socket被连接时调用来自OutgoingMessage的_flush方法刷新掉挂起的请求信息。关于_deferToConnect的作用注释也说明了。
// This function is for calls that need to happen once the socket is
// assigned to this request and writable. It's an important promisy
// thing for all the socket calls that happen either now
// (when a socket is assigned) or in the future (when a socket gets
// assigned out of the pool and is eventually writable).
既然要看agent的addRequest,就先看agent的实现思路,来到_http_agent.js
。
agent
先看构造函数。
function Agent(options) {
if (!(this instanceof Agent))
return new Agent(options);
EventEmitter.call(this);
this.defaultPort = 80;
this.protocol = 'http:';
this.options = { ...options };
// Don't confuse net and make it think that we're connecting to a pipe
this.options.path = null;
this.requests = {};
this.sockets = {};
this.freeSockets = {};
this.keepAliveMsecs = this.options.keepAliveMsecs || 1000;
this.keepAlive = this.options.keepAlive || false;
this.maxSockets = this.options.maxSockets || Agent.defaultMaxSockets;
this.maxFreeSockets = this.options.maxFreeSockets || 256;
this.on('free', (socket, options) => {
const name = this.getName(options);
debug('agent.on(free)', name);
if (socket.writable &&
this.requests[name] && this.requests[name].length) {
const req = this.requests[name].shift();
setRequestSocket(this, req, socket);
if (this.requests[name].length === 0) {
// don't leak
delete this.requests[name];
}
} else {
// If there are no pending requests, then put it in
// the freeSockets pool, but only if we're allowed to do so.
const req = socket._httpMessage;
if (req &&
req.shouldKeepAlive &&
socket.writable &&
this.keepAlive) {
let freeSockets = this.freeSockets[name];
const freeLen = freeSockets ? freeSockets.length : 0;
let count = freeLen;
if (this.sockets[name])
count += this.sockets[name].length;
if (count > this.maxSockets || freeLen >= this.maxFreeSockets) {
socket.destroy();
} else if (this.keepSocketAlive(socket)) {
freeSockets = freeSockets || [];
this.freeSockets[name] = freeSockets;
socket[async_id_symbol] = -1;
socket._httpMessage = null;
this.removeSocket(socket, options);
freeSockets.push(socket);
} else {
// Implementation doesn't want to keep socket alive
socket.destroy();
}
} else {
socket.destroy();
}
}
});
// Don't emit keylog events unless there is a listener for them.
this.on('newListener', maybeEnableKeylog);
}
ObjectSetPrototypeOf(Agent.prototype, EventEmitter.prototype);
ObjectSetPrototypeOf(Agent, EventEmitter);
首先继承了EventEmiiter,再除了初始化一些属性外,主要实现了free事件的监听。应该也能猜到free事件就是agent由工作中转空闲的一些处理,那么还是先看agent是怎么进入工作的。找到addRequest实现。
Agent.prototype.addRequest = function addRequest(req, options, port/* legacy */,
localAddress/* legacy */) {
// Legacy API: addRequest(req, host, port, localAddress)
// ...
const name = this.getName(options);
if (!this.sockets[name]) {
this.sockets[name] = [];
}
const freeLen = this.freeSockets[name] ? this.freeSockets[name].length : 0;
const sockLen = freeLen + this.sockets[name].length;
if (freeLen) {
// We have a free socket, so use that.
const socket = this.freeSockets[name].shift();
// Guard against an uninitialized or user supplied Socket.
const handle = socket._handle;
if (handle && typeof handle.asyncReset === 'function') {
// Assign the handle a new asyncId and run any destroy()/init() hooks.
handle.asyncReset(new ReusedHandle(handle.getProviderType(), handle));
socket[async_id_symbol] = handle.getAsyncId();
}
// don't leak
if (!this.freeSockets[name].length)
delete this.freeSockets[name];
this.reuseSocket(socket, req);
setRequestSocket(this, req, socket);
this.sockets[name].push(socket);
} else if (sockLen < this.maxSockets) {
debug('call onSocket', sockLen, freeLen);
// If we are under maxSockets create a new one.
this.createSocket(req, options, handleSocketCreation(this, req, true));
} else {
debug('wait for socket');
// We are over limit so we'll add it to the queue.
if (!this.requests[name]) {
this.requests[name] = [];
}
this.requests[name].push(req);
}
};
这里的逻辑和注释都很清晰。当一个新的请求到来时,会基于这个请求的端口,ip地址等信息由getName方法得到一个标识name。由于每个请求都必然需要对应一个socket,所以在agent里维护有freeSocket和socket两个数组分别表示空闲和工作中的socket,同时维护一个requests队列,当无freeSocket时将请求入队等待。
当请求来到,有空闲socket时就会复用。无空闲socket就检测socket是否达到最大值,达到最大值请求就排队等待,否则新建socket处理请求。进入createSocket看新建逻辑。
Agent.prototype.createSocket = function createSocket(req, options, cb) {
options = { ...options, ...this.options };
if (options.socketPath)
options.path = options.socketPath;
if (!options.servername && options.servername !== '')
options.servername = calculateServerName(options, req);
const name = this.getName(options);
options._agentKey = name;
debug('createConnection', name, options);
options.encoding = null;
let called = false;
const oncreate = (err, s) => {
if (called)
return;
called = true;
if (err)
return cb(err);
if (!this.sockets[name]) {
this.sockets[name] = [];
}
this.sockets[name].push(s);
debug('sockets', name, this.sockets[name].length);
installListeners(this, s, options);
cb(null, s);
};
const newSocket = this.createConnection(options, oncreate);
if (newSocket)
oncreate(null, newSocket);
};
内部调用了createConnection进行真正的连接创建,同时插入了一个onCreate回调,这个回调做的事是当Socket创建成功时调用installListeners,之后调用一个通过参数传进来的cb回调,往回查能得到这个回调名为handleSocketCreation。那么一步步来,先看createConnection。
Agent.prototype.createConnection = net.createConnection;
直接调用了net模块的createConnection,这里应该隐约能猜到了,毕竟net本身就有connect方法实现socket的连接,http里复用这一层完全没问题。到lib/net.js
验证。
module.exports = {
// ...
connect,
createConnection: connect,
createServer,
// ...
};
果然如此,那请求的后续也就不用赘述了。直接看socket创建成功后的installListeners调用。
function installListeners(agent, s, options) {
function onFree() {
debug('CLIENT socket onFree');
agent.emit('free', s, options);
}
s.on('free', onFree);
function onClose(err) {
debug('CLIENT socket onClose');
// This is the only place where sockets get removed from the Agent.
// If you want to remove a socket from the pool, just close it.
// All socket errors end in a close event anyway.
agent.removeSocket(s, options);
}
s.on('close', onClose);
function onRemove() {
// We need this function for cases like HTTP 'upgrade'
// (defined by WebSockets) where we need to remove a socket from the
// pool because it'll be locked up indefinitely
debug('CLIENT socket onRemove');
agent.removeSocket(s, options);
s.removeListener('close', onClose);
s.removeListener('free', onFree);
s.removeListener('agentRemove', onRemove);
}
s.on('agentRemove', onRemove);
if (agent[kOnKeylog]) {
s.on('keylog', agent[kOnKeylog]);
}
}
socket上安装一系列监听函数。这里就和之前构造函数里的free事件监听串起来了。不过接下来就需要知道socket上的free事件又是怎么触发的。
继续看handleSocketCreation。
function handleSocketCreation(agent, request, informRequest) {
return function handleSocketCreation_Inner(err, socket) {
if (err) {
process.nextTick(emitErrorNT, request, err);
return;
}
if (informRequest)
setRequestSocket(agent, request, socket);
else
socket.emit('free');
};
}
这里会根据informRequest的值决定是调用setRequestSocket还是触发free事件。在createSocket里informRequest为true,那么可想而知,在removeSocket应该也会调用这个函数并传入false触发socket的释放。
继续看创建时调用的setRequestSocket。
function setRequestSocket(agent, req, socket) {
req.onSocket(socket);
const agentTimeout = agent.options.timeout || 0;
if (req.timeout === undefined || req.timeout === agentTimeout) {
return;
}
socket.setTimeout(req.timeout);
// Reset timeout after response end
req.once('response', (res) => {
res.once('end', () => {
if (socket.timeout !== agentTimeout) {
socket.setTimeout(agentTimeout);
}
});
});
}
调用req的onSocket方法。回到_http_client.js
找到该方法。
ClientRequest.prototype.onSocket = function onSocket(socket) {
process.nextTick(onSocketNT, this, socket);
};
function onSocketNT(req, socket) {
if (req.aborted) {
// If we were aborted while waiting for a socket, skip the whole thing.
if (!req.agent) {
socket.destroy();
} else {
req.emit('close');
socket.emit('free');
}
} else {
tickOnSocket(req, socket);
}
}
继续调用tickOnSocket,到这一步就要开始准备处理服务端socket返回的数据了。
function tickOnSocket(req, socket) {
const parser = parsers.alloc();
req.socket = socket;
parser.initialize(HTTPParser.RESPONSE,
new HTTPClientAsyncResource('HTTPINCOMINGMESSAGE', req),
req.maxHeaderSize || 0,
req.insecureHTTPParser === undefined ?
isLenient() : req.insecureHTTPParser);
parser.socket = socket;
parser.outgoing = req;
req.parser = parser;
socket.parser = parser;
socket._httpMessage = req;
// Propagate headers limit from request object to parser
if (typeof req.maxHeadersCount === 'number') {
parser.maxHeaderPairs = req.maxHeadersCount << 1;
}
parser.onIncoming = parserOnIncomingClient;
socket.removeListener('error', freeSocketErrorListener);
socket.on('error', socketErrorListener);
socket.on('data', socketOnData);
socket.on('end', socketOnEnd);
socket.on('close', socketCloseListener);
socket.on('drain', ondrain);
if (
req.timeout !== undefined ||
(req.agent && req.agent.options && req.agent.options.timeout)
) {
listenSocketTimeout(req);
}
req.emit('socket', socket);
}
和服务端处理请求数据的步骤同理,取得parser,初始化后监听incoming,通过parserOnIncomingClient解析客户端到来的数据。
function parserOnIncomingClient(res, shouldKeepAlive) {
const socket = this.socket;
const req = socket._httpMessage;
debug('AGENT incoming response!');
if (req.res) {
// We already have a response object, this means the server
// sent a double response.
socket.destroy();
return 0; // No special treatment.
}
req.res = res;
// Skip body and treat as Upgrade.
if (res.upgrade)
return 2;
// Responses to CONNECT request is handled as Upgrade.
const method = req.method;
if (method === 'CONNECT') {
res.upgrade = true;
return 2; // Skip body and treat as Upgrade.
}
if (statusIsInformational(res.statusCode)) {
// Restart the parser, as this is a 1xx informational message.
req.res = null; // Clear res so that we don't hit double-responses.
// Maintain compatibility by sending 100-specific events
if (res.statusCode === 100) {
req.emit('continue');
}
// Send information events to all 1xx responses except 101 Upgrade.
req.emit('information', {
statusCode: res.statusCode,
statusMessage: res.statusMessage,
httpVersion: res.httpVersion,
httpVersionMajor: res.httpVersionMajor,
httpVersionMinor: res.httpVersionMinor,
headers: res.headers,
rawHeaders: res.rawHeaders
});
return 1; // Skip body but don't treat as Upgrade.
}
if (req.shouldKeepAlive && !shouldKeepAlive && !req.upgradeOrConnect) {
// Server MUST respond with Connection:keep-alive for us to enable it.
// If we've been upgraded (via WebSockets) we also shouldn't try to
// keep the connection open.
req.shouldKeepAlive = false;
}
DTRACE_HTTP_CLIENT_RESPONSE(socket, req);
req.res = res;
res.req = req;
// Add our listener first, so that we guarantee socket cleanup
res.on('end', responseOnEnd);
req.on('prefinish', requestOnPrefinish);
// If the user did not listen for the 'response' event, then they
// can't possibly read the data, so we ._dump() it into the void
// so that the socket doesn't hang there in a paused state.
if (req.aborted || !req.emit('response', res))
res._dump();
if (method === 'HEAD')
return 1; // Skip body but don't treat as Upgrade.
return 0; // No special treatment.
}
注释都写的很清楚,根据服务器传送的内容决定客户端的行为。我们把整个流程打通,看数据传输结束后的行为,进到responseOnEnd。
function responseOnEnd() {
const req = this.req;
if (req.socket && req.timeoutCb) {
req.socket.removeListener('timeout', emitRequestTimeout);
}
req._ended = true;
if (!req.shouldKeepAlive) {
const socket = req.socket;
if (socket.writable) {
debug('AGENT socket.destroySoon()');
if (typeof socket.destroySoon === 'function')
socket.destroySoon();
else
socket.end();
}
assert(!socket.writable);
} else if (req.finished && !this.aborted) {
// We can assume `req.finished` means all data has been written since:
// - `'responseOnEnd'` means we have been assigned a socket.
// - when we have a socket we write directly to it without buffering.
// - `req.finished` means `end()` has been called and no further data.
// can be written
responseKeepAlive(req);
}
}
这里看到,根据keepalive的值,决定是销毁socket还是调用responseKeepAlive。我们还是看如何keepalive的,进入responseKeepAlive。
function responseKeepAlive(req) {
const socket = req.socket;
debug('AGENT socket keep-alive');
if (req.timeoutCb) {
socket.setTimeout(0, req.timeoutCb);
req.timeoutCb = null;
}
socket.removeListener('close', socketCloseListener);
socket.removeListener('error', socketErrorListener);
socket.removeListener('data', socketOnData);
socket.removeListener('end', socketOnEnd);
socket.once('error', freeSocketErrorListener);
// There are cases where _handle === null. Avoid those. Passing null to
// nextTick() will call getDefaultTriggerAsyncId() to retrieve the id.
const asyncId = socket._handle ? socket._handle.getAsyncId() : undefined;
// Mark this socket as available, AFTER user-added end
// handlers have a chance to run.
defaultTriggerAsyncIdScope(asyncId, process.nextTick, emitFreeNT, req);
}
清除socket的事件监听,调用emitFreeNT。
function emitFreeNT(req) {
req.emit('close');
if (req.res) {
req.res.emit('close');
}
if (req.socket) {
req.socket.emit('free');
}
}
两边socket触发close事件,客户端触发free事件,回到构造函数里看free回调实现。
this.on('free', (socket, options) => {
const name = this.getName(options);
debug('agent.on(free)', name);
if (socket.writable &&
this.requests[name] && this.requests[name].length) {
const req = this.requests[name].shift();
setRequestSocket(this, req, socket);
if (this.requests[name].length === 0) {
// don't leak
delete this.requests[name];
}
} else {
// If there are no pending requests, then put it in
// the freeSockets pool, but only if we're allowed to do so.
const req = socket._httpMessage;
if (req &&
req.shouldKeepAlive &&
socket.writable &&
this.keepAlive) {
let freeSockets = this.freeSockets[name];
const freeLen = freeSockets ? freeSockets.length : 0;
let count = freeLen;
if (this.sockets[name])
count += this.sockets[name].length;
if (count > this.maxSockets || freeLen >= this.maxFreeSockets) {
socket.destroy();
} else if (this.keepSocketAlive(socket)) {
freeSockets = freeSockets || [];
this.freeSockets[name] = freeSockets;
socket[async_id_symbol] = -1;
socket._httpMessage = null;
this.removeSocket(socket, options);
freeSockets.push(socket);
} else {
// Implementation doesn't want to keep socket alive
socket.destroy();
}
} else {
socket.destroy();
}
}
});
请求队列里还有请求的话继续用这个socket,没有请求并且keepalive开启的话,就把该socket推入freeSocket中等待下次请求到来后直接使用,避免了销毁后再创建的开销,这样就实现了socket复用。这里注意到,当freeSockets数量大于maxSockets时,仍然会销毁,不过keepalive开启后默认的maxSockets是无限大所以也没有问题。
小结
至此就搞清楚了http中双端交互的流程,简单概括,就是在底层利用了net建立基于tcp的socket通信,在上层使用agent管理socket,parser解析http数据,并通过基于stream的incoming和outgoing维护数据。
文章内容又有点多了,不得不再分一篇,还有最后的incoming和outgoing的内部实现没谈到。
-- EOF --