Skip to content

Commit af8d873

Browse files
mint570divyagayathri-hcl
authored andcommitted
one to one sync
1 parent 863f4da commit af8d873

File tree

7 files changed

+240
-34
lines changed

7 files changed

+240
-34
lines changed

common/zmqclient.cpp

Lines changed: 88 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,18 @@ using namespace std;
1616
namespace swss {
1717

1818
ZmqClient::ZmqClient(const std::string& endpoint)
19-
:ZmqClient(endpoint, "")
19+
: ZmqClient(endpoint, "")
2020
{
2121
}
2222

2323
ZmqClient::ZmqClient(const std::string& endpoint, const std::string& vrf)
24+
: m_waitTimeMs(0), m_oneToOneSync(false)
2425
{
2526
initialize(endpoint, vrf);
2627
}
2728

2829
ZmqClient::ZmqClient(const std::string& endpoint, uint32_t waitTimeMs)
29-
: m_waitTimeMs(waitTimeMs)
30+
: m_waitTimeMs(waitTimeMs), m_oneToOneSync(m_waitTimeMs != 0)
3031
{
3132
initialize(endpoint);
3233
}
@@ -90,17 +91,26 @@ void ZmqClient::connect()
9091
zmq_ctx_destroy(m_context);
9192
}
9293

93-
// ZMQ Client/Server are n:1 mapping, so need use PUSH/PULL pattern http://api.zeromq.org/master:zmq-socket
9494
m_context = zmq_ctx_new();
95-
m_socket = zmq_socket(m_context, ZMQ_PUSH);
96-
95+
if (m_oneToOneSync)
96+
{
97+
m_socket = zmq_socket(m_context, ZMQ_REQ);
98+
}
99+
else
100+
{
101+
m_socket = zmq_socket(m_context, ZMQ_PUSH);
102+
}
103+
97104
// timeout all pending send package, so zmq will not block in dtor of this class: http://api.zeromq.org/master:zmq-setsockopt
98105
int linger = 0;
99106
zmq_setsockopt(m_socket, ZMQ_LINGER, &linger, sizeof(linger));
100107

101-
// Increase send buffer for use all bandwidth: http://api.zeromq.org/4-2:zmq-setsockopt
102-
int high_watermark = MQ_WATERMARK;
103-
zmq_setsockopt(m_socket, ZMQ_SNDHWM, &high_watermark, sizeof(high_watermark));
108+
if (!m_oneToOneSync)
109+
{
110+
// Increase send buffer for use all bandwidth: http://api.zeromq.org/4-2:zmq-setsockopt
111+
int high_watermark = MQ_WATERMARK;
112+
zmq_setsockopt(m_socket, ZMQ_SNDHWM, &high_watermark, sizeof(high_watermark));
113+
}
104114

105115
if (!m_vrf.empty())
106116
{
@@ -150,7 +160,14 @@ void ZmqClient::sendMsg(
150160
std::lock_guard<std::mutex> lock(m_socketMutex);
151161

152162
// Use none block mode to use all bandwidth: http://api.zeromq.org/2-1%3Azmq-send
153-
rc = zmq_send(m_socket, m_sendbuffer.data(), serializedlen, ZMQ_NOBLOCK);
163+
if (m_oneToOneSync)
164+
{
165+
rc = zmq_send(m_socket, m_sendbuffer.data(), serializedlen, 0);
166+
}
167+
else
168+
{
169+
rc = zmq_send(m_socket, m_sendbuffer.data(), serializedlen, ZMQ_NOBLOCK);
170+
}
154171
}
155172
if (rc >= 0)
156173
{
@@ -202,11 +219,68 @@ void ZmqClient::sendMsg(
202219
throw system_error(make_error_code(errc::io_error), message);
203220
}
204221

205-
// TODO: To be implemented later, required for ZMQ_CLIENT & ZMQ_SERVER
206-
// socket types in response path.
207222
bool ZmqClient::wait(
208-
const std::string &dbName, const std::string &tableName,
209-
const std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>> &kcos) {
210-
return false;
223+
std::string &dbName, std::string &tableName,
224+
std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>> &kcos) {
225+
226+
SWSS_LOG_ENTER();
227+
228+
if (!m_oneToOneSync)
229+
{
230+
return false;
231+
}
232+
233+
zmq_pollitem_t items [1] = { };
234+
items[0].socket = m_socket;
235+
items[0].events = ZMQ_POLLIN;
236+
237+
int rc;
238+
239+
for (int i = 0; true; ++i)
240+
{
241+
rc = zmq_poll(items, 1, (int)m_waitTimeMs);
242+
243+
if (rc == 0)
244+
{
245+
SWSS_LOG_ERROR("zmq_poll timed out");
246+
return false;
247+
}
248+
if (rc > 0)
249+
{
250+
break;
251+
}
252+
if (zmq_errno() == EINTR && i <= MQ_MAX_RETRY)
253+
{
254+
continue;
255+
}
256+
SWSS_LOG_THROW("zmq_poll failed, zmqerrno: %d", zmq_errno());
257+
}
258+
259+
for (int i = 0; true; ++i)
260+
{
261+
rc = zmq_recv(m_socket, m_sendbuffer.data(), m_sendbuffer.size(), 0);
262+
263+
if (rc < 0)
264+
{
265+
if (zmq_errno() == EINTR && i <= MQ_MAX_RETRY)
266+
{
267+
continue;
268+
}
269+
SWSS_LOG_THROW("zmq_recv failed, zmqerrno: %d", zmq_errno());
270+
}
271+
if (rc >= (int)m_sendbuffer.size())
272+
{
273+
SWSS_LOG_THROW(
274+
"zmq_recv message was truncated (over %d bytes, received %d), increase buffer size, message DROPPED",
275+
(int)m_sendbuffer.size(), rc);
276+
}
277+
break;
278+
}
279+
280+
m_sendbuffer.at(rc) = 0; // make sure that we end string with zero before parse
281+
kcos.clear();
282+
BinarySerializer::deserializeBuffer(m_sendbuffer.data(), m_sendbuffer.size(), dbName, tableName, kcos);
283+
284+
return true;
211285
}
212286
}

common/zmqclient.h

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ class ZmqClient
1515

1616
ZmqClient(const std::string& endpoint);
1717
ZmqClient(const std::string& endpoint, const std::string& vrf);
18+
// If waitTimeMs is set to non-zero, it will enable one-to-one sync with the
19+
// server. It will use ZMQ_REQ and ZMQ_REP socket type. There can only be
20+
// one client and one server for a ZMQ socket.
1821
ZmqClient(const std::string& endpoint, uint32_t waitTimeMs);
1922
~ZmqClient();
2023

@@ -26,9 +29,10 @@ class ZmqClient
2629
const std::string& tableName,
2730
const std::vector<KeyOpFieldsValuesTuple>& kcos);
2831

29-
bool wait(const std::string& dbName,
30-
const std::string& tableName,
31-
const std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>>& kcos);
32+
// This method should only be used in one-to-one sync mode with the server.
33+
bool wait(std::string& dbName,
34+
std::string& tableName,
35+
std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>>& kcos);
3236

3337
private:
3438
void initialize(const std::string& endpoint, const std::string& vrf = "");
@@ -43,8 +47,12 @@ class ZmqClient
4347

4448
bool m_connected;
4549

50+
// Timeout in ms to wait for response message from the server.
51+
// If this is set to zero, one-to-one sync is disabled.
4652
uint32_t m_waitTimeMs;
4753

54+
bool m_oneToOneSync;
55+
4856
std::mutex m_socketMutex;
4957

5058
std::vector<char> m_sendbuffer;

common/zmqproducerstatetable.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -164,9 +164,9 @@ void ZmqProducerStateTable::send(const std::vector<KeyOpFieldsValuesTuple> &kcos
164164
}
165165
}
166166

167-
bool ZmqProducerStateTable::wait(const std::string& dbName,
168-
const std::string& tableName,
169-
const std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>>& kcos)
167+
bool ZmqProducerStateTable::wait(std::string& dbName,
168+
std::string& tableName,
169+
std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>>& kcos)
170170
{
171171
return m_zmqClient.wait(dbName, tableName, kcos);
172172
}

common/zmqproducerstatetable.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,10 @@ class ZmqProducerStateTable : public ProducerStateTable
3737
// Batched send that can include both SET and DEL requests.
3838
virtual void send(const std::vector<KeyOpFieldsValuesTuple> &kcos);
3939

40-
// To wait for the response from the peer.
41-
virtual bool wait(const std::string& dbName,
42-
const std::string& tableName,
43-
const std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>>& kcos);
40+
// This method should only be used if the ZmqClient enables one-to-one sync.
41+
virtual bool wait(std::string& dbName,
42+
std::string& tableName,
43+
std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>>& kcos);
4444

4545
size_t dbUpdaterQueueSize();
4646
private:

common/zmqserver.cpp

Lines changed: 107 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,18 @@ using namespace std;
1313
namespace swss {
1414

1515
ZmqServer::ZmqServer(const std::string& endpoint)
16-
: ZmqServer(endpoint, "")
16+
: ZmqServer(endpoint, "", false)
1717
{
1818
}
1919

2020
ZmqServer::ZmqServer(const std::string& endpoint, const std::string& vrf)
21+
: ZmqServer(endpoint, vrf, false)
22+
{
23+
}
24+
25+
ZmqServer::ZmqServer(const std::string& endpoint, const std::string& vrf, bool oneToOneSync)
2126
: m_endpoint(endpoint),
22-
m_vrf(vrf)
27+
m_vrf(vrf), m_oneToOneSync(oneToOneSync), m_allowZmqPoll(true)
2328
{
2429
connect();
2530
m_buffer.resize(MQ_RESPONSE_MAX_COUNT);
@@ -31,6 +36,7 @@ ZmqServer::ZmqServer(const std::string& endpoint, const std::string& vrf)
3136

3237
ZmqServer::~ZmqServer()
3338
{
39+
m_allowZmqPoll = true;
3440
m_runThread = false;
3541
m_mqPollThread->join();
3642

@@ -42,11 +48,22 @@ void ZmqServer::connect()
4248
{
4349
SWSS_LOG_ENTER();
4450
m_context = zmq_ctx_new();
45-
m_socket = zmq_socket(m_context, ZMQ_PULL);
4651

47-
// Increase recv buffer for use all bandwidth: http://api.zeromq.org/4-2:zmq-setsockopt
48-
int high_watermark = MQ_WATERMARK;
49-
zmq_setsockopt(m_socket, ZMQ_RCVHWM, &high_watermark, sizeof(high_watermark));
52+
if (m_oneToOneSync)
53+
{
54+
m_socket = zmq_socket(m_context, ZMQ_REP);
55+
}
56+
else
57+
{
58+
m_socket = zmq_socket(m_context, ZMQ_PULL);
59+
}
60+
61+
if (!m_oneToOneSync)
62+
{
63+
// Increase recv buffer for use all bandwidth: http://api.zeromq.org/4-2:zmq-setsockopt
64+
int high_watermark = MQ_WATERMARK;
65+
zmq_setsockopt(m_socket, ZMQ_RCVHWM, &high_watermark, sizeof(high_watermark));
66+
}
5067

5168
if (!m_vrf.empty())
5269
{
@@ -129,6 +146,8 @@ void ZmqServer::mqPollThread()
129146
SWSS_LOG_NOTICE("bind to zmq endpoint: %s", m_endpoint.c_str());
130147
while (m_runThread)
131148
{
149+
m_allowZmqPoll = false;
150+
132151
// receive message
133152
auto rc = zmq_poll(&poll_item, 1, 1000);
134153
if (rc == 0 || !(poll_item.revents & ZMQ_POLLIN))
@@ -139,7 +158,14 @@ void ZmqServer::mqPollThread()
139158
}
140159

141160
// receive message
142-
rc = zmq_recv(m_socket, m_buffer.data(), MQ_RESPONSE_MAX_COUNT, ZMQ_DONTWAIT);
161+
if (m_oneToOneSync)
162+
{
163+
rc = zmq_recv(m_socket, m_buffer.data(), MQ_RESPONSE_MAX_COUNT, 0);
164+
}
165+
else
166+
{
167+
rc = zmq_recv(m_socket, m_buffer.data(), MQ_RESPONSE_MAX_COUNT, ZMQ_DONTWAIT);
168+
}
143169
if (rc < 0)
144170
{
145171
int zmq_err = zmq_errno();
@@ -166,15 +192,86 @@ void ZmqServer::mqPollThread()
166192

167193
// deserialize and write to redis:
168194
handleReceivedData(m_buffer.data(), rc);
195+
while (m_oneToOneSync && !m_allowZmqPoll)
196+
{
197+
usleep(10);
198+
}
169199
}
170200
SWSS_LOG_NOTICE("mqPollThread end");
171201
}
172202

173-
// TODO: To be implemented later, required for ZMQ_CLIENT & ZMQ_SERVER
174-
// socket types in response path.
175203
void ZmqServer::sendMsg(
176204
const std::string &dbName, const std::string &tableName,
177205
const std::vector<swss::KeyOpFieldsValuesTuple> &values) {
178-
return;
206+
if (!m_oneToOneSync)
207+
{
208+
return;
209+
}
210+
211+
int serializedlen = (int)BinarySerializer::serializeBuffer(
212+
m_buffer.data(),
213+
m_buffer.size(),
214+
dbName,
215+
tableName,
216+
values);
217+
218+
SWSS_LOG_DEBUG("sending: %d", serializedlen);
219+
int zmq_err = 0;
220+
int retry_delay = 10;
221+
int rc = 0;
222+
for (int i = 0; i <= MQ_MAX_RETRY; ++i)
223+
{
224+
rc = zmq_send(m_socket, m_buffer.data(), serializedlen, 0);
225+
226+
if (rc >= 0)
227+
{
228+
m_allowZmqPoll = true;
229+
SWSS_LOG_DEBUG("zmq sent %d bytes", serializedlen);
230+
return;
231+
}
232+
233+
zmq_err = zmq_errno();
234+
// sleep (2 ^ retry time) * 10 ms
235+
retry_delay *= 2;
236+
if (zmq_err == EINTR
237+
|| zmq_err== EFSM)
238+
{
239+
// EINTR: interrupted by signal
240+
// EFSM: socket state not ready
241+
// For example when ZMQ socket still not receive reply message from last sended package.
242+
// There was state machine inside ZMQ socket, when the socket is not in ready to send state, this
243+
// error will happen.
244+
// for more detail, please check: http://api.zeromq.org/2-1:zmq-send
245+
SWSS_LOG_DEBUG("zmq send retry, endpoint: %s, error: %d", m_endpoint.c_str(), zmq_err);
246+
247+
retry_delay = 0;
248+
}
249+
else if (zmq_err == EAGAIN)
250+
{
251+
// EAGAIN: ZMQ is full to need try again
252+
SWSS_LOG_WARN("zmq is full, will retry in %d ms, endpoint: %s, error: %d", retry_delay, m_endpoint.c_str(), zmq_err);
253+
}
254+
else if (zmq_err == ETERM)
255+
{
256+
auto message = "zmq connection break, endpoint: " + m_endpoint + ", error: " + to_string(rc);
257+
SWSS_LOG_ERROR("%s", message.c_str());
258+
throw system_error(make_error_code(errc::connection_reset), message);
259+
}
260+
else
261+
{
262+
// for other error, send failed immediately.
263+
auto message = "zmq send failed, endpoint: " + m_endpoint + ", error: " + to_string(rc);
264+
SWSS_LOG_ERROR("%s", message.c_str());
265+
throw system_error(make_error_code(errc::io_error), message);
266+
}
267+
268+
usleep(retry_delay * 1000);
269+
}
270+
271+
// failed after retry
272+
auto message = "zmq send failed, endpoint: " + m_endpoint + ", zmqerrno: " + to_string(zmq_err) + ":" + zmq_strerror(zmq_err) + ", msg length:" + to_string(serializedlen);
273+
SWSS_LOG_ERROR("%s", message.c_str());
274+
throw system_error(make_error_code(errc::io_error), message);
179275
}
276+
180277
}

0 commit comments

Comments
 (0)