Skip to content

Commit 2aba0bb

Browse files
authored
Merge pull request #221 from kurdybacha/poller-size
Problem: poller move operations not complete
2 parents 3281509 + 625a0eb commit 2aba0bb

File tree

2 files changed

+115
-39
lines changed

2 files changed

+115
-39
lines changed

tests/poller.cpp

Lines changed: 88 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,26 +9,34 @@
99
TEST(poller, create_destroy)
1010
{
1111
zmq::poller_t poller;
12+
ASSERT_TRUE(poller.empty ());
1213
}
1314

1415
static_assert(!std::is_copy_constructible<zmq::poller_t>::value, "poller_t should not be copy-constructible");
1516
static_assert(!std::is_copy_assignable<zmq::poller_t>::value, "poller_t should not be copy-assignable");
1617

1718
TEST(poller, move_construct_empty)
1819
{
19-
std::unique_ptr<zmq::poller_t> a{new zmq::poller_t};
20-
zmq::poller_t b = std::move(*a);
21-
20+
std::unique_ptr<zmq::poller_t> a {new zmq::poller_t};
21+
ASSERT_TRUE(a->empty ());
22+
zmq::poller_t b = std::move (*a);
23+
ASSERT_TRUE(b.empty ());
24+
ASSERT_EQ(0u, a->size ());
25+
ASSERT_EQ(0u, b.size ());
2226
a.reset ();
2327
}
2428

2529
TEST(poller, move_assign_empty)
2630
{
2731
std::unique_ptr<zmq::poller_t> a{new zmq::poller_t};
32+
ASSERT_TRUE(a->empty());
2833
zmq::poller_t b;
29-
34+
ASSERT_TRUE(b.empty());
3035
b = std::move(*a);
31-
36+
ASSERT_EQ(0u, a->size ());
37+
ASSERT_EQ(0u, b.size ());
38+
ASSERT_TRUE(a->empty());
39+
ASSERT_TRUE(b.empty());
3240
a.reset ();
3341
}
3442

@@ -39,8 +47,13 @@ TEST(poller, move_construct_non_empty)
3947

4048
std::unique_ptr<zmq::poller_t> a{new zmq::poller_t};
4149
a->add(socket, ZMQ_POLLIN, [](short) {});
42-
zmq::poller_t b = std::move(*a);
43-
50+
ASSERT_FALSE(a->empty ());
51+
ASSERT_EQ(1u, a->size ());
52+
zmq::poller_t b = std::move (*a);
53+
ASSERT_TRUE(a->empty ());
54+
ASSERT_EQ(0u, a->size ());
55+
ASSERT_FALSE(b.empty ());
56+
ASSERT_EQ(1u, b.size ());
4457
a.reset ();
4558
}
4659

@@ -51,10 +64,14 @@ TEST(poller, move_assign_non_empty)
5164

5265
std::unique_ptr<zmq::poller_t> a{new zmq::poller_t};
5366
a->add(socket, ZMQ_POLLIN, [](short) {});
67+
ASSERT_FALSE(a->empty());
68+
ASSERT_EQ(1u, a->size ());
5469
zmq::poller_t b;
55-
5670
b = std::move(*a);
57-
71+
ASSERT_TRUE(a->empty ());
72+
ASSERT_EQ(0u, a->size ());
73+
ASSERT_FALSE(b.empty ());
74+
ASSERT_EQ(1u, b.size ());
5875
a.reset ();
5976
}
6077

@@ -75,6 +92,8 @@ TEST(poller, add_handler_invalid_events_type)
7592
zmq::poller_t::handler_t handler;
7693
short invalid_events_type = 2 << 10;
7794
ASSERT_NO_THROW(poller.add(socket, invalid_events_type, handler));
95+
ASSERT_FALSE(poller.empty ());
96+
ASSERT_EQ(1u, poller.size ());
7897
}
7998

8099
TEST(poller, add_handler_twice_throws)
@@ -229,7 +248,7 @@ TEST(poller, client_server)
229248
ASSERT_EQ(events, ZMQ_POLLOUT);
230249
}
231250

232-
TEST(poller, poller_add_invalid_socket_throws)
251+
TEST(poller, add_invalid_socket_throws)
233252
{
234253
zmq::context_t context;
235254
zmq::poller_t poller;
@@ -239,15 +258,17 @@ TEST(poller, poller_add_invalid_socket_throws)
239258
zmq::error_t);
240259
}
241260

242-
TEST(poller, poller_remove_invalid_socket_throws)
261+
TEST(poller, remove_invalid_socket_throws)
243262
{
244263
zmq::context_t context;
245264
zmq::socket_t socket {context, zmq::socket_type::router};
246265
zmq::poller_t poller;
247266
ASSERT_NO_THROW (poller.add (socket, ZMQ_POLLIN, zmq::poller_t::handler_t {}));
267+
ASSERT_EQ (1u, poller.size ());
248268
std::vector<zmq::socket_t> sockets;
249269
sockets.emplace_back (std::move (socket));
250270
ASSERT_THROW (poller.remove (socket), zmq::error_t);
271+
ASSERT_EQ (1u, poller.size ());
251272
}
252273

253274
TEST(poller, wait_on_added_empty_handler)
@@ -340,6 +361,59 @@ TEST(poller, wait_one_return)
340361
ASSERT_EQ(count, result);
341362
}
342363

364+
TEST(poller, wait_on_move_constructed_poller)
365+
{
366+
server_client_setup s;
367+
ASSERT_NO_THROW (s.client.send ("Hi"));
368+
zmq::poller_t a;
369+
zmq::poller_t::handler_t handler;
370+
ASSERT_NO_THROW (a.add (s.server, ZMQ_POLLIN, handler));
371+
ASSERT_EQ(1u, a.size ());
372+
zmq::poller_t b {std::move (a)};
373+
ASSERT_EQ(1u, b.size ());
374+
ASSERT_NO_THROW (b.wait (std::chrono::milliseconds {-1}));
375+
}
376+
377+
TEST(poller, wait_on_move_assign_poller)
378+
{
379+
server_client_setup s;
380+
ASSERT_NO_THROW (s.client.send ("Hi"));
381+
zmq::poller_t a;
382+
zmq::poller_t::handler_t handler;
383+
ASSERT_NO_THROW (a.add (s.server, ZMQ_POLLIN, handler));
384+
ASSERT_EQ(1u, a.size ());
385+
zmq::poller_t b;
386+
ASSERT_EQ(0u, b.size ());
387+
b = {std::move (a)};
388+
ASSERT_EQ(1u, b.size ());
389+
ASSERT_NO_THROW (b.wait (std::chrono::milliseconds {-1}));
390+
}
391+
392+
TEST(poller, received_on_move_construced_poller)
393+
{
394+
// Setup server and client
395+
server_client_setup s;
396+
int count = 0;
397+
// Setup poller a
398+
zmq::poller_t a;
399+
ASSERT_NO_THROW(a.add(s.server, ZMQ_POLLIN, [&count](short) {
400+
++count;
401+
}));
402+
// client sends message
403+
ASSERT_NO_THROW(s.client.send("Hi"));
404+
// wait for message and verify it is received
405+
a.wait(std::chrono::milliseconds{500});
406+
ASSERT_EQ(1u, count);
407+
// Move construct poller b
408+
zmq::poller_t b{std::move(a)};
409+
// client sends message again
410+
ASSERT_NO_THROW(s.client.send("Hi"));
411+
// wait for message and verify it is received
412+
b.wait(std::chrono::milliseconds{500});
413+
ASSERT_EQ(2u, count);
414+
}
415+
416+
343417
TEST(poller, remove_from_handler)
344418
{
345419
constexpr auto ITER_NO = 10;
@@ -354,9 +428,11 @@ TEST(poller, remove_from_handler)
354428
for (auto i = 0; i < ITER_NO; ++i) {
355429
ASSERT_NO_THROW(poller.add(setup_list[i].server, ZMQ_POLLIN, [&,i](short events) {
356430
ASSERT_EQ(events, ZMQ_POLLIN);
357-
poller.remove(setup_list[ITER_NO - i -1].server);
431+
poller.remove(setup_list[ITER_NO-i-1].server);
432+
ASSERT_EQ(ITER_NO-i-1, poller.size());
358433
}));
359434
}
435+
ASSERT_EQ(ITER_NO, poller.size());
360436
// Clients send messages
361437
for (auto & s : setup_list) {
362438
ASSERT_NO_THROW(s.client.send("Hi"));

zmq.hpp

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1018,36 +1018,19 @@ namespace zmq
10181018
class poller_t
10191019
{
10201020
public:
1021-
poller_t () : poller_ptr (zmq_poller_new ())
1021+
poller_t ()
10221022
{
10231023
if (!poller_ptr)
10241024
throw error_t ();
10251025
}
10261026

1027-
~poller_t ()
1028-
{
1029-
if (poller_ptr)
1030-
{
1031-
int rc = zmq_poller_destroy (&poller_ptr);
1032-
assert(rc == 0);
1033-
}
1034-
}
1027+
~poller_t () = default;
10351028

10361029
poller_t(const poller_t&) = delete;
10371030
poller_t &operator=(const poller_t&) = delete;
1038-
poller_t(poller_t&& src)
1039-
: poller_ptr(src.poller_ptr)
1040-
, poller_events(std::move (src.poller_events))
1041-
{
1042-
src.poller_ptr = NULL;
1043-
}
1044-
poller_t &operator=(poller_t&& src)
1045-
{
1046-
poller_ptr = src.poller_ptr;
1047-
poller_events = std::move (src.poller_events);
1048-
src.poller_ptr = NULL;
1049-
return *this;
1050-
}
1031+
1032+
poller_t(poller_t&& src) = default;
1033+
poller_t &operator=(poller_t&& src) = default;
10511034

10521035
using handler_t = std::function<void(short)>;
10531036

@@ -1056,7 +1039,7 @@ namespace zmq
10561039
auto it = std::end (handlers);
10571040
auto inserted = false;
10581041
std::tie(it, inserted) = handlers.emplace (socket.ptr, std::make_shared<handler_t> (std::move (handler)));
1059-
if (0 == zmq_poller_add (poller_ptr, socket.ptr, inserted && *(it->second) ? it->second.get() : nullptr, events)) {
1042+
if (0 == zmq_poller_add (poller_ptr.get (), socket.ptr, inserted && *(it->second) ? it->second.get() : nullptr, events)) {
10601043
need_rebuild = true;
10611044
return;
10621045
}
@@ -1068,7 +1051,7 @@ namespace zmq
10681051

10691052
void remove (zmq::socket_t &socket)
10701053
{
1071-
if (0 == zmq_poller_remove (poller_ptr, socket.ptr)) {
1054+
if (0 == zmq_poller_remove (poller_ptr.get (), socket.ptr)) {
10721055
handlers.erase (socket.ptr);
10731056
need_rebuild = true;
10741057
return;
@@ -1078,7 +1061,7 @@ namespace zmq
10781061

10791062
void modify (zmq::socket_t &socket, short events)
10801063
{
1081-
if (0 != zmq_poller_modify (poller_ptr, socket.ptr, events))
1064+
if (0 != zmq_poller_modify (poller_ptr.get (), socket.ptr, events))
10821065
throw error_t ();
10831066
}
10841067

@@ -1095,7 +1078,7 @@ namespace zmq
10951078
}
10961079
need_rebuild = false;
10971080
}
1098-
int rc = zmq_poller_wait_all (poller_ptr, poller_events.data (),
1081+
int rc = zmq_poller_wait_all (poller_ptr.get (), poller_events.data (),
10991082
static_cast<int> (poller_events.size ()),
11001083
static_cast<long>(timeout.count ()));
11011084
if (rc > 0) {
@@ -1116,8 +1099,25 @@ namespace zmq
11161099
throw error_t ();
11171100
}
11181101

1102+
bool empty () const
1103+
{
1104+
return handlers.empty ();
1105+
}
1106+
1107+
size_t size () const
1108+
{
1109+
return handlers.size ();
1110+
}
1111+
11191112
private:
1120-
void *poller_ptr {nullptr};
1113+
std::unique_ptr<void, std::function<void(void*)>> poller_ptr
1114+
{
1115+
zmq_poller_new (),
1116+
[](void *ptr) {
1117+
int rc = zmq_poller_destroy (&ptr);
1118+
ZMQ_ASSERT (rc == 0);
1119+
}
1120+
};
11211121
bool need_rebuild {false};
11221122
std::unordered_map<void*, std::shared_ptr<handler_t>> handlers {};
11231123
std::vector<zmq_poller_event_t> poller_events {};

0 commit comments

Comments
 (0)