Skip to content

Commit fed6358

Browse files
one to one sync
1 parent 863f4da commit fed6358

File tree

7 files changed

+250
-37
lines changed

7 files changed

+250
-37
lines changed

common/zmqclient.cpp

Lines changed: 89 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,19 @@ using namespace std;
1616
namespace swss {
1717

1818
ZmqClient::ZmqClient(const std::string& endpoint)
19-
:ZmqClient(endpoint, "")
19+
:ZmqClient(endpoint, "", 0)
2020
{
2121
}
2222

2323
ZmqClient::ZmqClient(const std::string& endpoint, const std::string& vrf)
2424
{
25-
initialize(endpoint, vrf);
25+
initialize(endpoint, vrf, 0);
2626
}
2727

2828
ZmqClient::ZmqClient(const std::string& endpoint, uint32_t waitTimeMs)
29-
: m_waitTimeMs(waitTimeMs)
29+
: m_waitTimeMs(waitTimeMs), m_oneToOneSync(m_waitTimeMs != 0)
3030
{
31-
initialize(endpoint);
31+
initialize(endpoint, vrf);
3232
}
3333

3434
ZmqClient::~ZmqClient()
@@ -90,17 +90,26 @@ void ZmqClient::connect()
9090
zmq_ctx_destroy(m_context);
9191
}
9292

93-
// ZMQ Client/Server are n:1 mapping, so need use PUSH/PULL pattern http://api.zeromq.org/master:zmq-socket
9493
m_context = zmq_ctx_new();
95-
m_socket = zmq_socket(m_context, ZMQ_PUSH);
96-
94+
if (m_oneToOneSync)
95+
{
96+
m_socket = zmq_socket(m_context, ZMQ_REQ);
97+
}
98+
else
99+
{
100+
m_socket = zmq_socket(m_context, ZMQ_PUSH);
101+
}
102+
97103
// timeout all pending send package, so zmq will not block in dtor of this class: http://api.zeromq.org/master:zmq-setsockopt
98104
int linger = 0;
99105
zmq_setsockopt(m_socket, ZMQ_LINGER, &linger, sizeof(linger));
100106

101-
// Increase send buffer for use all bandwidth: http://api.zeromq.org/4-2:zmq-setsockopt
102-
int high_watermark = MQ_WATERMARK;
103-
zmq_setsockopt(m_socket, ZMQ_SNDHWM, &high_watermark, sizeof(high_watermark));
107+
if (!m_oneToOneSync)
108+
{
109+
// Increase send buffer for use all bandwidth: http://api.zeromq.org/4-2:zmq-setsockopt
110+
int high_watermark = MQ_WATERMARK;
111+
zmq_setsockopt(m_socket, ZMQ_SNDHWM, &high_watermark, sizeof(high_watermark));
112+
}
104113

105114
if (!m_vrf.empty())
106115
{
@@ -150,7 +159,14 @@ void ZmqClient::sendMsg(
150159
std::lock_guard<std::mutex> lock(m_socketMutex);
151160

152161
// Use none block mode to use all bandwidth: http://api.zeromq.org/2-1%3Azmq-send
153-
rc = zmq_send(m_socket, m_sendbuffer.data(), serializedlen, ZMQ_NOBLOCK);
162+
if (m_oneToOneSync)
163+
{
164+
rc = zmq_send(m_socket, m_sendbuffer.data(), serializedlen, 0);
165+
}
166+
else
167+
{
168+
rc = zmq_send(m_socket, m_sendbuffer.data(), serializedlen, ZMQ_NOBLOCK);
169+
}
154170
}
155171
if (rc >= 0)
156172
{
@@ -205,8 +221,67 @@ void ZmqClient::sendMsg(
205221
// TODO: To be implemented later, required for ZMQ_CLIENT & ZMQ_SERVER
206222
// socket types in response path.
207223
bool ZmqClient::wait(
208-
const std::string &dbName, const std::string &tableName,
209-
const std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>> &kcos) {
210-
return false;
224+
std::string &dbName, std::string &tableName,
225+
std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>> &kcos) {
226+
227+
SWSS_LOG_ENTER();
228+
229+
if (!m_oneToOneSync)
230+
{
231+
return false;
232+
}
233+
234+
zmq_pollitem_t items [1] = { };
235+
items[0].socket = m_socket;
236+
items[0].events = ZMQ_POLLIN;
237+
238+
int rc;
239+
240+
for (int i = 0; true; ++i)
241+
{
242+
rc = zmq_poll(items, 1, (int)m_waitTimeMs);
243+
244+
if (rc == 0)
245+
{
246+
SWSS_LOG_ERROR("zmq_poll timed out");
247+
return false;
248+
}
249+
if (rc > 0)
250+
{
251+
break;
252+
}
253+
if (zmq_errno() == EINTR && i <= MQ_MAX_RETRY)
254+
{
255+
continue;
256+
}
257+
SWSS_LOG_THROW("zmq_poll failed, zmqerrno: %d", zmq_errno());
258+
}
259+
260+
for (int i = 0; true; ++i)
261+
{
262+
rc = zmq_recv(m_socket, m_sendbuffer.data(), m_sendbuffer.size(), 0);
263+
264+
if (rc < 0)
265+
{
266+
if (zmq_errno() == EINTR && i <= MQ_MAX_RETRY)
267+
{
268+
continue;
269+
}
270+
SWSS_LOG_THROW("zmq_recv failed, zmqerrno: %d", zmq_errno());
271+
}
272+
if (rc >= (int)m_sendbuffer.size())
273+
{
274+
SWSS_LOG_THROW(
275+
"zmq_recv message was truncated (over %d bytes, received %d), increase buffer size, message DROPPED",
276+
(int)m_sendbuffer.size(), rc);
277+
}
278+
break;
279+
}
280+
281+
m_sendbuffer.at(rc) = 0; // make sure that we end string with zero before parse
282+
kcos.clear();
283+
BinarySerializer::deserializeBuffer(m_sendbuffer.data(), m_sendbuffer.size(), dbName, tableName, kcos);
284+
285+
return true;
211286
}
212287
}

common/zmqclient.h

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ class ZmqClient
1515

1616
ZmqClient(const std::string& endpoint);
1717
ZmqClient(const std::string& endpoint, const std::string& vrf);
18+
// If waitTimeMs is set to non-zero, it will enable one-to-one sync with the
19+
// server. It will use ZMQ_REQ and ZMQ_REP socket type. There can only be
20+
// one client and one server for a ZMQ socket.
1821
ZmqClient(const std::string& endpoint, uint32_t waitTimeMs);
1922
~ZmqClient();
2023

@@ -26,9 +29,10 @@ class ZmqClient
2629
const std::string& tableName,
2730
const std::vector<KeyOpFieldsValuesTuple>& kcos);
2831

29-
bool wait(const std::string& dbName,
30-
const std::string& tableName,
31-
const std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>>& kcos);
32+
// This method should only be used in one-to-one sync mode with the server.
33+
bool wait(std::string& dbName,
34+
std::string& tableName,
35+
std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>>& kcos);
3236

3337
private:
3438
void initialize(const std::string& endpoint, const std::string& vrf = "");
@@ -43,8 +47,12 @@ class ZmqClient
4347

4448
bool m_connected;
4549

50+
// Timeout in ms to wait for response message from the server.
51+
// If this is set to zero, one-to-one sync is disabled.
4652
uint32_t m_waitTimeMs;
4753

54+
bool m_oneToOneSync;
55+
4856
std::mutex m_socketMutex;
4957

5058
std::vector<char> m_sendbuffer;

common/zmqproducerstatetable.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -164,9 +164,9 @@ void ZmqProducerStateTable::send(const std::vector<KeyOpFieldsValuesTuple> &kcos
164164
}
165165
}
166166

167-
bool ZmqProducerStateTable::wait(const std::string& dbName,
168-
const std::string& tableName,
169-
const std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>>& kcos)
167+
bool ZmqProducerStateTable::wait(std::string& dbName,
168+
std::string& tableName,
169+
std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>>& kcos)
170170
{
171171
return m_zmqClient.wait(dbName, tableName, kcos);
172172
}

common/zmqproducerstatetable.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,10 @@ class ZmqProducerStateTable : public ProducerStateTable
3737
// Batched send that can include both SET and DEL requests.
3838
virtual void send(const std::vector<KeyOpFieldsValuesTuple> &kcos);
3939

40-
// To wait for the response from the peer.
41-
virtual bool wait(const std::string& dbName,
42-
const std::string& tableName,
43-
const std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>>& kcos);
40+
// This method should only be used if the ZmqClient enables one-to-one sync.
41+
virtual bool wait(std::string& dbName,
42+
std::string& tableName,
43+
std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>>& kcos);
4444

4545
size_t dbUpdaterQueueSize();
4646
private:

common/zmqserver.cpp

Lines changed: 103 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,13 @@ using namespace std;
1313
namespace swss {
1414

1515
ZmqServer::ZmqServer(const std::string& endpoint)
16-
: ZmqServer(endpoint, "")
16+
: ZmqServer(endpoint, "", false)
1717
{
1818
}
1919

20-
ZmqServer::ZmqServer(const std::string& endpoint, const std::string& vrf)
20+
ZmqServer::ZmqServer(const std::string& endpoint, const std::string& vrf, bool oneToOneSync)
2121
: m_endpoint(endpoint),
22-
m_vrf(vrf)
22+
m_vrf(vrf), m_oneToOneSync(oneToOneSync), m_allowZmqPoll(true)
2323
{
2424
connect();
2525
m_buffer.resize(MQ_RESPONSE_MAX_COUNT);
@@ -31,6 +31,7 @@ ZmqServer::ZmqServer(const std::string& endpoint, const std::string& vrf)
3131

3232
ZmqServer::~ZmqServer()
3333
{
34+
m_allowZmqPoll = true;
3435
m_runThread = false;
3536
m_mqPollThread->join();
3637

@@ -42,11 +43,22 @@ void ZmqServer::connect()
4243
{
4344
SWSS_LOG_ENTER();
4445
m_context = zmq_ctx_new();
45-
m_socket = zmq_socket(m_context, ZMQ_PULL);
4646

47-
// Increase recv buffer for use all bandwidth: http://api.zeromq.org/4-2:zmq-setsockopt
48-
int high_watermark = MQ_WATERMARK;
49-
zmq_setsockopt(m_socket, ZMQ_RCVHWM, &high_watermark, sizeof(high_watermark));
47+
if (m_oneToOneSync)
48+
{
49+
m_socket = zmq_socket(m_context, ZMQ_REP);
50+
}
51+
else
52+
{
53+
m_socket = zmq_socket(m_context, ZMQ_PULL);
54+
}
55+
56+
if (!m_oneToOneSync)
57+
{
58+
// Increase recv buffer for use all bandwidth: http://api.zeromq.org/4-2:zmq-setsockopt
59+
int high_watermark = MQ_WATERMARK;
60+
zmq_setsockopt(m_socket, ZMQ_RCVHWM, &high_watermark, sizeof(high_watermark));
61+
}
5062

5163
if (!m_vrf.empty())
5264
{
@@ -129,6 +141,8 @@ void ZmqServer::mqPollThread()
129141
SWSS_LOG_NOTICE("bind to zmq endpoint: %s", m_endpoint.c_str());
130142
while (m_runThread)
131143
{
144+
m_allowZmqPoll = false;
145+
132146
// receive message
133147
auto rc = zmq_poll(&poll_item, 1, 1000);
134148
if (rc == 0 || !(poll_item.revents & ZMQ_POLLIN))
@@ -139,7 +153,14 @@ void ZmqServer::mqPollThread()
139153
}
140154

141155
// receive message
142-
rc = zmq_recv(m_socket, m_buffer.data(), MQ_RESPONSE_MAX_COUNT, ZMQ_DONTWAIT);
156+
if (m_oneToOneSync)
157+
{
158+
rc = zmq_recv(m_socket, m_buffer.data(), MQ_RESPONSE_MAX_COUNT, 0);
159+
}
160+
else
161+
{
162+
rc = zmq_recv(m_socket, m_buffer.data(), MQ_RESPONSE_MAX_COUNT, ZMQ_DONTWAIT);
163+
}
143164
if (rc < 0)
144165
{
145166
int zmq_err = zmq_errno();
@@ -166,6 +187,10 @@ void ZmqServer::mqPollThread()
166187

167188
// deserialize and write to redis:
168189
handleReceivedData(m_buffer.data(), rc);
190+
while (m_oneToOneSync && !m_allowZmqPoll)
191+
{
192+
usleep(10);
193+
}
169194
}
170195
SWSS_LOG_NOTICE("mqPollThread end");
171196
}
@@ -175,6 +200,75 @@ void ZmqServer::mqPollThread()
175200
void ZmqServer::sendMsg(
176201
const std::string &dbName, const std::string &tableName,
177202
const std::vector<swss::KeyOpFieldsValuesTuple> &values) {
178-
return;
203+
if (!m_oneToOneSync)
204+
{
205+
return;
206+
}
207+
208+
int serializedlen = (int)BinarySerializer::serializeBuffer(
209+
m_buffer.data(),
210+
m_buffer.size(),
211+
dbName,
212+
tableName,
213+
values);
214+
215+
SWSS_LOG_DEBUG("sending: %d", serializedlen);
216+
int zmq_err = 0;
217+
int retry_delay = 10;
218+
int rc = 0;
219+
for (int i = 0; i <= MQ_MAX_RETRY; ++i)
220+
{
221+
rc = zmq_send(m_socket, m_buffer.data(), serializedlen, 0);
222+
223+
if (rc >= 0)
224+
{
225+
m_allowZmqPoll = true;
226+
SWSS_LOG_DEBUG("zmq sent %d bytes", serializedlen);
227+
return;
228+
}
229+
230+
zmq_err = zmq_errno();
231+
// sleep (2 ^ retry time) * 10 ms
232+
retry_delay *= 2;
233+
if (zmq_err == EINTR
234+
|| zmq_err== EFSM)
235+
{
236+
// EINTR: interrupted by signal
237+
// EFSM: socket state not ready
238+
// For example when ZMQ socket still not receive reply message from last sended package.
239+
// There was state machine inside ZMQ socket, when the socket is not in ready to send state, this
240+
// error will happen.
241+
// for more detail, please check: http://api.zeromq.org/2-1:zmq-send
242+
SWSS_LOG_DEBUG("zmq send retry, endpoint: %s, error: %d", m_endpoint.c_str(), zmq_err);
243+
244+
retry_delay = 0;
245+
}
246+
else if (zmq_err == EAGAIN)
247+
{
248+
// EAGAIN: ZMQ is full to need try again
249+
SWSS_LOG_WARN("zmq is full, will retry in %d ms, endpoint: %s, error: %d", retry_delay, m_endpoint.c_str(), zmq_err);
250+
}
251+
else if (zmq_err == ETERM)
252+
{
253+
auto message = "zmq connection break, endpoint: " + m_endpoint + ", error: " + to_string(rc);
254+
SWSS_LOG_ERROR("%s", message.c_str());
255+
throw system_error(make_error_code(errc::connection_reset), message);
256+
}
257+
else
258+
{
259+
// for other error, send failed immediately.
260+
auto message = "zmq send failed, endpoint: " + m_endpoint + ", error: " + to_string(rc);
261+
SWSS_LOG_ERROR("%s", message.c_str());
262+
throw system_error(make_error_code(errc::io_error), message);
263+
}
264+
265+
usleep(retry_delay * 1000);
266+
}
267+
268+
// failed after retry
269+
auto message = "zmq send failed, endpoint: " + m_endpoint + ", zmqerrno: " + to_string(zmq_err) + ":" + zmq_strerror(zmq_err) + ", msg length:" + to_string(serializedlen);
270+
SWSS_LOG_ERROR("%s", message.c_str());
271+
throw system_error(make_error_code(errc::io_error), message);
179272
}
273+
180274
}

0 commit comments

Comments
 (0)