@@ -2374,112 +2374,6 @@ class monitor_t
2374
2374
2375
2375
return process_event (items[0 ].revents );
2376
2376
}
2377
-
2378
- bool process_event (short events)
2379
- {
2380
- zmq::message_t eventMsg;
2381
-
2382
- if (events & ZMQ_POLLIN) {
2383
- int rc = zmq_msg_recv (eventMsg.handle (), _monitor_socket.handle (), 0 );
2384
- if (rc == -1 && zmq_errno () == ETERM)
2385
- return false ;
2386
- assert (rc != -1 );
2387
-
2388
- } else {
2389
- return false ;
2390
- }
2391
-
2392
- #if ZMQ_VERSION_MAJOR >= 4
2393
- const char *data = static_cast <const char *>(eventMsg.data ());
2394
- zmq_event_t msgEvent;
2395
- memcpy (&msgEvent.event , data, sizeof (uint16_t ));
2396
- data += sizeof (uint16_t );
2397
- memcpy (&msgEvent.value , data, sizeof (int32_t ));
2398
- zmq_event_t *event = &msgEvent;
2399
- #else
2400
- zmq_event_t *event = static_cast <zmq_event_t *>(eventMsg.data ());
2401
- #endif
2402
-
2403
- #ifdef ZMQ_NEW_MONITOR_EVENT_LAYOUT
2404
- zmq::message_t addrMsg;
2405
- int rc = zmq_msg_recv (addrMsg.handle (), _monitor_socket.handle (), 0 );
2406
- if (rc == -1 && zmq_errno () == ETERM) {
2407
- return false ;
2408
- }
2409
-
2410
- assert (rc != -1 );
2411
- std::string address = addrMsg.to_string ();
2412
- #else
2413
- // Bit of a hack, but all events in the zmq_event_t union have the same layout so this will work for all event types.
2414
- std::string address = event->data .connected .addr ;
2415
- #endif
2416
-
2417
- #ifdef ZMQ_EVENT_MONITOR_STOPPED
2418
- if (event->event == ZMQ_EVENT_MONITOR_STOPPED) {
2419
- return false ;
2420
- }
2421
-
2422
- #endif
2423
-
2424
- switch (event->event ) {
2425
- case ZMQ_EVENT_CONNECTED:
2426
- on_event_connected (*event, address.c_str ());
2427
- break ;
2428
- case ZMQ_EVENT_CONNECT_DELAYED:
2429
- on_event_connect_delayed (*event, address.c_str ());
2430
- break ;
2431
- case ZMQ_EVENT_CONNECT_RETRIED:
2432
- on_event_connect_retried (*event, address.c_str ());
2433
- break ;
2434
- case ZMQ_EVENT_LISTENING:
2435
- on_event_listening (*event, address.c_str ());
2436
- break ;
2437
- case ZMQ_EVENT_BIND_FAILED:
2438
- on_event_bind_failed (*event, address.c_str ());
2439
- break ;
2440
- case ZMQ_EVENT_ACCEPTED:
2441
- on_event_accepted (*event, address.c_str ());
2442
- break ;
2443
- case ZMQ_EVENT_ACCEPT_FAILED:
2444
- on_event_accept_failed (*event, address.c_str ());
2445
- break ;
2446
- case ZMQ_EVENT_CLOSED:
2447
- on_event_closed (*event, address.c_str ());
2448
- break ;
2449
- case ZMQ_EVENT_CLOSE_FAILED:
2450
- on_event_close_failed (*event, address.c_str ());
2451
- break ;
2452
- case ZMQ_EVENT_DISCONNECTED:
2453
- on_event_disconnected (*event, address.c_str ());
2454
- break ;
2455
- #if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 3, 0) || (defined(ZMQ_BUILD_DRAFT_API) && ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 3))
2456
- case ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL:
2457
- on_event_handshake_failed_no_detail (*event, address.c_str ());
2458
- break ;
2459
- case ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL:
2460
- on_event_handshake_failed_protocol (*event, address.c_str ());
2461
- break ;
2462
- case ZMQ_EVENT_HANDSHAKE_FAILED_AUTH:
2463
- on_event_handshake_failed_auth (*event, address.c_str ());
2464
- break ;
2465
- case ZMQ_EVENT_HANDSHAKE_SUCCEEDED:
2466
- on_event_handshake_succeeded (*event, address.c_str ());
2467
- break ;
2468
- #elif defined(ZMQ_BUILD_DRAFT_API) && ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 1)
2469
- case ZMQ_EVENT_HANDSHAKE_FAILED:
2470
- on_event_handshake_failed (*event, address.c_str ());
2471
- break ;
2472
- case ZMQ_EVENT_HANDSHAKE_SUCCEED:
2473
- on_event_handshake_succeed (*event, address.c_str ());
2474
- break ;
2475
- #endif
2476
- default :
2477
- on_event_unknown (*event, address.c_str ());
2478
- break ;
2479
- }
2480
-
2481
- return true ;
2482
- }
2483
2377
2484
2378
#ifdef ZMQ_EVENT_MONITOR_STOPPED
2485
2379
void abort ()
@@ -2588,12 +2482,120 @@ class monitor_t
2588
2482
(void ) addr_;
2589
2483
}
2590
2484
2485
+ protected:
2486
+ bool process_event (short events)
2487
+ {
2488
+ zmq::message_t eventMsg;
2489
+
2490
+ if (events & ZMQ_POLLIN) {
2491
+ int rc = zmq_msg_recv (eventMsg.handle (), _monitor_socket.handle (), 0 );
2492
+ if (rc == -1 && zmq_errno () == ETERM)
2493
+ return false ;
2494
+ assert (rc != -1 );
2495
+
2496
+ } else {
2497
+ return false ;
2498
+ }
2499
+
2500
+ #if ZMQ_VERSION_MAJOR >= 4
2501
+ const char *data = static_cast <const char *>(eventMsg.data ());
2502
+ zmq_event_t msgEvent;
2503
+ memcpy (&msgEvent.event , data, sizeof (uint16_t ));
2504
+ data += sizeof (uint16_t );
2505
+ memcpy (&msgEvent.value , data, sizeof (int32_t ));
2506
+ zmq_event_t *event = &msgEvent;
2507
+ #else
2508
+ zmq_event_t *event = static_cast <zmq_event_t *>(eventMsg.data ());
2509
+ #endif
2510
+
2511
+ #ifdef ZMQ_NEW_MONITOR_EVENT_LAYOUT
2512
+ zmq::message_t addrMsg;
2513
+ int rc = zmq_msg_recv (addrMsg.handle (), _monitor_socket.handle (), 0 );
2514
+ if (rc == -1 && zmq_errno () == ETERM) {
2515
+ return false ;
2516
+ }
2517
+
2518
+ assert (rc != -1 );
2519
+ std::string address = addrMsg.to_string ();
2520
+ #else
2521
+ // Bit of a hack, but all events in the zmq_event_t union have the same layout so this will work for all event types.
2522
+ std::string address = event->data .connected .addr ;
2523
+ #endif
2524
+
2525
+ #ifdef ZMQ_EVENT_MONITOR_STOPPED
2526
+ if (event->event == ZMQ_EVENT_MONITOR_STOPPED) {
2527
+ return false ;
2528
+ }
2529
+
2530
+ #endif
2531
+
2532
+ switch (event->event ) {
2533
+ case ZMQ_EVENT_CONNECTED:
2534
+ on_event_connected (*event, address.c_str ());
2535
+ break ;
2536
+ case ZMQ_EVENT_CONNECT_DELAYED:
2537
+ on_event_connect_delayed (*event, address.c_str ());
2538
+ break ;
2539
+ case ZMQ_EVENT_CONNECT_RETRIED:
2540
+ on_event_connect_retried (*event, address.c_str ());
2541
+ break ;
2542
+ case ZMQ_EVENT_LISTENING:
2543
+ on_event_listening (*event, address.c_str ());
2544
+ break ;
2545
+ case ZMQ_EVENT_BIND_FAILED:
2546
+ on_event_bind_failed (*event, address.c_str ());
2547
+ break ;
2548
+ case ZMQ_EVENT_ACCEPTED:
2549
+ on_event_accepted (*event, address.c_str ());
2550
+ break ;
2551
+ case ZMQ_EVENT_ACCEPT_FAILED:
2552
+ on_event_accept_failed (*event, address.c_str ());
2553
+ break ;
2554
+ case ZMQ_EVENT_CLOSED:
2555
+ on_event_closed (*event, address.c_str ());
2556
+ break ;
2557
+ case ZMQ_EVENT_CLOSE_FAILED:
2558
+ on_event_close_failed (*event, address.c_str ());
2559
+ break ;
2560
+ case ZMQ_EVENT_DISCONNECTED:
2561
+ on_event_disconnected (*event, address.c_str ());
2562
+ break ;
2563
+ #if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 3, 0) || (defined(ZMQ_BUILD_DRAFT_API) && ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 3))
2564
+ case ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL:
2565
+ on_event_handshake_failed_no_detail (*event, address.c_str ());
2566
+ break ;
2567
+ case ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL:
2568
+ on_event_handshake_failed_protocol (*event, address.c_str ());
2569
+ break ;
2570
+ case ZMQ_EVENT_HANDSHAKE_FAILED_AUTH:
2571
+ on_event_handshake_failed_auth (*event, address.c_str ());
2572
+ break ;
2573
+ case ZMQ_EVENT_HANDSHAKE_SUCCEEDED:
2574
+ on_event_handshake_succeeded (*event, address.c_str ());
2575
+ break ;
2576
+ #elif defined(ZMQ_BUILD_DRAFT_API) && ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 1)
2577
+ case ZMQ_EVENT_HANDSHAKE_FAILED:
2578
+ on_event_handshake_failed (*event, address.c_str ());
2579
+ break ;
2580
+ case ZMQ_EVENT_HANDSHAKE_SUCCEED:
2581
+ on_event_handshake_succeed (*event, address.c_str ());
2582
+ break ;
2583
+ #endif
2584
+ default :
2585
+ on_event_unknown (*event, address.c_str ());
2586
+ break ;
2587
+ }
2588
+
2589
+ return true ;
2590
+ }
2591
+
2592
+ socket_t _monitor_socket;
2593
+
2591
2594
private:
2592
2595
monitor_t (const monitor_t &) ZMQ_DELETED_FUNCTION;
2593
2596
void operator =(const monitor_t &) ZMQ_DELETED_FUNCTION;
2594
2597
2595
2598
socket_ref _socket;
2596
- socket_t _monitor_socket;
2597
2599
2598
2600
void close () ZMQ_NOTHROW
2599
2601
{
0 commit comments