Last Surprise Lyn
  1. 1 Last Surprise Lyn
  2. 2 Time Bomb Veela
  3. 3 Hypocrite Nush
  4. 4 Warcry mpi
  5. 5 かかってこいよ NakamuraEmi
  6. 6 Libertus Chen-U
  7. 7 One Last You Jen Bird
  8. 8 Flower Of Life 发热巫女
  9. 9 BREAK IN TO BREAK OUT Lyn
  10. 10 Life Will Change Lyn
2020-01-27 15:19:04

Node源码解析——cluster

Node自诞生起就以其非阻塞式IO和事件驱动的特性闻名,基于此的实现下,常规node应用的代码执行都是单线程的。虽然单线程的好处不少,但在目前普遍都是多核CPU的环境下,单线程无法充分利用其性能。同时,单线程模式下的容错率过低,一旦异常未捕获应用就会崩溃。以此为背景,就诞生了今天要谈到的node内置模块——cluster。

背景

通过cluster模块可以实现Node的多进程管理。Node采用的多进程架构为Master-Worker模式,也就是常说的主从模式,主进程(master进程)负责管理从进程(worker进程),属于一个服务性质的进程,通常用来处理客户端请求分发到合适的worker进程上,同时也需要监控worker进程的运行状态是否良好。worker进程则专注于业务处理。

一段基本的多进程创建代码如下。

var express = require('express');
var app = express();
const cluster = require('cluster');  // 实例化cluster
const numCPUs = require('os').cpus().length;  // 获取CPU核数

if (cluster.isMaster) {
  console.log(`主进程 ${process.pid} 正在运行`);
  // 根据CPU核数fork子进程。
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }
  cluster.on('exit', (worker, code, signal) => {
    console.log(`子进程 ${worker.process.pid} 已退出`);
  });
} else {
  app.get('/', (req, res)=>{
    res.send(`我是子进程${process.pid}.`);
  })
  app.listen(3000);
  console.log(`子进程 ${process.pid} 已启动`);
}

那么就引申出如下几个问题:

  • cluster模块如何区分master进程和worker进程?
  • master进程需要处理客户端请求并实现调度,但在master进程中并没有创建服务器,是如何实现服务器的职责?
  • 多个子进程如何实现共同监听同一个端口?
  • 主从进程间如何通信?

带着这些问题,就进入到下面的源码分析吧。

源码

master

打开cluster模块的源码入口lib/cluster第一个问题就迎刃而解。

const childOrMaster = 'NODE_UNIQUE_ID' in process.env ? 'child' : 'master';
module.exports = require(`internal/cluster/${childOrMaster}`);

通过NODE_UNIQUE_ID这和环境变量决定主从进程类型。进一步的,需要搞清楚这个环境变量是如何生成的。到/lib/internal/cluster/master的fork函数中看,很自然的,因为cluster是通过这个api开启进程创建。


cluster.fork = function(env) {
  cluster.setupMaster();
  const id = ++ids;
  const workerProcess = createWorkerProcess(id, env);
  const worker = new Worker({
    id: id,
    process: workerProcess
  });
  // ...
  cluster.workers[worker.id] = worker;
  return worker
}

再进入createWorkerProcess中看。


function createWorkerProcess(id, env) {
  const workerEnv = { ...process.env, ...env, NODE_UNIQUE_ID: `${id}` };
    // ...

  return fork(cluster.settings.exec, cluster.settings.args, {
    cwd: cluster.settings.cwd,
    env: workerEnv,
       // ...
  });
}

NODE_UNIQUE_ID是主进程调用cluster.fork创建子进程时传入的一个自增id,因此可以区分。同时我们也可以知道在主进程中通过这个id来标识并存储子进程。

解决了第一个问题之后再回看这个文件代码的全貌,暴露出如下3个接口:

  • setupMaster
  • fork
  • disconnect

setupMaster主要用来初始化主进程参数,是fork执行的第一步。disconnect则用来断开所有存储的worker进程,那么核心内容还是看fork。

之前已经了解了fork中worker进程的创建,id设置与存储,现在把中间内容补齐再看。

cluster.fork = function(env) {
  cluster.setupMaster();
  const id = ++ids;
  const workerProcess = createWorkerProcess(id, env);
  const worker = new Worker({
    id: id,
    process: workerProcess
  });

  worker.on('message', function(message, handle) {
    cluster.emit('message', this, message, handle);
  });

  worker.process.once('exit', (exitCode, signalCode) => {
    /*
     * Remove the worker from the workers list only
     * if it has disconnected, otherwise we might
     * still want to access it.
     */
    if (!worker.isConnected()) {
      removeHandlesForWorker(worker);
      removeWorker(worker);
    }

    worker.exitedAfterDisconnect = !!worker.exitedAfterDisconnect;
    worker.state = 'dead';
    worker.emit('exit', exitCode, signalCode);
    cluster.emit('exit', worker, exitCode, signalCode);
  });

  worker.process.once('disconnect', () => {
    /*
     * Now is a good time to remove the handles
     * associated with this worker because it is
     * not connected to the master anymore.
     */
    removeHandlesForWorker(worker);

    /*
     * Remove the worker from the workers list only
     * if its process has exited. Otherwise, we might
     * still want to access it.
     */
    if (worker.isDead())
      removeWorker(worker);

    worker.exitedAfterDisconnect = !!worker.exitedAfterDisconnect;
    worker.state = 'disconnected';
    worker.emit('disconnect');
    cluster.emit('disconnect', worker);
  });

  worker.process.on('internalMessage', internal(worker, onmessage));
  process.nextTick(emitForkNT, worker);
  cluster.workers[worker.id] = worker;
  return worker;
};

那么整个worker进程创建的流程就清楚了:

  1. setupMaster初始化主进程参数
  2. 新建worker并设置id,该id除区分主从进程外,还作为主进程保存子进程的引用
  3. 在worker上添加一些事件监听
  4. 返回子进程实例

进入到setupMaster中看细节:

cluster.setupMaster = function(options) {
  const settings = {
    args: process.argv.slice(2),
    exec: process.argv[1],
    execArgv: process.execArgv,
    silent: false,
    ...cluster.settings,
    ...options
  };

  // Tell V8 to write profile data for each process to a separate file.
  // Without --logfile=v8-%p.log, everything ends up in a single, unusable
  // file. (Unusable because what V8 logs are memory addresses and each
  // process has its own memory mappings.)
  if (settings.execArgv.some((s) => s.startsWith('--prof')) &&
      !settings.execArgv.some((s) => s.startsWith('--logfile='))) {
    settings.execArgv = settings.execArgv.concat(['--logfile=v8-%p.log']);
  }

  cluster.settings = settings;

  if (initialized === true)
    return process.nextTick(setupSettingsNT, settings);

  initialized = true;
  schedulingPolicy = cluster.schedulingPolicy;  // Freeze policy.
  assert(schedulingPolicy === SCHED_NONE || schedulingPolicy === SCHED_RR,
         `Bad cluster.schedulingPolicy: ${schedulingPolicy}`);

  process.nextTick(setupSettingsNT, settings);

  process.on('internalMessage', (message) => {
    if (message.cmd !== 'NODE_DEBUG_ENABLED')
      return;

    for (const worker of ObjectValues(cluster.workers)) {
      if (worker.state === 'online' || worker.state === 'listening') {
        process._debugProcess(worker.process.pid);
      } else {
        worker.once('online', function() {
          process._debugProcess(this.process.pid);
        });
      }
    }
  });
};

注意到除了参数处理以及setup事件触发外,最重要的是监听了internalMessage事件,在这里处理子进程的调试功能。当NODE_DEBUG_ENABLED关闭时,关闭调试,否则再判断子进程是否处于onlinelistening状态决定是否开启调试。调试的激活靠process._debugProcess方法,这个方法在windows和linux下有不同的实现。

setupMaster执行完成后,就调用createWorkerProcess创建子进程了,该函数除了一些参数处理外,最重要的就是调用了child_process中的fork函数创建子进程。这里也可以看出cluster的底层就是child_process

那么在进入子进程的具体代码之前,我们还要解决一个主进程中未解决的问题,那就是主进程如何实现服务器的职责?不用多想,直接看实现服务器职责的接口——net模块的listen源码/lib/net 。无视细节判断,可以看到在listen的实现中,最终成功的执行都会进入到listenInCluster中。看这个函数细节。

function listenInCluster(server, address, port, addressType,
                         backlog, fd, exclusive, flags) {
  exclusive = !!exclusive;

  if (cluster === undefined) cluster = require('cluster');

  if (cluster.isMaster || exclusive) {
    // Will create a new handle
    // _listen2 sets up the listened handle, it is still named like this
    // to avoid breaking code that wraps this method
    server._listen2(address, port, addressType, backlog, fd, flags);
    return;
  }

  const serverQuery = {
    address: address,
    port: port,
    addressType: addressType,
    fd: fd,
    flags,
  };

  // Get the master's server handle, and listen on it
  cluster._getServer(server, serverQuery, listenOnMasterHandle);

  function listenOnMasterHandle(err, handle) {
    err = checkBindError(err, port, handle);

    if (err) {
      const ex = exceptionWithHostPort(err, 'bind', address, port);
      return server.emit('error', ex);
    }

    // Reuse master's server handle
    server._handle = handle;
    // _listen2 sets up the listened handle, it is still named like this
    // to avoid breaking code that wraps this method
    server._listen2(address, port, addressType, backlog, fd, flags);
  }
}

这里会根据进程类型来分别处理。当是主进程时,直接通过_listen2这个服务器内部方法创建服务器需要的数据。当是worker进程时,会先通过worker进程的_getServer方法给主进程发送服务器需要的数据,由主进程去处理。现在就可以进入child文件lib/internal/cluster/child查看细节了。

child

子进程文件的核心方法也就两个——_setupWorker和_getServer。前者是初始化时的一些操作,包括创建Worker对象,监听disconnect和internalMessage事件以及向主进程回传online状态等。_getServer就是和主进程进行服务器信息通信了。看细节代码:

cluster._getServer = function(obj, options, cb) {
  let address = options.address;

  // Resolve unix socket paths to absolute paths
  if (options.port < 0 && typeof address === 'string' &&
      process.platform !== 'win32')
    address = path.resolve(address);

  const indexesKey = [address,
                      options.port,
                      options.addressType,
                      options.fd ].join(':');

  let index = indexes.get(indexesKey);

  if (index === undefined)
    index = 0;
  else
    index++;

  indexes.set(indexesKey, index);

  const message = {
    act: 'queryServer',
    index,
    data: null,
    ...options
  };

  message.address = address;

  // Set custom data on handle (i.e. tls tickets key)
  if (obj._getServerData)
    message.data = obj._getServerData();

  send(message, (reply, handle) => {
    if (typeof obj._setServerData === 'function')
      obj._setServerData(reply.data);

    if (handle)
      shared(reply, handle, indexesKey, cb);  // Shared listen socket.
    else
      rr(reply, indexesKey, cb);              // Round-robin.
  });

  obj.once('listening', () => {
    cluster.worker.state = 'listening';
    const address = obj.address();
    message.act = 'listening';
    message.port = (address && address.port) || options.port;
    send(message);
  });
};

可以看到是通过queryServer这样一个动作发送到主进程处理。再回到master代码查看其处理逻辑。

function queryServer(worker, message) {
  // Stop processing if worker already disconnecting
  if (worker.exitedAfterDisconnect)
    return;

  const key = `${message.address}:${message.port}:${message.addressType}:` +
              `${message.fd}:${message.index}`;
  let handle = handles.get(key);

  if (handle === undefined) {
    let address = message.address;

    // Find shortest path for unix sockets because of the ~100 byte limit
    if (message.port < 0 && typeof address === 'string' &&
        process.platform !== 'win32') {

      address = path.relative(process.cwd(), address);

      if (message.address.length < address.length)
        address = message.address;
    }

    let constructor = RoundRobinHandle;
    // UDP is exempt from round-robin connection balancing for what should
    // be obvious reasons: it's connectionless. There is nothing to send to
    // the workers except raw datagrams and that's pointless.
    if (schedulingPolicy !== SCHED_RR ||
        message.addressType === 'udp4' ||
        message.addressType === 'udp6') {
      constructor = SharedHandle;
    }

    handle = new constructor(key,
                             address,
                             message.port,
                             message.addressType,
                             message.fd,
                             message.flags);
    handles.set(key, handle);
  }

  if (!handle.data)
    handle.data = message.data;

  // Set custom server data
  handle.add(worker, (errno, reply, handle) => {
    const { data } = handles.get(key);

    if (errno)
      handles.delete(key);  // Gives other workers a chance to retry.

    send(worker, {
      errno,
      key,
      ack: message.seq,
      data,
      ...reply
    }, handle);
  });
}

这里通过RoundRobinHandle这样一个构造函数根据worker进程传递的服务器信息,使用net模块创建服务器。该函数位于lib/internal/cluster/round_robin_handle

function RoundRobinHandle(key, address, port, addressType, fd, flags) {
  this.key = key;
  this.all = new Map();
  this.free = [];
  this.handles = [];
  this.handle = null;
  this.server = net.createServer(assert.fail);

  if (fd >= 0)
    this.server.listen({ fd });
  else if (port >= 0) {
    this.server.listen({
      port,
      host: address,
      // Currently, net module only supports `ipv6Only` option in `flags`.
      ipv6Only: Boolean(flags & constants.UV_TCP_IPV6ONLY),
    });
  } else
    this.server.listen(address);  // UNIX socket path.

  this.server.once('listening', () => {
    this.handle = this.server._handle;
    this.handle.onconnection = (err, handle) => this.distribute(err, handle);
    this.server._handle = null;
    this.server = null;
  });
}

至此就明白了为什么master进程不调用listen也可以处理服务器请求,因为fork出子进程时,子进程通过IPC将创建服务器的请求发送到主进程隐式创建了。

但这样有个问题,为什么所有的子进程创建的服务器都可以监听同一端口?

实际上子进程中的服务器并没有构建真正的socket句柄,关键就在worker进程才会有的_getServer方法中。

 send(message, (reply, handle) => {
   if (typeof obj._setServerData === 'function')
     obj._setServerData(reply.data);

   if (handle)
     shared(reply, handle, indexesKey, cb);  // Shared listen socket.
   else
     rr(reply, indexesKey, cb);              // Round-robin.
 });

这里worker进程创建成功通知主进程时,主进程会返回一个空handle,worker进程进入rr函数。

function rr(message, indexesKey, cb) {
  if (message.errno)
    return cb(message.errno, null);

  let key = message.key;

  function listen(backlog) {
    // TODO(bnoordhuis) Send a message to the master that tells it to
    // update the backlog size. The actual backlog should probably be
    // the largest requested size by any worker.
    return 0;
  }

  function close() {
    // lib/net.js treats server._handle.close() as effectively synchronous.
    // That means there is a time window between the call to close() and
    // the ack by the master process in which we can still receive handles.
    // onconnection() below handles that by sending those handles back to
    // the master.
    if (key === undefined)
      return;

    send({ act: 'close', key });
    handles.delete(key);
    indexes.delete(indexesKey);
    key = undefined;
  }

  function getsockname(out) {
    if (key)
      ObjectAssign(out, message.sockname);

    return 0;
  }

  // Faux handle. Mimics a TCPWrap with just enough fidelity to get away
  // with it. Fools net.Server into thinking that it's backed by a real
  // handle. Use a noop function for ref() and unref() because the control
  // channel is going to keep the worker alive anyway.
  const handle = { close, listen, ref: noop, unref: noop };

  if (message.sockname) {
    handle.getsockname = getsockname;  // TCP handles only.
  }

  assert(handles.has(key) === false);
  handles.set(key, handle);
  cb(0, handle);
}

可以看到在rr中handle的listen被hack为return 0,并不会真正的监听端口,实际上真正监听端口的只有主进程。那么下一个问题又来了,既然子进程没有创建底层socket,主进程和子进程又是如何通信处理请求并实现负载均衡呢?实际上这个答案在上面的源代码中已经出现很多次了——RoundRobin,一种请求调度策略。核心同样在主进程隐式创建服务器的逻辑中。lib/internal/cluster/round_robin_handle

function RoundRobinHandle(key, address, port, addressType, fd, flags) {
    // ...
  this.free = [];
  this.handles = [];
  this.handle = null;
  this.server = net.createServer(assert.fail);

  // ...
  this.server.once('listening', () => {
    this.handle = this.server._handle;
    this.handle.onconnection = (err, handle) => this.distribute(err, handle);
    this.server._handle = null;
    this.server = null;
  });
}

RoundRobinHandle.prototype.distribute = function(err, handle) {
  this.handles.push(handle);
  const worker = this.free.shift();

  if (worker)
    this.handoff(worker);
};

RoundRobinHandle.prototype.handoff = function(worker) {
  if (this.all.has(worker.id) === false) {
    return;  // Worker is closing (or has closed) the server.
  }

  const handle = this.handles.shift();

  if (handle === undefined) {
    this.free.push(worker);  // Add to ready queue again.
    return;
  }

  const message = { act: 'newconn', key: this.key };

  sendHelper(worker.process, message, handle, (reply) => {
    if (reply.accepted)
      handle.close();
    else
      this.distribute(0, handle);  // Worker is shutting down. Send to another.

    this.handoff(worker);
  });
};

首先实例化时会将主进程的_handle句柄挂载到自身。当这个句柄被连接时,回调会调用distribute来分发连接上来的客户端socket句柄,把这个句柄加入到待处理队列,然后从空闲的worker中取出一个传到handoff中执行。handoff取出下一个待处理的客户端socket,通过sendHelper这样一个通信工具函数发送newconn消息和句柄到对应worker处理。如果worker处理未成功,则再次调用distribute给下一个空闲worker。至此就完成了主进程分发请求到worker进程的通信。

小结

这篇文章简单分析了node中cluster这个实现多进程的模块。值得关注的点有这些,首先底层用到了child_process,关键内容在于master和child之间的通信,以及负载均衡采用的RoundRobin调度策略。cluster的运作和net模块的server关联也非常紧密,涉及到socket相关的监听处理。下篇node文章可以写写net模块的源码。

-- EOF --

添加在分类「 前端开发 」下,并被添加 「Node.js」 标签。