Skip to content

Commit b838563

Browse files
authored
Merge pull request #134 from a4z/monitor
Problem: monitor_t::monitor function is blocking
2 parents b0e6d4b + d4da63f commit b838563

File tree

1 file changed

+150
-73
lines changed

1 file changed

+150
-73
lines changed

zmq.hpp

Lines changed: 150 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -703,120 +703,192 @@ namespace zmq
703703
class monitor_t
704704
{
705705
public:
706-
monitor_t() : socketPtr(NULL) {}
707-
virtual ~monitor_t() {}
706+
monitor_t() : socketPtr(NULL), monitor_socket{NULL} {}
707+
708+
virtual ~monitor_t()
709+
{
710+
if (socketPtr)
711+
zmq_socket_monitor(socketPtr, NULL, 0);
712+
713+
if (monitor_socket)
714+
zmq_close (monitor_socket);
715+
716+
}
717+
718+
719+
#ifdef ZMQ_HAS_RVALUE_REFS
720+
monitor_t(monitor_t&& rhs) ZMQ_NOTHROW :
721+
socketPtr(rhs.socketPtr),
722+
monitor_socket(rhs.monitor_socket)
723+
{
724+
rhs.socketPtr = NULL;
725+
rhs.monitor_socket = NULL;
726+
}
727+
728+
socket_t& operator=(socket_t&& rhs) ZMQ_DELETED_FUNCTION ;
729+
#endif
730+
731+
708732

709733
void monitor(socket_t &socket, std::string const& addr, int events = ZMQ_EVENT_ALL)
710734
{
711735
monitor(socket, addr.c_str(), events);
712736
}
713737

714738
void monitor(socket_t &socket, const char *addr_, int events = ZMQ_EVENT_ALL)
739+
{
740+
init (socket, addr_, events) ;
741+
while(true)
742+
{
743+
check_event(-1) ;
744+
}
745+
}
746+
747+
void init(socket_t &socket, std::string const& addr, int events = ZMQ_EVENT_ALL)
748+
{
749+
init(socket, addr.c_str(), events);
750+
}
751+
752+
void init(socket_t &socket, const char *addr_, int events = ZMQ_EVENT_ALL)
715753
{
716754
int rc = zmq_socket_monitor(socket.ptr, addr_, events);
717755
if (rc != 0)
718756
throw error_t ();
719757

720758
socketPtr = socket.ptr;
721-
void *s = zmq_socket (socket.ctxptr, ZMQ_PAIR);
722-
assert (s);
759+
monitor_socket = zmq_socket (socket.ctxptr, ZMQ_PAIR);
760+
assert (monitor_socket);
723761

724-
rc = zmq_connect (s, addr_);
762+
rc = zmq_connect (monitor_socket, addr_);
725763
assert (rc == 0);
726764

727765
on_monitor_started();
766+
}
767+
768+
bool check_event(int timeout = 0)
769+
{
770+
assert (monitor_socket);
771+
772+
zmq_msg_t eventMsg;
773+
zmq_msg_init (&eventMsg);
774+
775+
zmq::pollitem_t items [] = {
776+
{ monitor_socket, 0, ZMQ_POLLIN, 0 },
777+
};
778+
779+
zmq::poll (&items [0], 1, timeout);
728780

729-
while (true) {
730-
zmq_msg_t eventMsg;
731-
zmq_msg_init (&eventMsg);
732-
rc = zmq_msg_recv (&eventMsg, s, 0);
781+
if (items [0].revents & ZMQ_POLLIN)
782+
{
783+
int rc = zmq_msg_recv (&eventMsg, monitor_socket, 0);
733784
if (rc == -1 && zmq_errno() == ETERM)
734-
break;
785+
return false;
735786
assert (rc != -1);
787+
788+
}
789+
else
790+
{
791+
zmq_msg_close (&eventMsg);
792+
return false;
793+
}
794+
736795
#if ZMQ_VERSION_MAJOR >= 4
737-
const char* data = static_cast<const char*>(zmq_msg_data(&eventMsg));
738-
zmq_event_t msgEvent;
739-
memcpy(&msgEvent.event, data, sizeof(uint16_t)); data += sizeof(uint16_t);
740-
memcpy(&msgEvent.value, data, sizeof(int32_t));
741-
zmq_event_t* event = &msgEvent;
796+
const char* data = static_cast<const char*>(zmq_msg_data(&eventMsg));
797+
zmq_event_t msgEvent;
798+
memcpy(&msgEvent.event, data, sizeof(uint16_t)); data += sizeof(uint16_t);
799+
memcpy(&msgEvent.value, data, sizeof(int32_t));
800+
zmq_event_t* event = &msgEvent;
742801
#else
743-
zmq_event_t* event = static_cast<zmq_event_t*>(zmq_msg_data(&eventMsg));
802+
zmq_event_t* event = static_cast<zmq_event_t*>(zmq_msg_data(&eventMsg));
744803
#endif
745804

746805
#ifdef ZMQ_NEW_MONITOR_EVENT_LAYOUT
747-
zmq_msg_t addrMsg;
748-
zmq_msg_init (&addrMsg);
749-
rc = zmq_msg_recv (&addrMsg, s, 0);
750-
if (rc == -1 && zmq_errno() == ETERM)
751-
break;
752-
assert (rc != -1);
753-
const char* str = static_cast<const char*>(zmq_msg_data (&addrMsg));
754-
std::string address(str, str + zmq_msg_size(&addrMsg));
755-
zmq_msg_close (&addrMsg);
806+
zmq_msg_t addrMsg;
807+
zmq_msg_init (&addrMsg);
808+
int rc = zmq_msg_recv (&addrMsg, monitor_socket, 0);
809+
if (rc == -1 && zmq_errno() == ETERM)
810+
{
811+
zmq_msg_close (&eventMsg);
812+
return false;
813+
}
814+
815+
assert (rc != -1);
816+
const char* str = static_cast<const char*>(zmq_msg_data (&addrMsg));
817+
std::string address(str, str + zmq_msg_size(&addrMsg));
818+
zmq_msg_close (&addrMsg);
756819
#else
757-
// Bit of a hack, but all events in the zmq_event_t union have the same layout so this will work for all event types.
758-
std::string address = event->data.connected.addr;
820+
// Bit of a hack, but all events in the zmq_event_t union have the same layout so this will work for all event types.
821+
std::string address = event->data.connected.addr;
759822
#endif
760823

761824
#ifdef ZMQ_EVENT_MONITOR_STOPPED
762-
if (event->event == ZMQ_EVENT_MONITOR_STOPPED)
763-
break;
825+
if (event->event == ZMQ_EVENT_MONITOR_STOPPED)
826+
{
827+
zmq_msg_close (&eventMsg);
828+
return true;
829+
}
830+
764831
#endif
765832

766-
switch (event->event) {
767-
case ZMQ_EVENT_CONNECTED:
768-
on_event_connected(*event, address.c_str());
769-
break;
770-
case ZMQ_EVENT_CONNECT_DELAYED:
771-
on_event_connect_delayed(*event, address.c_str());
772-
break;
773-
case ZMQ_EVENT_CONNECT_RETRIED:
774-
on_event_connect_retried(*event, address.c_str());
775-
break;
776-
case ZMQ_EVENT_LISTENING:
777-
on_event_listening(*event, address.c_str());
778-
break;
779-
case ZMQ_EVENT_BIND_FAILED:
780-
on_event_bind_failed(*event, address.c_str());
781-
break;
782-
case ZMQ_EVENT_ACCEPTED:
783-
on_event_accepted(*event, address.c_str());
784-
break;
785-
case ZMQ_EVENT_ACCEPT_FAILED:
786-
on_event_accept_failed(*event, address.c_str());
787-
break;
788-
case ZMQ_EVENT_CLOSED:
789-
on_event_closed(*event, address.c_str());
790-
break;
791-
case ZMQ_EVENT_CLOSE_FAILED:
792-
on_event_close_failed(*event, address.c_str());
793-
break;
794-
case ZMQ_EVENT_DISCONNECTED:
795-
on_event_disconnected(*event, address.c_str());
796-
break;
833+
switch (event->event) {
834+
case ZMQ_EVENT_CONNECTED:
835+
on_event_connected(*event, address.c_str());
836+
break;
837+
case ZMQ_EVENT_CONNECT_DELAYED:
838+
on_event_connect_delayed(*event, address.c_str());
839+
break;
840+
case ZMQ_EVENT_CONNECT_RETRIED:
841+
on_event_connect_retried(*event, address.c_str());
842+
break;
843+
case ZMQ_EVENT_LISTENING:
844+
on_event_listening(*event, address.c_str());
845+
break;
846+
case ZMQ_EVENT_BIND_FAILED:
847+
on_event_bind_failed(*event, address.c_str());
848+
break;
849+
case ZMQ_EVENT_ACCEPTED:
850+
on_event_accepted(*event, address.c_str());
851+
break;
852+
case ZMQ_EVENT_ACCEPT_FAILED:
853+
on_event_accept_failed(*event, address.c_str());
854+
break;
855+
case ZMQ_EVENT_CLOSED:
856+
on_event_closed(*event, address.c_str());
857+
break;
858+
case ZMQ_EVENT_CLOSE_FAILED:
859+
on_event_close_failed(*event, address.c_str());
860+
break;
861+
case ZMQ_EVENT_DISCONNECTED:
862+
on_event_disconnected(*event, address.c_str());
863+
break;
797864
#ifdef ZMQ_BUILD_DRAFT_API
798-
case ZMQ_EVENT_HANDSHAKE_FAILED:
799-
on_event_handshake_failed(*event, address.c_str());
800-
break;
801-
case ZMQ_EVENT_HANDSHAKE_SUCCEED:
802-
on_event_handshake_succeed(*event, address.c_str());
803-
break;
865+
case ZMQ_EVENT_HANDSHAKE_FAILED:
866+
on_event_handshake_failed(*event, address.c_str());
867+
break;
868+
case ZMQ_EVENT_HANDSHAKE_SUCCEED:
869+
on_event_handshake_succeed(*event, address.c_str());
870+
break;
804871
#endif
805-
default:
806-
on_event_unknown(*event, address.c_str());
807-
break;
808-
}
809-
zmq_msg_close (&eventMsg);
872+
default:
873+
on_event_unknown(*event, address.c_str());
874+
break;
810875
}
811-
zmq_close (s);
812-
socketPtr = NULL;
876+
zmq_msg_close (&eventMsg);
877+
878+
return true ;
813879
}
814880

815881
#ifdef ZMQ_EVENT_MONITOR_STOPPED
816882
void abort()
817883
{
818884
if (socketPtr)
819885
zmq_socket_monitor(socketPtr, NULL, 0);
886+
887+
if (monitor_socket)
888+
zmq_close (monitor_socket);
889+
890+
socketPtr = NULL;
891+
monitor_socket = NULL;
820892
}
821893
#endif
822894
virtual void on_monitor_started() {}
@@ -834,7 +906,12 @@ namespace zmq
834906
virtual void on_event_handshake_succeed(const zmq_event_t &event_, const char* addr_) { (void) event_; (void) addr_; }
835907
virtual void on_event_unknown(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; }
836908
private:
909+
910+
monitor_t (const monitor_t&) ZMQ_DELETED_FUNCTION;
911+
void operator = (const monitor_t&) ZMQ_DELETED_FUNCTION;
912+
837913
void* socketPtr;
914+
void *monitor_socket ;
838915
};
839916

840917
#if defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) && defined(ZMQ_HAVE_POLLER)

0 commit comments

Comments
 (0)