Skip to content

Commit aac2964

Browse files
committed
dgram: add option or recvmmsg
1 parent 049664b commit aac2964

File tree

7 files changed

+175
-15
lines changed

7 files changed

+175
-15
lines changed

doc/api/dgram.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -968,6 +968,8 @@ changes:
968968
specified by the NAT.
969969
* `sendBlockList` {net.BlockList} `sendBlockList` can be used for disabling outbound
970970
access to specific IP addresses, IP ranges, or IP subnets.
971+
* `msgCount` {integer} `msgCount` can be used to receive multiple messages at once via `recvmmsg`.
972+
**Default:** `0`, disabled.
971973
* `callback` {Function} Attached as a listener for `'message'` events. Optional.
972974
* Returns: {dgram.Socket}
973975

lib/dgram.js

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ function Socket(type, listener) {
112112
let sendBufferSize;
113113
let receiveBlockList;
114114
let sendBlockList;
115-
115+
let msgCount;
116116
let options;
117117
if (type !== null && typeof type === 'object') {
118118
options = type;
@@ -138,9 +138,13 @@ function Socket(type, listener) {
138138
}
139139
sendBlockList = options.sendBlockList;
140140
}
141+
if (options.msgCount !== undefined) {
142+
validateUint32(options.msgCount, 'options.msgCount');
143+
msgCount = options.msgCount;
144+
}
141145
}
142146

143-
const handle = newHandle(type, lookup);
147+
const handle = newHandle(type, lookup, msgCount);
144148
handle[owner_symbol] = this;
145149

146150
this[async_id_symbol] = handle.getAsyncId();
@@ -162,6 +166,7 @@ function Socket(type, listener) {
162166
sendBufferSize,
163167
receiveBlockList,
164168
sendBlockList,
169+
msgCount,
165170
};
166171

167172
if (options?.signal !== undefined) {
@@ -380,6 +385,7 @@ Socket.prototype.bind = function(port_, address_ /* , callback */) {
380385
}
381386

382387
if (cluster.isWorker && !exclusive) {
388+
// TODO(theanarkh): support recvmmsg
383389
bindServerHandle(this, {
384390
address: ip,
385391
port: port,

lib/internal/dgram.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ function lookup6(lookup, address, callback) {
2828
return lookup(address || '::1', 6, callback);
2929
}
3030

31-
function newHandle(type, lookup) {
31+
function newHandle(type, lookup, mmsgCount) {
3232
if (lookup === undefined) {
3333
if (dns === undefined) {
3434
dns = require('dns');
@@ -40,14 +40,14 @@ function newHandle(type, lookup) {
4040
}
4141

4242
if (type === 'udp4') {
43-
const handle = new UDP();
43+
const handle = new UDP(mmsgCount);
4444

4545
handle.lookup = FunctionPrototypeBind(lookup4, handle, lookup);
4646
return handle;
4747
}
4848

4949
if (type === 'udp6') {
50-
const handle = new UDP();
50+
const handle = new UDP(mmsgCount);
5151

5252
handle.lookup = FunctionPrototypeBind(lookup6, handle, lookup);
5353
handle.bind = handle.bind6;

src/udp_wrap.cc

Lines changed: 52 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -142,20 +142,34 @@ void UDPWrapBase::RegisterExternalReferences(
142142
registry->Register(RecvStop);
143143
}
144144

145-
UDPWrap::UDPWrap(Environment* env, Local<Object> object)
145+
UDPWrap::UDPWrap(Environment* env, Local<Object> object, uint32_t msg_count)
146146
: HandleWrap(env,
147147
object,
148148
reinterpret_cast<uv_handle_t*>(&handle_),
149-
AsyncWrap::PROVIDER_UDPWRAP) {
149+
AsyncWrap::PROVIDER_UDPWRAP),
150+
msg_count_(msg_count),
151+
mmsg_buf_(uv_buf_init(nullptr, 0)) {
150152
object->SetAlignedPointerInInternalField(
151153
UDPWrapBase::kUDPWrapBaseField, static_cast<UDPWrapBase*>(this));
152-
153-
int r = uv_udp_init(env->event_loop(), &handle_);
154+
int r;
155+
if (msg_count > 0) {
156+
r = uv_udp_init_ex(
157+
env->event_loop(), &handle_, AF_UNSPEC | UV_UDP_RECVMMSG);
158+
} else {
159+
r = uv_udp_init(env->event_loop(), &handle_);
160+
}
154161
CHECK_EQ(r, 0); // can't fail anyway
155-
156162
set_listener(this);
157163
}
158164

165+
UDPWrap::~UDPWrap() {
166+
// Libuv does not release the memory of memory which allocated
167+
// by handle->alloc_cb when we call close in handle->read_cb,
168+
// so we should release the memory here if necessary.
169+
if (using_recvmmsg()) {
170+
release_buf();
171+
}
172+
}
159173

160174
void UDPWrap::Initialize(Local<Object> target,
161175
Local<Value> unused,
@@ -270,8 +284,12 @@ void UDPWrap::RegisterExternalReferences(ExternalReferenceRegistry* registry) {
270284

271285
void UDPWrap::New(const FunctionCallbackInfo<Value>& args) {
272286
CHECK(args.IsConstructCall());
287+
uint32_t msg_count = 0;
288+
if (args[0]->IsUint32()) {
289+
msg_count = args[0].As<Uint32>()->Value();
290+
}
273291
Environment* env = Environment::GetCurrent(args);
274-
new UDPWrap(env, args.This());
292+
new UDPWrap(env, args.This(), msg_count);
275293
}
276294

277295

@@ -741,6 +759,19 @@ void UDPWrap::OnAlloc(uv_handle_t* handle,
741759
}
742760

743761
uv_buf_t UDPWrap::OnAlloc(size_t suggested_size) {
762+
if (using_recvmmsg()) {
763+
CHECK(mmsg_buf_.base == nullptr);
764+
if (msg_count_ > SIZE_MAX / suggested_size) {
765+
return mmsg_buf_;
766+
}
767+
suggested_size *= msg_count_;
768+
void* base = malloc(suggested_size);
769+
if (base == nullptr) {
770+
return mmsg_buf_;
771+
}
772+
mmsg_buf_ = uv_buf_init(reinterpret_cast<char*>(base), suggested_size);
773+
return mmsg_buf_;
774+
}
744775
return env()->allocate_managed_buffer(suggested_size);
745776
}
746777

@@ -759,7 +790,17 @@ void UDPWrap::OnRecv(ssize_t nread,
759790
unsigned int flags) {
760791
Environment* env = this->env();
761792
Isolate* isolate = env->isolate();
762-
std::unique_ptr<BackingStore> bs = env->release_managed_buffer(buf_);
793+
std::unique_ptr<BackingStore> bs;
794+
795+
auto cleanup = OnScopeLeave([&]() {
796+
if (using_recvmmsg() && (nread <= 0 || (flags & UV_UDP_MMSG_FREE))) {
797+
release_buf();
798+
}
799+
});
800+
801+
if (!using_recvmmsg()) {
802+
bs = env->release_managed_buffer(buf_);
803+
}
763804
if (nread == 0 && addr == nullptr) {
764805
return;
765806
}
@@ -778,6 +819,10 @@ void UDPWrap::OnRecv(ssize_t nread,
778819
return;
779820
} else if (nread == 0) {
780821
bs = ArrayBuffer::NewBackingStore(isolate, 0);
822+
} else if (using_recvmmsg()) {
823+
bs = ArrayBuffer::NewBackingStore(
824+
isolate, nread, BackingStoreInitializationMode::kUninitialized);
825+
memcpy(bs->Data(), buf_.base, nread);
781826
} else if (static_cast<size_t>(nread) != bs->ByteLength()) {
782827
CHECK_LE(static_cast<size_t>(nread), bs->ByteLength());
783828
std::unique_ptr<BackingStore> old_bs = std::move(bs);

src/udp_wrap.h

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,15 @@ class UDPWrap final : public HandleWrap,
158158
const uv_buf_t& buf,
159159
const sockaddr* addr,
160160
unsigned int flags) override;
161+
bool using_recvmmsg() {
162+
return uv_udp_using_recvmmsg(reinterpret_cast<uv_udp_t*>(&handle_));
163+
}
164+
void release_buf() {
165+
if (mmsg_buf_.base != nullptr) {
166+
free(mmsg_buf_.base);
167+
mmsg_buf_ = uv_buf_init(nullptr, 0);
168+
}
169+
}
161170
ReqWrap<uv_udp_send_t>* CreateSendWrap(size_t msg_size) override;
162171
void OnSendDone(ReqWrap<uv_udp_send_t>* wrap, int status) override;
163172

@@ -178,7 +187,11 @@ class UDPWrap final : public HandleWrap,
178187
static v8::MaybeLocal<v8::Object> Instantiate(Environment* env,
179188
AsyncWrap* parent,
180189
SocketType type);
181-
SET_NO_MEMORY_INFO()
190+
void MemoryInfo(MemoryTracker* tracker) const override {
191+
if (mmsg_buf_.base != nullptr) {
192+
tracker->TrackFieldWithSize("mmsg_buf", mmsg_buf_.len);
193+
}
194+
}
182195
SET_MEMORY_INFO_NAME(UDPWrap)
183196
SET_SELF_SIZE(UDPWrap)
184197

@@ -189,7 +202,9 @@ class UDPWrap final : public HandleWrap,
189202
int (*F)(const typename T::HandleType*, sockaddr*, int*)>
190203
friend void GetSockOrPeerName(const v8::FunctionCallbackInfo<v8::Value>&);
191204

192-
UDPWrap(Environment* env, v8::Local<v8::Object> object);
205+
UDPWrap(Environment* env, v8::Local<v8::Object> object, uint32_t msg_count);
206+
207+
~UDPWrap();
193208

194209
static void DoBind(const v8::FunctionCallbackInfo<v8::Value>& args,
195210
int family);
@@ -213,7 +228,8 @@ class UDPWrap final : public HandleWrap,
213228
unsigned int flags);
214229

215230
uv_udp_t handle_;
216-
231+
uint32_t msg_count_;
232+
uv_buf_t mmsg_buf_;
217233
bool current_send_has_callback_;
218234
v8::Local<v8::Object> current_send_req_wrap_;
219235
};
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
'use strict';
2+
const common = require('../common');
3+
const assert = require('assert');
4+
const dgram = require('dgram');
5+
const message_to_send = 'A message to send';
6+
7+
const server = dgram.createSocket({ type: 'udp4', msgCount: 3 });
8+
const client = dgram.createSocket({ type: 'udp4' });
9+
10+
client.on('close', common.mustCall());
11+
server.on('close', common.mustCall());
12+
13+
server.on('message', common.mustCall((msg) => {
14+
assert.strictEqual(msg.toString(), message_to_send.toString());
15+
// The server will release the memory which allocated by handle->alloc_cb
16+
server.close();
17+
client.close();
18+
}));
19+
20+
server.on('listening', common.mustCall(() => {
21+
for (let i = 0; i < 2; i++) {
22+
client.send(message_to_send,
23+
0,
24+
message_to_send.length,
25+
server.address().port,
26+
server.address().address);
27+
}
28+
}));
29+
30+
server.bind(0);
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
'use strict';
2+
const common = require('../common');
3+
const assert = require('assert');
4+
const dgram = require('dgram');
5+
const message_to_send = 'A message to send';
6+
7+
[
8+
-1,
9+
1.1,
10+
NaN,
11+
undefined,
12+
{},
13+
[],
14+
null,
15+
function() {},
16+
Symbol(),
17+
true,
18+
Infinity,
19+
].forEach((msgCount) => {
20+
try {
21+
dgram.createSocket({ type: 'udp4', msgCount });
22+
} catch (e) {
23+
assert.ok(/ERR_OUT_OF_RANGE|ERR_INVALID_ARG_TYPE/i.test(e.code));
24+
}
25+
});
26+
27+
[
28+
0,
29+
1,
30+
].forEach((msgCount) => {
31+
const socket = dgram.createSocket({ type: 'udp4', msgCount });
32+
socket.close();
33+
});
34+
35+
const server = dgram.createSocket({ type: 'udp4', msgCount: 3 });
36+
const client = dgram.createSocket({ type: 'udp4' });
37+
38+
client.on('close', common.mustCall());
39+
server.on('close', common.mustCall());
40+
41+
let done = 0;
42+
const count = 2;
43+
server.on('message', common.mustCall((msg) => {
44+
assert.strictEqual(msg.toString(), message_to_send.toString());
45+
if (++done === count) {
46+
client.close();
47+
server.close();
48+
}
49+
}, count));
50+
51+
server.on('listening', common.mustCall(() => {
52+
for (let i = 0; i < count; i++) {
53+
client.send(message_to_send,
54+
0,
55+
message_to_send.length,
56+
server.address().port,
57+
server.address().address);
58+
}
59+
}));
60+
61+
server.bind(0);

0 commit comments

Comments
 (0)