Skip to content

Commit ac17c35

Browse files
bridge: move ZMQ handling over (#37118)
1 parent bcb13a7 commit ac17c35

File tree

6 files changed

+256
-15
lines changed

6 files changed

+256
-15
lines changed

cereal/SConscript

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ cereal = env.Library('cereal', [f'gen/cpp/{s}.c++' for s in schema_files])
1313

1414
# Build messaging
1515
services_h = env.Command(['services.h'], ['services.py'], 'python3 ' + cereal_dir.path + '/services.py > $TARGET')
16-
env.Program('messaging/bridge', ['messaging/bridge.cc', 'messaging/msgq_to_zmq.cc'], LIBS=[msgq, common, 'pthread'])
16+
env.Program('messaging/bridge', ['messaging/bridge.cc', 'messaging/msgq_to_zmq.cc', 'messaging/bridge_zmq.cc'], LIBS=[msgq, common, 'pthread'])
1717

1818
socketmaster = env.Library('socketmaster', ['messaging/socketmaster.cc'])
1919

cereal/messaging/bridge.cc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,14 @@ void msgq_to_zmq(const std::vector<std::string> &endpoints, const std::string &i
2525
}
2626

2727
void zmq_to_msgq(const std::vector<std::string> &endpoints, const std::string &ip) {
28-
auto poller = std::make_unique<ZMQPoller>();
28+
auto poller = std::make_unique<BridgeZmqPoller>();
2929
auto pub_context = std::make_unique<MSGQContext>();
30-
auto sub_context = std::make_unique<ZMQContext>();
31-
std::map<SubSocket *, PubSocket *> sub2pub;
30+
auto sub_context = std::make_unique<BridgeZmqContext>();
31+
std::map<BridgeZmqSubSocket *, MSGQPubSocket *> sub2pub;
3232

3333
for (auto endpoint : endpoints) {
3434
auto pub_sock = new MSGQPubSocket();
35-
auto sub_sock = new ZMQSubSocket();
35+
auto sub_sock = new BridgeZmqSubSocket();
3636
size_t queue_size = services.at(endpoint).queue_size;
3737
pub_sock->connect(pub_context.get(), endpoint, true, queue_size);
3838
sub_sock->connect(sub_context.get(), endpoint, ip, false);

cereal/messaging/bridge_zmq.cc

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
#include "cereal/messaging/bridge_zmq.h"
2+
3+
#include <cassert>
4+
#include <cstring>
5+
#include <unistd.h>
6+
7+
static size_t fnv1a_hash(const std::string &str) {
8+
const size_t fnv_prime = 0x100000001b3;
9+
size_t hash_value = 0xcbf29ce484222325;
10+
for (char c : str) {
11+
hash_value ^= (unsigned char)c;
12+
hash_value *= fnv_prime;
13+
}
14+
return hash_value;
15+
}
16+
17+
// FIXME: This is a hack to get the port number from the socket name, might have collisions.
18+
static int get_port(std::string endpoint) {
19+
size_t hash_value = fnv1a_hash(endpoint);
20+
int start_port = 8023;
21+
int max_port = 65535;
22+
return start_port + (hash_value % (max_port - start_port));
23+
}
24+
25+
BridgeZmqContext::BridgeZmqContext() {
26+
context = zmq_ctx_new();
27+
}
28+
29+
BridgeZmqContext::~BridgeZmqContext() {
30+
if (context != nullptr) {
31+
zmq_ctx_term(context);
32+
}
33+
}
34+
35+
void BridgeZmqMessage::init(size_t sz) {
36+
size = sz;
37+
data = new char[size];
38+
}
39+
40+
void BridgeZmqMessage::init(char *d, size_t sz) {
41+
size = sz;
42+
data = new char[size];
43+
memcpy(data, d, size);
44+
}
45+
46+
void BridgeZmqMessage::close() {
47+
if (size > 0) {
48+
delete[] data;
49+
}
50+
data = nullptr;
51+
size = 0;
52+
}
53+
54+
BridgeZmqMessage::~BridgeZmqMessage() {
55+
close();
56+
}
57+
58+
int BridgeZmqSubSocket::connect(BridgeZmqContext *context, std::string endpoint, std::string address, bool conflate, bool check_endpoint) {
59+
sock = zmq_socket(context->getRawContext(), ZMQ_SUB);
60+
if (sock == nullptr) {
61+
return -1;
62+
}
63+
64+
zmq_setsockopt(sock, ZMQ_SUBSCRIBE, "", 0);
65+
66+
if (conflate) {
67+
int arg = 1;
68+
zmq_setsockopt(sock, ZMQ_CONFLATE, &arg, sizeof(int));
69+
}
70+
71+
int reconnect_ivl = 500;
72+
zmq_setsockopt(sock, ZMQ_RECONNECT_IVL_MAX, &reconnect_ivl, sizeof(reconnect_ivl));
73+
74+
full_endpoint = "tcp://" + address + ":";
75+
if (check_endpoint) {
76+
full_endpoint += std::to_string(get_port(endpoint));
77+
} else {
78+
full_endpoint += endpoint;
79+
}
80+
81+
return zmq_connect(sock, full_endpoint.c_str());
82+
}
83+
84+
void BridgeZmqSubSocket::setTimeout(int timeout) {
85+
zmq_setsockopt(sock, ZMQ_RCVTIMEO, &timeout, sizeof(int));
86+
}
87+
88+
Message *BridgeZmqSubSocket::receive(bool non_blocking) {
89+
zmq_msg_t msg;
90+
assert(zmq_msg_init(&msg) == 0);
91+
92+
int flags = non_blocking ? ZMQ_DONTWAIT : 0;
93+
int rc = zmq_msg_recv(&msg, sock, flags);
94+
95+
Message *ret = nullptr;
96+
if (rc >= 0) {
97+
ret = new BridgeZmqMessage;
98+
ret->init((char *)zmq_msg_data(&msg), zmq_msg_size(&msg));
99+
}
100+
101+
zmq_msg_close(&msg);
102+
return ret;
103+
}
104+
105+
BridgeZmqSubSocket::~BridgeZmqSubSocket() {
106+
if (sock != nullptr) {
107+
zmq_close(sock);
108+
}
109+
}
110+
111+
int BridgeZmqPubSocket::connect(BridgeZmqContext *context, std::string endpoint, bool check_endpoint) {
112+
sock = zmq_socket(context->getRawContext(), ZMQ_PUB);
113+
if (sock == nullptr) {
114+
return -1;
115+
}
116+
117+
full_endpoint = "tcp://*:";
118+
if (check_endpoint) {
119+
full_endpoint += std::to_string(get_port(endpoint));
120+
} else {
121+
full_endpoint += endpoint;
122+
}
123+
124+
// ZMQ pub sockets cannot be shared between processes, so we need to ensure pid stays the same.
125+
pid = getpid();
126+
127+
return zmq_bind(sock, full_endpoint.c_str());
128+
}
129+
130+
int BridgeZmqPubSocket::sendMessage(Message *message) {
131+
assert(pid == getpid());
132+
return zmq_send(sock, message->getData(), message->getSize(), ZMQ_DONTWAIT);
133+
}
134+
135+
int BridgeZmqPubSocket::send(char *data, size_t size) {
136+
assert(pid == getpid());
137+
return zmq_send(sock, data, size, ZMQ_DONTWAIT);
138+
}
139+
140+
BridgeZmqPubSocket::~BridgeZmqPubSocket() {
141+
if (sock != nullptr) {
142+
zmq_close(sock);
143+
}
144+
}
145+
146+
void BridgeZmqPoller::registerSocket(BridgeZmqSubSocket *socket) {
147+
assert(num_polls + 1 < (sizeof(polls) / sizeof(polls[0])));
148+
polls[num_polls].socket = socket->getRawSocket();
149+
polls[num_polls].events = ZMQ_POLLIN;
150+
151+
sockets.push_back(socket);
152+
num_polls++;
153+
}
154+
155+
std::vector<BridgeZmqSubSocket *> BridgeZmqPoller::poll(int timeout) {
156+
std::vector<BridgeZmqSubSocket *> ret;
157+
158+
int rc = zmq_poll(polls, num_polls, timeout);
159+
if (rc < 0) {
160+
return ret;
161+
}
162+
163+
for (size_t i = 0; i < num_polls; i++) {
164+
if (polls[i].revents) {
165+
ret.push_back(sockets[i]);
166+
}
167+
}
168+
169+
return ret;
170+
}

cereal/messaging/bridge_zmq.h

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
#pragma once
2+
3+
#include <cstddef>
4+
#include <string>
5+
#include <vector>
6+
7+
#include <zmq.h>
8+
9+
#include "msgq/ipc.h"
10+
11+
class BridgeZmqContext {
12+
public:
13+
BridgeZmqContext();
14+
void *getRawContext() { return context; }
15+
~BridgeZmqContext();
16+
17+
private:
18+
void *context = nullptr;
19+
};
20+
21+
class BridgeZmqMessage : public Message {
22+
public:
23+
void init(size_t size) override;
24+
void init(char *data, size_t size) override;
25+
void close() override;
26+
size_t getSize() override { return size; }
27+
char *getData() override { return data; }
28+
~BridgeZmqMessage() override;
29+
30+
private:
31+
char *data = nullptr;
32+
size_t size = 0;
33+
};
34+
35+
class BridgeZmqSubSocket {
36+
public:
37+
int connect(BridgeZmqContext *context, std::string endpoint, std::string address, bool conflate = false, bool check_endpoint = true);
38+
void setTimeout(int timeout);
39+
Message *receive(bool non_blocking = false);
40+
void *getRawSocket() { return sock; }
41+
~BridgeZmqSubSocket();
42+
43+
private:
44+
void *sock = nullptr;
45+
std::string full_endpoint;
46+
};
47+
48+
class BridgeZmqPubSocket {
49+
public:
50+
int connect(BridgeZmqContext *context, std::string endpoint, bool check_endpoint = true);
51+
int sendMessage(Message *message);
52+
int send(char *data, size_t size);
53+
void *getRawSocket() { return sock; }
54+
~BridgeZmqPubSocket();
55+
56+
private:
57+
void *sock = nullptr;
58+
std::string full_endpoint;
59+
int pid = -1;
60+
};
61+
62+
class BridgeZmqPoller {
63+
public:
64+
void registerSocket(BridgeZmqSubSocket *socket);
65+
std::vector<BridgeZmqSubSocket *> poll(int timeout);
66+
67+
private:
68+
static constexpr size_t MAX_BRIDGE_ZMQ_POLLERS = 128;
69+
std::vector<BridgeZmqSubSocket *> sockets;
70+
zmq_pollitem_t polls[MAX_BRIDGE_ZMQ_POLLERS] = {};
71+
size_t num_polls = 0;
72+
};

cereal/messaging/msgq_to_zmq.cc

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,14 @@ static std::string recv_zmq_msg(void *sock) {
2222
}
2323

2424
void MsgqToZmq::run(const std::vector<std::string> &endpoints, const std::string &ip) {
25-
zmq_context = std::make_unique<ZMQContext>();
25+
zmq_context = std::make_unique<BridgeZmqContext>();
2626
msgq_context = std::make_unique<MSGQContext>();
2727

2828
// Create ZMQPubSockets for each endpoint
2929
for (const auto &endpoint : endpoints) {
3030
auto &socket_pair = socket_pairs.emplace_back();
3131
socket_pair.endpoint = endpoint;
32-
socket_pair.pub_sock = std::make_unique<ZMQPubSocket>();
32+
socket_pair.pub_sock = std::make_unique<BridgeZmqPubSocket>();
3333
int ret = socket_pair.pub_sock->connect(zmq_context.get(), endpoint);
3434
if (ret != 0) {
3535
printf("Failed to create ZMQ publisher for [%s]: %s\n", endpoint.c_str(), zmq_strerror(zmq_errno()));
@@ -49,7 +49,7 @@ void MsgqToZmq::run(const std::vector<std::string> &endpoints, const std::string
4949

5050
for (auto sub_sock : msgq_poller->poll(100)) {
5151
// Process messages for each socket
52-
ZMQPubSocket *pub_sock = sub2pub.at(sub_sock);
52+
BridgeZmqPubSocket *pub_sock = sub2pub.at(sub_sock);
5353
for (int i = 0; i < MAX_MESSAGES_PER_SOCKET; ++i) {
5454
auto msg = std::unique_ptr<Message>(sub_sock->receive(true));
5555
if (!msg) break;
@@ -72,7 +72,7 @@ void MsgqToZmq::zmqMonitorThread() {
7272
// Set up ZMQ monitor for each pub socket
7373
for (int i = 0; i < socket_pairs.size(); ++i) {
7474
std::string addr = "inproc://op-bridge-monitor-" + std::to_string(i);
75-
zmq_socket_monitor(socket_pairs[i].pub_sock->sock, addr.c_str(), ZMQ_EVENT_ACCEPTED | ZMQ_EVENT_DISCONNECTED);
75+
zmq_socket_monitor(socket_pairs[i].pub_sock->getRawSocket(), addr.c_str(), ZMQ_EVENT_ACCEPTED | ZMQ_EVENT_DISCONNECTED);
7676

7777
void *monitor_socket = zmq_socket(zmq_context->getRawContext(), ZMQ_PAIR);
7878
zmq_connect(monitor_socket, addr.c_str());
@@ -130,7 +130,7 @@ void MsgqToZmq::zmqMonitorThread() {
130130

131131
// Clean up monitor sockets
132132
for (int i = 0; i < pollitems.size(); ++i) {
133-
zmq_socket_monitor(socket_pairs[i].pub_sock->sock, nullptr, 0);
133+
zmq_socket_monitor(socket_pairs[i].pub_sock->getRawSocket(), nullptr, 0);
134134
zmq_close(pollitems[i].socket);
135135
}
136136
cv.notify_one();

cereal/messaging/msgq_to_zmq.h

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,8 @@
77
#include <string>
88
#include <vector>
99

10-
#define private public
1110
#include "msgq/impl_msgq.h"
12-
#include "msgq/impl_zmq.h"
11+
#include "cereal/messaging/bridge_zmq.h"
1312

1413
class MsgqToZmq {
1514
public:
@@ -22,16 +21,16 @@ class MsgqToZmq {
2221

2322
struct SocketPair {
2423
std::string endpoint;
25-
std::unique_ptr<ZMQPubSocket> pub_sock;
24+
std::unique_ptr<BridgeZmqPubSocket> pub_sock;
2625
std::unique_ptr<MSGQSubSocket> sub_sock;
2726
int connected_clients = 0;
2827
};
2928

3029
std::unique_ptr<MSGQContext> msgq_context;
31-
std::unique_ptr<ZMQContext> zmq_context;
30+
std::unique_ptr<BridgeZmqContext> zmq_context;
3231
std::mutex mutex;
3332
std::condition_variable cv;
3433
std::unique_ptr<MSGQPoller> msgq_poller;
35-
std::map<SubSocket *, ZMQPubSocket *> sub2pub;
34+
std::map<SubSocket *, BridgeZmqPubSocket *> sub2pub;
3635
std::vector<SocketPair> socket_pairs;
3736
};

0 commit comments

Comments
 (0)