Skip to content

Commit 85a14af

Browse files
authored
Merge pull request #612 from dietelTiMaMi/feature/ExposeMonitorSocketForActivePoller
Feature/expose monitor socket for active poller
2 parents 945d60c + 7273016 commit 85a14af

File tree

2 files changed

+205
-105
lines changed

2 files changed

+205
-105
lines changed

tests/monitor.cpp

Lines changed: 95 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
class mock_monitor_t : public zmq::monitor_t
1010
{
11-
public:
11+
public:
1212

1313
void on_event_connected(const zmq_event_t &, const char *) ZMQ_OVERRIDE
1414
{
@@ -89,7 +89,7 @@ TEST_CASE("monitor init abort", "[monitor]")
8989
{
9090
class mock_monitor : public mock_monitor_t
9191
{
92-
public:
92+
public:
9393
mock_monitor(std::function<void(void)> handle_connected) :
9494
handle_connected{std::move(handle_connected)}
9595
{
@@ -128,7 +128,7 @@ TEST_CASE("monitor init abort", "[monitor]")
128128
{
129129
std::unique_lock<std::mutex> lock(mutex);
130130
CHECK(cond_var.wait_for(lock, std::chrono::seconds(1),
131-
[&done] { return done; }));
131+
[&done] { return done; }));
132132
}
133133
CHECK(monitor.connected == 1);
134134
monitor.abort();
@@ -150,3 +150,95 @@ TEST_CASE("monitor from move assigned socket", "[monitor]")
150150
// failing
151151
}
152152
#endif
153+
154+
#if defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) \
155+
&& !defined(ZMQ_CPP11_PARTIAL) && defined(ZMQ_HAVE_POLLER)
156+
#include "zmq_addon.hpp"
157+
158+
TEST_CASE("poll monitor events using active poller", "[monitor]")
159+
{
160+
// define necessary class for test
161+
class test_monitor : public zmq::monitor_t
162+
{
163+
public:
164+
void init(zmq::socket_t &socket,
165+
const char *const addr_,
166+
int events = ZMQ_EVENT_ALL)
167+
{
168+
zmq::monitor_t::init(socket, addr_, events);
169+
}
170+
171+
void addToPoller(zmq::active_poller_t &inActivePoller)
172+
{
173+
inActivePoller.add(
174+
monitor_socket(), zmq::event_flags::pollin,
175+
[&](zmq::event_flags ef) { process_event(static_cast<short>(ef)); });
176+
}
177+
178+
void on_event_accepted(const zmq_event_t &event_, const char *addr_) override
179+
{
180+
clientAccepted++;
181+
}
182+
void on_event_disconnected(const zmq_event_t &event,
183+
const char *const addr) override
184+
{
185+
clientDisconnected++;
186+
}
187+
188+
int clientAccepted = 0;
189+
int clientDisconnected = 0;
190+
};
191+
192+
//Arrange
193+
int messageCounter = 0;
194+
const char monitorAddress[] = "inproc://monitor-server";
195+
196+
auto addToPoller = [&](zmq::socket_t &socket, zmq::active_poller_t &poller) {
197+
poller.add(socket, zmq::event_flags::pollin, [&](zmq::event_flags ef) {
198+
zmq::message_t msg;
199+
auto result = socket.recv(msg, zmq::recv_flags::dontwait);
200+
messageCounter++;
201+
});
202+
};
203+
204+
common_server_client_setup sockets(false);
205+
206+
test_monitor monitor;
207+
monitor.init(sockets.server, monitorAddress,
208+
ZMQ_EVENT_ACCEPTED | ZMQ_EVENT_DISCONNECTED);
209+
210+
zmq::active_poller_t poller;
211+
monitor.addToPoller(poller);
212+
addToPoller(sockets.server, poller);
213+
214+
sockets.init();
215+
sockets.client.send(zmq::message_t(0), zmq::send_flags::dontwait);
216+
CHECK(monitor.clientAccepted == 0);
217+
CHECK(monitor.clientDisconnected == 0);
218+
219+
//Act
220+
for (int i = 0; i < 100; i++) {
221+
poller.wait(std::chrono::milliseconds(50));
222+
if (monitor.clientAccepted > 0) {
223+
break;
224+
}
225+
}
226+
CHECK(monitor.clientAccepted == 1);
227+
CHECK(monitor.clientDisconnected == 0);
228+
229+
sockets.client.close();
230+
231+
for (int i = 0; i < 100; i++) {
232+
poller.wait(std::chrono::milliseconds(50));
233+
if (monitor.clientDisconnected > 0) {
234+
break;
235+
}
236+
}
237+
sockets.server.close();
238+
239+
// Assert
240+
CHECK(messageCounter == 1);
241+
CHECK(monitor.clientAccepted == 1);
242+
CHECK(monitor.clientDisconnected == 1);
243+
}
244+
#endif

zmq.hpp

Lines changed: 110 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -2362,8 +2362,6 @@ class monitor_t
23622362
{
23632363
assert(_monitor_socket);
23642364

2365-
zmq::message_t eventMsg;
2366-
23672365
zmq::pollitem_t items[] = {
23682366
{_monitor_socket.handle(), 0, ZMQ_POLLIN, 0},
23692367
};
@@ -2374,106 +2372,7 @@ class monitor_t
23742372
zmq::poll(&items[0], 1, timeout);
23752373
#endif
23762374

2377-
if (items[0].revents & ZMQ_POLLIN) {
2378-
int rc = zmq_msg_recv(eventMsg.handle(), _monitor_socket.handle(), 0);
2379-
if (rc == -1 && zmq_errno() == ETERM)
2380-
return false;
2381-
assert(rc != -1);
2382-
2383-
} else {
2384-
return false;
2385-
}
2386-
2387-
#if ZMQ_VERSION_MAJOR >= 4
2388-
const char *data = static_cast<const char *>(eventMsg.data());
2389-
zmq_event_t msgEvent;
2390-
memcpy(&msgEvent.event, data, sizeof(uint16_t));
2391-
data += sizeof(uint16_t);
2392-
memcpy(&msgEvent.value, data, sizeof(int32_t));
2393-
zmq_event_t *event = &msgEvent;
2394-
#else
2395-
zmq_event_t *event = static_cast<zmq_event_t *>(eventMsg.data());
2396-
#endif
2397-
2398-
#ifdef ZMQ_NEW_MONITOR_EVENT_LAYOUT
2399-
zmq::message_t addrMsg;
2400-
int rc = zmq_msg_recv(addrMsg.handle(), _monitor_socket.handle(), 0);
2401-
if (rc == -1 && zmq_errno() == ETERM) {
2402-
return false;
2403-
}
2404-
2405-
assert(rc != -1);
2406-
std::string address = addrMsg.to_string();
2407-
#else
2408-
// Bit of a hack, but all events in the zmq_event_t union have the same layout so this will work for all event types.
2409-
std::string address = event->data.connected.addr;
2410-
#endif
2411-
2412-
#ifdef ZMQ_EVENT_MONITOR_STOPPED
2413-
if (event->event == ZMQ_EVENT_MONITOR_STOPPED) {
2414-
return false;
2415-
}
2416-
2417-
#endif
2418-
2419-
switch (event->event) {
2420-
case ZMQ_EVENT_CONNECTED:
2421-
on_event_connected(*event, address.c_str());
2422-
break;
2423-
case ZMQ_EVENT_CONNECT_DELAYED:
2424-
on_event_connect_delayed(*event, address.c_str());
2425-
break;
2426-
case ZMQ_EVENT_CONNECT_RETRIED:
2427-
on_event_connect_retried(*event, address.c_str());
2428-
break;
2429-
case ZMQ_EVENT_LISTENING:
2430-
on_event_listening(*event, address.c_str());
2431-
break;
2432-
case ZMQ_EVENT_BIND_FAILED:
2433-
on_event_bind_failed(*event, address.c_str());
2434-
break;
2435-
case ZMQ_EVENT_ACCEPTED:
2436-
on_event_accepted(*event, address.c_str());
2437-
break;
2438-
case ZMQ_EVENT_ACCEPT_FAILED:
2439-
on_event_accept_failed(*event, address.c_str());
2440-
break;
2441-
case ZMQ_EVENT_CLOSED:
2442-
on_event_closed(*event, address.c_str());
2443-
break;
2444-
case ZMQ_EVENT_CLOSE_FAILED:
2445-
on_event_close_failed(*event, address.c_str());
2446-
break;
2447-
case ZMQ_EVENT_DISCONNECTED:
2448-
on_event_disconnected(*event, address.c_str());
2449-
break;
2450-
#if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 3, 0) || (defined(ZMQ_BUILD_DRAFT_API) && ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 3))
2451-
case ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL:
2452-
on_event_handshake_failed_no_detail(*event, address.c_str());
2453-
break;
2454-
case ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL:
2455-
on_event_handshake_failed_protocol(*event, address.c_str());
2456-
break;
2457-
case ZMQ_EVENT_HANDSHAKE_FAILED_AUTH:
2458-
on_event_handshake_failed_auth(*event, address.c_str());
2459-
break;
2460-
case ZMQ_EVENT_HANDSHAKE_SUCCEEDED:
2461-
on_event_handshake_succeeded(*event, address.c_str());
2462-
break;
2463-
#elif defined(ZMQ_BUILD_DRAFT_API) && ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 1)
2464-
case ZMQ_EVENT_HANDSHAKE_FAILED:
2465-
on_event_handshake_failed(*event, address.c_str());
2466-
break;
2467-
case ZMQ_EVENT_HANDSHAKE_SUCCEED:
2468-
on_event_handshake_succeed(*event, address.c_str());
2469-
break;
2470-
#endif
2471-
default:
2472-
on_event_unknown(*event, address.c_str());
2473-
break;
2474-
}
2475-
2476-
return true;
2375+
return process_event(items[0].revents);
24772376
}
24782377

24792378
#ifdef ZMQ_EVENT_MONITOR_STOPPED
@@ -2583,6 +2482,115 @@ class monitor_t
25832482
(void) addr_;
25842483
}
25852484

2485+
protected:
2486+
bool process_event(short events)
2487+
{
2488+
zmq::message_t eventMsg;
2489+
2490+
if (events & ZMQ_POLLIN) {
2491+
int rc = zmq_msg_recv(eventMsg.handle(), _monitor_socket.handle(), 0);
2492+
if (rc == -1 && zmq_errno() == ETERM)
2493+
return false;
2494+
assert(rc != -1);
2495+
2496+
} else {
2497+
return false;
2498+
}
2499+
2500+
#if ZMQ_VERSION_MAJOR >= 4
2501+
const char *data = static_cast<const char *>(eventMsg.data());
2502+
zmq_event_t msgEvent;
2503+
memcpy(&msgEvent.event, data, sizeof(uint16_t));
2504+
data += sizeof(uint16_t);
2505+
memcpy(&msgEvent.value, data, sizeof(int32_t));
2506+
zmq_event_t *event = &msgEvent;
2507+
#else
2508+
zmq_event_t *event = static_cast<zmq_event_t *>(eventMsg.data());
2509+
#endif
2510+
2511+
#ifdef ZMQ_NEW_MONITOR_EVENT_LAYOUT
2512+
zmq::message_t addrMsg;
2513+
int rc = zmq_msg_recv(addrMsg.handle(), _monitor_socket.handle(), 0);
2514+
if (rc == -1 && zmq_errno() == ETERM) {
2515+
return false;
2516+
}
2517+
2518+
assert(rc != -1);
2519+
std::string address = addrMsg.to_string();
2520+
#else
2521+
// Bit of a hack, but all events in the zmq_event_t union have the same layout so this will work for all event types.
2522+
std::string address = event->data.connected.addr;
2523+
#endif
2524+
2525+
#ifdef ZMQ_EVENT_MONITOR_STOPPED
2526+
if (event->event == ZMQ_EVENT_MONITOR_STOPPED) {
2527+
return false;
2528+
}
2529+
2530+
#endif
2531+
2532+
switch (event->event) {
2533+
case ZMQ_EVENT_CONNECTED:
2534+
on_event_connected(*event, address.c_str());
2535+
break;
2536+
case ZMQ_EVENT_CONNECT_DELAYED:
2537+
on_event_connect_delayed(*event, address.c_str());
2538+
break;
2539+
case ZMQ_EVENT_CONNECT_RETRIED:
2540+
on_event_connect_retried(*event, address.c_str());
2541+
break;
2542+
case ZMQ_EVENT_LISTENING:
2543+
on_event_listening(*event, address.c_str());
2544+
break;
2545+
case ZMQ_EVENT_BIND_FAILED:
2546+
on_event_bind_failed(*event, address.c_str());
2547+
break;
2548+
case ZMQ_EVENT_ACCEPTED:
2549+
on_event_accepted(*event, address.c_str());
2550+
break;
2551+
case ZMQ_EVENT_ACCEPT_FAILED:
2552+
on_event_accept_failed(*event, address.c_str());
2553+
break;
2554+
case ZMQ_EVENT_CLOSED:
2555+
on_event_closed(*event, address.c_str());
2556+
break;
2557+
case ZMQ_EVENT_CLOSE_FAILED:
2558+
on_event_close_failed(*event, address.c_str());
2559+
break;
2560+
case ZMQ_EVENT_DISCONNECTED:
2561+
on_event_disconnected(*event, address.c_str());
2562+
break;
2563+
#if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 3, 0) || (defined(ZMQ_BUILD_DRAFT_API) && ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 3))
2564+
case ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL:
2565+
on_event_handshake_failed_no_detail(*event, address.c_str());
2566+
break;
2567+
case ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL:
2568+
on_event_handshake_failed_protocol(*event, address.c_str());
2569+
break;
2570+
case ZMQ_EVENT_HANDSHAKE_FAILED_AUTH:
2571+
on_event_handshake_failed_auth(*event, address.c_str());
2572+
break;
2573+
case ZMQ_EVENT_HANDSHAKE_SUCCEEDED:
2574+
on_event_handshake_succeeded(*event, address.c_str());
2575+
break;
2576+
#elif defined(ZMQ_BUILD_DRAFT_API) && ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 1)
2577+
case ZMQ_EVENT_HANDSHAKE_FAILED:
2578+
on_event_handshake_failed(*event, address.c_str());
2579+
break;
2580+
case ZMQ_EVENT_HANDSHAKE_SUCCEED:
2581+
on_event_handshake_succeed(*event, address.c_str());
2582+
break;
2583+
#endif
2584+
default:
2585+
on_event_unknown(*event, address.c_str());
2586+
break;
2587+
}
2588+
2589+
return true;
2590+
}
2591+
2592+
socket_ref monitor_socket() {return _monitor_socket;}
2593+
25862594
private:
25872595
monitor_t(const monitor_t &) ZMQ_DELETED_FUNCTION;
25882596
void operator=(const monitor_t &) ZMQ_DELETED_FUNCTION;

0 commit comments

Comments
 (0)