@@ -64,6 +64,28 @@ public function configureZmq(): void
6464 }
6565 }
6666
67+ public function disconnect (): void
68+ {
69+ // If we are connected, then
70+ if ($ this ->socket !== null ) {
71+ try {
72+ $ this ->socket ->disconnect ($ this ->relayOldMessages );
73+ } catch (\Exception $ exception ) {
74+ $ this ->socket = null ;
75+ $ this ->relayOldMessages = null ;
76+
77+ $ this ->logger ->critical ('Unable to disconnect from old message relay: '
78+ . $ this ->relayOldMessages . ', e = ' . $ exception ->getMessage ());
79+ }
80+ }
81+ }
82+
83+ private function reconnect (): void
84+ {
85+ $ this ->disconnect ();
86+ $ this ->configureZmq ();
87+ }
88+
6789 public function isRelay (): bool
6890 {
6991 return !empty ($ this ->relayMessages ) && $ this ->relayMessages !== 'false ' ;
@@ -87,7 +109,12 @@ public function relay(Message $message): void
87109 try {
88110 $ this ->socket ->send (json_encode ($ message ));
89111 } catch (\ZMQSocketException $ socketException ) {
90- $ this ->logger ->error ('relay: [ ' . $ socketException ->getCode () . '] ' . $ socketException ->getMessage ());
112+ $ this ->logger ->error ('relay: send [ ' . $ socketException ->getCode () . '] ' . $ socketException ->getMessage ());
113+ if ($ socketException ->getCode () === 156384763 ) {
114+ // Socket state error, reconnect.
115+ // We will drop this message
116+ $ this ->reconnect ();
117+ }
91118 return ;
92119 }
93120
@@ -101,7 +128,12 @@ public function relay(Message $message): void
101128 break ;
102129 }
103130 } catch (\ZMQSocketException $ socketException ) {
104- $ this ->logger ->error ('relay: [ ' . $ socketException ->getCode () . '] ' . $ socketException ->getMessage ());
131+ $ this ->logger ->error ('relay: recv [ ' . $ socketException ->getCode () . '] ' . $ socketException ->getMessage ());
132+ if ($ socketException ->getCode () === 156384763 ) {
133+ // Socket state error, reconnect.
134+ // We will drop this message
135+ $ this ->reconnect ();
136+ }
105137 break ;
106138 }
107139
0 commit comments