Skip to content

Commit 17e1d97

Browse files
authored
Merge pull request #215 from kurdybacha/poller_modify
Problem: poller_t does not support modify
2 parents dcdf828 + 3fcd58d commit 17e1d97

File tree

2 files changed

+112
-42
lines changed

2 files changed

+112
-42
lines changed

tests/poller.cpp

Lines changed: 106 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,6 @@ TEST(poller, remove_unregistered_throws)
113113
ASSERT_THROW(poller.remove(socket), zmq::error_t);
114114
}
115115

116-
/// \todo this should lead to an exception instead
117116
TEST(poller, remove_registered_empty)
118117
{
119118
zmq::context_t context;
@@ -156,76 +155,87 @@ class loopback_ip4_binder
156155
std::string endpoint_;
157156
};
158157

159-
} //namespace
160-
161-
TEST(poller, poll_basic)
158+
struct server_client_setup
162159
{
160+
server_client_setup ()
161+
{
162+
init ();
163+
}
164+
165+
void init()
166+
{
167+
endpoint = loopback_ip4_binder {server}.endpoint ();
168+
ASSERT_NO_THROW (client.connect (endpoint));
169+
}
170+
171+
zmq::poller_t::handler_t handler = [&](short e) {
172+
events = e;
173+
};
174+
163175
zmq::context_t context;
176+
zmq::socket_t server {context, zmq::socket_type::router};
177+
zmq::socket_t client {context, zmq::socket_type::dealer};
178+
std::string endpoint;
179+
short events = 0;
180+
};
164181

165-
zmq::socket_t vent{context, zmq::socket_type::push};
166-
auto endpoint = loopback_ip4_binder(vent).endpoint();
182+
} //namespace
167183

168-
zmq::socket_t sink{context, zmq::socket_type::pull};
169-
ASSERT_NO_THROW(sink.connect(endpoint));
184+
TEST(poller, poll_basic)
185+
{
186+
server_client_setup s;
170187

171-
ASSERT_NO_THROW(vent.send("Hi"));
188+
ASSERT_NO_THROW(s.client.send("Hi"));
172189

173190
zmq::poller_t poller;
174191
bool message_received = false;
175192
zmq::poller_t::handler_t handler = [&message_received](short events) {
176193
ASSERT_TRUE(0 != (events & ZMQ_POLLIN));
177194
message_received = true;
178195
};
179-
ASSERT_NO_THROW(poller.add(sink, ZMQ_POLLIN, handler));
196+
ASSERT_NO_THROW(poller.add(s.server, ZMQ_POLLIN, handler));
180197
ASSERT_NO_THROW(poller.wait(std::chrono::milliseconds{-1}));
181198
ASSERT_TRUE(message_received);
182199
}
183200

184201
/// \todo this contains multiple test cases that should be split up
185202
TEST(poller, client_server)
186203
{
187-
zmq::context_t context;
188204
const std::string send_msg = "Hi";
189205

190-
// Setup server
191-
zmq::socket_t server{context, zmq::socket_type::router};
192-
auto endpoint = loopback_ip4_binder(server).endpoint();
206+
// Setup server and client
207+
server_client_setup s;
193208

194209
// Setup poller
195210
zmq::poller_t poller;
196-
bool got_pollin = false;
197-
bool got_pollout = false;
198-
zmq::poller_t::handler_t handler = [&](short events) {
199-
if (0 != (events & ZMQ_POLLIN)) {
211+
short events;
212+
zmq::poller_t::handler_t handler = [&](short e) {
213+
if (0 != (e & ZMQ_POLLIN)) {
200214
zmq::message_t zmq_msg;
201-
ASSERT_NO_THROW(server.recv(&zmq_msg)); // skip msg id
202-
ASSERT_NO_THROW(server.recv(&zmq_msg)); // get message
215+
ASSERT_NO_THROW(s.server.recv(&zmq_msg)); // skip msg id
216+
ASSERT_NO_THROW(s.server.recv(&zmq_msg)); // get message
203217
std::string recv_msg(zmq_msg.data<char>(),
204218
zmq_msg.size());
205219
ASSERT_EQ(send_msg, recv_msg);
206-
got_pollin = true;
207-
} else if (0 != (events & ZMQ_POLLOUT)) {
208-
got_pollout = true;
209-
} else {
220+
} else if (0 != (e & ~ZMQ_POLLOUT)) {
210221
ASSERT_TRUE(false) << "Unexpected event type " << events;
211222
}
223+
events = e;
212224
};
213-
ASSERT_NO_THROW(poller.add(server, ZMQ_POLLIN, handler));
214225

215-
// Setup client and send message
216-
zmq::socket_t client{context, zmq::socket_type::dealer};
217-
ASSERT_NO_THROW(client.connect(endpoint));
218-
ASSERT_NO_THROW(client.send(send_msg));
226+
ASSERT_NO_THROW(poller.add(s.server, ZMQ_POLLIN, handler));
227+
228+
// client sends message
229+
ASSERT_NO_THROW(s.client.send(send_msg));
219230

220231
ASSERT_NO_THROW(poller.wait(std::chrono::milliseconds{-1}));
221-
ASSERT_TRUE(got_pollin);
222-
ASSERT_FALSE(got_pollout);
232+
ASSERT_EQ(events, ZMQ_POLLIN);
223233

224234
// Re-add server socket with pollout flag
225-
ASSERT_NO_THROW(poller.remove(server));
226-
ASSERT_NO_THROW(poller.add(server, ZMQ_POLLIN | ZMQ_POLLOUT, handler));
235+
ASSERT_NO_THROW(poller.remove(s.server));
236+
ASSERT_NO_THROW(poller.add(s.server, ZMQ_POLLIN | ZMQ_POLLOUT, handler));
227237
ASSERT_NO_THROW(poller.wait(std::chrono::milliseconds{-1}));
228-
ASSERT_TRUE(got_pollout);
238+
ASSERT_EQ(events, ZMQ_POLLOUT);
229239
}
230240

231241
TEST(poller, poller_add_invalid_socket_throws)
@@ -253,18 +263,72 @@ TEST(poller, poller_remove_invalid_socket_throws)
253263
}
254264

255265
TEST(poller, wait_on_added_empty_handler)
266+
{
267+
server_client_setup s;
268+
ASSERT_NO_THROW(s.client.send("Hi"));
269+
zmq::poller_t poller;
270+
std::function<void(void)> handler;
271+
ASSERT_NO_THROW(poller.add(s.server, ZMQ_POLLIN, handler));
272+
ASSERT_NO_THROW(poller.wait(std::chrono::milliseconds{-1}));
273+
}
274+
275+
TEST(poller, modify_empty_throws)
256276
{
257277
zmq::context_t context;
258-
zmq::socket_t vent{context, zmq::socket_type::push};
259-
auto endpoint = loopback_ip4_binder(vent).endpoint();
278+
zmq::socket_t socket {context, zmq::socket_type::push};
279+
zmq::poller_t poller;
280+
ASSERT_THROW (poller.modify (socket, ZMQ_POLLIN), zmq::error_t);
281+
}
260282

261-
zmq::socket_t sink{context, zmq::socket_type::pull};
262-
ASSERT_NO_THROW(sink.connect(endpoint));
263-
ASSERT_NO_THROW(vent.send("Hi"));
283+
TEST(poller, modify_invalid_socket_throws)
284+
{
285+
zmq::context_t context;
286+
zmq::socket_t a {context, zmq::socket_type::push};
287+
zmq::socket_t b {std::move (a)};
288+
zmq::poller_t poller;
289+
ASSERT_THROW (poller.modify (a, ZMQ_POLLIN), zmq::error_t);
290+
}
264291

292+
TEST(poller, modify_not_added_throws)
293+
{
294+
zmq::context_t context;
295+
zmq::socket_t a {context, zmq::socket_type::push};
296+
zmq::socket_t b {context, zmq::socket_type::push};
265297
zmq::poller_t poller;
266-
std::function<void(void)> handler;
267-
ASSERT_NO_THROW(poller.add(sink, ZMQ_POLLIN, handler));
268-
ASSERT_NO_THROW(poller.wait(std::chrono::milliseconds{-1}));
298+
ASSERT_NO_THROW (poller.add (a, ZMQ_POLLIN, zmq::poller_t::handler_t {}));
299+
ASSERT_THROW (poller.modify (b, ZMQ_POLLIN), zmq::error_t);
300+
}
301+
302+
TEST(poller, modify_simple)
303+
{
304+
zmq::context_t context;
305+
zmq::socket_t a {context, zmq::socket_type::push};
306+
zmq::poller_t poller;
307+
ASSERT_NO_THROW (poller.add (a, ZMQ_POLLIN, zmq::poller_t::handler_t {}));
308+
ASSERT_NO_THROW (poller.modify (a, ZMQ_POLLIN|ZMQ_POLLOUT));
309+
}
310+
311+
TEST(poller, poll_client_server)
312+
{
313+
// Setup server and client
314+
server_client_setup s;
315+
316+
// Setup poller
317+
zmq::poller_t poller;
318+
ASSERT_NO_THROW(poller.add(s.server, ZMQ_POLLIN, s.handler));
319+
320+
// client sends message
321+
ASSERT_NO_THROW(s.client.send("Hi"));
322+
323+
// wait for message and verify events
324+
ASSERT_NO_THROW(poller.wait(std::chrono::milliseconds{500}));
325+
ASSERT_TRUE(s.events == ZMQ_POLLIN);
326+
ASSERT_EQ(s.events, ZMQ_POLLIN);
327+
328+
// Modify server socket with pollout flag
329+
ASSERT_NO_THROW(poller.modify(s.server, ZMQ_POLLIN | ZMQ_POLLOUT));
330+
ASSERT_NO_THROW(poller.wait(std::chrono::milliseconds{500}));
331+
ASSERT_EQ(s.events, ZMQ_POLLIN | ZMQ_POLLOUT);
269332
}
333+
270334
#endif

zmq.hpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1083,6 +1083,12 @@ namespace zmq
10831083
throw error_t ();
10841084
}
10851085

1086+
void modify (zmq::socket_t &socket, short events)
1087+
{
1088+
if (0 != zmq_poller_modify (poller_ptr, socket.ptr, events))
1089+
throw error_t ();
1090+
}
1091+
10861092
bool wait (std::chrono::milliseconds timeout)
10871093
{
10881094
int rc = zmq_poller_wait_all (poller_ptr, poller_events.data (), static_cast<int> (poller_events.size ()), static_cast<long>(timeout.count ()));

0 commit comments

Comments
 (0)