Node源码解析——http(上)
网络模块的最后一系列文章,就看看基于tcp之上的一些网络模块实现,因为已经有了阅读net模块的基础,看这个部分应该会好理解很多。
要看的第一个模块当然是最通用也是大家最熟悉的http了。打开源码文件,发现和之前的模块不一样,除了lib/http.js
这个总出口外,还有6个职责划分清晰的子文件。
先简单介绍下各自的职责:
- _http_agent.js: 官方文档有重点介绍,对应Agent类,负责管理http客户端的连接持久性和重用。
- _http_client.js: 对应http.ClientRequest类,负责正在进行中的请求实现。
- _http_server.js: 对应http.Server类和http.ServerResponse类,负责服务器和服务器响应实现。
- _http_incoming.js: 对应http.IncomingMessage类,两端回调的res继承该类。
- _http_outgoing.js: 对应http.OutgoingMessage类,两端回调的req继承该类。
- _http_common.js: 负责提供上面各个http子模块都会用到的通用方法,比如http解析。
好了,了解了基本结构,再跟着常规流程看源码。第一步当然还是创建服务器。
createServer
这是个直接从最顶层暴露的接口,那么看总入口lib/http.js
。
const {
_connectionListener,
STATUS_CODES,
Server,
ServerResponse
} = require('_http_server');
function createServer(opts, requestListener) {
return new Server(opts, requestListener);
}
很简单,直接实例化了一个来自_http_server的Server实例,如上文所说,这个子文件负责服务器部分,就先看这个构造函数的实现。
function Server(options, requestListener) {
if (!(this instanceof Server)) return new Server(options, requestListener);
if (typeof options === 'function') {
requestListener = options;
options = {};
} else if (options == null || typeof options === 'object') {
options = { ...options };
} else {
throw new ERR_INVALID_ARG_TYPE('options', 'object', options);
}
this[kIncomingMessage] = options.IncomingMessage || IncomingMessage;
this[kServerResponse] = options.ServerResponse || ServerResponse;
net.Server.call(this, { allowHalfOpen: true });
if (requestListener) {
this.on('request', requestListener);
}
// Similar option to this. Too lazy to write my own docs.
// http://www.squid-cache.org/Doc/config/half_closed_clients/
// http://wiki.squid-cache.org/SquidFaq/InnerWorkings#What_is_a_half-closed_filedescriptor.3F
this.httpAllowHalfOpen = false;
this.on('connection', connectionListener);
this.timeout = 0;
this.keepAliveTimeout = 5000;
this.maxHeadersCount = null;
this.headersTimeout = 40 * 1000; // 40 seconds
}
ObjectSetPrototypeOf(Server.prototype, net.Server.prototype);
ObjectSetPrototypeOf(Server, net.Server);
除了一些选项处理和属性初始化外,最值得关注的就是继承了net模块的实现,以及request和connection事件的监听,其中connection事件的监听回调和net由用户自定义不同,是由node本身实现。那么也就能预见到,在tcp连接之上的http,应该在connection时有一些特殊处理。
Server的启动,也就是listen实现完全继承自net就不再赘述,这一步会得到一个开启listen的socket等待连接,我们直接看http区别于net的响应连接部分connectionListener。
function connectionListener(socket) {
defaultTriggerAsyncIdScope(
getOrSetAsyncId(socket), connectionListenerInternal, this, socket
);
}
function connectionListenerInternal(server, socket) {
debug('SERVER new http connection');
// Ensure that the server property of the socket is correctly set.
// See https://github.com/nodejs/node/issues/13435
if (socket.server === null)
socket.server = server;
// If the user has added a listener to the server,
// request, or response, then it's their responsibility.
// otherwise, destroy on timeout by default
if (server.timeout && typeof socket.setTimeout === 'function')
socket.setTimeout(server.timeout);
socket.on('timeout', socketOnTimeout);
const parser = parsers.alloc();
// TODO(addaleax): This doesn't play well with the
// `async_hooks.currentResource()` proposal, see
// https://github.com/nodejs/node/pull/21313
parser.initialize(
HTTPParser.REQUEST,
new HTTPServerAsyncResource('HTTPINCOMINGMESSAGE', socket)
);
parser.socket = socket;
// We are starting to wait for our headers.
parser.parsingHeadersStart = nowDate();
socket.parser = parser;
// Propagate headers limit from server instance to parser
if (typeof server.maxHeadersCount === 'number') {
parser.maxHeaderPairs = server.maxHeadersCount << 1;
}
const state = {
onData: null,
onEnd: null,
onClose: null,
onDrain: null,
outgoing: [],
incoming: [],
// `outgoingData` is an approximate amount of bytes queued through all
// inactive responses. If more data than the high watermark is queued - we
// need to pause TCP socket/HTTP parser, and wait until the data will be
// sent to the client.
outgoingData: 0,
keepAliveTimeoutSet: false
};
state.onData = socketOnData.bind(undefined, server, socket, parser, state);
state.onEnd = socketOnEnd.bind(undefined, server, socket, parser, state);
state.onClose = socketOnClose.bind(undefined, socket, state);
state.onDrain = socketOnDrain.bind(undefined, socket, state);
socket.on('data', state.onData);
socket.on('error', socketOnError);
socket.on('end', state.onEnd);
socket.on('close', state.onClose);
socket.on('drain', state.onDrain);
parser.onIncoming = parserOnIncoming.bind(undefined, server, socket, state);
// We are consuming socket, so it won't get any actual data
socket.on('resume', onSocketResume);
socket.on('pause', onSocketPause);
// Overrides to unconsume on `data`, `readable` listeners
socket.on = generateSocketListenerWrapper('on');
socket.addListener = generateSocketListenerWrapper('addListener');
socket.prependListener = generateSocketListenerWrapper('prependListener');
// We only consume the socket if it has never been consumed before.
if (socket._handle && socket._handle.isStreamBase &&
!socket._handle._consumed) {
parser._consumed = true;
socket._handle._consumed = true;
parser.consume(socket._handle);
}
parser[kOnExecute] =
onParserExecute.bind(undefined, server, socket, parser, state);
socket._paused = false;
}
先是一个async_hooks记录,真正调用的是connectionListenerInternal,就是一个准备parser解析socket即将到来的请求数据的过程。这里有很多注释说明了这个过程中的细节,我们先关注整个流程。
parser
首先自然是parser是如何构建和分配的,可以看到是调用了来自_http_common_
的parsers的alloc方法。进去看细节:
const parsers = new FreeList('parsers', 1000, function parsersCb() {
const parser = new HTTPParser();
cleanParser(parser);
parser.onIncoming = null;
parser[kOnHeaders] = parserOnHeaders;
parser[kOnHeadersComplete] = parserOnHeadersComplete;
parser[kOnBody] = parserOnBody;
parser[kOnMessageComplete] = parserOnMessageComplete;
return parser;
});
这里构造了一种名为FreeList的数据结构来管理parser群,大概能明白有1000的上限,并且在其中的每个parser上设定有一系列解析钩子。再看FreeList源码究竟是什么结构:
class FreeList {
constructor(name, max, ctor) {
this.name = name;
this.ctor = ctor;
this.max = max;
this.list = [];
}
hasItems() {
return this.list.length > 0;
}
alloc() {
return this.list.length > 0 ?
this.list.pop() :
ReflectApply(this.ctor, this, arguments);
}
free(obj) {
if (this.list.length < this.max) {
this.list.push(obj);
return true;
}
return false;
}
}
就是一个数组结构,并在其中看到了connection调用的alloc方法,逻辑也很简单,就是预留了1000个 parsers,每次有连接建立时,都会从空闲数组中分配一个,全部分配完毕再新建。理所当然的能推断出,当连接断开时,就会调用free把parser回推到空闲数组再利用。总的来说,这样一个数据结构就是为了尽可能的实现parser的复用,减少实例化parser的开销。
同时注意到,parser的构造函数来自于c++模块的node_http_parser.cc
,进去先看注释
// This is a binding to llhttp (https://github.com/nodejs/llhttp)
// The goal is to decouple sockets from parsing for more javascript-level
// agility. A Buffer is read from a socket and passed to parser.execute().
// The parser then issues callbacks with slices of the data
// parser.onMessageBegin
// parser.onPath
// parser.onBody
// ...
用到了llhttp这个依赖,关于llhttp的原理这里就不多解释了不是重点,知乎有一篇文章写得很清楚感兴趣可以移步:llhttp是如何使Node.js性能翻倍的?,总之我们只需要知道node里用到了这个解析器,这个解析器也提供了一些解析各个阶段完成时的钩子函数给我们做事,node主要用到的阶段按时间可以分为如下阶段:
- parserOnHeaders: 解析请求头数据时
- parserOnHeadersComplete: 请求头解析完毕时
- parserOnBody:解析请求体数据时
- parserOnMessageComplete: 请求体解析完毕时
- parserOnIncoming: 请求解析完毕时
那么就回到设置这些回调的地方_http_common_
看看分别做了哪些处理。
// Only called in the slow case where slow means
// that the request headers were either fragmented
// across multiple TCP packets or too large to be
// processed in a single run. This method is also
// called to process trailing HTTP headers.
function parserOnHeaders(headers, url) {
// Once we exceeded headers limit - stop collecting them
if (this.maxHeaderPairs <= 0 ||
this._headers.length < this.maxHeaderPairs) {
this._headers = this._headers.concat(headers);
}
this._url += url;
}
注释说的很明白,这个回调主要是在慢速网络下调用,慢速网络下的请求头可能跨越多个tcp数据包,无法一次处理,就在这个回调中合并。
// `headers` and `url` are set only if .onHeaders() has not been called for
// this request.
// `url` is not set for response parsers but that's not applicable here since
// all our parsers are request parsers.
function parserOnHeadersComplete(versionMajor, versionMinor, headers, method,
url, statusCode, statusMessage, upgrade,
shouldKeepAlive) {
const parser = this;
const { socket } = parser;
if (headers === undefined) {
headers = parser._headers;
parser._headers = [];
}
if (url === undefined) {
url = parser._url;
parser._url = '';
}
// Parser is also used by http client
const ParserIncomingMessage = (socket && socket.server &&
socket.server[kIncomingMessage]) ||
IncomingMessage;
const incoming = parser.incoming = new ParserIncomingMessage(socket);
incoming.httpVersionMajor = versionMajor;
incoming.httpVersionMinor = versionMinor;
incoming.httpVersion = `${versionMajor}.${versionMinor}`;
incoming.url = url;
incoming.upgrade = upgrade;
let n = headers.length;
// If parser.maxHeaderPairs <= 0 assume that there's no limit.
if (parser.maxHeaderPairs > 0)
n = MathMin(n, parser.maxHeaderPairs);
incoming._addHeaderLines(headers, n);
if (typeof method === 'number') {
// server only
incoming.method = methods[method];
} else {
// client only
incoming.statusCode = statusCode;
incoming.statusMessage = statusMessage;
}
return parser.onIncoming(incoming, shouldKeepAlive);
}
这一步把请求头解析完毕后,就会给parser实例化一个IncomingMessage对象了,这个构造函数也就是来自于_http_incoming.js
这个子文件。接下来调用parser的onIncoming。这个回调不是在_http_common
中设置,而是在connectionListenerInternal中构建parser实例完成后。
parser.onIncoming = parserOnIncoming.bind(undefined, server, socket, state);
看实现。
function parserOnIncoming(server, socket, state, req, keepAlive) {
resetSocketTimeout(server, socket, state);
if (server.keepAliveTimeout > 0) {
req.on('end', resetHeadersTimeoutOnReqEnd);
}
// Set to zero to communicate that we have finished parsing.
socket.parser.parsingHeadersStart = 0;
if (req.upgrade) {
req.upgrade = req.method === 'CONNECT' ||
server.listenerCount('upgrade') > 0;
if (req.upgrade)
return 2;
}
state.incoming.push(req);
// If the writable end isn't consuming, then stop reading
// so that we don't become overwhelmed by a flood of
// pipelined requests that may never be resolved.
if (!socket._paused) {
const ws = socket._writableState;
if (ws.needDrain || state.outgoingData >= socket.writableHighWaterMark) {
socket._paused = true;
// We also need to pause the parser, but don't do that until after
// the call to execute, because we may still be processing the last
// chunk.
socket.pause();
}
}
const res = new server[kServerResponse](req);
res._onPendingData = updateOutgoingData.bind(undefined, socket, state);
res.shouldKeepAlive = keepAlive;
DTRACE_HTTP_SERVER_REQUEST(req, socket);
if (socket._httpMessage) {
// There are already pending outgoing res, append.
state.outgoing.push(res);
} else {
res.assignSocket(socket);
}
// When we're finished writing the response, check if this is the last
// response, if so destroy the socket.
res.on('finish',
resOnFinish.bind(undefined, req, res, socket, state, server));
if (req.headers.expect !== undefined &&
(req.httpVersionMajor === 1 && req.httpVersionMinor === 1)) {
if (continueExpression.test(req.headers.expect)) {
res._expect_continue = true;
if (server.listenerCount('checkContinue') > 0) {
server.emit('checkContinue', req, res);
} else {
res.writeContinue();
server.emit('request', req, res);
}
} else if (server.listenerCount('checkExpectation') > 0) {
server.emit('checkExpectation', req, res);
} else {
res.writeHead(417);
res.end();
}
} else {
server.emit('request', req, res);
}
return 0; // No special treatment.
}
这里传入的req也就是IncomingMessage对象了。这里有比较多的关于stream的知识,因为socket中的数据处理都是基于此的,先不细讲,主要看到有基于req构建res的代码,res的构造函数来自于server上挂载的kServerResponse符号,也就是和Incoming对应的继承于_http_outgoing.js
的ServerResponse,并监听其finish事件挂上finish回调。之后做的事就是res是否写入完毕,完成后就调用request事件,抛出req,res对象。
还记得request事件在哪里监听的吗?在实例化Server时,会监听request并挂上调用createServer传入的回调,至此就完成了服务器启动监听,接收请求,响应回调处理请求的过程。
看起来整个过程很流畅,但我还有一些问题故意没有说。
- req和res的内部结构,也就是涉及到
_http_incoming.js
和_http_outgoing.js
这两个子文件,以及两者的交互。 - parserOnBody和parserOnMessageComplete在请求解析完毕时都没有调用,调用时机?
小结
边读源码边写,感觉内容比我想象的多不少,还是分几篇来吧,这一篇就到server创建部分的准备为止。
-- EOF --