Node源码解析——udp
源码阅读最好遵循一个从架构到细节的流程。现在我们已经了解了node的整体架构以及模块间的职责划分。接下来可以再深入一步,看看node内部模块间又有怎样的区别和联系。这篇文章就从之前写过的代表tcp的net的平行模块入手——udp相关模块。
我们知道net基于tcp,从tcp延伸开来又有一些常用网络模块,之上有http/https,平行有udp,udp之上又有dns。当然这些常用网络模块在node中也有实现,其架构如下图。
那么接下来就一一看其实现。这篇文章先看udp相关。
dgram
在node中基于tcp实现的网络模块为net,同样平行的自然有基于udp实现的网络模块,那就是dgram。相比于tcp,udp是无连接,不可靠的,流程也会简单很多,服务端和客户端只需要创建socket,绑定网络信息,然后发送和接受数据即可。我们一步一步看。先看创建入口,在lib/dgram.js
的createSocket。
function createSocket(type, listener) {
return new Socket(type, listener);
}
很简单,就是new一个udp的socket实例,这里传入两个参数,其一是该socket的相关选项信息,比如是udp4或upd6,是否仅支持ipv6等,其二就是该socket响应message事件的回调。看一下这个socket构造函数内容。
function Socket(type, listener) {
EventEmitter.call(this);
let lookup;
let recvBufferSize;
let sendBufferSize;
let options;
if (type !== null && typeof type === 'object') {
options = type;
type = options.type;
lookup = options.lookup;
recvBufferSize = options.recvBufferSize;
sendBufferSize = options.sendBufferSize;
}
const handle = newHandle(type, lookup);
handle[owner_symbol] = this;
this[async_id_symbol] = handle.getAsyncId();
this.type = type;
if (typeof listener === 'function')
this.on('message', listener);
this[kStateSymbol] = {
handle,
receiving: false,
bindState: BIND_STATE_UNBOUND,
connectState: CONNECT_STATE_DISCONNECTED,
queue: undefined,
reuseAddr: options && options.reuseAddr, // Use UV_UDP_REUSEADDR if true.
ipv6Only: options && options.ipv6Only,
recvBufferSize,
sendBufferSize
};
}
ObjectSetPrototypeOf(Socket.prototype, EventEmitter.prototype);
ObjectSetPrototypeOf(Socket, EventEmitter);
可以注意到,这个socket没有继承stream,而是直接从EventEmitter继承而来。同时其handle创建来自于newHandle这个函数,追查到internal/dgram.js
。
function lookup4(lookup, address, callback) {
return lookup(address || '127.0.0.1', 4, callback);
}
function lookup6(lookup, address, callback) {
return lookup(address || '::1', 6, callback);
}
function newHandle(type, lookup) {
if (lookup === undefined) {
if (dns === undefined) {
dns = require('dns');
}
lookup = dns.lookup;
} else if (typeof lookup !== 'function') {
throw new ERR_INVALID_ARG_TYPE('lookup', 'Function', lookup);
}
if (type === 'udp4') {
const handle = new UDP();
handle.lookup = lookup4.bind(handle, lookup);
return handle;
}
if (type === 'udp6') {
const handle = new UDP();
handle.lookup = lookup6.bind(handle, lookup);
handle.bind = handle.bind6;
handle.connect = handle.connect6;
handle.send = handle.send6;
return handle;
}
throw new ERR_SOCKET_BAD_TYPE();
}
这里会设定handle用来进行IP地址查询的函数,默认是dns查询。根据udp4或udp6会用UDP类创建实例并包装查询函数传入符合规范的参数。UDP则是来自C++编写的udp_wrap了,net模块中也用到了对应的tcp_wrap。到这里也应该明白了,网络通信的底层工作实际上都会聚焦到tcp_wrap,udp_wrap这样的C++模块完成,我们JS模块部分主要就是起一个连接和参数处理作用,职责划分非常明确。
看完了创建,就可以进入地址绑定流程了,看原型上的bind方法。
Socket.prototype.bind = function(port_, address_ /* , callback */) {
// ...
startListening(this);
return this
}
函数很长,但都是些错误处理,参数处理以及worker进程端口共享之类的判断不属于流程重点,只需要知道最后会通过startListening启动监听。
function startListening(socket) {
const state = socket[kStateSymbol];
state.handle.onmessage = onMessage;
// Todo: handle errors
state.handle.recvStart();
state.receiving = true;
state.bindState = BIND_STATE_BOUND;
if (state.recvBufferSize)
bufferSize(socket, state.recvBufferSize, RECV_BUFFER);
if (state.sendBufferSize)
bufferSize(socket, state.sendBufferSize, SEND_BUFFER);
socket.emit('listening');
}
在handle上设置onMessage接受数据,并调用handle的recvStart启动udp数据接收,再在JS抛出listening事件。这个recvStart也就是在udp_wrap.cc
里的实现了。
void UDPWrap::RecvStart(const FunctionCallbackInfo<Value>& args) {
UDPWrap* wrap;
ASSIGN_OR_RETURN_UNWRAP(&wrap,
args.Holder(),
args.GetReturnValue().Set(UV_EBADF));
int err = uv_udp_recv_start(&wrap->handle_, OnAlloc, OnRecv);
// UV_EALREADY means that the socket is already bound but that's okay
if (err == UV_EALREADY)
err = 0;
args.GetReturnValue().Set(err);
}
这里也能看出来在C++模块中用到了libuv的网络通信实现异步。端口绑定完成,下一步就是通过send方法进行数据传输。
Socket.prototype.send = function(buffer,
offset,
length,
port,
address,
callback) {
// ...
const afterDns = (ex, ip) => {
defaultTriggerAsyncIdScope(
this[async_id_symbol],
doSend,
ex, this, ip, list, address, port, callback
);
};
if (!connected) {
state.handle.lookup(address, afterDns);
} else {
afterDns(null, null);
}
};
同样是一段很长的参数和错误处理代码,提取出最关键的部分,会使用handle的lookup函数解析地址并回调afterDns函数,这里会用node的async_hooks模块记录这个异步动作,并调用doSend方法。
function doSend(ex, self, ip, list, address, port, callback) {
const state = self[kStateSymbol];
if (ex) {
if (typeof callback === 'function') {
process.nextTick(callback, ex);
return;
}
process.nextTick(() => self.emit('error', ex));
return;
} else if (!state.handle) {
return;
}
const req = new SendWrap();
req.list = list; // Keep reference alive.
req.address = address;
req.port = port;
if (callback) {
req.callback = callback;
req.oncomplete = afterSend;
}
let err;
if (port)
err = state.handle.send(req, list, list.length, port, ip, !!callback);
else
err = state.handle.send(req, list, list.length, !!callback);
if (err >= 1) {
// Synchronous finish. The return code is msg_length + 1 so that we can
// distinguish between synchronous success and asynchronous success.
if (callback)
process.nextTick(callback, null, err - 1);
return;
}
if (err && callback) {
// Don't emit as error, dgram_legacy.js compatibility
const ex = exceptionWithHostPort(err, 'send', address, port);
process.nextTick(callback, ex);
}
}
doSend中,会把发送的数据用udp_wrap的SendWrap包装一层统一请求操作规范,再通过handle的send方法调用libuv的对应模块发送。SendWrap的结构可以在req_wrap.h
中找到,其继承自AsyncWrap和ReqWrapBase。
class ReqWrapBase {
public:
explicit inline ReqWrapBase(Environment* env);
virtual ~ReqWrapBase() = default;
virtual void Cancel() = 0;
virtual AsyncWrap* GetAsyncWrap() = 0;
private:
friend int GenDebugSymbols();
friend class Environment;
ListNode<ReqWrapBase> req_wrap_queue_;
};
再回到udp_wrap看send方法。
void UDPWrap::DoSend(const FunctionCallbackInfo<Value>& args, int family) {
Environment* env = Environment::GetCurrent(args);
UDPWrap* wrap;
ASSIGN_OR_RETURN_UNWRAP(&wrap,
args.Holder(),
args.GetReturnValue().Set(UV_EBADF));
CHECK(args.Length() == 4 || args.Length() == 6);
CHECK(args[0]->IsObject());
CHECK(args[1]->IsArray());
CHECK(args[2]->IsUint32());
bool sendto = args.Length() == 6;
if (sendto) {
// send(req, list, list.length, port, address, hasCallback)
CHECK(args[3]->IsUint32());
CHECK(args[4]->IsString());
CHECK(args[5]->IsBoolean());
} else {
// send(req, list, list.length, hasCallback)
CHECK(args[3]->IsBoolean());
}
Local<Object> req_wrap_obj = args[0].As<Object>();
Local<Array> chunks = args[1].As<Array>();
// it is faster to fetch the length of the
// array in js-land
size_t count = args[2].As<Uint32>()->Value();
const bool have_callback = sendto ? args[5]->IsTrue() : args[3]->IsTrue();
size_t msg_size = 0;
MaybeStackBuffer<uv_buf_t, 16> bufs(count);
// construct uv_buf_t array
for (size_t i = 0; i < count; i++) {
Local<Value> chunk = chunks->Get(env->context(), i).ToLocalChecked();
size_t length = Buffer::Length(chunk);
bufs[i] = uv_buf_init(Buffer::Data(chunk), length);
msg_size += length;
}
int err = 0;
struct sockaddr_storage addr_storage;
sockaddr* addr = nullptr;
if (sendto) {
const unsigned short port = args[3].As<Uint32>()->Value();
node::Utf8Value address(env->isolate(), args[4]);
err = sockaddr_for_family(family, address.out(), port, &addr_storage);
if (err == 0) {
addr = reinterpret_cast<sockaddr*>(&addr_storage);
}
}
uv_buf_t* bufs_ptr = *bufs;
if (err == 0 && !UNLIKELY(env->options()->test_udp_no_try_send)) {
err = uv_udp_try_send(&wrap->handle_, bufs_ptr, count, addr);
if (err == UV_ENOSYS || err == UV_EAGAIN) {
err = 0;
} else if (err >= 0) {
size_t sent = err;
while (count > 0 && bufs_ptr->len <= sent) {
sent -= bufs_ptr->len;
bufs_ptr++;
count--;
}
if (count > 0) {
CHECK_LT(sent, bufs_ptr->len);
bufs_ptr->base += sent;
bufs_ptr->len -= sent;
} else {
CHECK_EQ(static_cast<size_t>(err), msg_size);
// + 1 so that the JS side can distinguish 0-length async sends from
// 0-length sync sends.
args.GetReturnValue().Set(static_cast<uint32_t>(msg_size) + 1);
return;
}
}
}
if (err == 0) {
AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(wrap);
SendWrap* req_wrap = new SendWrap(env, req_wrap_obj, have_callback);
req_wrap->msg_size = msg_size;
err = req_wrap->Dispatch(uv_udp_send,
&wrap->handle_,
bufs_ptr,
count,
addr,
OnSend);
if (err)
delete req_wrap;
}
args.GetReturnValue().Set(err);
}
通过之前构造好的的SendWrap实例调用dispatch发送数据,而在SendWrap内部就是调用了libuv的uv_udp_send进行具体的数据操作了,当数据发送完成后,会调用onSend触发一个complete。
void UDPWrap::OnSend(uv_udp_send_t* req, int status) {
std::unique_ptr<SendWrap> req_wrap{static_cast<SendWrap*>(req->data)};
if (req_wrap->have_callback()) {
Environment* env = req_wrap->env();
HandleScope handle_scope(env->isolate());
Context::Scope context_scope(env->context());
Local<Value> arg[] = {
Integer::New(env->isolate(), status),
Integer::New(env->isolate(), req_wrap->msg_size),
};
req_wrap->MakeCallback(env->oncomplete_string(), 2, arg);
}
onComplete在上面的JS模块lib/dgram.js
的doSend中赋值为afterSend。
function afterSend(err, sent) {
if (err) {
err = exceptionWithHostPort(err, 'send', this.address, this.port);
} else {
err = null;
}
this.callback(err, sent);
}
在这里完成send的回调。
send完成,就是另一端接受数据了,之前也提到startListening的时候会在handle上绑定onmessage,那么就去udp_wrap查这个onmessage的调用时机。可以预见到就在和onSend对应的onRecv中。
void UDPWrap::OnRecv(uv_udp_t* handle,
ssize_t nread,
const uv_buf_t* buf_,
const struct sockaddr* addr,
unsigned int flags) {
UDPWrap* wrap = static_cast<UDPWrap*>(handle->data);
Environment* env = wrap->env();
AllocatedBuffer buf(env, *buf_);
if (nread == 0 && addr == nullptr) {
return;
}
HandleScope handle_scope(env->isolate());
Context::Scope context_scope(env->context());
Local<Object> wrap_obj = wrap->object();
Local<Value> argv[] = {
Integer::New(env->isolate(), nread),
wrap_obj,
Undefined(env->isolate()),
Undefined(env->isolate())
};
if (nread < 0) {
wrap->MakeCallback(env->onmessage_string(), arraysize(argv), argv);
return;
}
buf.Resize(nread);
argv[2] = buf.ToBuffer().ToLocalChecked();
argv[3] = AddressToJS(env, addr);
wrap->MakeCallback(env->onmessage_string(), arraysize(argv), argv);
}
果然,接受数据成功后会有一个onmessage的回调,对应到JS模块的onMessage
function onMessage(nread, handle, buf, rinfo) {
const self = handle[owner_symbol];
if (nread < 0) {
return self.emit('error', errnoException(nread, 'recvmsg'));
}
rinfo.size = buf.length; // compatibility
self.emit('message', buf, rinfo);
}
通过监听message事件即可接收到发送的数据,至此dgram的数据通信流程结束。
dns
好了,已经了解了udp的模块,趁热打铁,进一步分析dns模块。dns应该都很熟悉,就不详细介绍其功能,简单来说就是通过基于udp的dns请求将网址发送到dns服务器解析出其ip地址再返回。dns解析又分为本地解析和网络解析,本地解析会用到本机上的hosts以及各种缓存,当本地解析未取得结果时,就把请求转发到网络dns服务器进行网络解析。对应到的两个dns模块api也就是lookup和resolve了。
先看lookup,在lib/dns.js
找到其实现。
function lookup(hostname, options, callback) {
let hints = 0;
let family = -1;
let all = false;
let verbatim = false;
// Parse arguments
if (hostname && typeof hostname !== 'string') {
throw new ERR_INVALID_ARG_TYPE('hostname', 'string', hostname);
} else if (typeof options === 'function') {
callback = options;
family = 0;
} else if (typeof callback !== 'function') {
throw new ERR_INVALID_CALLBACK(callback);
} else if (options !== null && typeof options === 'object') {
hints = options.hints >>> 0;
family = options.family >>> 0;
all = options.all === true;
verbatim = options.verbatim === true;
validateHints(hints);
} else {
family = options >>> 0;
}
if (family !== 0 && family !== 4 && family !== 6)
throw new ERR_INVALID_OPT_VALUE('family', family);
if (!hostname) {
emitInvalidHostnameWarning(hostname);
if (all) {
process.nextTick(callback, null, []);
} else {
process.nextTick(callback, null, null, family === 6 ? 6 : 4);
}
return {};
}
const matchedFamily = isIP(hostname);
if (matchedFamily) {
if (all) {
process.nextTick(
callback, null, [{ address: hostname, family: matchedFamily }]);
} else {
process.nextTick(callback, null, hostname, matchedFamily);
}
return {};
}
const req = new GetAddrInfoReqWrap();
req.callback = callback;
req.family = family;
req.hostname = hostname;
req.oncomplete = all ? onlookupall : onlookup;
const err = cares.getaddrinfo(
req, toASCII(hostname), family, hints, verbatim
);
if (err) {
process.nextTick(callback, dnsException(err, 'getaddrinfo', hostname));
return {};
}
return req;
}
同样是一堆参数处理,毕竟JS模块的定位就是如此,但这里还是有需要重点关注的两个点,其一是构造了一个GetAddrInfoReqWrap实例,并调用getaddrinfo完成解析,其二是在这个实例上设置的oncomplete回调。先看回调,应该就是dns解析完成后会调用的函数,根据是否传入all参数决定设置为onlookup或者onlookupall。
function onlookup(err, addresses) {
if (err) {
return this.callback(dnsException(err, 'getaddrinfo', this.hostname));
}
if (this.family) {
this.callback(null, addresses[0], this.family);
} else {
this.callback(null, addresses[0], isIP(addresses[0]));
}
}
function onlookupall(err, addresses) {
if (err) {
return this.callback(dnsException(err, 'getaddrinfo', this.hostname));
}
const family = this.family;
for (let i = 0; i < addresses.length; i++) {
const addr = addresses[i];
addresses[i] = {
address: addr,
family: family || isIP(addr)
};
}
this.callback(null, addresses);
}
果然,回调就是调用我们传入的callback,根据all参数也会决定解析结果的数量。再进入GetAddrInfoReqWrap中,这个构造函数来自cares_wrap模块,看到wrap结尾的模块也都不陌生了,到cares_wrap.cc
中查询getAddrInfo方法。
void GetAddrInfo(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
CHECK(args[0]->IsObject());
CHECK(args[1]->IsString());
CHECK(args[2]->IsInt32());
CHECK(args[4]->IsBoolean());
Local<Object> req_wrap_obj = args[0].As<Object>();
node::Utf8Value hostname(env->isolate(), args[1]);
int32_t flags = 0;
if (args[3]->IsInt32()) {
flags = args[3].As<Int32>()->Value();
}
int family;
switch (args[2].As<Int32>()->Value()) {
case 0:
family = AF_UNSPEC;
break;
case 4:
family = AF_INET;
break;
case 6:
family = AF_INET6;
break;
default:
CHECK(0 && "bad address family");
}
auto req_wrap = std::make_unique<GetAddrInfoReqWrap>(env,
req_wrap_obj,
args[4]->IsTrue());
struct addrinfo hints;
memset(&hints, 0, sizeof(hints));
hints.ai_family = family;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = flags;
TRACE_EVENT_NESTABLE_ASYNC_BEGIN2(
TRACING_CATEGORY_NODE2(dns, native), "lookup", req_wrap.get(),
"hostname", TRACE_STR_COPY(*hostname),
"family",
family == AF_INET ? "ipv4" : family == AF_INET6 ? "ipv6" : "unspec");
int err = req_wrap->Dispatch(uv_getaddrinfo,
AfterGetAddrInfo,
*hostname,
nullptr,
&hints);
if (err == 0)
// Release ownership of the pointer allowing the ownership to be transferred
USE(req_wrap.release());
args.GetReturnValue().Set(err);
}
这里又看到了dgram模块出现的req_wrap,同样用其包装libuv的dns解析方法uv_getnameinfo进行异步解析并传入afterGetAddrInfo作为回调,这个回调中唤醒complete的callback,也就是之前js中设定的onlookup或者onlookupAll了,至此lookup流程结束。
// Make the callback into JavaScript
req_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv);
可以看到思路和dgram模块类似,都是在JS模块中处理参数和设置回调,在C++模块中调用libuv接口,并触发complete把结果传入JS的回调再执行。
再看resolve方法,回到lib/dns.js
。
const resolveMap = ObjectCreate(null);
Resolver.prototype.resolveAny = resolveMap.ANY = resolver('queryAny');
Resolver.prototype.resolve4 = resolveMap.A = resolver('queryA');
Resolver.prototype.resolve6 = resolveMap.AAAA = resolver('queryAaaa');
Resolver.prototype.resolveCname = resolveMap.CNAME = resolver('queryCname');
Resolver.prototype.resolveMx = resolveMap.MX = resolver('queryMx');
Resolver.prototype.resolveNs = resolveMap.NS = resolver('queryNs');
Resolver.prototype.resolveTxt = resolveMap.TXT = resolver('queryTxt');
Resolver.prototype.resolveSrv = resolveMap.SRV = resolver('querySrv');
Resolver.prototype.resolvePtr = resolveMap.PTR = resolver('queryPtr');
Resolver.prototype.resolveNaptr = resolveMap.NAPTR = resolver('queryNaptr');
Resolver.prototype.resolveSoa = resolveMap.SOA = resolver('querySoa');
Resolver.prototype.reverse = resolver('getHostByAddr');
Resolver.prototype.resolve = resolve;
function resolve(hostname, rrtype, callback) {
let resolver;
if (typeof rrtype === 'string') {
resolver = resolveMap[rrtype];
} else if (typeof rrtype === 'function') {
resolver = resolveMap.A;
callback = rrtype;
} else {
throw new ERR_INVALID_ARG_TYPE('rrtype', 'string', rrtype);
}
if (typeof resolver === 'function') {
return resolver.call(this, hostname, callback);
} else {
throw new ERR_INVALID_OPT_VALUE('rrtype', rrtype);
}
}
这里注意到,各种各样的resolve都会聚焦到resolver中根据第一个参数决定resolve类型处理。那么看resolver实现。
function resolver(bindingName) {
function query(name, /* options, */ callback) {
let options;
if (arguments.length > 2) {
options = callback;
callback = arguments[2];
}
validateString(name, 'name');
if (typeof callback !== 'function') {
throw new ERR_INVALID_CALLBACK(callback);
}
const req = new QueryReqWrap();
req.bindingName = bindingName;
req.callback = callback;
req.hostname = name;
req.oncomplete = onresolve;
req.ttl = !!(options && options.ttl);
const err = this._handle[bindingName](req, toASCII(name));
if (err) throw dnsException(err, bindingName, name);
return req;
}
ObjectDefineProperty(query, 'name', { value: bindingName });
return query;
}
还是一样的套路,通过cares_wrap模块的QueryReqWrap构造请求对象,并在其oncomplete上设置onresolve方法,然后根据resolve类型调用Resolver上的对应_handle处理。
先看onresolve,应该也都能猜到了,就是调用我们传入的callback。
function onresolve(err, result, ttls) {
if (ttls && this.ttl)
result = result.map((address, index) => ({ address, ttl: ttls[index] }));
if (err)
this.callback(dnsException(err, this.bindingName, this.hostname));
else
this.callback(null, result);
}
再看Resolver上的_handle,Resolver位于internal/dns/utils.js
class Resolver {
constructor() {
this._handle = new ChannelWrap();
}
// ...
}
其_handle又来自cares_wrap.cc
的ChannelWrap实例。
env->SetProtoMethod(channel_wrap, "queryAny", Query<QueryAnyWrap>);
env->SetProtoMethod(channel_wrap, "queryA", Query<QueryAWrap>);
env->SetProtoMethod(channel_wrap, "queryAaaa", Query<QueryAaaaWrap>);
env->SetProtoMethod(channel_wrap, "queryCname", Query<QueryCnameWrap>);
env->SetProtoMethod(channel_wrap, "queryMx", Query<QueryMxWrap>);
env->SetProtoMethod(channel_wrap, "queryNs", Query<QueryNsWrap>);
env->SetProtoMethod(channel_wrap, "queryTxt", Query<QueryTxtWrap>);
env->SetProtoMethod(channel_wrap, "querySrv", Query<QuerySrvWrap>);
env->SetProtoMethod(channel_wrap, "queryPtr", Query<QueryPtrWrap>);
env->SetProtoMethod(channel_wrap, "queryNaptr", Query<QueryNaptrWrap>);
env->SetProtoMethod(channel_wrap, "querySoa", Query<QuerySoaWrap>);
可以看到,不同类型的resolve对应不同方法,当不传类型参数时,也就是基础的resolve会走queryA。
class QueryAWrap: public QueryWrap {
public:
QueryAWrap(ChannelWrap* channel, Local<Object> req_wrap_obj)
: QueryWrap(channel, req_wrap_obj, "resolve4") {
}
int Send(const char* name) override {
AresQuery(name, ns_c_in, ns_t_a);
return 0;
}
SET_NO_MEMORY_INFO()
SET_MEMORY_INFO_NAME(QueryAWrap)
SET_SELF_SIZE(QueryAWrap)
protected:
void Parse(unsigned char* buf, int len) override {
HandleScope handle_scope(env()->isolate());
Context::Scope context_scope(env()->context());
ares_addrttl addrttls[256];
int naddrttls = arraysize(addrttls), status;
Local<Array> ret = Array::New(env()->isolate());
int type = ns_t_a;
status = ParseGeneralReply(env(),
buf,
len,
&type,
ret,
addrttls,
&naddrttls);
if (status != ARES_SUCCESS) {
ParseError(status);
return;
}
Local<Array> ttls = AddrTTLToArray<ares_addrttl>(env(),
addrttls,
naddrttls);
CallOnComplete(ret, ttls);
}
};
每一种QueryWrap都实现了自己的send和parse取得自己类型对应的解析结果,再通过CallOnComplete唤起complete回调。
void CallOnComplete(Local<Value> answer,
Local<Value> extra = Local<Value>()) {
HandleScope handle_scope(env()->isolate());
Context::Scope context_scope(env()->context());
Local<Value> argv[] = {
Integer::New(env()->isolate(), 0),
answer,
extra
};
const int argc = arraysize(argv) - extra.IsEmpty();
TRACE_EVENT_NESTABLE_ASYNC_END0(
TRACING_CATEGORY_NODE2(dns, native), trace_name_, this);
MakeCallback(env()->oncomplete_string(), argc, argv);
}
果然,又是熟悉的oncomplete_string,resolve流程就走通了。那么现在需要了解的就是,各种类型的Wrap的send和parse是在哪里调用的?注意到调用_handle时这些Wrap都被传入了Query中,应该是在这里统一流程处理。
template <class Wrap>
static void Query(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
ChannelWrap* channel;
ASSIGN_OR_RETURN_UNWRAP(&channel, args.Holder());
CHECK_EQ(false, args.IsConstructCall());
CHECK(args[0]->IsObject());
CHECK(args[1]->IsString());
Local<Object> req_wrap_obj = args[0].As<Object>();
Local<String> string = args[1].As<String>();
auto wrap = std::make_unique<Wrap>(channel, req_wrap_obj);
node::Utf8Value name(env->isolate(), string);
channel->ModifyActivityQueryCount(1);
int err = wrap->Send(*name);
if (err) {
channel->ModifyActivityQueryCount(-1);
} else {
// Release ownership of the pointer allowing the ownership to be transferred
USE(wrap.release());
}
args.GetReturnValue().Set(err);
}
果然,在这里统一调用了Send,所有的QueryWrap的Send都会调用AresQuery,这个方法继承自QueryWrap。
protected:
void AresQuery(const char* name,
int dnsclass,
int type) {
channel_->EnsureServers();
TRACE_EVENT_NESTABLE_ASYNC_BEGIN1(
TRACING_CATEGORY_NODE2(dns, native), trace_name_, this,
"name", TRACE_STR_COPY(name));
ares_query(channel_->cares_channel(), name, dnsclass, type, Callback,
MakeCallbackPointer());
}
查询完成调用Callback,主要到这里用的查询方法是ares_query,并不是来自libuv,而是来自node的依赖cares——用于异步dns解析的c语言库。
static void Callback(void* arg, int status, int timeouts,
unsigned char* answer_buf, int answer_len) {
QueryWrap* wrap = FromCallbackPointer(arg);
if (wrap == nullptr) return;
unsigned char* buf_copy = nullptr;
if (status == ARES_SUCCESS) {
buf_copy = node::Malloc<unsigned char>(answer_len);
memcpy(buf_copy, answer_buf, answer_len);
}
wrap->response_data_ = std::make_unique<ResponseData>();
ResponseData* data = wrap->response_data_.get();
data->status = status;
data->is_host = false;
data->buf = MallocedBuffer<unsigned char>(buf_copy, answer_len);
wrap->QueueResponseCallback(status);
}
继续调用QueueResponseCallback,因为不同类型query可能是并行的,需要维护活动队列。
void QueueResponseCallback(int status) {
BaseObjectPtr<QueryWrap> strong_ref{this};
env()->SetImmediate([this, strong_ref](Environment*) {
AfterResponse();
// Delete once strong_ref goes out of scope.
Detach();
});
channel_->set_query_last_ok(status != ARES_ECONNREFUSED);
channel_->ModifyActivityQueryCount(-1);
}
异步调用AfterResponse,同时活动队列数量减1。
void AfterResponse() {
CHECK(response_data_);
const int status = response_data_->status;
if (status != ARES_SUCCESS) {
ParseError(status);
} else if (!response_data_->is_host) {
Parse(response_data_->buf.data, response_data_->buf.size);
} else {
Parse(response_data_->host.get());
}
}
终于在这里完成了我们的parse过程,实现了_handle到send到parse到complete的串联。流程一致的情况下,关键就在于根据不同的resolveType实现不同的parse。
小结
这篇文章分析了node中的一些udp相关网络模块,有dgram和dns,其中dgram是常规udp通信,基于libuv实现,dns中根据是本地解析还是网络解析分别基于libuv和cares实现。下一篇文章继续分析基于tcp的其他网络模块——http,tls等。
-- EOF --