Skip to content

Commit 1975818

Browse files
kurdybachabluca
authored andcommitted
Problem: poller's handler not aware of event type. (#200)
* Problem: Poller's handler not aware of event type. It was possible to register a handler for more than one event types but impossible to distinguish which event is being handled. Now events are passed to a handler. Besides that some other changes: * new test covering changes in the poller * modified existing tests to cover changes in the poller * defined handler_t in poller_t scope for more convinient use and simpler code * helper loopback binder to be re-used in tests * Problem: CMake build fails on Windows Issue #199 It seems that with GCC on Linux <array> is implicitly included by one of stl includes already in zmq.hpp but it breaks on Windows with Visual Studio. Adding explicit include for array. Can not verify right now but this change is a good practice so creating a pull request. * Poller: array include not between C++11 guards
1 parent 65475cb commit 1975818

File tree

2 files changed

+86
-15
lines changed

2 files changed

+86
-15
lines changed

tests/poller.cpp

Lines changed: 82 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33

44
#if defined(ZMQ_CPP11) && defined(ZMQ_BUILD_DRAFT_API)
55

6+
#include <array>
7+
68
TEST(poller, create_destroy)
79
{
810
zmq::poller_t poller;
@@ -26,7 +28,7 @@ TEST(poller, add_handler)
2628
zmq::context_t context;
2729
zmq::socket_t socket{context, zmq::socket_type::router};
2830
zmq::poller_t poller;
29-
std::function<void()> handler = [](){};
31+
zmq::poller_t::handler_t handler;
3032
ASSERT_NO_THROW(poller.add(socket, ZMQ_POLLIN, handler));
3133
}
3234

@@ -35,7 +37,7 @@ TEST(poller, add_handler_invalid_events_type)
3537
zmq::context_t context;
3638
zmq::socket_t socket{context, zmq::socket_type::router};
3739
zmq::poller_t poller;
38-
std::function<void()> handler = [](){};
40+
zmq::poller_t::handler_t handler;
3941
short invalid_events_type = 2 << 10;
4042
ASSERT_NO_THROW(poller.add(socket, invalid_events_type, handler));
4143
}
@@ -45,7 +47,7 @@ TEST(poller, add_handler_twice_throws)
4547
zmq::context_t context;
4648
zmq::socket_t socket{context, zmq::socket_type::router};
4749
zmq::poller_t poller;
48-
std::function<void()> handler = [](){};
50+
zmq::poller_t::handler_t handler;
4951
poller.add(socket, ZMQ_POLLIN, handler);
5052
ASSERT_THROW(poller.add(socket, ZMQ_POLLIN, handler), zmq::error_t);
5153
}
@@ -69,26 +71,46 @@ TEST(poller, remove_registered)
6971
zmq::context_t context;
7072
zmq::socket_t socket{context, zmq::socket_type::router};
7173
zmq::poller_t poller;
72-
std::function<void()> handler = [](){};
74+
zmq::poller_t::handler_t handler;
7375
poller.add(socket, ZMQ_POLLIN, handler);
7476
ASSERT_NO_THROW(poller.remove(socket));
7577
}
7678

79+
namespace {
80+
81+
class loopback_ip4_binder
82+
{
83+
public:
84+
loopback_ip4_binder(zmq::socket_t &socket) { bind(socket); }
85+
std::string endpoint() { return endpoint_; }
86+
private:
87+
// Helper function used in constructor
88+
// as Gtest allows only void returning functions
89+
// and constructor/destructor are not.
90+
void bind(zmq::socket_t &socket)
91+
{
92+
ASSERT_NO_THROW(socket.bind("tcp://127.0.0.1:*"));
93+
std::array<char, 100> endpoint{};
94+
size_t endpoint_size = endpoint.size();
95+
ASSERT_NO_THROW(socket.getsockopt(ZMQ_LAST_ENDPOINT, endpoint.data(),
96+
&endpoint_size));
97+
ASSERT_TRUE(endpoint_size < endpoint.size());
98+
endpoint_ = std::string{endpoint.data()};
99+
}
100+
std::string endpoint_;
101+
};
102+
103+
} //namespace
104+
77105
TEST(poller, poll_basic)
78106
{
79107
zmq::context_t context;
80108

81109
zmq::socket_t vent{context, zmq::socket_type::push};
82-
ASSERT_NO_THROW(vent.bind("tcp://127.0.0.1:*"));
110+
auto endpoint = loopback_ip4_binder(vent).endpoint();
83111

84112
zmq::socket_t sink{context, zmq::socket_type::pull};
85-
// TODO: this should be simpler...
86-
std::array<char, 100> endpoint{};
87-
size_t endpoint_size = endpoint.size();
88-
ASSERT_NO_THROW(vent.getsockopt(ZMQ_LAST_ENDPOINT, endpoint.data(),
89-
&endpoint_size));
90-
ASSERT_TRUE(endpoint_size < endpoint.size());
91-
ASSERT_NO_THROW(sink.connect(endpoint.data()));
113+
ASSERT_NO_THROW(sink.connect(endpoint));
92114

93115
std::string message = "H";
94116

@@ -97,12 +119,59 @@ TEST(poller, poll_basic)
97119

98120
zmq::poller_t poller;
99121
bool message_received = false;
100-
std::function<void()> handler = [&message_received]() {
122+
zmq::poller_t::handler_t handler = [&message_received](short events) {
123+
ASSERT_TRUE(0 != (events & ZMQ_POLLIN));
101124
message_received = true;
102125
};
103126
ASSERT_NO_THROW(poller.add(sink, ZMQ_POLLIN, handler));
104127
ASSERT_NO_THROW(poller.wait(std::chrono::milliseconds{-1}));
105128
ASSERT_TRUE(message_received);
106129
}
107130

131+
TEST(poller, client_server)
132+
{
133+
zmq::context_t context;
134+
std::string send_msg = "Hi";
135+
136+
// Setup server
137+
zmq::socket_t server{context, zmq::socket_type::router};
138+
auto endpoint = loopback_ip4_binder(server).endpoint();
139+
140+
// Setup poller
141+
zmq::poller_t poller;
142+
bool got_pollin = false;
143+
bool got_pollout = false;
144+
zmq::poller_t::handler_t handler = [&](short events) {
145+
if (0 != (events & ZMQ_POLLIN)) {
146+
zmq::message_t zmq_msg;
147+
ASSERT_NO_THROW(server.recv(&zmq_msg)); // skip msg id
148+
ASSERT_NO_THROW(server.recv(&zmq_msg)); // get message
149+
std::string recv_msg(zmq_msg.data<char>(),
150+
zmq_msg.size());
151+
ASSERT_EQ(send_msg, recv_msg);
152+
got_pollin = true;
153+
} else if (0 != (events & ZMQ_POLLOUT)) {
154+
got_pollout = true;
155+
} else {
156+
ASSERT_TRUE(false) << "Unexpected event type " << events;
157+
}
158+
};
159+
ASSERT_NO_THROW(poller.add(server, ZMQ_POLLIN, handler));
160+
161+
// Setup client and send message
162+
zmq::socket_t client{context, zmq::socket_type::dealer};
163+
ASSERT_NO_THROW(client.connect(endpoint));
164+
ASSERT_NO_THROW(client.send(std::begin(send_msg), std::end(send_msg)));
165+
166+
ASSERT_NO_THROW(poller.wait(std::chrono::milliseconds{-1}));
167+
ASSERT_TRUE(got_pollin);
168+
ASSERT_FALSE(got_pollout);
169+
170+
// Re-add server socket with pollout flag
171+
ASSERT_NO_THROW(poller.remove(server));
172+
ASSERT_NO_THROW(poller.add(server, ZMQ_POLLIN | ZMQ_POLLOUT, handler));
173+
ASSERT_NO_THROW(poller.wait(std::chrono::milliseconds{-1}));
174+
ASSERT_TRUE(got_pollout);
175+
}
176+
108177
#endif

zmq.hpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1020,7 +1020,9 @@ namespace zmq
10201020
return *this;
10211021
}
10221022

1023-
void add (zmq::socket_t &socket, short events, std::function<void(void)> &handler)
1023+
using handler_t = std::function<void(short)>;
1024+
1025+
void add (zmq::socket_t &socket, short events, handler_t &handler)
10241026
{
10251027
if (0 == zmq_poller_add (poller_ptr, socket.ptr, handler ? &handler : NULL, events)) {
10261028
poller_events.emplace_back (zmq_poller_event_t ());
@@ -1044,7 +1046,7 @@ namespace zmq
10441046
if (rc >= 0) {
10451047
std::for_each (poller_events.begin (), poller_events.begin () + rc, [](zmq_poller_event_t& event) {
10461048
if (event.user_data != NULL)
1047-
(*reinterpret_cast<std::function<void(void)>*> (event.user_data)) ();
1049+
(*reinterpret_cast<handler_t*> (event.user_data)) (event.events);
10481050
});
10491051
return true;
10501052
}

0 commit comments

Comments
 (0)