@@ -13,13 +13,18 @@ using namespace std;
1313namespace swss {
1414
1515ZmqServer::ZmqServer (const std::string& endpoint)
16- : ZmqServer(endpoint, " " )
16+ : ZmqServer(endpoint, " " , false )
1717{
1818}
1919
2020ZmqServer::ZmqServer (const std::string& endpoint, const std::string& vrf)
21+ : ZmqServer(endpoint, vrf, false )
22+ {
23+ }
24+
25+ ZmqServer::ZmqServer (const std::string& endpoint, const std::string& vrf, bool oneToOneSync)
2126 : m_endpoint(endpoint),
22- m_vrf (vrf)
27+ m_vrf (vrf), m_oneToOneSync(oneToOneSync), m_allowZmqPoll( true )
2328{
2429 connect ();
2530 m_buffer.resize (MQ_RESPONSE_MAX_COUNT);
@@ -31,6 +36,7 @@ ZmqServer::ZmqServer(const std::string& endpoint, const std::string& vrf)
3136
3237ZmqServer::~ZmqServer ()
3338{
39+ m_allowZmqPoll = true ;
3440 m_runThread = false ;
3541 m_mqPollThread->join ();
3642
@@ -42,11 +48,22 @@ void ZmqServer::connect()
4248{
4349 SWSS_LOG_ENTER ();
4450 m_context = zmq_ctx_new ();
45- m_socket = zmq_socket (m_context, ZMQ_PULL);
4651
47- // Increase recv buffer for use all bandwidth: http://api.zeromq.org/4-2:zmq-setsockopt
48- int high_watermark = MQ_WATERMARK;
49- zmq_setsockopt (m_socket, ZMQ_RCVHWM, &high_watermark, sizeof (high_watermark));
52+ if (m_oneToOneSync)
53+ {
54+ m_socket = zmq_socket (m_context, ZMQ_REP);
55+ }
56+ else
57+ {
58+ m_socket = zmq_socket (m_context, ZMQ_PULL);
59+ }
60+
61+ if (!m_oneToOneSync)
62+ {
63+ // Increase recv buffer for use all bandwidth: http://api.zeromq.org/4-2:zmq-setsockopt
64+ int high_watermark = MQ_WATERMARK;
65+ zmq_setsockopt (m_socket, ZMQ_RCVHWM, &high_watermark, sizeof (high_watermark));
66+ }
5067
5168 if (!m_vrf.empty ())
5269 {
@@ -129,6 +146,8 @@ void ZmqServer::mqPollThread()
129146 SWSS_LOG_NOTICE (" bind to zmq endpoint: %s" , m_endpoint.c_str ());
130147 while (m_runThread)
131148 {
149+ m_allowZmqPoll = false ;
150+
132151 // receive message
133152 auto rc = zmq_poll (&poll_item, 1 , 1000 );
134153 if (rc == 0 || !(poll_item.revents & ZMQ_POLLIN))
@@ -139,7 +158,14 @@ void ZmqServer::mqPollThread()
139158 }
140159
141160 // receive message
142- rc = zmq_recv (m_socket, m_buffer.data (), MQ_RESPONSE_MAX_COUNT, ZMQ_DONTWAIT);
161+ if (m_oneToOneSync)
162+ {
163+ rc = zmq_recv (m_socket, m_buffer.data (), MQ_RESPONSE_MAX_COUNT, 0 );
164+ }
165+ else
166+ {
167+ rc = zmq_recv (m_socket, m_buffer.data (), MQ_RESPONSE_MAX_COUNT, ZMQ_DONTWAIT);
168+ }
143169 if (rc < 0 )
144170 {
145171 int zmq_err = zmq_errno ();
@@ -166,15 +192,86 @@ void ZmqServer::mqPollThread()
166192
167193 // deserialize and write to redis:
168194 handleReceivedData (m_buffer.data (), rc);
195+ while (m_oneToOneSync && !m_allowZmqPoll)
196+ {
197+ usleep (10 );
198+ }
169199 }
170200 SWSS_LOG_NOTICE (" mqPollThread end" );
171201}
172202
173- // TODO: To be implemented later, required for ZMQ_CLIENT & ZMQ_SERVER
174- // socket types in response path.
175203void ZmqServer::sendMsg (
176204 const std::string &dbName, const std::string &tableName,
177205 const std::vector<swss::KeyOpFieldsValuesTuple> &values) {
178- return ;
206+ if (!m_oneToOneSync)
207+ {
208+ return ;
209+ }
210+
211+ int serializedlen = (int )BinarySerializer::serializeBuffer (
212+ m_buffer.data (),
213+ m_buffer.size (),
214+ dbName,
215+ tableName,
216+ values);
217+
218+ SWSS_LOG_DEBUG (" sending: %d" , serializedlen);
219+ int zmq_err = 0 ;
220+ int retry_delay = 10 ;
221+ int rc = 0 ;
222+ for (int i = 0 ; i <= MQ_MAX_RETRY; ++i)
223+ {
224+ rc = zmq_send (m_socket, m_buffer.data (), serializedlen, 0 );
225+
226+ if (rc >= 0 )
227+ {
228+ m_allowZmqPoll = true ;
229+ SWSS_LOG_DEBUG (" zmq sent %d bytes" , serializedlen);
230+ return ;
231+ }
232+
233+ zmq_err = zmq_errno ();
234+ // sleep (2 ^ retry time) * 10 ms
235+ retry_delay *= 2 ;
236+ if (zmq_err == EINTR
237+ || zmq_err== EFSM)
238+ {
239+ // EINTR: interrupted by signal
240+ // EFSM: socket state not ready
241+ // For example when ZMQ socket still not receive reply message from last sended package.
242+ // There was state machine inside ZMQ socket, when the socket is not in ready to send state, this
243+ // error will happen.
244+ // for more detail, please check: http://api.zeromq.org/2-1:zmq-send
245+ SWSS_LOG_DEBUG (" zmq send retry, endpoint: %s, error: %d" , m_endpoint.c_str (), zmq_err);
246+
247+ retry_delay = 0 ;
248+ }
249+ else if (zmq_err == EAGAIN)
250+ {
251+ // EAGAIN: ZMQ is full to need try again
252+ SWSS_LOG_WARN (" zmq is full, will retry in %d ms, endpoint: %s, error: %d" , retry_delay, m_endpoint.c_str (), zmq_err);
253+ }
254+ else if (zmq_err == ETERM)
255+ {
256+ auto message = " zmq connection break, endpoint: " + m_endpoint + " , error: " + to_string (rc);
257+ SWSS_LOG_ERROR (" %s" , message.c_str ());
258+ throw system_error (make_error_code (errc::connection_reset), message);
259+ }
260+ else
261+ {
262+ // for other error, send failed immediately.
263+ auto message = " zmq send failed, endpoint: " + m_endpoint + " , error: " + to_string (rc);
264+ SWSS_LOG_ERROR (" %s" , message.c_str ());
265+ throw system_error (make_error_code (errc::io_error), message);
266+ }
267+
268+ usleep (retry_delay * 1000 );
269+ }
270+
271+ // failed after retry
272+ auto message = " zmq send failed, endpoint: " + m_endpoint + " , zmqerrno: " + to_string (zmq_err) + " :" + zmq_strerror (zmq_err) + " , msg length:" + to_string (serializedlen);
273+ SWSS_LOG_ERROR (" %s" , message.c_str ());
274+ throw system_error (make_error_code (errc::io_error), message);
179275}
276+
180277}
0 commit comments