@@ -142,20 +142,34 @@ void UDPWrapBase::RegisterExternalReferences(
142142 registry->Register (RecvStop);
143143}
144144
145- UDPWrap::UDPWrap (Environment* env, Local<Object> object)
145+ UDPWrap::UDPWrap (Environment* env, Local<Object> object, uint32_t msg_count )
146146 : HandleWrap(env,
147147 object,
148148 reinterpret_cast <uv_handle_t *>(&handle_),
149- AsyncWrap::PROVIDER_UDPWRAP) {
149+ AsyncWrap::PROVIDER_UDPWRAP),
150+ msg_count_ (msg_count),
151+ mmsg_buf_(uv_buf_init(nullptr , 0 )) {
150152 object->SetAlignedPointerInInternalField (
151153 UDPWrapBase::kUDPWrapBaseField , static_cast <UDPWrapBase*>(this ));
152-
153- int r = uv_udp_init (env->event_loop (), &handle_);
154+ int r;
155+ if (msg_count > 0 ) {
156+ r = uv_udp_init_ex (
157+ env->event_loop (), &handle_, AF_UNSPEC | UV_UDP_RECVMMSG);
158+ } else {
159+ r = uv_udp_init (env->event_loop (), &handle_);
160+ }
154161 CHECK_EQ (r, 0 ); // can't fail anyway
155-
156162 set_listener (this );
157163}
158164
165+ UDPWrap::~UDPWrap () {
166+ // Libuv does not release the memory of memory which allocated
167+ // by handle->alloc_cb when we call close in handle->read_cb,
168+ // so we should release the memory here if necessary.
169+ if (using_recvmmsg ()) {
170+ release_buf ();
171+ }
172+ }
159173
160174void UDPWrap::Initialize (Local<Object> target,
161175 Local<Value> unused,
@@ -270,8 +284,12 @@ void UDPWrap::RegisterExternalReferences(ExternalReferenceRegistry* registry) {
270284
271285void UDPWrap::New (const FunctionCallbackInfo<Value>& args) {
272286 CHECK (args.IsConstructCall ());
287+ uint32_t msg_count = 0 ;
288+ if (args[0 ]->IsUint32 ()) {
289+ msg_count = args[0 ].As <Uint32>()->Value ();
290+ }
273291 Environment* env = Environment::GetCurrent (args);
274- new UDPWrap (env, args.This ());
292+ new UDPWrap (env, args.This (), msg_count );
275293}
276294
277295
@@ -741,6 +759,20 @@ void UDPWrap::OnAlloc(uv_handle_t* handle,
741759}
742760
743761uv_buf_t UDPWrap::OnAlloc (size_t suggested_size) {
762+ if (using_recvmmsg ()) {
763+ CHECK (mmsg_buf_.base == nullptr );
764+ if (msg_count_ > SIZE_MAX / suggested_size) {
765+ return mmsg_buf_;
766+ }
767+ suggested_size *= msg_count_;
768+ void * base = malloc (suggested_size);
769+ if (base == nullptr ) {
770+ return mmsg_buf_;
771+ }
772+ mmsg_buf_ = uv_buf_init (reinterpret_cast <char *>(base),
773+ suggested_size);
774+ return mmsg_buf_;
775+ }
744776 return env ()->allocate_managed_buffer (suggested_size);
745777}
746778
@@ -759,7 +791,17 @@ void UDPWrap::OnRecv(ssize_t nread,
759791 unsigned int flags) {
760792 Environment* env = this ->env ();
761793 Isolate* isolate = env->isolate ();
762- std::unique_ptr<BackingStore> bs = env->release_managed_buffer (buf_);
794+ std::unique_ptr<BackingStore> bs;
795+
796+ auto cleanup = OnScopeLeave ([&]() {
797+ if (using_recvmmsg () && (nread <= 0 || (flags & UV_UDP_MMSG_FREE))) {
798+ release_buf ();
799+ }
800+ });
801+
802+ if (!using_recvmmsg ()) {
803+ bs = env->release_managed_buffer (buf_);
804+ }
763805 if (nread == 0 && addr == nullptr ) {
764806 return ;
765807 }
@@ -778,6 +820,10 @@ void UDPWrap::OnRecv(ssize_t nread,
778820 return ;
779821 } else if (nread == 0 ) {
780822 bs = ArrayBuffer::NewBackingStore (isolate, 0 );
823+ } else if (using_recvmmsg ()) {
824+ bs = ArrayBuffer::NewBackingStore (
825+ isolate, nread, BackingStoreInitializationMode::kUninitialized );
826+ memcpy (bs->Data (), buf_.base , nread);
781827 } else if (static_cast <size_t >(nread) != bs->ByteLength ()) {
782828 CHECK_LE (static_cast <size_t >(nread), bs->ByteLength ());
783829 std::unique_ptr<BackingStore> old_bs = std::move (bs);
0 commit comments