Skip to content

Commit faf6671

Browse files
kurdybachabluca
authored andcommitted
Problem: poller can segfault when modified from registered handler. (#219)
* Problem: poller can segfault when modified from registred handler. It is possible that a user would like to add/remove sockets from handlers. As handlers and poll items might be removed while not being processed yet - we have a segfault situation. Provided unit test `remove_from_handler` demonstrates the problem. Solution: Modify internal poll item data structure only after processing of events is finished. Please not that events processing path performance remains the same when there are no modification (add/remove) to the poller (no rebuild) - main real use case. As an effect of changes `size()` method has been removed as it does not represent any meaningful information anymore. There are active and pending (waiting for rebuild) poll items so two different sizes. User can easily track on their side number of registered sockets if original size information is needed. `wait` method returns number of processed sockets now. It might be useful information to a user for no extra cost.
1 parent ac64eba commit faf6671

File tree

2 files changed

+84
-32
lines changed

2 files changed

+84
-32
lines changed

tests/poller.cpp

Lines changed: 54 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
TEST(poller, create_destroy)
1010
{
1111
zmq::poller_t poller;
12-
ASSERT_EQ(0u, poller.size ());
1312
}
1413

1514
static_assert(!std::is_copy_constructible<zmq::poller_t>::value, "poller_t should not be copy-constructible");
@@ -20,9 +19,7 @@ TEST(poller, move_construct_empty)
2019
std::unique_ptr<zmq::poller_t> a{new zmq::poller_t};
2120
zmq::poller_t b = std::move(*a);
2221

23-
ASSERT_EQ(0u, a->size ());
2422
a.reset ();
25-
ASSERT_EQ(0u, b.size ());
2623
}
2724

2825
TEST(poller, move_assign_empty)
@@ -32,9 +29,7 @@ TEST(poller, move_assign_empty)
3229

3330
b = std::move(*a);
3431

35-
ASSERT_EQ(0u, a->size ());
3632
a.reset ();
37-
ASSERT_EQ(0u, b.size ());
3833
}
3934

4035
TEST(poller, move_construct_non_empty)
@@ -46,9 +41,7 @@ TEST(poller, move_construct_non_empty)
4641
a->add(socket, ZMQ_POLLIN, [](short) {});
4742
zmq::poller_t b = std::move(*a);
4843

49-
ASSERT_EQ(0u, a->size ());
5044
a.reset ();
51-
ASSERT_EQ(1u, b.size ());
5245
}
5346

5447
TEST(poller, move_assign_non_empty)
@@ -62,9 +55,7 @@ TEST(poller, move_assign_non_empty)
6255

6356
b = std::move(*a);
6457

65-
ASSERT_EQ(0u, a->size ());
6658
a.reset ();
67-
ASSERT_EQ(1u, b.size ());
6859
}
6960

7061
TEST(poller, add_handler)
@@ -246,7 +237,6 @@ TEST(poller, poller_add_invalid_socket_throws)
246237
zmq::socket_t b {std::move (a)};
247238
ASSERT_THROW (poller.add (a, ZMQ_POLLIN, zmq::poller_t::handler_t {}),
248239
zmq::error_t);
249-
ASSERT_EQ (0u, poller.size ());
250240
}
251241

252242
TEST(poller, poller_remove_invalid_socket_throws)
@@ -255,11 +245,9 @@ TEST(poller, poller_remove_invalid_socket_throws)
255245
zmq::socket_t socket {context, zmq::socket_type::router};
256246
zmq::poller_t poller;
257247
ASSERT_NO_THROW (poller.add (socket, ZMQ_POLLIN, zmq::poller_t::handler_t {}));
258-
ASSERT_EQ (1u, poller.size ());
259248
std::vector<zmq::socket_t> sockets;
260249
sockets.emplace_back (std::move (socket));
261250
ASSERT_THROW (poller.remove (socket), zmq::error_t);
262-
ASSERT_EQ (1u, poller.size ());
263251
}
264252

265253
TEST(poller, wait_on_added_empty_handler)
@@ -331,4 +319,58 @@ TEST(poller, poll_client_server)
331319
ASSERT_EQ(s.events, ZMQ_POLLIN | ZMQ_POLLOUT);
332320
}
333321

322+
TEST(poller, wait_one_return)
323+
{
324+
// Setup server and client
325+
server_client_setup s;
326+
327+
int count = 0;
328+
329+
// Setup poller
330+
zmq::poller_t poller;
331+
ASSERT_NO_THROW(poller.add(s.server, ZMQ_POLLIN, [&count](short) {
332+
++count;
333+
}));
334+
335+
// client sends message
336+
ASSERT_NO_THROW(s.client.send("Hi"));
337+
338+
// wait for message and verify events
339+
int result = poller.wait(std::chrono::milliseconds{500});
340+
ASSERT_EQ(count, result);
341+
}
342+
343+
TEST(poller, remove_from_handler)
344+
{
345+
constexpr auto ITER_NO = 10;
346+
347+
// Setup servers and clients
348+
std::vector<server_client_setup> setup_list;
349+
for (auto i = 0; i < ITER_NO; ++i)
350+
setup_list.emplace_back (server_client_setup{});
351+
352+
// Setup poller
353+
zmq::poller_t poller;
354+
for (auto i = 0; i < ITER_NO; ++i) {
355+
ASSERT_NO_THROW(poller.add(setup_list[i].server, ZMQ_POLLIN, [&,i](short events) {
356+
ASSERT_EQ(events, ZMQ_POLLIN);
357+
poller.remove(setup_list[ITER_NO - i -1].server);
358+
}));
359+
}
360+
// Clients send messages
361+
for (auto & s : setup_list) {
362+
ASSERT_NO_THROW(s.client.send("Hi"));
363+
}
364+
365+
// Wait for all servers to receive a message
366+
for (auto & s : setup_list) {
367+
zmq::pollitem_t items [] = { { s.server, 0, ZMQ_POLLIN, 0 } };
368+
zmq::poll (&items [0], 1);
369+
}
370+
371+
// Fire all handlers in one wait
372+
int count = poller.wait (std::chrono::milliseconds{-1});
373+
ASSERT_EQ(count, ITER_NO);
374+
}
375+
334376
#endif

zmq.hpp

Lines changed: 30 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
#include <tuple>
7575
#include <functional>
7676
#include <unordered_map>
77+
#include <memory>
7778
#endif
7879

7980
// Detect whether the compiler supports C++11 rvalue references.
@@ -1054,10 +1055,9 @@ namespace zmq
10541055
{
10551056
auto it = std::end (handlers);
10561057
auto inserted = false;
1057-
if (handler)
1058-
std::tie(it, inserted) = handlers.emplace (socket.ptr, std::move (handler));
1059-
if (0 == zmq_poller_add (poller_ptr, socket.ptr, inserted ? &(it->second) : nullptr, events)) {
1060-
poller_events.emplace_back (zmq_poller_event_t ());
1058+
std::tie(it, inserted) = handlers.emplace (socket.ptr, std::make_shared<handler_t> (std::move (handler)));
1059+
if (0 == zmq_poller_add (poller_ptr, socket.ptr, inserted && *(it->second) ? it->second.get() : nullptr, events)) {
1060+
need_rebuild = true;
10611061
return;
10621062
}
10631063
// rollback
@@ -1070,7 +1070,7 @@ namespace zmq
10701070
{
10711071
if (0 == zmq_poller_remove (poller_ptr, socket.ptr)) {
10721072
handlers.erase (socket.ptr);
1073-
poller_events.pop_back ();
1073+
need_rebuild = true;
10741074
return;
10751075
}
10761076
throw error_t ();
@@ -1082,36 +1082,46 @@ namespace zmq
10821082
throw error_t ();
10831083
}
10841084

1085-
bool wait (std::chrono::milliseconds timeout)
1085+
int wait (std::chrono::milliseconds timeout)
10861086
{
1087-
int rc = zmq_poller_wait_all (poller_ptr, poller_events.data (), static_cast<int> (poller_events.size ()), static_cast<long>(timeout.count ()));
1088-
if (rc >= 0) {
1089-
std::for_each (poller_events.begin (), poller_events.begin () + rc, [](zmq_poller_event_t& event) {
1087+
if (need_rebuild) {
1088+
poller_events.clear ();
1089+
poller_handlers.clear ();
1090+
poller_events.reserve (handlers.size ());
1091+
poller_handlers.reserve (handlers.size ());
1092+
for (const auto &handler : handlers) {
1093+
poller_events.emplace_back (zmq_poller_event_t {});
1094+
poller_handlers.push_back (handler.second);
1095+
}
1096+
need_rebuild = false;
1097+
}
1098+
int rc = zmq_poller_wait_all (poller_ptr, poller_events.data (),
1099+
static_cast<int> (poller_events.size ()),
1100+
static_cast<long>(timeout.count ()));
1101+
if (rc > 0) {
1102+
std::for_each (poller_events.begin (), poller_events.begin () + rc,
1103+
[](zmq_poller_event_t& event) {
10901104
if (event.user_data != NULL)
10911105
(*reinterpret_cast<handler_t*> (event.user_data)) (event.events);
10921106
});
1093-
return true;
1107+
return rc;
10941108
}
1095-
10961109
#if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 3)
10971110
if (zmq_errno () == EAGAIN)
10981111
#else
10991112
if (zmq_errno () == ETIMEDOUT)
11001113
#endif
1101-
return false;
1114+
return 0;
11021115

11031116
throw error_t ();
11041117
}
11051118

1106-
size_t size ()
1107-
{
1108-
return poller_events.size();
1109-
}
1110-
11111119
private:
1112-
void *poller_ptr;
1113-
std::vector<zmq_poller_event_t> poller_events;
1114-
std::unordered_map<void*, handler_t> handlers;
1120+
void *poller_ptr {nullptr};
1121+
bool need_rebuild {false};
1122+
std::unordered_map<void*, std::shared_ptr<handler_t>> handlers {};
1123+
std::vector<zmq_poller_event_t> poller_events {};
1124+
std::vector<std::shared_ptr<handler_t>> poller_handlers {};
11151125
}; // class poller_t
11161126
#endif // defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) && defined(ZMQ_HAVE_POLLER)
11171127

0 commit comments

Comments
 (0)