@@ -142,20 +142,34 @@ void UDPWrapBase::RegisterExternalReferences(
142142 registry->Register (RecvStop);
143143}
144144
145- UDPWrap::UDPWrap (Environment* env, Local<Object> object)
145+ UDPWrap::UDPWrap (Environment* env, Local<Object> object, uint32_t msg_count )
146146 : HandleWrap(env,
147147 object,
148148 reinterpret_cast <uv_handle_t *>(&handle_),
149149 AsyncWrap::PROVIDER_UDPWRAP) {
150150 object->SetAlignedPointerInInternalField (
151151 UDPWrapBase::kUDPWrapBaseField , static_cast <UDPWrapBase*>(this ));
152-
153- int r = uv_udp_init (env->event_loop (), &handle_);
152+ int r;
153+ if (msg_count > 0 ) {
154+ r = uv_udp_init_ex (
155+ env->event_loop (), &handle_, AF_UNSPEC | UV_UDP_RECVMMSG);
156+ } else {
157+ r = uv_udp_init (env->event_loop (), &handle_);
158+ }
154159 CHECK_EQ (r, 0 ); // can't fail anyway
155-
160+ msg_count_ = msg_count;
161+ mmsg_buf_ = uv_buf_init (nullptr , 0 );
156162 set_listener (this );
157163}
158164
165+ UDPWrap::~UDPWrap () {
166+ // Libuv does not release the memory of memory which allocated
167+ // by handle->alloc_cb when we call close in handle->read_cb,
168+ // so we should release the memory here if necessary.
169+ if (using_recvmmsg ()) {
170+ release_buf ();
171+ }
172+ }
159173
160174void UDPWrap::Initialize (Local<Object> target,
161175 Local<Value> unused,
@@ -270,8 +284,12 @@ void UDPWrap::RegisterExternalReferences(ExternalReferenceRegistry* registry) {
270284
271285void UDPWrap::New (const FunctionCallbackInfo<Value>& args) {
272286 CHECK (args.IsConstructCall ());
287+ uint32_t msg_count = 0 ;
288+ if (args[0 ]->IsUint32 ()) {
289+ msg_count = args[0 ].As <Uint32>()->Value ();
290+ }
273291 Environment* env = Environment::GetCurrent (args);
274- new UDPWrap (env, args.This ());
292+ new UDPWrap (env, args.This (), msg_count );
275293}
276294
277295
@@ -741,6 +759,12 @@ void UDPWrap::OnAlloc(uv_handle_t* handle,
741759}
742760
743761uv_buf_t UDPWrap::OnAlloc (size_t suggested_size) {
762+ if (using_recvmmsg ()) {
763+ suggested_size *= msg_count_;
764+ mmsg_buf_ = uv_buf_init (reinterpret_cast <char *>(malloc (suggested_size)),
765+ suggested_size);
766+ return mmsg_buf_;
767+ }
744768 return env ()->allocate_managed_buffer (suggested_size);
745769}
746770
@@ -759,7 +783,17 @@ void UDPWrap::OnRecv(ssize_t nread,
759783 unsigned int flags) {
760784 Environment* env = this ->env ();
761785 Isolate* isolate = env->isolate ();
762- std::unique_ptr<BackingStore> bs = env->release_managed_buffer (buf_);
786+ std::unique_ptr<BackingStore> bs;
787+
788+ auto cleanup = OnScopeLeave ([&]() {
789+ if (using_recvmmsg () && (nread <= 0 || (flags & UV_UDP_MMSG_FREE))) {
790+ release_buf ();
791+ }
792+ });
793+
794+ if (!using_recvmmsg ()) {
795+ bs = env->release_managed_buffer (buf_);
796+ }
763797 if (nread == 0 && addr == nullptr ) {
764798 return ;
765799 }
@@ -778,6 +812,10 @@ void UDPWrap::OnRecv(ssize_t nread,
778812 return ;
779813 } else if (nread == 0 ) {
780814 bs = ArrayBuffer::NewBackingStore (isolate, 0 );
815+ } else if (using_recvmmsg ()) {
816+ bs = ArrayBuffer::NewBackingStore (
817+ isolate, nread, BackingStoreInitializationMode::kUninitialized );
818+ memcpy (bs->Data (), buf_.base , nread);
781819 } else if (static_cast <size_t >(nread) != bs->ByteLength ()) {
782820 CHECK_LE (static_cast <size_t >(nread), bs->ByteLength ());
783821 std::unique_ptr<BackingStore> old_bs = std::move (bs);
0 commit comments