8
8
9
9
class mock_monitor_t : public zmq ::monitor_t
10
10
{
11
- public:
11
+ public:
12
12
13
13
void on_event_connected (const zmq_event_t &, const char *) ZMQ_OVERRIDE
14
14
{
@@ -89,7 +89,7 @@ TEST_CASE("monitor init abort", "[monitor]")
89
89
{
90
90
class mock_monitor : public mock_monitor_t
91
91
{
92
- public:
92
+ public:
93
93
mock_monitor (std::function<void (void )> handle_connected) :
94
94
handle_connected{std::move (handle_connected)}
95
95
{
@@ -128,7 +128,7 @@ TEST_CASE("monitor init abort", "[monitor]")
128
128
{
129
129
std::unique_lock<std::mutex> lock (mutex);
130
130
CHECK (cond_var.wait_for (lock, std::chrono::seconds (1 ),
131
- [&done] { return done; }));
131
+ [&done] { return done; }));
132
132
}
133
133
CHECK (monitor.connected == 1 );
134
134
monitor.abort ();
@@ -150,3 +150,90 @@ TEST_CASE("monitor from move assigned socket", "[monitor]")
150
150
// failing
151
151
}
152
152
#endif
153
+
154
+ #if defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) \
155
+ && !defined(ZMQ_CPP11_PARTIAL) && defined(ZMQ_HAVE_POLLER)
156
+ #include " zmq_addon.hpp"
157
+ using namespace std ::literals::chrono_literals;
158
+
159
+ TEST_CASE (" poll monitor events using active poller" , " [monitor]" )
160
+ {
161
+ // define necessary class for test
162
+ class test_monitor : public zmq ::monitor_t
163
+ {
164
+ public:
165
+ void init (zmq::socket_t &socket,
166
+ const char *const addr_,
167
+ int events = ZMQ_EVENT_ALL)
168
+ {
169
+ zmq::monitor_t::init (socket, addr_, events);
170
+ }
171
+
172
+ void addToPoller (zmq::active_poller_t &inActivePoller)
173
+ {
174
+ inActivePoller.add (
175
+ _monitor_socket, zmq::event_flags::pollin,
176
+ [&](zmq::event_flags ef) { process_event (static_cast <short >(ef)); });
177
+ }
178
+
179
+ void on_event_accepted (const zmq_event_t &event_,
180
+ const char *addr_) override
181
+ {
182
+ clientAccepted++;
183
+ }
184
+ void on_event_disconnected (const zmq_event_t &event,
185
+ const char *const addr) override
186
+ {
187
+ clientDisconnected++;
188
+ }
189
+
190
+ int clientAccepted = 0 ;
191
+ int clientDisconnected = 0 ;
192
+ };
193
+
194
+ // Arrange
195
+ int messageCounter = 0 ;
196
+ const char monitorAddress[] = " inproc://monitor-server" ;
197
+
198
+ auto addToPoller = [&](zmq::socket_t &socket, zmq::active_poller_t &poller) {
199
+ poller.add (socket, zmq::event_flags::pollin, [&](zmq::event_flags ef) {
200
+ zmq::message_t msg;
201
+ auto result = socket.recv (msg, zmq::recv_flags::dontwait);
202
+ messageCounter++;
203
+ });
204
+ };
205
+
206
+ common_server_client_setup sockets (false );
207
+
208
+ test_monitor monitor;
209
+ monitor.init (sockets.server , monitorAddress);
210
+
211
+ zmq::active_poller_t poller;
212
+ monitor.addToPoller (poller);
213
+ addToPoller (sockets.server , poller);
214
+
215
+ sockets.init ();
216
+ sockets.client .send (zmq::message_t (0 ), zmq::send_flags::dontwait);
217
+ CHECK (monitor.clientAccepted == 0 );
218
+ CHECK (monitor.clientDisconnected == 0 );
219
+
220
+ // Act
221
+ for (int i = 0 ; i < 10 ; i++) {
222
+ poller.wait (10ms);
223
+ }
224
+ CHECK (monitor.clientAccepted == 1 );
225
+ CHECK (monitor.clientDisconnected == 0 );
226
+
227
+ sockets.client .close ();
228
+
229
+ for (int i = 0 ; i < 10 ; i++) {
230
+ poller.wait (10ms);
231
+ }
232
+ sockets.server .close ();
233
+
234
+ // Assert
235
+ CHECK (messageCounter == 1 );
236
+ CHECK (monitor.clientAccepted == 1 );
237
+ CHECK (monitor.clientDisconnected == 1 );
238
+ }
239
+ #endif
0 commit comments