Hypocrite Nush
  1. 1 Hypocrite Nush
  2. 2 Flower Of Life 发热巫女
  3. 3 かかってこいよ NakamuraEmi
  4. 4 BREAK IN TO BREAK OUT Lyn
  5. 5 Time Bomb Veela
  6. 6 Libertus Chen-U
  7. 7 Last Surprise Lyn
  8. 8 Life Will Change Lyn
  9. 9 One Last You Jen Bird
  10. 10 Warcry mpi
2020-02-23 17:33:36

Node源码解析——udp

源码阅读最好遵循一个从架构到细节的流程。现在我们已经了解了node的整体架构以及模块间的职责划分。接下来可以再深入一步,看看node内部模块间又有怎样的区别和联系。这篇文章就从之前写过的代表tcp的net的平行模块入手——udp相关模块。

我们知道net基于tcp,从tcp延伸开来又有一些常用网络模块,之上有http/https,平行有udp,udp之上又有dns。当然这些常用网络模块在node中也有实现,其架构如下图。

那么接下来就一一看其实现。这篇文章先看udp相关。

dgram

在node中基于tcp实现的网络模块为net,同样平行的自然有基于udp实现的网络模块,那就是dgram。相比于tcp,udp是无连接,不可靠的,流程也会简单很多,服务端和客户端只需要创建socket,绑定网络信息,然后发送和接受数据即可。我们一步一步看。先看创建入口,在lib/dgram.js的createSocket。

function createSocket(type, listener) {
  return new Socket(type, listener);
}

很简单,就是new一个udp的socket实例,这里传入两个参数,其一是该socket的相关选项信息,比如是udp4或upd6,是否仅支持ipv6等,其二就是该socket响应message事件的回调。看一下这个socket构造函数内容。

function Socket(type, listener) {
  EventEmitter.call(this);
  let lookup;
  let recvBufferSize;
  let sendBufferSize;

  let options;
  if (type !== null && typeof type === 'object') {
    options = type;
    type = options.type;
    lookup = options.lookup;
    recvBufferSize = options.recvBufferSize;
    sendBufferSize = options.sendBufferSize;
  }

  const handle = newHandle(type, lookup);
  handle[owner_symbol] = this;

  this[async_id_symbol] = handle.getAsyncId();
  this.type = type;

  if (typeof listener === 'function')
    this.on('message', listener);

  this[kStateSymbol] = {
    handle,
    receiving: false,
    bindState: BIND_STATE_UNBOUND,
    connectState: CONNECT_STATE_DISCONNECTED,
    queue: undefined,
    reuseAddr: options && options.reuseAddr, // Use UV_UDP_REUSEADDR if true.
    ipv6Only: options && options.ipv6Only,
    recvBufferSize,
    sendBufferSize
  };
}
ObjectSetPrototypeOf(Socket.prototype, EventEmitter.prototype);
ObjectSetPrototypeOf(Socket, EventEmitter);

可以注意到,这个socket没有继承stream,而是直接从EventEmitter继承而来。同时其handle创建来自于newHandle这个函数,追查到internal/dgram.js

function lookup4(lookup, address, callback) {
  return lookup(address || '127.0.0.1', 4, callback);
}


function lookup6(lookup, address, callback) {
  return lookup(address || '::1', 6, callback);
}

function newHandle(type, lookup) {
  if (lookup === undefined) {
    if (dns === undefined) {
      dns = require('dns');
    }

    lookup = dns.lookup;
  } else if (typeof lookup !== 'function') {
    throw new ERR_INVALID_ARG_TYPE('lookup', 'Function', lookup);
  }

  if (type === 'udp4') {
    const handle = new UDP();

    handle.lookup = lookup4.bind(handle, lookup);
    return handle;
  }

  if (type === 'udp6') {
    const handle = new UDP();

    handle.lookup = lookup6.bind(handle, lookup);
    handle.bind = handle.bind6;
    handle.connect = handle.connect6;
    handle.send = handle.send6;
    return handle;
  }

  throw new ERR_SOCKET_BAD_TYPE();
}

这里会设定handle用来进行IP地址查询的函数,默认是dns查询。根据udp4或udp6会用UDP类创建实例并包装查询函数传入符合规范的参数。UDP则是来自C++编写的udp_wrap了,net模块中也用到了对应的tcp_wrap。到这里也应该明白了,网络通信的底层工作实际上都会聚焦到tcp_wrap,udp_wrap这样的C++模块完成,我们JS模块部分主要就是起一个连接和参数处理作用,职责划分非常明确。

看完了创建,就可以进入地址绑定流程了,看原型上的bind方法。

Socket.prototype.bind = function(port_, address_ /* , callback */) {
  // ...
  startListening(this);
  return this
}

函数很长,但都是些错误处理,参数处理以及worker进程端口共享之类的判断不属于流程重点,只需要知道最后会通过startListening启动监听。

function startListening(socket) {
  const state = socket[kStateSymbol];

  state.handle.onmessage = onMessage;
  // Todo: handle errors
  state.handle.recvStart();
  state.receiving = true;
  state.bindState = BIND_STATE_BOUND;

  if (state.recvBufferSize)
    bufferSize(socket, state.recvBufferSize, RECV_BUFFER);

  if (state.sendBufferSize)
    bufferSize(socket, state.sendBufferSize, SEND_BUFFER);

  socket.emit('listening');
}

在handle上设置onMessage接受数据,并调用handle的recvStart启动udp数据接收,再在JS抛出listening事件。这个recvStart也就是在udp_wrap.cc里的实现了。

void UDPWrap::RecvStart(const FunctionCallbackInfo<Value>& args) {
  UDPWrap* wrap;
  ASSIGN_OR_RETURN_UNWRAP(&wrap,
                          args.Holder(),
                          args.GetReturnValue().Set(UV_EBADF));
  int err = uv_udp_recv_start(&wrap->handle_, OnAlloc, OnRecv);
  // UV_EALREADY means that the socket is already bound but that's okay
  if (err == UV_EALREADY)
    err = 0;
  args.GetReturnValue().Set(err);
}

这里也能看出来在C++模块中用到了libuv的网络通信实现异步。端口绑定完成,下一步就是通过send方法进行数据传输。

Socket.prototype.send = function(buffer,
                                 offset,
                                 length,
                                 port,
                                 address,
                                 callback) {

    // ...
     const afterDns = (ex, ip) => {
    defaultTriggerAsyncIdScope(
      this[async_id_symbol],
      doSend,
      ex, this, ip, list, address, port, callback
    );
  };

  if (!connected) {
    state.handle.lookup(address, afterDns);
  } else {
    afterDns(null, null);
  }
};

同样是一段很长的参数和错误处理代码,提取出最关键的部分,会使用handle的lookup函数解析地址并回调afterDns函数,这里会用node的async_hooks模块记录这个异步动作,并调用doSend方法。

function doSend(ex, self, ip, list, address, port, callback) {
  const state = self[kStateSymbol];

  if (ex) {
    if (typeof callback === 'function') {
      process.nextTick(callback, ex);
      return;
    }

    process.nextTick(() => self.emit('error', ex));
    return;
  } else if (!state.handle) {
    return;
  }

  const req = new SendWrap();
  req.list = list;  // Keep reference alive.
  req.address = address;
  req.port = port;
  if (callback) {
    req.callback = callback;
    req.oncomplete = afterSend;
  }

  let err;
  if (port)
    err = state.handle.send(req, list, list.length, port, ip, !!callback);
  else
    err = state.handle.send(req, list, list.length, !!callback);

  if (err >= 1) {
    // Synchronous finish. The return code is msg_length + 1 so that we can
    // distinguish between synchronous success and asynchronous success.
    if (callback)
      process.nextTick(callback, null, err - 1);
    return;
  }

  if (err && callback) {
    // Don't emit as error, dgram_legacy.js compatibility
    const ex = exceptionWithHostPort(err, 'send', address, port);
    process.nextTick(callback, ex);
  }
}

doSend中,会把发送的数据用udp_wrap的SendWrap包装一层统一请求操作规范,再通过handle的send方法调用libuv的对应模块发送。SendWrap的结构可以在req_wrap.h中找到,其继承自AsyncWrap和ReqWrapBase。

class ReqWrapBase {
 public:
  explicit inline ReqWrapBase(Environment* env);

  virtual ~ReqWrapBase() = default;

  virtual void Cancel() = 0;
  virtual AsyncWrap* GetAsyncWrap() = 0;

 private:
  friend int GenDebugSymbols();
  friend class Environment;

  ListNode<ReqWrapBase> req_wrap_queue_;
};

再回到udp_wrap看send方法。

void UDPWrap::DoSend(const FunctionCallbackInfo<Value>& args, int family) {
  Environment* env = Environment::GetCurrent(args);

  UDPWrap* wrap;
  ASSIGN_OR_RETURN_UNWRAP(&wrap,
                          args.Holder(),
                          args.GetReturnValue().Set(UV_EBADF));

  CHECK(args.Length() == 4 || args.Length() == 6);
  CHECK(args[0]->IsObject());
  CHECK(args[1]->IsArray());
  CHECK(args[2]->IsUint32());

  bool sendto = args.Length() == 6;
  if (sendto) {
    // send(req, list, list.length, port, address, hasCallback)
    CHECK(args[3]->IsUint32());
    CHECK(args[4]->IsString());
    CHECK(args[5]->IsBoolean());
  } else {
    // send(req, list, list.length, hasCallback)
    CHECK(args[3]->IsBoolean());
  }

  Local<Object> req_wrap_obj = args[0].As<Object>();
  Local<Array> chunks = args[1].As<Array>();
  // it is faster to fetch the length of the
  // array in js-land
  size_t count = args[2].As<Uint32>()->Value();
  const bool have_callback = sendto ? args[5]->IsTrue() : args[3]->IsTrue();

  size_t msg_size = 0;

  MaybeStackBuffer<uv_buf_t, 16> bufs(count);

  // construct uv_buf_t array
  for (size_t i = 0; i < count; i++) {
    Local<Value> chunk = chunks->Get(env->context(), i).ToLocalChecked();

    size_t length = Buffer::Length(chunk);

    bufs[i] = uv_buf_init(Buffer::Data(chunk), length);
    msg_size += length;
  }

  int err = 0;
  struct sockaddr_storage addr_storage;
  sockaddr* addr = nullptr;
  if (sendto) {
    const unsigned short port = args[3].As<Uint32>()->Value();
    node::Utf8Value address(env->isolate(), args[4]);
    err = sockaddr_for_family(family, address.out(), port, &addr_storage);
    if (err == 0) {
      addr = reinterpret_cast<sockaddr*>(&addr_storage);
    }
  }

  uv_buf_t* bufs_ptr = *bufs;
  if (err == 0 && !UNLIKELY(env->options()->test_udp_no_try_send)) {
    err = uv_udp_try_send(&wrap->handle_, bufs_ptr, count, addr);
    if (err == UV_ENOSYS || err == UV_EAGAIN) {
      err = 0;
    } else if (err >= 0) {
      size_t sent = err;
      while (count > 0 && bufs_ptr->len <= sent) {
        sent -= bufs_ptr->len;
        bufs_ptr++;
        count--;
      }
      if (count > 0) {
        CHECK_LT(sent, bufs_ptr->len);
        bufs_ptr->base += sent;
        bufs_ptr->len -= sent;
      } else {
        CHECK_EQ(static_cast<size_t>(err), msg_size);
        // + 1 so that the JS side can distinguish 0-length async sends from
        // 0-length sync sends.
        args.GetReturnValue().Set(static_cast<uint32_t>(msg_size) + 1);
        return;
      }
    }
  }

  if (err == 0) {
    AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(wrap);
    SendWrap* req_wrap = new SendWrap(env, req_wrap_obj, have_callback);
    req_wrap->msg_size = msg_size;

    err = req_wrap->Dispatch(uv_udp_send,
                             &wrap->handle_,
                             bufs_ptr,
                             count,
                             addr,
                             OnSend);
    if (err)
      delete req_wrap;
  }

  args.GetReturnValue().Set(err);
}

通过之前构造好的的SendWrap实例调用dispatch发送数据,而在SendWrap内部就是调用了libuv的uv_udp_send进行具体的数据操作了,当数据发送完成后,会调用onSend触发一个complete。

void UDPWrap::OnSend(uv_udp_send_t* req, int status) {
  std::unique_ptr<SendWrap> req_wrap{static_cast<SendWrap*>(req->data)};
  if (req_wrap->have_callback()) {
    Environment* env = req_wrap->env();
    HandleScope handle_scope(env->isolate());
    Context::Scope context_scope(env->context());
    Local<Value> arg[] = {
      Integer::New(env->isolate(), status),
      Integer::New(env->isolate(), req_wrap->msg_size),
    };
    req_wrap->MakeCallback(env->oncomplete_string(), 2, arg);
  }

onComplete在上面的JS模块lib/dgram.js的doSend中赋值为afterSend。

function afterSend(err, sent) {
  if (err) {
    err = exceptionWithHostPort(err, 'send', this.address, this.port);
  } else {
    err = null;
  }

  this.callback(err, sent);
}

在这里完成send的回调。

send完成,就是另一端接受数据了,之前也提到startListening的时候会在handle上绑定onmessage,那么就去udp_wrap查这个onmessage的调用时机。可以预见到就在和onSend对应的onRecv中。

void UDPWrap::OnRecv(uv_udp_t* handle,
                     ssize_t nread,
                     const uv_buf_t* buf_,
                     const struct sockaddr* addr,
                     unsigned int flags) {
  UDPWrap* wrap = static_cast<UDPWrap*>(handle->data);
  Environment* env = wrap->env();

  AllocatedBuffer buf(env, *buf_);
  if (nread == 0 && addr == nullptr) {
    return;
  }

  HandleScope handle_scope(env->isolate());
  Context::Scope context_scope(env->context());

  Local<Object> wrap_obj = wrap->object();
  Local<Value> argv[] = {
    Integer::New(env->isolate(), nread),
    wrap_obj,
    Undefined(env->isolate()),
    Undefined(env->isolate())
  };

  if (nread < 0) {
    wrap->MakeCallback(env->onmessage_string(), arraysize(argv), argv);
    return;
  }

  buf.Resize(nread);
  argv[2] = buf.ToBuffer().ToLocalChecked();
  argv[3] = AddressToJS(env, addr);
  wrap->MakeCallback(env->onmessage_string(), arraysize(argv), argv);
}

果然,接受数据成功后会有一个onmessage的回调,对应到JS模块的onMessage

function onMessage(nread, handle, buf, rinfo) {
  const self = handle[owner_symbol];
  if (nread < 0) {
    return self.emit('error', errnoException(nread, 'recvmsg'));
  }
  rinfo.size = buf.length; // compatibility
  self.emit('message', buf, rinfo);
}

通过监听message事件即可接收到发送的数据,至此dgram的数据通信流程结束。

dns

好了,已经了解了udp的模块,趁热打铁,进一步分析dns模块。dns应该都很熟悉,就不详细介绍其功能,简单来说就是通过基于udp的dns请求将网址发送到dns服务器解析出其ip地址再返回。dns解析又分为本地解析和网络解析,本地解析会用到本机上的hosts以及各种缓存,当本地解析未取得结果时,就把请求转发到网络dns服务器进行网络解析。对应到的两个dns模块api也就是lookup和resolve了。

先看lookup,在lib/dns.js找到其实现。

function lookup(hostname, options, callback) {
  let hints = 0;
  let family = -1;
  let all = false;
  let verbatim = false;

  // Parse arguments
  if (hostname && typeof hostname !== 'string') {
    throw new ERR_INVALID_ARG_TYPE('hostname', 'string', hostname);
  } else if (typeof options === 'function') {
    callback = options;
    family = 0;
  } else if (typeof callback !== 'function') {
    throw new ERR_INVALID_CALLBACK(callback);
  } else if (options !== null && typeof options === 'object') {
    hints = options.hints >>> 0;
    family = options.family >>> 0;
    all = options.all === true;
    verbatim = options.verbatim === true;

    validateHints(hints);
  } else {
    family = options >>> 0;
  }

  if (family !== 0 && family !== 4 && family !== 6)
    throw new ERR_INVALID_OPT_VALUE('family', family);

  if (!hostname) {
    emitInvalidHostnameWarning(hostname);
    if (all) {
      process.nextTick(callback, null, []);
    } else {
      process.nextTick(callback, null, null, family === 6 ? 6 : 4);
    }
    return {};
  }

  const matchedFamily = isIP(hostname);
  if (matchedFamily) {
    if (all) {
      process.nextTick(
        callback, null, [{ address: hostname, family: matchedFamily }]);
    } else {
      process.nextTick(callback, null, hostname, matchedFamily);
    }
    return {};
  }

  const req = new GetAddrInfoReqWrap();
  req.callback = callback;
  req.family = family;
  req.hostname = hostname;
  req.oncomplete = all ? onlookupall : onlookup;

  const err = cares.getaddrinfo(
    req, toASCII(hostname), family, hints, verbatim
  );
  if (err) {
    process.nextTick(callback, dnsException(err, 'getaddrinfo', hostname));
    return {};
  }
  return req;
}

同样是一堆参数处理,毕竟JS模块的定位就是如此,但这里还是有需要重点关注的两个点,其一是构造了一个GetAddrInfoReqWrap实例,并调用getaddrinfo完成解析,其二是在这个实例上设置的oncomplete回调。先看回调,应该就是dns解析完成后会调用的函数,根据是否传入all参数决定设置为onlookup或者onlookupall。

function onlookup(err, addresses) {
  if (err) {
    return this.callback(dnsException(err, 'getaddrinfo', this.hostname));
  }
  if (this.family) {
    this.callback(null, addresses[0], this.family);
  } else {
    this.callback(null, addresses[0], isIP(addresses[0]));
  }
}


function onlookupall(err, addresses) {
  if (err) {
    return this.callback(dnsException(err, 'getaddrinfo', this.hostname));
  }

  const family = this.family;
  for (let i = 0; i < addresses.length; i++) {
    const addr = addresses[i];
    addresses[i] = {
      address: addr,
      family: family || isIP(addr)
    };
  }

  this.callback(null, addresses);
}

果然,回调就是调用我们传入的callback,根据all参数也会决定解析结果的数量。再进入GetAddrInfoReqWrap中,这个构造函数来自cares_wrap模块,看到wrap结尾的模块也都不陌生了,到cares_wrap.cc中查询getAddrInfo方法。

void GetAddrInfo(const FunctionCallbackInfo<Value>& args) {
  Environment* env = Environment::GetCurrent(args);

  CHECK(args[0]->IsObject());
  CHECK(args[1]->IsString());
  CHECK(args[2]->IsInt32());
  CHECK(args[4]->IsBoolean());
  Local<Object> req_wrap_obj = args[0].As<Object>();
  node::Utf8Value hostname(env->isolate(), args[1]);

  int32_t flags = 0;
  if (args[3]->IsInt32()) {
    flags = args[3].As<Int32>()->Value();
  }

  int family;

  switch (args[2].As<Int32>()->Value()) {
    case 0:
      family = AF_UNSPEC;
      break;
    case 4:
      family = AF_INET;
      break;
    case 6:
      family = AF_INET6;
      break;
    default:
      CHECK(0 && "bad address family");
  }

  auto req_wrap = std::make_unique<GetAddrInfoReqWrap>(env,
                                                       req_wrap_obj,
                                                       args[4]->IsTrue());

  struct addrinfo hints;
  memset(&hints, 0, sizeof(hints));
  hints.ai_family = family;
  hints.ai_socktype = SOCK_STREAM;
  hints.ai_flags = flags;

  TRACE_EVENT_NESTABLE_ASYNC_BEGIN2(
      TRACING_CATEGORY_NODE2(dns, native), "lookup", req_wrap.get(),
      "hostname", TRACE_STR_COPY(*hostname),
      "family",
      family == AF_INET ? "ipv4" : family == AF_INET6 ? "ipv6" : "unspec");

  int err = req_wrap->Dispatch(uv_getaddrinfo,
                               AfterGetAddrInfo,
                               *hostname,
                               nullptr,
                               &hints);
  if (err == 0)
    // Release ownership of the pointer allowing the ownership to be transferred
    USE(req_wrap.release());

  args.GetReturnValue().Set(err);
}

这里又看到了dgram模块出现的req_wrap,同样用其包装libuv的dns解析方法uv_getnameinfo进行异步解析并传入afterGetAddrInfo作为回调,这个回调中唤醒complete的callback,也就是之前js中设定的onlookup或者onlookupAll了,至此lookup流程结束。

  // Make the callback into JavaScript
  req_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv);

可以看到思路和dgram模块类似,都是在JS模块中处理参数和设置回调,在C++模块中调用libuv接口,并触发complete把结果传入JS的回调再执行。

再看resolve方法,回到lib/dns.js

const resolveMap = ObjectCreate(null);
Resolver.prototype.resolveAny = resolveMap.ANY = resolver('queryAny');
Resolver.prototype.resolve4 = resolveMap.A = resolver('queryA');
Resolver.prototype.resolve6 = resolveMap.AAAA = resolver('queryAaaa');
Resolver.prototype.resolveCname = resolveMap.CNAME = resolver('queryCname');
Resolver.prototype.resolveMx = resolveMap.MX = resolver('queryMx');
Resolver.prototype.resolveNs = resolveMap.NS = resolver('queryNs');
Resolver.prototype.resolveTxt = resolveMap.TXT = resolver('queryTxt');
Resolver.prototype.resolveSrv = resolveMap.SRV = resolver('querySrv');
Resolver.prototype.resolvePtr = resolveMap.PTR = resolver('queryPtr');
Resolver.prototype.resolveNaptr = resolveMap.NAPTR = resolver('queryNaptr');
Resolver.prototype.resolveSoa = resolveMap.SOA = resolver('querySoa');
Resolver.prototype.reverse = resolver('getHostByAddr');

Resolver.prototype.resolve = resolve;

function resolve(hostname, rrtype, callback) {
  let resolver;
  if (typeof rrtype === 'string') {
    resolver = resolveMap[rrtype];
  } else if (typeof rrtype === 'function') {
    resolver = resolveMap.A;
    callback = rrtype;
  } else {
    throw new ERR_INVALID_ARG_TYPE('rrtype', 'string', rrtype);
  }

  if (typeof resolver === 'function') {
    return resolver.call(this, hostname, callback);
  } else {
    throw new ERR_INVALID_OPT_VALUE('rrtype', rrtype);
  }
}

这里注意到,各种各样的resolve都会聚焦到resolver中根据第一个参数决定resolve类型处理。那么看resolver实现。

function resolver(bindingName) {
  function query(name, /* options, */ callback) {
    let options;
    if (arguments.length > 2) {
      options = callback;
      callback = arguments[2];
    }

    validateString(name, 'name');
    if (typeof callback !== 'function') {
      throw new ERR_INVALID_CALLBACK(callback);
    }

    const req = new QueryReqWrap();
    req.bindingName = bindingName;
    req.callback = callback;
    req.hostname = name;
    req.oncomplete = onresolve;
    req.ttl = !!(options && options.ttl);
    const err = this._handle[bindingName](req, toASCII(name));
    if (err) throw dnsException(err, bindingName, name);
    return req;
  }
  ObjectDefineProperty(query, 'name', { value: bindingName });
  return query;
}

还是一样的套路,通过cares_wrap模块的QueryReqWrap构造请求对象,并在其oncomplete上设置onresolve方法,然后根据resolve类型调用Resolver上的对应_handle处理。

先看onresolve,应该也都能猜到了,就是调用我们传入的callback。

function onresolve(err, result, ttls) {
  if (ttls && this.ttl)
    result = result.map((address, index) => ({ address, ttl: ttls[index] }));

  if (err)
    this.callback(dnsException(err, this.bindingName, this.hostname));
  else
    this.callback(null, result);
}

再看Resolver上的_handle,Resolver位于internal/dns/utils.js

class Resolver {
  constructor() {
    this._handle = new ChannelWrap();
  }
  // ...
}

其_handle又来自cares_wrap.cc的ChannelWrap实例。

     env->SetProtoMethod(channel_wrap, "queryAny", Query<QueryAnyWrap>);
  env->SetProtoMethod(channel_wrap, "queryA", Query<QueryAWrap>);
  env->SetProtoMethod(channel_wrap, "queryAaaa", Query<QueryAaaaWrap>);
  env->SetProtoMethod(channel_wrap, "queryCname", Query<QueryCnameWrap>);
  env->SetProtoMethod(channel_wrap, "queryMx", Query<QueryMxWrap>);
  env->SetProtoMethod(channel_wrap, "queryNs", Query<QueryNsWrap>);
  env->SetProtoMethod(channel_wrap, "queryTxt", Query<QueryTxtWrap>);
  env->SetProtoMethod(channel_wrap, "querySrv", Query<QuerySrvWrap>);
  env->SetProtoMethod(channel_wrap, "queryPtr", Query<QueryPtrWrap>);
  env->SetProtoMethod(channel_wrap, "queryNaptr", Query<QueryNaptrWrap>);
  env->SetProtoMethod(channel_wrap, "querySoa", Query<QuerySoaWrap>);

可以看到,不同类型的resolve对应不同方法,当不传类型参数时,也就是基础的resolve会走queryA。

class QueryAWrap: public QueryWrap {
 public:
  QueryAWrap(ChannelWrap* channel, Local<Object> req_wrap_obj)
      : QueryWrap(channel, req_wrap_obj, "resolve4") {
  }

  int Send(const char* name) override {
    AresQuery(name, ns_c_in, ns_t_a);
    return 0;
  }

  SET_NO_MEMORY_INFO()
  SET_MEMORY_INFO_NAME(QueryAWrap)
  SET_SELF_SIZE(QueryAWrap)

 protected:
  void Parse(unsigned char* buf, int len) override {
    HandleScope handle_scope(env()->isolate());
    Context::Scope context_scope(env()->context());

    ares_addrttl addrttls[256];
    int naddrttls = arraysize(addrttls), status;
    Local<Array> ret = Array::New(env()->isolate());

    int type = ns_t_a;
    status = ParseGeneralReply(env(),
                               buf,
                               len,
                               &type,
                               ret,
                               addrttls,
                               &naddrttls);
    if (status != ARES_SUCCESS) {
      ParseError(status);
      return;
    }

    Local<Array> ttls = AddrTTLToArray<ares_addrttl>(env(),
                                                     addrttls,
                                                     naddrttls);

    CallOnComplete(ret, ttls);
  }
};

每一种QueryWrap都实现了自己的send和parse取得自己类型对应的解析结果,再通过CallOnComplete唤起complete回调。

  void CallOnComplete(Local<Value> answer,
                      Local<Value> extra = Local<Value>()) {
    HandleScope handle_scope(env()->isolate());
    Context::Scope context_scope(env()->context());
    Local<Value> argv[] = {
      Integer::New(env()->isolate(), 0),
      answer,
      extra
    };
    const int argc = arraysize(argv) - extra.IsEmpty();
    TRACE_EVENT_NESTABLE_ASYNC_END0(
        TRACING_CATEGORY_NODE2(dns, native), trace_name_, this);

    MakeCallback(env()->oncomplete_string(), argc, argv);
  }

果然,又是熟悉的oncomplete_string,resolve流程就走通了。那么现在需要了解的就是,各种类型的Wrap的send和parse是在哪里调用的?注意到调用_handle时这些Wrap都被传入了Query中,应该是在这里统一流程处理。

template <class Wrap>
static void Query(const FunctionCallbackInfo<Value>& args) {
  Environment* env = Environment::GetCurrent(args);
  ChannelWrap* channel;
  ASSIGN_OR_RETURN_UNWRAP(&channel, args.Holder());

  CHECK_EQ(false, args.IsConstructCall());
  CHECK(args[0]->IsObject());
  CHECK(args[1]->IsString());

  Local<Object> req_wrap_obj = args[0].As<Object>();
  Local<String> string = args[1].As<String>();
  auto wrap = std::make_unique<Wrap>(channel, req_wrap_obj);

  node::Utf8Value name(env->isolate(), string);
  channel->ModifyActivityQueryCount(1);
  int err = wrap->Send(*name);
  if (err) {
    channel->ModifyActivityQueryCount(-1);
  } else {
    // Release ownership of the pointer allowing the ownership to be transferred
    USE(wrap.release());
  }

  args.GetReturnValue().Set(err);
}

果然,在这里统一调用了Send,所有的QueryWrap的Send都会调用AresQuery,这个方法继承自QueryWrap。

 protected:
  void AresQuery(const char* name,
                 int dnsclass,
                 int type) {
    channel_->EnsureServers();
    TRACE_EVENT_NESTABLE_ASYNC_BEGIN1(
      TRACING_CATEGORY_NODE2(dns, native), trace_name_, this,
      "name", TRACE_STR_COPY(name));
    ares_query(channel_->cares_channel(), name, dnsclass, type, Callback,
               MakeCallbackPointer());
  }

查询完成调用Callback,主要到这里用的查询方法是ares_query,并不是来自libuv,而是来自node的依赖cares——用于异步dns解析的c语言库。

  static void Callback(void* arg, int status, int timeouts,
                       unsigned char* answer_buf, int answer_len) {
    QueryWrap* wrap = FromCallbackPointer(arg);
    if (wrap == nullptr) return;

    unsigned char* buf_copy = nullptr;
    if (status == ARES_SUCCESS) {
      buf_copy = node::Malloc<unsigned char>(answer_len);
      memcpy(buf_copy, answer_buf, answer_len);
    }

    wrap->response_data_ = std::make_unique<ResponseData>();
    ResponseData* data = wrap->response_data_.get();
    data->status = status;
    data->is_host = false;
    data->buf = MallocedBuffer<unsigned char>(buf_copy, answer_len);

    wrap->QueueResponseCallback(status);
  }

继续调用QueueResponseCallback,因为不同类型query可能是并行的,需要维护活动队列。


  void QueueResponseCallback(int status) {
    BaseObjectPtr<QueryWrap> strong_ref{this};
    env()->SetImmediate([this, strong_ref](Environment*) {
      AfterResponse();

      // Delete once strong_ref goes out of scope.
      Detach();
    });

    channel_->set_query_last_ok(status != ARES_ECONNREFUSED);
    channel_->ModifyActivityQueryCount(-1);
  }

异步调用AfterResponse,同时活动队列数量减1。


  void AfterResponse() {
    CHECK(response_data_);

    const int status = response_data_->status;

    if (status != ARES_SUCCESS) {
      ParseError(status);
    } else if (!response_data_->is_host) {
      Parse(response_data_->buf.data, response_data_->buf.size);
    } else {
      Parse(response_data_->host.get());
    }
  }

终于在这里完成了我们的parse过程,实现了_handle到send到parse到complete的串联。流程一致的情况下,关键就在于根据不同的resolveType实现不同的parse。

小结

这篇文章分析了node中的一些udp相关网络模块,有dgram和dns,其中dgram是常规udp通信,基于libuv实现,dns中根据是本地解析还是网络解析分别基于libuv和cares实现。下一篇文章继续分析基于tcp的其他网络模块——http,tls等。

-- EOF --

添加在分类「 前端开发 」下,并被添加 「Node.js」 标签。