Node源码解析——cluster
Node自诞生起就以其非阻塞式IO和事件驱动的特性闻名,基于此的实现下,常规node应用的代码执行都是单线程的。虽然单线程的好处不少,但在目前普遍都是多核CPU的环境下,单线程无法充分利用其性能。同时,单线程模式下的容错率过低,一旦异常未捕获应用就会崩溃。以此为背景,就诞生了今天要谈到的node内置模块——cluster。
背景
通过cluster模块可以实现Node的多进程管理。Node采用的多进程架构为Master-Worker模式,也就是常说的主从模式,主进程(master进程)负责管理从进程(worker进程),属于一个服务性质的进程,通常用来处理客户端请求分发到合适的worker进程上,同时也需要监控worker进程的运行状态是否良好。worker进程则专注于业务处理。
一段基本的多进程创建代码如下。
var express = require('express');
var app = express();
const cluster = require('cluster'); // 实例化cluster
const numCPUs = require('os').cpus().length; // 获取CPU核数
if (cluster.isMaster) {
console.log(`主进程 ${process.pid} 正在运行`);
// 根据CPU核数fork子进程。
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}
cluster.on('exit', (worker, code, signal) => {
console.log(`子进程 ${worker.process.pid} 已退出`);
});
} else {
app.get('/', (req, res)=>{
res.send(`我是子进程${process.pid}.`);
})
app.listen(3000);
console.log(`子进程 ${process.pid} 已启动`);
}
那么就引申出如下几个问题:
- cluster模块如何区分master进程和worker进程?
- master进程需要处理客户端请求并实现调度,但在master进程中并没有创建服务器,是如何实现服务器的职责?
- 多个子进程如何实现共同监听同一个端口?
- 主从进程间如何通信?
带着这些问题,就进入到下面的源码分析吧。
源码
master
打开cluster模块的源码入口lib/cluster第一个问题就迎刃而解。
const childOrMaster = 'NODE_UNIQUE_ID' in process.env ? 'child' : 'master';
module.exports = require(`internal/cluster/${childOrMaster}`);
通过NODE_UNIQUE_ID
这和环境变量决定主从进程类型。进一步的,需要搞清楚这个环境变量是如何生成的。到/lib/internal/cluster/master的fork函数中看,很自然的,因为cluster是通过这个api开启进程创建。
cluster.fork = function(env) {
cluster.setupMaster();
const id = ++ids;
const workerProcess = createWorkerProcess(id, env);
const worker = new Worker({
id: id,
process: workerProcess
});
// ...
cluster.workers[worker.id] = worker;
return worker
}
再进入createWorkerProcess中看。
function createWorkerProcess(id, env) {
const workerEnv = { ...process.env, ...env, NODE_UNIQUE_ID: `${id}` };
// ...
return fork(cluster.settings.exec, cluster.settings.args, {
cwd: cluster.settings.cwd,
env: workerEnv,
// ...
});
}
NODE_UNIQUE_ID
是主进程调用cluster.fork创建子进程时传入的一个自增id,因此可以区分。同时我们也可以知道在主进程中通过这个id来标识并存储子进程。
解决了第一个问题之后再回看这个文件代码的全貌,暴露出如下3个接口:
- setupMaster
- fork
- disconnect
setupMaster主要用来初始化主进程参数,是fork执行的第一步。disconnect则用来断开所有存储的worker进程,那么核心内容还是看fork。
之前已经了解了fork中worker进程的创建,id设置与存储,现在把中间内容补齐再看。
cluster.fork = function(env) {
cluster.setupMaster();
const id = ++ids;
const workerProcess = createWorkerProcess(id, env);
const worker = new Worker({
id: id,
process: workerProcess
});
worker.on('message', function(message, handle) {
cluster.emit('message', this, message, handle);
});
worker.process.once('exit', (exitCode, signalCode) => {
/*
* Remove the worker from the workers list only
* if it has disconnected, otherwise we might
* still want to access it.
*/
if (!worker.isConnected()) {
removeHandlesForWorker(worker);
removeWorker(worker);
}
worker.exitedAfterDisconnect = !!worker.exitedAfterDisconnect;
worker.state = 'dead';
worker.emit('exit', exitCode, signalCode);
cluster.emit('exit', worker, exitCode, signalCode);
});
worker.process.once('disconnect', () => {
/*
* Now is a good time to remove the handles
* associated with this worker because it is
* not connected to the master anymore.
*/
removeHandlesForWorker(worker);
/*
* Remove the worker from the workers list only
* if its process has exited. Otherwise, we might
* still want to access it.
*/
if (worker.isDead())
removeWorker(worker);
worker.exitedAfterDisconnect = !!worker.exitedAfterDisconnect;
worker.state = 'disconnected';
worker.emit('disconnect');
cluster.emit('disconnect', worker);
});
worker.process.on('internalMessage', internal(worker, onmessage));
process.nextTick(emitForkNT, worker);
cluster.workers[worker.id] = worker;
return worker;
};
那么整个worker进程创建的流程就清楚了:
setupMaster
初始化主进程参数- 新建worker并设置id,该id除区分主从进程外,还作为主进程保存子进程的引用
- 在worker上添加一些事件监听
- 返回子进程实例
进入到setupMaster
中看细节:
cluster.setupMaster = function(options) {
const settings = {
args: process.argv.slice(2),
exec: process.argv[1],
execArgv: process.execArgv,
silent: false,
...cluster.settings,
...options
};
// Tell V8 to write profile data for each process to a separate file.
// Without --logfile=v8-%p.log, everything ends up in a single, unusable
// file. (Unusable because what V8 logs are memory addresses and each
// process has its own memory mappings.)
if (settings.execArgv.some((s) => s.startsWith('--prof')) &&
!settings.execArgv.some((s) => s.startsWith('--logfile='))) {
settings.execArgv = settings.execArgv.concat(['--logfile=v8-%p.log']);
}
cluster.settings = settings;
if (initialized === true)
return process.nextTick(setupSettingsNT, settings);
initialized = true;
schedulingPolicy = cluster.schedulingPolicy; // Freeze policy.
assert(schedulingPolicy === SCHED_NONE || schedulingPolicy === SCHED_RR,
`Bad cluster.schedulingPolicy: ${schedulingPolicy}`);
process.nextTick(setupSettingsNT, settings);
process.on('internalMessage', (message) => {
if (message.cmd !== 'NODE_DEBUG_ENABLED')
return;
for (const worker of ObjectValues(cluster.workers)) {
if (worker.state === 'online' || worker.state === 'listening') {
process._debugProcess(worker.process.pid);
} else {
worker.once('online', function() {
process._debugProcess(this.process.pid);
});
}
}
});
};
注意到除了参数处理以及setup
事件触发外,最重要的是监听了internalMessage
事件,在这里处理子进程的调试功能。当NODE_DEBUG_ENABLED
关闭时,关闭调试,否则再判断子进程是否处于online
和listening
状态决定是否开启调试。调试的激活靠process._debugProcess
方法,这个方法在windows和linux下有不同的实现。
setupMaster
执行完成后,就调用createWorkerProcess
创建子进程了,该函数除了一些参数处理外,最重要的就是调用了child_process
中的fork
函数创建子进程。这里也可以看出cluster
的底层就是child_process
。
那么在进入子进程的具体代码之前,我们还要解决一个主进程中未解决的问题,那就是主进程如何实现服务器的职责?不用多想,直接看实现服务器职责的接口——net
模块的listen
源码/lib/net 。无视细节判断,可以看到在listen的实现中,最终成功的执行都会进入到listenInCluster
中。看这个函数细节。
function listenInCluster(server, address, port, addressType,
backlog, fd, exclusive, flags) {
exclusive = !!exclusive;
if (cluster === undefined) cluster = require('cluster');
if (cluster.isMaster || exclusive) {
// Will create a new handle
// _listen2 sets up the listened handle, it is still named like this
// to avoid breaking code that wraps this method
server._listen2(address, port, addressType, backlog, fd, flags);
return;
}
const serverQuery = {
address: address,
port: port,
addressType: addressType,
fd: fd,
flags,
};
// Get the master's server handle, and listen on it
cluster._getServer(server, serverQuery, listenOnMasterHandle);
function listenOnMasterHandle(err, handle) {
err = checkBindError(err, port, handle);
if (err) {
const ex = exceptionWithHostPort(err, 'bind', address, port);
return server.emit('error', ex);
}
// Reuse master's server handle
server._handle = handle;
// _listen2 sets up the listened handle, it is still named like this
// to avoid breaking code that wraps this method
server._listen2(address, port, addressType, backlog, fd, flags);
}
}
这里会根据进程类型来分别处理。当是主进程时,直接通过_listen2这个服务器内部方法创建服务器需要的数据。当是worker进程时,会先通过worker进程的_getServer方法给主进程发送服务器需要的数据,由主进程去处理。现在就可以进入child文件lib/internal/cluster/child查看细节了。
child
子进程文件的核心方法也就两个——_setupWorker和_getServer。前者是初始化时的一些操作,包括创建Worker对象,监听disconnect和internalMessage事件以及向主进程回传online状态等。_getServer就是和主进程进行服务器信息通信了。看细节代码:
cluster._getServer = function(obj, options, cb) {
let address = options.address;
// Resolve unix socket paths to absolute paths
if (options.port < 0 && typeof address === 'string' &&
process.platform !== 'win32')
address = path.resolve(address);
const indexesKey = [address,
options.port,
options.addressType,
options.fd ].join(':');
let index = indexes.get(indexesKey);
if (index === undefined)
index = 0;
else
index++;
indexes.set(indexesKey, index);
const message = {
act: 'queryServer',
index,
data: null,
...options
};
message.address = address;
// Set custom data on handle (i.e. tls tickets key)
if (obj._getServerData)
message.data = obj._getServerData();
send(message, (reply, handle) => {
if (typeof obj._setServerData === 'function')
obj._setServerData(reply.data);
if (handle)
shared(reply, handle, indexesKey, cb); // Shared listen socket.
else
rr(reply, indexesKey, cb); // Round-robin.
});
obj.once('listening', () => {
cluster.worker.state = 'listening';
const address = obj.address();
message.act = 'listening';
message.port = (address && address.port) || options.port;
send(message);
});
};
可以看到是通过queryServer
这样一个动作发送到主进程处理。再回到master代码查看其处理逻辑。
function queryServer(worker, message) {
// Stop processing if worker already disconnecting
if (worker.exitedAfterDisconnect)
return;
const key = `${message.address}:${message.port}:${message.addressType}:` +
`${message.fd}:${message.index}`;
let handle = handles.get(key);
if (handle === undefined) {
let address = message.address;
// Find shortest path for unix sockets because of the ~100 byte limit
if (message.port < 0 && typeof address === 'string' &&
process.platform !== 'win32') {
address = path.relative(process.cwd(), address);
if (message.address.length < address.length)
address = message.address;
}
let constructor = RoundRobinHandle;
// UDP is exempt from round-robin connection balancing for what should
// be obvious reasons: it's connectionless. There is nothing to send to
// the workers except raw datagrams and that's pointless.
if (schedulingPolicy !== SCHED_RR ||
message.addressType === 'udp4' ||
message.addressType === 'udp6') {
constructor = SharedHandle;
}
handle = new constructor(key,
address,
message.port,
message.addressType,
message.fd,
message.flags);
handles.set(key, handle);
}
if (!handle.data)
handle.data = message.data;
// Set custom server data
handle.add(worker, (errno, reply, handle) => {
const { data } = handles.get(key);
if (errno)
handles.delete(key); // Gives other workers a chance to retry.
send(worker, {
errno,
key,
ack: message.seq,
data,
...reply
}, handle);
});
}
这里通过RoundRobinHandle这样一个构造函数根据worker进程传递的服务器信息,使用net模块创建服务器。该函数位于lib/internal/cluster/round_robin_handle
function RoundRobinHandle(key, address, port, addressType, fd, flags) {
this.key = key;
this.all = new Map();
this.free = [];
this.handles = [];
this.handle = null;
this.server = net.createServer(assert.fail);
if (fd >= 0)
this.server.listen({ fd });
else if (port >= 0) {
this.server.listen({
port,
host: address,
// Currently, net module only supports `ipv6Only` option in `flags`.
ipv6Only: Boolean(flags & constants.UV_TCP_IPV6ONLY),
});
} else
this.server.listen(address); // UNIX socket path.
this.server.once('listening', () => {
this.handle = this.server._handle;
this.handle.onconnection = (err, handle) => this.distribute(err, handle);
this.server._handle = null;
this.server = null;
});
}
至此就明白了为什么master进程不调用listen也可以处理服务器请求,因为fork出子进程时,子进程通过IPC将创建服务器的请求发送到主进程隐式创建了。
但这样有个问题,为什么所有的子进程创建的服务器都可以监听同一端口?
实际上子进程中的服务器并没有构建真正的socket句柄,关键就在worker进程才会有的_getServer方法中。
send(message, (reply, handle) => {
if (typeof obj._setServerData === 'function')
obj._setServerData(reply.data);
if (handle)
shared(reply, handle, indexesKey, cb); // Shared listen socket.
else
rr(reply, indexesKey, cb); // Round-robin.
});
这里worker进程创建成功通知主进程时,主进程会返回一个空handle,worker进程进入rr函数。
function rr(message, indexesKey, cb) {
if (message.errno)
return cb(message.errno, null);
let key = message.key;
function listen(backlog) {
// TODO(bnoordhuis) Send a message to the master that tells it to
// update the backlog size. The actual backlog should probably be
// the largest requested size by any worker.
return 0;
}
function close() {
// lib/net.js treats server._handle.close() as effectively synchronous.
// That means there is a time window between the call to close() and
// the ack by the master process in which we can still receive handles.
// onconnection() below handles that by sending those handles back to
// the master.
if (key === undefined)
return;
send({ act: 'close', key });
handles.delete(key);
indexes.delete(indexesKey);
key = undefined;
}
function getsockname(out) {
if (key)
ObjectAssign(out, message.sockname);
return 0;
}
// Faux handle. Mimics a TCPWrap with just enough fidelity to get away
// with it. Fools net.Server into thinking that it's backed by a real
// handle. Use a noop function for ref() and unref() because the control
// channel is going to keep the worker alive anyway.
const handle = { close, listen, ref: noop, unref: noop };
if (message.sockname) {
handle.getsockname = getsockname; // TCP handles only.
}
assert(handles.has(key) === false);
handles.set(key, handle);
cb(0, handle);
}
可以看到在rr中handle的listen被hack为return 0,并不会真正的监听端口,实际上真正监听端口的只有主进程。那么下一个问题又来了,既然子进程没有创建底层socket,主进程和子进程又是如何通信处理请求并实现负载均衡呢?实际上这个答案在上面的源代码中已经出现很多次了——RoundRobin,一种请求调度策略。核心同样在主进程隐式创建服务器的逻辑中。lib/internal/cluster/round_robin_handle
function RoundRobinHandle(key, address, port, addressType, fd, flags) {
// ...
this.free = [];
this.handles = [];
this.handle = null;
this.server = net.createServer(assert.fail);
// ...
this.server.once('listening', () => {
this.handle = this.server._handle;
this.handle.onconnection = (err, handle) => this.distribute(err, handle);
this.server._handle = null;
this.server = null;
});
}
RoundRobinHandle.prototype.distribute = function(err, handle) {
this.handles.push(handle);
const worker = this.free.shift();
if (worker)
this.handoff(worker);
};
RoundRobinHandle.prototype.handoff = function(worker) {
if (this.all.has(worker.id) === false) {
return; // Worker is closing (or has closed) the server.
}
const handle = this.handles.shift();
if (handle === undefined) {
this.free.push(worker); // Add to ready queue again.
return;
}
const message = { act: 'newconn', key: this.key };
sendHelper(worker.process, message, handle, (reply) => {
if (reply.accepted)
handle.close();
else
this.distribute(0, handle); // Worker is shutting down. Send to another.
this.handoff(worker);
});
};
首先实例化时会将主进程的_handle句柄挂载到自身。当这个句柄被连接时,回调会调用distribute
来分发连接上来的客户端socket句柄,把这个句柄加入到待处理队列,然后从空闲的worker中取出一个传到handoff中执行。handoff取出下一个待处理的客户端socket,通过sendHelper这样一个通信工具函数发送newconn消息和句柄到对应worker处理。如果worker处理未成功,则再次调用distribute给下一个空闲worker。至此就完成了主进程分发请求到worker进程的通信。
小结
这篇文章简单分析了node中cluster这个实现多进程的模块。值得关注的点有这些,首先底层用到了child_process,关键内容在于master和child之间的通信,以及负载均衡采用的RoundRobin调度策略。cluster的运作和net模块的server关联也非常紧密,涉及到socket相关的监听处理。下篇node文章可以写写net模块的源码。
-- EOF --