diff --git a/src/callbacks.cc b/src/callbacks.cc index c8f3b5c1..7df2392f 100644 --- a/src/callbacks.cc +++ b/src/callbacks.cc @@ -48,8 +48,7 @@ v8::Local TopicPartitionListToV8Array( return tp_array; } -Dispatcher::Dispatcher() { - async = NULL; +Dispatcher::Dispatcher(): async(nullptr, async_deleter) { uv_mutex_init(&async_lock); } @@ -66,8 +65,9 @@ Dispatcher::~Dispatcher() { // Only run this if we aren't already listening void Dispatcher::Activate() { if (!async) { - async = new uv_async_t; - uv_async_init(uv_default_loop(), async, AsyncMessage_); + async = std::unique_ptr( + new uv_async_t(), async_deleter); + uv_async_init(uv_default_loop(), async.get(), AsyncMessage_); async->data = this; } @@ -75,10 +75,7 @@ void Dispatcher::Activate() { // Should be able to run this regardless of whether it is active or not void Dispatcher::Deactivate() { - if (async) { - uv_close(reinterpret_cast(async), NULL); - async = NULL; - } + async.reset(); } bool Dispatcher::HasCallbacks() { @@ -87,7 +84,7 @@ bool Dispatcher::HasCallbacks() { void Dispatcher::Execute() { if (async) { - uv_async_send(async); + uv_async_send(async.get()); } } @@ -119,6 +116,17 @@ void Dispatcher::RemoveCallback(const v8::Local &cb) { } } +// Custom deleter for uv_async_t smart pointers +void Dispatcher::async_deleter(uv_async_t* async) { + uv_close( + reinterpret_cast(async), + // Release memory after uv_close() has finished. + [](uv_handle_t* handle) { + delete reinterpret_cast(handle); + } + ); +} + event_t::event_t(const RdKafka::Event &event) { message = ""; fac = ""; diff --git a/src/callbacks.h b/src/callbacks.h index 8fcb3311..7fa0f2c8 100644 --- a/src/callbacks.h +++ b/src/callbacks.h @@ -12,6 +12,7 @@ #include #include +#include #include #include @@ -49,7 +50,9 @@ class Dispatcher { dispatcher->Flush(); } - uv_async_t *async; + static inline void async_deleter(uv_async_t* async); + + std::unique_ptr async; }; struct event_t {