diff --git a/doc/api/dgram.md b/doc/api/dgram.md index 8ea655f645bc4d..8100ec353e1a90 100644 --- a/doc/api/dgram.md +++ b/doc/api/dgram.md @@ -968,6 +968,8 @@ changes: specified by the NAT. * `sendBlockList` {net.BlockList} `sendBlockList` can be used for disabling outbound access to specific IP addresses, IP ranges, or IP subnets. + * `msgCount` {integer} `msgCount` can be used to receive multiple messages at once via `recvmmsg`. + **Default:** `0`, disabled. * `callback` {Function} Attached as a listener for `'message'` events. Optional. * Returns: {dgram.Socket} diff --git a/lib/dgram.js b/lib/dgram.js index 95184035a53024..7e3430a017cc65 100644 --- a/lib/dgram.js +++ b/lib/dgram.js @@ -112,7 +112,7 @@ function Socket(type, listener) { let sendBufferSize; let receiveBlockList; let sendBlockList; - + let msgCount; let options; if (type !== null && typeof type === 'object') { options = type; @@ -138,9 +138,13 @@ function Socket(type, listener) { } sendBlockList = options.sendBlockList; } + if (options.msgCount !== undefined) { + validateUint32(options.msgCount, 'options.msgCount'); + msgCount = options.msgCount; + } } - const handle = newHandle(type, lookup); + const handle = newHandle(type, lookup, msgCount); handle[owner_symbol] = this; this[async_id_symbol] = handle.getAsyncId(); @@ -162,6 +166,7 @@ function Socket(type, listener) { sendBufferSize, receiveBlockList, sendBlockList, + msgCount, }; if (options?.signal !== undefined) { @@ -380,6 +385,7 @@ Socket.prototype.bind = function(port_, address_ /* , callback */) { } if (cluster.isWorker && !exclusive) { + // TODO(theanarkh): support recvmmsg bindServerHandle(this, { address: ip, port: port, diff --git a/lib/internal/dgram.js b/lib/internal/dgram.js index bae5da1c1f0def..8eeef948d31be4 100644 --- a/lib/internal/dgram.js +++ b/lib/internal/dgram.js @@ -28,7 +28,7 @@ function lookup6(lookup, address, callback) { return lookup(address || '::1', 6, callback); } -function newHandle(type, lookup) { +function newHandle(type, lookup, mmsgCount) { if (lookup === undefined) { if (dns === undefined) { dns = require('dns'); @@ -40,14 +40,14 @@ function newHandle(type, lookup) { } if (type === 'udp4') { - const handle = new UDP(); + const handle = new UDP(mmsgCount); handle.lookup = FunctionPrototypeBind(lookup4, handle, lookup); return handle; } if (type === 'udp6') { - const handle = new UDP(); + const handle = new UDP(mmsgCount); handle.lookup = FunctionPrototypeBind(lookup6, handle, lookup); handle.bind = handle.bind6; diff --git a/src/udp_wrap.cc b/src/udp_wrap.cc index 0821d07b5cde7c..c0de86fc5833bc 100644 --- a/src/udp_wrap.cc +++ b/src/udp_wrap.cc @@ -142,20 +142,34 @@ void UDPWrapBase::RegisterExternalReferences( registry->Register(RecvStop); } -UDPWrap::UDPWrap(Environment* env, Local object) +UDPWrap::UDPWrap(Environment* env, Local object, uint32_t msg_count) : HandleWrap(env, object, reinterpret_cast(&handle_), - AsyncWrap::PROVIDER_UDPWRAP) { + AsyncWrap::PROVIDER_UDPWRAP), + msg_count_(msg_count), + mmsg_buf_(uv_buf_init(nullptr, 0)) { object->SetAlignedPointerInInternalField( UDPWrapBase::kUDPWrapBaseField, static_cast(this)); - - int r = uv_udp_init(env->event_loop(), &handle_); + int r; + if (msg_count > 0) { + r = uv_udp_init_ex( + env->event_loop(), &handle_, AF_UNSPEC | UV_UDP_RECVMMSG); + } else { + r = uv_udp_init(env->event_loop(), &handle_); + } CHECK_EQ(r, 0); // can't fail anyway - set_listener(this); } +UDPWrap::~UDPWrap() { + // Libuv does not release the memory of memory which allocated + // by handle->alloc_cb when we call close in handle->read_cb, + // so we should release the memory here if necessary. + if (using_recvmmsg()) { + release_buf(); + } +} void UDPWrap::Initialize(Local target, Local unused, @@ -270,8 +284,12 @@ void UDPWrap::RegisterExternalReferences(ExternalReferenceRegistry* registry) { void UDPWrap::New(const FunctionCallbackInfo& args) { CHECK(args.IsConstructCall()); + uint32_t msg_count = 0; + if (args[0]->IsUint32()) { + msg_count = args[0].As()->Value(); + } Environment* env = Environment::GetCurrent(args); - new UDPWrap(env, args.This()); + new UDPWrap(env, args.This(), msg_count); } @@ -741,6 +759,19 @@ void UDPWrap::OnAlloc(uv_handle_t* handle, } uv_buf_t UDPWrap::OnAlloc(size_t suggested_size) { + if (using_recvmmsg()) { + CHECK(mmsg_buf_.base == nullptr); + if (msg_count_ > SIZE_MAX / suggested_size) { + return mmsg_buf_; + } + suggested_size *= msg_count_; + void* base = malloc(suggested_size); + if (base == nullptr) { + return mmsg_buf_; + } + mmsg_buf_ = uv_buf_init(reinterpret_cast(base), suggested_size); + return mmsg_buf_; + } return env()->allocate_managed_buffer(suggested_size); } @@ -759,7 +790,17 @@ void UDPWrap::OnRecv(ssize_t nread, unsigned int flags) { Environment* env = this->env(); Isolate* isolate = env->isolate(); - std::unique_ptr bs = env->release_managed_buffer(buf_); + std::unique_ptr bs; + + auto cleanup = OnScopeLeave([&]() { + if (using_recvmmsg() && (nread <= 0 || (flags & UV_UDP_MMSG_FREE))) { + release_buf(); + } + }); + + if (!using_recvmmsg()) { + bs = env->release_managed_buffer(buf_); + } if (nread == 0 && addr == nullptr) { return; } @@ -778,6 +819,10 @@ void UDPWrap::OnRecv(ssize_t nread, return; } else if (nread == 0) { bs = ArrayBuffer::NewBackingStore(isolate, 0); + } else if (using_recvmmsg()) { + bs = ArrayBuffer::NewBackingStore( + isolate, nread, BackingStoreInitializationMode::kUninitialized); + memcpy(bs->Data(), buf_.base, nread); } else if (static_cast(nread) != bs->ByteLength()) { CHECK_LE(static_cast(nread), bs->ByteLength()); std::unique_ptr old_bs = std::move(bs); diff --git a/src/udp_wrap.h b/src/udp_wrap.h index c0914dbf3a7f3f..6e3628b73e6595 100644 --- a/src/udp_wrap.h +++ b/src/udp_wrap.h @@ -158,6 +158,15 @@ class UDPWrap final : public HandleWrap, const uv_buf_t& buf, const sockaddr* addr, unsigned int flags) override; + bool using_recvmmsg() { + return uv_udp_using_recvmmsg(reinterpret_cast(&handle_)); + } + void release_buf() { + if (mmsg_buf_.base != nullptr) { + free(mmsg_buf_.base); + mmsg_buf_ = uv_buf_init(nullptr, 0); + } + } ReqWrap* CreateSendWrap(size_t msg_size) override; void OnSendDone(ReqWrap* wrap, int status) override; @@ -178,7 +187,11 @@ class UDPWrap final : public HandleWrap, static v8::MaybeLocal Instantiate(Environment* env, AsyncWrap* parent, SocketType type); - SET_NO_MEMORY_INFO() + void MemoryInfo(MemoryTracker* tracker) const override { + if (mmsg_buf_.base != nullptr) { + tracker->TrackFieldWithSize("mmsg_buf", mmsg_buf_.len); + } + } SET_MEMORY_INFO_NAME(UDPWrap) SET_SELF_SIZE(UDPWrap) @@ -189,7 +202,9 @@ class UDPWrap final : public HandleWrap, int (*F)(const typename T::HandleType*, sockaddr*, int*)> friend void GetSockOrPeerName(const v8::FunctionCallbackInfo&); - UDPWrap(Environment* env, v8::Local object); + UDPWrap(Environment* env, v8::Local object, uint32_t msg_count); + + ~UDPWrap(); static void DoBind(const v8::FunctionCallbackInfo& args, int family); @@ -213,7 +228,8 @@ class UDPWrap final : public HandleWrap, unsigned int flags); uv_udp_t handle_; - + uint32_t msg_count_; + uv_buf_t mmsg_buf_; bool current_send_has_callback_; v8::Local current_send_req_wrap_; }; diff --git a/test/parallel/test-dgram-udp-recvmmsg-close.js b/test/parallel/test-dgram-udp-recvmmsg-close.js new file mode 100644 index 00000000000000..fba0d991e0caf2 --- /dev/null +++ b/test/parallel/test-dgram-udp-recvmmsg-close.js @@ -0,0 +1,30 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); +const dgram = require('dgram'); +const message_to_send = 'A message to send'; + +const server = dgram.createSocket({ type: 'udp4', msgCount: 3 }); +const client = dgram.createSocket({ type: 'udp4' }); + +client.on('close', common.mustCall()); +server.on('close', common.mustCall()); + +server.on('message', common.mustCall((msg) => { + assert.strictEqual(msg.toString(), message_to_send.toString()); + // The server will release the memory which allocated by handle->alloc_cb + server.close(); + client.close(); +})); + +server.on('listening', common.mustCall(() => { + for (let i = 0; i < 2; i++) { + client.send(message_to_send, + 0, + message_to_send.length, + server.address().port, + server.address().address); + } +})); + +server.bind(0); diff --git a/test/parallel/test-dgram-udp-recvmmsg.js b/test/parallel/test-dgram-udp-recvmmsg.js new file mode 100644 index 00000000000000..64d7d50347a900 --- /dev/null +++ b/test/parallel/test-dgram-udp-recvmmsg.js @@ -0,0 +1,61 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); +const dgram = require('dgram'); +const message_to_send = 'A message to send'; + +[ + -1, + 1.1, + NaN, + undefined, + {}, + [], + null, + function() {}, + Symbol(), + true, + Infinity, +].forEach((msgCount) => { + try { + dgram.createSocket({ type: 'udp4', msgCount }); + } catch (e) { + assert.ok(/ERR_OUT_OF_RANGE|ERR_INVALID_ARG_TYPE/i.test(e.code)); + } +}); + +[ + 0, + 1, +].forEach((msgCount) => { + const socket = dgram.createSocket({ type: 'udp4', msgCount }); + socket.close(); +}); + +const server = dgram.createSocket({ type: 'udp4', msgCount: 3 }); +const client = dgram.createSocket({ type: 'udp4' }); + +client.on('close', common.mustCall()); +server.on('close', common.mustCall()); + +let done = 0; +const count = 2; +server.on('message', common.mustCall((msg) => { + assert.strictEqual(msg.toString(), message_to_send.toString()); + if (++done === count) { + client.close(); + server.close(); + } +}, count)); + +server.on('listening', common.mustCall(() => { + for (let i = 0; i < count; i++) { + client.send(message_to_send, + 0, + message_to_send.length, + server.address().port, + server.address().address); + } +})); + +server.bind(0);