Skip to content

Commit bf47be0

Browse files
committed
Problem: poller_t adds an abstraction layer on zmq_poller_*
Solution: extract base_poller_t from poller_t, which provides a direct mapping of zmq_poller_* to C++ only
1 parent cdef8bc commit bf47be0

File tree

2 files changed

+99
-56
lines changed

2 files changed

+99
-56
lines changed

tests/poller.cpp

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ TEST(poller, poll_basic)
204204
message_received = true;
205205
};
206206
ASSERT_NO_THROW(poller.add(s.server, ZMQ_POLLIN, handler));
207-
ASSERT_NO_THROW(poller.wait(std::chrono::milliseconds{-1}));
207+
ASSERT_EQ(1, poller.wait(std::chrono::milliseconds{-1}));
208208
ASSERT_TRUE(message_received);
209209
}
210210

@@ -237,13 +237,13 @@ TEST(poller, client_server)
237237
// client sends message
238238
ASSERT_NO_THROW(s.client.send(send_msg));
239239

240-
ASSERT_NO_THROW(poller.wait(std::chrono::milliseconds{-1}));
240+
ASSERT_EQ(1, poller.wait(std::chrono::milliseconds{-1}));
241241
ASSERT_EQ(events, ZMQ_POLLIN);
242242

243243
// Re-add server socket with pollout flag
244244
ASSERT_NO_THROW(poller.remove(s.server));
245245
ASSERT_NO_THROW(poller.add(s.server, ZMQ_POLLIN | ZMQ_POLLOUT, handler));
246-
ASSERT_NO_THROW(poller.wait(std::chrono::milliseconds{-1}));
246+
ASSERT_EQ(1, poller.wait(std::chrono::milliseconds{-1}));
247247
ASSERT_EQ(events, ZMQ_POLLOUT);
248248
}
249249

@@ -335,7 +335,7 @@ TEST(poller, poll_client_server)
335335

336336
// Modify server socket with pollout flag
337337
ASSERT_NO_THROW(poller.modify(s.server, ZMQ_POLLIN | ZMQ_POLLOUT));
338-
ASSERT_NO_THROW(poller.wait(std::chrono::milliseconds{500}));
338+
ASSERT_EQ(1, poller.wait(std::chrono::milliseconds{500}));
339339
ASSERT_EQ(s.events, ZMQ_POLLIN | ZMQ_POLLOUT);
340340
}
341341

@@ -356,8 +356,8 @@ TEST(poller, wait_one_return)
356356
ASSERT_NO_THROW(s.client.send("Hi"));
357357

358358
// wait for message and verify events
359-
int result = poller.wait(std::chrono::milliseconds{500});
360-
ASSERT_EQ(count, result);
359+
ASSERT_EQ(1, poller.wait(std::chrono::milliseconds{500}));
360+
ASSERT_EQ(1u, count);
361361
}
362362

363363
TEST(poller, wait_on_move_constructed_poller)
@@ -401,14 +401,14 @@ TEST(poller, received_on_move_construced_poller)
401401
// client sends message
402402
ASSERT_NO_THROW(s.client.send("Hi"));
403403
// wait for message and verify it is received
404-
a.wait(std::chrono::milliseconds{500});
404+
ASSERT_EQ(1, a.wait(std::chrono::milliseconds{500}));
405405
ASSERT_EQ(1u, count);
406406
// Move construct poller b
407407
zmq::poller_t b{std::move(a)};
408408
// client sends message again
409409
ASSERT_NO_THROW(s.client.send("Hi"));
410410
// wait for message and verify it is received
411-
b.wait(std::chrono::milliseconds{500});
411+
ASSERT_EQ(1, b.wait(std::chrono::milliseconds{500}));
412412
ASSERT_EQ(2u, count);
413413
}
414414

@@ -424,12 +424,14 @@ TEST(poller, remove_from_handler)
424424

425425
// Setup poller
426426
zmq::poller_t poller;
427+
int count = 0;
427428
for (auto i = 0; i < ITER_NO; ++i) {
428429
ASSERT_NO_THROW(poller.add(setup_list[i].server, ZMQ_POLLIN, [&,i](short events) {
429430
ASSERT_EQ(events, ZMQ_POLLIN);
430431
poller.remove(setup_list[ITER_NO-i-1].server);
431432
ASSERT_EQ(ITER_NO-i-1, poller.size());
432433
}));
434+
++count;
433435
}
434436
ASSERT_EQ(ITER_NO, poller.size());
435437
// Clients send messages
@@ -444,8 +446,8 @@ TEST(poller, remove_from_handler)
444446
}
445447

446448
// Fire all handlers in one wait
447-
int count = poller.wait (std::chrono::milliseconds{-1});
448-
ASSERT_EQ(count, ITER_NO);
449+
ASSERT_EQ(ITER_NO, poller.wait (std::chrono::milliseconds{-1}));
450+
ASSERT_EQ(ITER_NO, count);
449451
}
450452

451453
#endif

zmq.hpp

Lines changed: 87 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -577,7 +577,6 @@ namespace zmq
577577
class socket_t
578578
{
579579
friend class monitor_t;
580-
friend class poller_t;
581580
public:
582581
inline socket_t(context_t& context_, int type_)
583582
{
@@ -1019,6 +1018,67 @@ namespace zmq
10191018
};
10201019

10211020
#if defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) && defined(ZMQ_HAVE_POLLER)
1021+
template <typename T = void>
1022+
class base_poller_t
1023+
{
1024+
public:
1025+
void add (zmq::socket_t &socket, short events, T *user_data)
1026+
{
1027+
if (0 != zmq_poller_add (poller_ptr.get (), static_cast<void*>(socket), user_data, events))
1028+
{
1029+
throw error_t ();
1030+
}
1031+
}
1032+
1033+
void remove (zmq::socket_t &socket)
1034+
{
1035+
if (0 != zmq_poller_remove (poller_ptr.get (), static_cast<void*>(socket)))
1036+
{
1037+
throw error_t ();
1038+
}
1039+
}
1040+
1041+
void modify (zmq::socket_t &socket, short events)
1042+
{
1043+
if (0 != zmq_poller_modify (poller_ptr.get (), static_cast<void*>(socket), events))
1044+
{
1045+
throw error_t ();
1046+
}
1047+
}
1048+
1049+
int wait_all (std::vector<zmq_poller_event_t> &poller_events, const std::chrono::microseconds timeout)
1050+
{
1051+
int rc = zmq_poller_wait_all (poller_ptr.get (), poller_events.data (),
1052+
static_cast<int> (poller_events.size ()),
1053+
static_cast<long>(timeout.count ()));
1054+
if (rc > 0)
1055+
return rc;
1056+
1057+
#if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 3)
1058+
if (zmq_errno () == EAGAIN)
1059+
#else
1060+
if (zmq_errno () == ETIMEDOUT)
1061+
#endif
1062+
return 0;
1063+
1064+
throw error_t ();
1065+
}
1066+
private:
1067+
std::unique_ptr<void, std::function<void(void*)>> poller_ptr
1068+
{
1069+
[]() {
1070+
auto poller_new = zmq_poller_new ();
1071+
if (poller_new)
1072+
return poller_new;
1073+
throw error_t ();
1074+
}(),
1075+
[](void *ptr) {
1076+
int rc = zmq_poller_destroy (&ptr);
1077+
ZMQ_ASSERT (rc == 0);
1078+
}
1079+
};
1080+
};
1081+
10221082
class poller_t
10231083
{
10241084
public:
@@ -1035,33 +1095,35 @@ namespace zmq
10351095

10361096
void add (zmq::socket_t &socket, short events, handler_t handler)
10371097
{
1038-
auto it = std::end (handlers);
1039-
auto inserted = false;
1040-
std::tie(it, inserted) = handlers.emplace (socket.ptr, std::make_shared<handler_t> (std::move (handler)));
1041-
if (0 == zmq_poller_add (poller_ptr.get (), socket.ptr, inserted && *(it->second) ? it->second.get() : nullptr, events)) {
1042-
need_rebuild = true;
1043-
return;
1098+
auto it = decltype (handlers)::iterator {};
1099+
auto inserted = bool {};
1100+
std::tie(it, inserted) = handlers.emplace (static_cast<void*>(socket), std::make_shared<handler_t> (std::move (handler)));
1101+
try
1102+
{
1103+
base_poller.add (socket, events, inserted && *(it->second) ? it->second.get() : nullptr);
1104+
need_rebuild |= inserted;
1105+
}
1106+
catch (const zmq::error_t&)
1107+
{
1108+
// rollback
1109+
if (inserted)
1110+
{
1111+
handlers.erase (static_cast<void*>(socket));
1112+
}
1113+
throw;
10441114
}
1045-
// rollback
1046-
if (inserted)
1047-
handlers.erase (socket.ptr);
1048-
throw error_t ();
10491115
}
10501116

10511117
void remove (zmq::socket_t &socket)
10521118
{
1053-
if (0 == zmq_poller_remove (poller_ptr.get (), socket.ptr)) {
1054-
handlers.erase (socket.ptr);
1055-
need_rebuild = true;
1056-
return;
1057-
}
1058-
throw error_t ();
1119+
base_poller.remove (socket);
1120+
handlers.erase (static_cast<void*>(socket));
1121+
need_rebuild = true;
10591122
}
10601123

10611124
void modify (zmq::socket_t &socket, short events)
10621125
{
1063-
if (0 != zmq_poller_modify (poller_ptr.get (), socket.ptr, events))
1064-
throw error_t ();
1126+
base_poller.modify (socket, events);
10651127
}
10661128

10671129
int wait (std::chrono::milliseconds timeout)
@@ -1077,25 +1139,15 @@ namespace zmq
10771139
}
10781140
need_rebuild = false;
10791141
}
1080-
int rc = zmq_poller_wait_all (poller_ptr.get (), poller_events.data (),
1081-
static_cast<int> (poller_events.size ()),
1082-
static_cast<long>(timeout.count ()));
1083-
if (rc > 0) {
1084-
std::for_each (poller_events.begin (), poller_events.begin () + rc,
1142+
const int count = base_poller.wait_all (poller_events, timeout);
1143+
if (count != 0) {
1144+
std::for_each (poller_events.begin (), poller_events.begin () + count,
10851145
[](zmq_poller_event_t& event) {
10861146
if (event.user_data != NULL)
10871147
(*reinterpret_cast<handler_t*> (event.user_data)) (event.events);
10881148
});
1089-
return rc;
10901149
}
1091-
#if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 3)
1092-
if (zmq_errno () == EAGAIN)
1093-
#else
1094-
if (zmq_errno () == ETIMEDOUT)
1095-
#endif
1096-
return 0;
1097-
1098-
throw error_t ();
1150+
return count;
10991151
}
11001152

11011153
bool empty () const
@@ -1109,20 +1161,9 @@ namespace zmq
11091161
}
11101162

11111163
private:
1112-
std::unique_ptr<void, std::function<void(void*)>> poller_ptr
1113-
{
1114-
[]() {
1115-
auto poller_new = zmq_poller_new ();
1116-
if (poller_new)
1117-
return poller_new;
1118-
throw error_t ();
1119-
}(),
1120-
[](void *ptr) {
1121-
int rc = zmq_poller_destroy (&ptr);
1122-
ZMQ_ASSERT (rc == 0);
1123-
}
1124-
};
11251164
bool need_rebuild {false};
1165+
1166+
base_poller_t<handler_t> base_poller {};
11261167
std::unordered_map<void*, std::shared_ptr<handler_t>> handlers {};
11271168
std::vector<zmq_poller_event_t> poller_events {};
11281169
std::vector<std::shared_ptr<handler_t>> poller_handlers {};

0 commit comments

Comments
 (0)