Skip to content

Commit ecc5df0

Browse files
committed
dgram: add option or recvmmsg
1 parent 049664b commit ecc5df0

File tree

7 files changed

+135
-13
lines changed

7 files changed

+135
-13
lines changed

doc/api/dgram.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -968,6 +968,8 @@ changes:
968968
specified by the NAT.
969969
* `sendBlockList` {net.BlockList} `sendBlockList` can be used for disabling outbound
970970
access to specific IP addresses, IP ranges, or IP subnets.
971+
* `msgCount` {integer} `msgCount` can be used to receive multiple messages at once via `recvmmsg`.
972+
**Default:** `0`, disabled.
971973
* `callback` {Function} Attached as a listener for `'message'` events. Optional.
972974
* Returns: {dgram.Socket}
973975

lib/dgram.js

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ function Socket(type, listener) {
112112
let sendBufferSize;
113113
let receiveBlockList;
114114
let sendBlockList;
115-
115+
let msgCount;
116116
let options;
117117
if (type !== null && typeof type === 'object') {
118118
options = type;
@@ -138,9 +138,13 @@ function Socket(type, listener) {
138138
}
139139
sendBlockList = options.sendBlockList;
140140
}
141+
if (options.msgCount) {
142+
validateUint32(options.msgCount, 'options.msgCount', true);
143+
msgCount = options.msgCount;
144+
}
141145
}
142146

143-
const handle = newHandle(type, lookup);
147+
const handle = newHandle(type, lookup, msgCount);
144148
handle[owner_symbol] = this;
145149

146150
this[async_id_symbol] = handle.getAsyncId();
@@ -162,6 +166,7 @@ function Socket(type, listener) {
162166
sendBufferSize,
163167
receiveBlockList,
164168
sendBlockList,
169+
msgCount,
165170
};
166171

167172
if (options?.signal !== undefined) {
@@ -380,6 +385,7 @@ Socket.prototype.bind = function(port_, address_ /* , callback */) {
380385
}
381386

382387
if (cluster.isWorker && !exclusive) {
388+
// TODO(theanarkh): support recvmmsg
383389
bindServerHandle(this, {
384390
address: ip,
385391
port: port,

lib/internal/dgram.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ function lookup6(lookup, address, callback) {
2828
return lookup(address || '::1', 6, callback);
2929
}
3030

31-
function newHandle(type, lookup) {
31+
function newHandle(type, lookup, mmsgCount) {
3232
if (lookup === undefined) {
3333
if (dns === undefined) {
3434
dns = require('dns');
@@ -40,14 +40,14 @@ function newHandle(type, lookup) {
4040
}
4141

4242
if (type === 'udp4') {
43-
const handle = new UDP();
43+
const handle = new UDP(mmsgCount);
4444

4545
handle.lookup = FunctionPrototypeBind(lookup4, handle, lookup);
4646
return handle;
4747
}
4848

4949
if (type === 'udp6') {
50-
const handle = new UDP();
50+
const handle = new UDP(mmsgCount);
5151

5252
handle.lookup = FunctionPrototypeBind(lookup6, handle, lookup);
5353
handle.bind = handle.bind6;

src/udp_wrap.cc

Lines changed: 45 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -142,20 +142,35 @@ void UDPWrapBase::RegisterExternalReferences(
142142
registry->Register(RecvStop);
143143
}
144144

145-
UDPWrap::UDPWrap(Environment* env, Local<Object> object)
145+
UDPWrap::UDPWrap(Environment* env, Local<Object> object, uint32_t msg_count)
146146
: HandleWrap(env,
147147
object,
148148
reinterpret_cast<uv_handle_t*>(&handle_),
149149
AsyncWrap::PROVIDER_UDPWRAP) {
150150
object->SetAlignedPointerInInternalField(
151151
UDPWrapBase::kUDPWrapBaseField, static_cast<UDPWrapBase*>(this));
152-
153-
int r = uv_udp_init(env->event_loop(), &handle_);
152+
int r;
153+
if (msg_count > 0) {
154+
r = uv_udp_init_ex(env->event_loop(),
155+
&handle_,
156+
AF_UNSPEC | UV_UDP_RECVMMSG);
157+
} else {
158+
r = uv_udp_init(env->event_loop(), &handle_);
159+
}
154160
CHECK_EQ(r, 0); // can't fail anyway
155-
161+
msg_count_ = msg_count;
162+
mmsg_buf_ = uv_buf_init(nullptr, 0);
156163
set_listener(this);
157164
}
158165

166+
UDPWrap::~UDPWrap() {
167+
// Libuv does not release the memory of memory which allocated
168+
// by handle->alloc_cb when we call close in handle->read_cb,
169+
// so we should release the memory here if necessary.
170+
if (using_recvmmsg()) {
171+
release_buf();
172+
}
173+
}
159174

160175
void UDPWrap::Initialize(Local<Object> target,
161176
Local<Value> unused,
@@ -270,8 +285,12 @@ void UDPWrap::RegisterExternalReferences(ExternalReferenceRegistry* registry) {
270285

271286
void UDPWrap::New(const FunctionCallbackInfo<Value>& args) {
272287
CHECK(args.IsConstructCall());
288+
uint32_t msg_count = 0;
289+
if (args[0]->IsUint32()) {
290+
msg_count = args[0].As<Uint32>()->Value();
291+
}
273292
Environment* env = Environment::GetCurrent(args);
274-
new UDPWrap(env, args.This());
293+
new UDPWrap(env, args.This(), msg_count);
275294
}
276295

277296

@@ -741,6 +760,12 @@ void UDPWrap::OnAlloc(uv_handle_t* handle,
741760
}
742761

743762
uv_buf_t UDPWrap::OnAlloc(size_t suggested_size) {
763+
if (using_recvmmsg()) {
764+
suggested_size *= msg_count_;
765+
mmsg_buf_ = uv_buf_init(reinterpret_cast<char*>(malloc(suggested_size)),
766+
suggested_size);
767+
return mmsg_buf_;
768+
}
744769
return env()->allocate_managed_buffer(suggested_size);
745770
}
746771

@@ -759,7 +784,17 @@ void UDPWrap::OnRecv(ssize_t nread,
759784
unsigned int flags) {
760785
Environment* env = this->env();
761786
Isolate* isolate = env->isolate();
762-
std::unique_ptr<BackingStore> bs = env->release_managed_buffer(buf_);
787+
std::unique_ptr<BackingStore> bs;
788+
789+
auto cleanup = OnScopeLeave([&]() {
790+
if (using_recvmmsg() && (nread <= 0 || (flags & UV_UDP_MMSG_FREE))) {
791+
release_buf();
792+
}
793+
});
794+
795+
if (!using_recvmmsg()) {
796+
bs = env->release_managed_buffer(buf_);
797+
}
763798
if (nread == 0 && addr == nullptr) {
764799
return;
765800
}
@@ -778,6 +813,10 @@ void UDPWrap::OnRecv(ssize_t nread,
778813
return;
779814
} else if (nread == 0) {
780815
bs = ArrayBuffer::NewBackingStore(isolate, 0);
816+
} else if (using_recvmmsg()) {
817+
bs = ArrayBuffer::NewBackingStore(
818+
isolate, nread, BackingStoreInitializationMode::kUninitialized);
819+
memcpy(bs->Data(), buf_.base, nread);
781820
} else if (static_cast<size_t>(nread) != bs->ByteLength()) {
782821
CHECK_LE(static_cast<size_t>(nread), bs->ByteLength());
783822
std::unique_ptr<BackingStore> old_bs = std::move(bs);

src/udp_wrap.h

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,15 @@ class UDPWrap final : public HandleWrap,
158158
const uv_buf_t& buf,
159159
const sockaddr* addr,
160160
unsigned int flags) override;
161+
bool using_recvmmsg() {
162+
return uv_udp_using_recvmmsg(reinterpret_cast<uv_udp_t*>(&handle_));
163+
}
164+
void release_buf() {
165+
if (mmsg_buf_.base != nullptr) {
166+
free(mmsg_buf_.base);
167+
mmsg_buf_ = uv_buf_init(nullptr, 0);
168+
}
169+
}
161170
ReqWrap<uv_udp_send_t>* CreateSendWrap(size_t msg_size) override;
162171
void OnSendDone(ReqWrap<uv_udp_send_t>* wrap, int status) override;
163172

@@ -189,7 +198,9 @@ class UDPWrap final : public HandleWrap,
189198
int (*F)(const typename T::HandleType*, sockaddr*, int*)>
190199
friend void GetSockOrPeerName(const v8::FunctionCallbackInfo<v8::Value>&);
191200

192-
UDPWrap(Environment* env, v8::Local<v8::Object> object);
201+
UDPWrap(Environment* env, v8::Local<v8::Object> object, uint32_t msg_count);
202+
203+
~UDPWrap();
193204

194205
static void DoBind(const v8::FunctionCallbackInfo<v8::Value>& args,
195206
int family);
@@ -213,7 +224,8 @@ class UDPWrap final : public HandleWrap,
213224
unsigned int flags);
214225

215226
uv_udp_t handle_;
216-
227+
uint32_t msg_count_;
228+
uv_buf_t mmsg_buf_;
217229
bool current_send_has_callback_;
218230
v8::Local<v8::Object> current_send_req_wrap_;
219231
};
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
'use strict';
2+
const common = require('../common');
3+
const assert = require('assert');
4+
const dgram = require('dgram');
5+
const message_to_send = 'A message to send';
6+
7+
const server = dgram.createSocket({ type: 'udp4', msgCount: 3 });
8+
const client = dgram.createSocket({ type: 'udp4' });
9+
10+
client.on('close', common.mustCall());
11+
server.on('close', common.mustCall());
12+
13+
server.on('message', common.mustCall((msg) => {
14+
assert.strictEqual(msg.toString(), message_to_send.toString());
15+
// The server will release the memory which allocated by handle->alloc_cb
16+
server.close();
17+
client.close();
18+
}));
19+
20+
server.on('listening', common.mustCall(() => {
21+
for (let i = 0; i < 2; i++) {
22+
client.send(message_to_send,
23+
0,
24+
message_to_send.length,
25+
server.address().port,
26+
server.address().address);
27+
}
28+
}));
29+
30+
server.bind(0);
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
'use strict';
2+
const common = require('../common');
3+
const assert = require('assert');
4+
const dgram = require('dgram');
5+
const message_to_send = 'A message to send';
6+
7+
const server = dgram.createSocket({ type: 'udp4', msgCount: 3 });
8+
const client = dgram.createSocket({ type: 'udp4' });
9+
10+
client.on('close', common.mustCall());
11+
server.on('close', common.mustCall());
12+
13+
let done = 0;
14+
const count = 2;
15+
server.on('message', common.mustCall((msg) => {
16+
assert.strictEqual(msg.toString(), message_to_send.toString());
17+
if (++done === count) {
18+
client.close();
19+
server.close();
20+
}
21+
}, count));
22+
23+
server.on('listening', common.mustCall(() => {
24+
for (let i = 0; i < count; i++) {
25+
client.send(message_to_send,
26+
0,
27+
message_to_send.length,
28+
server.address().port,
29+
server.address().address);
30+
}
31+
}));
32+
33+
server.bind(0);

0 commit comments

Comments
 (0)