Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 88 additions & 14 deletions common/zmqclient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,18 @@ using namespace std;
namespace swss {

ZmqClient::ZmqClient(const std::string& endpoint)
:ZmqClient(endpoint, "")
: ZmqClient(endpoint, "")
{
}

ZmqClient::ZmqClient(const std::string& endpoint, const std::string& vrf)
: m_waitTimeMs(0), m_oneToOneSync(false)
{
initialize(endpoint, vrf);
}

ZmqClient::ZmqClient(const std::string& endpoint, uint32_t waitTimeMs)
: m_waitTimeMs(waitTimeMs)
: m_waitTimeMs(waitTimeMs), m_oneToOneSync(m_waitTimeMs != 0)
{
initialize(endpoint);
}
Expand Down Expand Up @@ -90,17 +91,26 @@ void ZmqClient::connect()
zmq_ctx_destroy(m_context);
}

// ZMQ Client/Server are n:1 mapping, so need use PUSH/PULL pattern http://api.zeromq.org/master:zmq-socket
m_context = zmq_ctx_new();
m_socket = zmq_socket(m_context, ZMQ_PUSH);

if (m_oneToOneSync)
{
m_socket = zmq_socket(m_context, ZMQ_REQ);
}
else
{
m_socket = zmq_socket(m_context, ZMQ_PUSH);
}

// timeout all pending send package, so zmq will not block in dtor of this class: http://api.zeromq.org/master:zmq-setsockopt
int linger = 0;
zmq_setsockopt(m_socket, ZMQ_LINGER, &linger, sizeof(linger));

// Increase send buffer for use all bandwidth: http://api.zeromq.org/4-2:zmq-setsockopt
int high_watermark = MQ_WATERMARK;
zmq_setsockopt(m_socket, ZMQ_SNDHWM, &high_watermark, sizeof(high_watermark));
if (!m_oneToOneSync)
{
// Increase send buffer for use all bandwidth: http://api.zeromq.org/4-2:zmq-setsockopt
int high_watermark = MQ_WATERMARK;
zmq_setsockopt(m_socket, ZMQ_SNDHWM, &high_watermark, sizeof(high_watermark));
}

if (!m_vrf.empty())
{
Expand Down Expand Up @@ -150,7 +160,14 @@ void ZmqClient::sendMsg(
std::lock_guard<std::mutex> lock(m_socketMutex);

// Use none block mode to use all bandwidth: http://api.zeromq.org/2-1%3Azmq-send
rc = zmq_send(m_socket, m_sendbuffer.data(), serializedlen, ZMQ_NOBLOCK);
if (m_oneToOneSync)
{
rc = zmq_send(m_socket, m_sendbuffer.data(), serializedlen, 0);
}
else
{
rc = zmq_send(m_socket, m_sendbuffer.data(), serializedlen, ZMQ_NOBLOCK);
}
}
if (rc >= 0)
{
Expand Down Expand Up @@ -202,11 +219,68 @@ void ZmqClient::sendMsg(
throw system_error(make_error_code(errc::io_error), message);
}

// TODO: To be implemented later, required for ZMQ_CLIENT & ZMQ_SERVER
// socket types in response path.
bool ZmqClient::wait(
const std::string &dbName, const std::string &tableName,
const std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>> &kcos) {
return false;
std::string& dbName, std::string& tableName,
std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>> &kcos) {

SWSS_LOG_ENTER();

if (!m_oneToOneSync)
{
return false;
}

zmq_pollitem_t items [1] = { };
items[0].socket = m_socket;
items[0].events = ZMQ_POLLIN;

int rc;

for (int i = 0; true; ++i)
{
rc = zmq_poll(items, 1, (int)m_waitTimeMs);

if (rc == 0)
{
SWSS_LOG_ERROR("zmq_poll timed out");
return false;
}
if (rc > 0)
{
break;
}
if (zmq_errno() == EINTR && i <= MQ_MAX_RETRY)
{
continue;
}
SWSS_LOG_THROW("zmq_poll failed, zmqerrno: %d", zmq_errno());
}

for (int i = 0; true; ++i)
{
rc = zmq_recv(m_socket, m_sendbuffer.data(), m_sendbuffer.size(), 0);

if (rc < 0)
{
if (zmq_errno() == EINTR && i <= MQ_MAX_RETRY)
{
continue;
}
SWSS_LOG_THROW("zmq_recv failed, zmqerrno: %d", zmq_errno());
}
if (rc >= (int)m_sendbuffer.size())
{
SWSS_LOG_THROW(
"zmq_recv message was truncated (over %d bytes, received %d), increase buffer size, message DROPPED",
(int)m_sendbuffer.size(), rc);
}
break;
}

m_sendbuffer.at(rc) = 0; // make sure that we end string with zero before parse
kcos.clear();
BinarySerializer::deserializeBuffer(m_sendbuffer.data(), m_sendbuffer.size(), dbName, tableName, kcos);

return true;
}
}
14 changes: 11 additions & 3 deletions common/zmqclient.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ class ZmqClient

ZmqClient(const std::string& endpoint);
ZmqClient(const std::string& endpoint, const std::string& vrf);
// If waitTimeMs is set to non-zero, it will enable one-to-one sync with the
// server. It will use ZMQ_REQ and ZMQ_REP socket type. There can only be
// one client and one server for a ZMQ socket.
ZmqClient(const std::string& endpoint, uint32_t waitTimeMs);
~ZmqClient();

Expand All @@ -26,9 +29,10 @@ class ZmqClient
const std::string& tableName,
const std::vector<KeyOpFieldsValuesTuple>& kcos);

bool wait(const std::string& dbName,
const std::string& tableName,
const std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>>& kcos);
// This method should only be used in one-to-one sync mode with the server.
bool wait(std::string& dbName,
std::string& tableName,
std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>>& kcos);

private:
void initialize(const std::string& endpoint, const std::string& vrf = "");
Expand All @@ -43,8 +47,12 @@ class ZmqClient

bool m_connected;

// Timeout in ms to wait for response message from the server.
// If this is set to zero, one-to-one sync is disabled.
uint32_t m_waitTimeMs;

bool m_oneToOneSync;

std::mutex m_socketMutex;

std::vector<char> m_sendbuffer;
Expand Down
7 changes: 4 additions & 3 deletions common/zmqproducerstatetable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,10 @@ void ZmqProducerStateTable::send(const std::vector<KeyOpFieldsValuesTuple> &kcos
}
}

bool ZmqProducerStateTable::wait(const std::string& dbName,
const std::string& tableName,
const std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>>& kcos)

bool ZmqProducerStateTable::wait(std::string& dbName,
std::string& tableName,
std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>>& kcos)
{
return m_zmqClient.wait(dbName, tableName, kcos);
}
Expand Down
8 changes: 4 additions & 4 deletions common/zmqproducerstatetable.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ class ZmqProducerStateTable : public ProducerStateTable
// Batched send that can include both SET and DEL requests.
virtual void send(const std::vector<KeyOpFieldsValuesTuple> &kcos);

// To wait for the response from the peer.
virtual bool wait(const std::string& dbName,
const std::string& tableName,
const std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>>& kcos);
// This method should only be used if the ZmqClient enables one-to-one sync.
virtual bool wait(std::string& dbName,
std::string& tableName,
std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>>& kcos);

size_t dbUpdaterQueueSize();
private:
Expand Down
117 changes: 107 additions & 10 deletions common/zmqserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,18 @@ using namespace std;
namespace swss {

ZmqServer::ZmqServer(const std::string& endpoint)
: ZmqServer(endpoint, "")
: ZmqServer(endpoint, "", false)
{
}

ZmqServer::ZmqServer(const std::string& endpoint, const std::string& vrf)
: ZmqServer(endpoint, vrf, false)
{
}

ZmqServer::ZmqServer(const std::string& endpoint, const std::string& vrf, bool oneToOneSync)
: m_endpoint(endpoint),
m_vrf(vrf)
m_vrf(vrf), m_oneToOneSync(oneToOneSync), m_allowZmqPoll(true)
{
connect();
m_buffer.resize(MQ_RESPONSE_MAX_COUNT);
Expand All @@ -31,6 +36,7 @@ ZmqServer::ZmqServer(const std::string& endpoint, const std::string& vrf)

ZmqServer::~ZmqServer()
{
m_allowZmqPoll = true;
m_runThread = false;
m_mqPollThread->join();

Expand All @@ -42,11 +48,22 @@ void ZmqServer::connect()
{
SWSS_LOG_ENTER();
m_context = zmq_ctx_new();
m_socket = zmq_socket(m_context, ZMQ_PULL);

// Increase recv buffer for use all bandwidth: http://api.zeromq.org/4-2:zmq-setsockopt
int high_watermark = MQ_WATERMARK;
zmq_setsockopt(m_socket, ZMQ_RCVHWM, &high_watermark, sizeof(high_watermark));
if (m_oneToOneSync)
{
m_socket = zmq_socket(m_context, ZMQ_REP);
}
else
{
m_socket = zmq_socket(m_context, ZMQ_PULL);
}

if (!m_oneToOneSync)
{
// Increase recv buffer for use all bandwidth: http://api.zeromq.org/4-2:zmq-setsockopt
int high_watermark = MQ_WATERMARK;
zmq_setsockopt(m_socket, ZMQ_RCVHWM, &high_watermark, sizeof(high_watermark));
}

if (!m_vrf.empty())
{
Expand Down Expand Up @@ -129,6 +146,8 @@ void ZmqServer::mqPollThread()
SWSS_LOG_NOTICE("bind to zmq endpoint: %s", m_endpoint.c_str());
while (m_runThread)
{
m_allowZmqPoll = false;

// receive message
auto rc = zmq_poll(&poll_item, 1, 1000);
if (rc == 0 || !(poll_item.revents & ZMQ_POLLIN))
Expand All @@ -139,7 +158,14 @@ void ZmqServer::mqPollThread()
}

// receive message
rc = zmq_recv(m_socket, m_buffer.data(), MQ_RESPONSE_MAX_COUNT, ZMQ_DONTWAIT);
if (m_oneToOneSync)
{
rc = zmq_recv(m_socket, m_buffer.data(), MQ_RESPONSE_MAX_COUNT, 0);
}
else
{
rc = zmq_recv(m_socket, m_buffer.data(), MQ_RESPONSE_MAX_COUNT, ZMQ_DONTWAIT);
}
if (rc < 0)
{
int zmq_err = zmq_errno();
Expand All @@ -166,15 +192,86 @@ void ZmqServer::mqPollThread()

// deserialize and write to redis:
handleReceivedData(m_buffer.data(), rc);
while (m_oneToOneSync && !m_allowZmqPoll)
{
usleep(10);
}
}
SWSS_LOG_NOTICE("mqPollThread end");
}

// TODO: To be implemented later, required for ZMQ_CLIENT & ZMQ_SERVER
// socket types in response path.
void ZmqServer::sendMsg(
const std::string &dbName, const std::string &tableName,
const std::vector<swss::KeyOpFieldsValuesTuple> &values) {
return;
if (!m_oneToOneSync)
{
return;
}

int serializedlen = (int)BinarySerializer::serializeBuffer(
m_buffer.data(),
m_buffer.size(),
dbName,
tableName,
values);

SWSS_LOG_DEBUG("sending: %d", serializedlen);
int zmq_err = 0;
int retry_delay = 10;
int rc = 0;
for (int i = 0; i <= MQ_MAX_RETRY; ++i)
{
rc = zmq_send(m_socket, m_buffer.data(), serializedlen, 0);

if (rc >= 0)
{
m_allowZmqPoll = true;
SWSS_LOG_DEBUG("zmq sent %d bytes", serializedlen);
return;
}

zmq_err = zmq_errno();
// sleep (2 ^ retry time) * 10 ms
retry_delay *= 2;
if (zmq_err == EINTR
|| zmq_err== EFSM)
{
// EINTR: interrupted by signal
// EFSM: socket state not ready
// For example when ZMQ socket still not receive reply message from last sended package.
// There was state machine inside ZMQ socket, when the socket is not in ready to send state, this
// error will happen.
// for more detail, please check: http://api.zeromq.org/2-1:zmq-send
SWSS_LOG_DEBUG("zmq send retry, endpoint: %s, error: %d", m_endpoint.c_str(), zmq_err);

retry_delay = 0;
}
else if (zmq_err == EAGAIN)
{
// EAGAIN: ZMQ is full to need try again
SWSS_LOG_WARN("zmq is full, will retry in %d ms, endpoint: %s, error: %d", retry_delay, m_endpoint.c_str(), zmq_err);
}
else if (zmq_err == ETERM)
{
auto message = "zmq connection break, endpoint: " + m_endpoint + ", error: " + to_string(rc);
SWSS_LOG_ERROR("%s", message.c_str());
throw system_error(make_error_code(errc::connection_reset), message);
}
else
{
// for other error, send failed immediately.
auto message = "zmq send failed, endpoint: " + m_endpoint + ", error: " + to_string(rc);
SWSS_LOG_ERROR("%s", message.c_str());
throw system_error(make_error_code(errc::io_error), message);
}

usleep(retry_delay * 1000);
}

// failed after retry
auto message = "zmq send failed, endpoint: " + m_endpoint + ", zmqerrno: " + to_string(zmq_err) + ":" + zmq_strerror(zmq_err) + ", msg length:" + to_string(serializedlen);
SWSS_LOG_ERROR("%s", message.c_str());
throw system_error(make_error_code(errc::io_error), message);
}

}
Loading
Loading