Skip to content

Commit 5f40e9e

Browse files
committed
amqp: Send discovery message periodically
1 parent dc998e1 commit 5f40e9e

File tree

1 file changed

+39
-3
lines changed

1 file changed

+39
-3
lines changed

src/msgflo.cpp

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,22 @@ class AbstractEngine {
193193
vector<ParticipantRegistration> registrations;
194194
};
195195

196+
// C-style subclassing
197+
// used to pass context for libev timer callback
198+
struct EvTimerWrapper {
199+
200+
public:
201+
struct ev_timer timer;
202+
std::function<void (void)> callback;
203+
};
204+
205+
static void timeout_cb(struct ev_loop *loop, ev_timer *timer, int revent) {
206+
EvTimerWrapper *wrapper = (EvTimerWrapper *)timer;
207+
if (wrapper->callback) {
208+
wrapper->callback();
209+
}
210+
}
211+
196212
class AmqpEngine final : public Engine, protected AbstractEngine<AmqpEngine> {
197213

198214
struct AmqpMessage final : public AbstractMessage {
@@ -216,11 +232,18 @@ class AmqpEngine final : public Engine, protected AbstractEngine<AmqpEngine> {
216232
};
217233

218234
public:
219-
AmqpEngine(const string &url)
220-
: Engine(), loop(EV_DEFAULT), handler(loop), connection(&handler, AMQP::Address(url)), channel(&connection) {
235+
AmqpEngine(const string &url, EngineConfig config)
236+
: Engine()
237+
, loop(EV_DEFAULT)
238+
, handler(loop)
239+
, connection(&handler, AMQP::Address(url))
240+
, channel(&connection)
241+
, discoveryPeriod(config.discoveryPeriod/3)
242+
{
221243
channel.setQos(1); // TODO: is this prefech?
222244

223245
channel.onReady([&]() {
246+
connected = true;
224247
for(auto &r: registrations) {
225248
for (const auto &port : r.inports) {
226249
setupInPort(r, port);
@@ -241,6 +264,16 @@ class AmqpEngine final : public Engine, protected AbstractEngine<AmqpEngine> {
241264
}
242265

243266
virtual void launch() override {
267+
discoveryTimer.callback = [this]() {
268+
if (not connected) {
269+
return;
270+
}
271+
for(auto &r: registrations) {
272+
this->sendDiscoveryMessage(r);
273+
}
274+
};
275+
ev_timer_init(&discoveryTimer.timer, timeout_cb, discoveryPeriod, discoveryPeriod);
276+
ev_timer_start(loop, &discoveryTimer.timer);
244277
ev_run(loop, 0);
245278
}
246279

@@ -290,6 +323,9 @@ class AmqpEngine final : public Engine, protected AbstractEngine<AmqpEngine> {
290323
AMQP::LibEvHandler handler;
291324
AMQP::TcpConnection connection;
292325
AMQP::TcpChannel channel;
326+
int64_t discoveryPeriod;
327+
EvTimerWrapper discoveryTimer;
328+
bool connected = false;
293329
};
294330

295331
using msg_flo_mqtt_client = mqtt_client<trygvis::mqtt_support::mqtt_client_personality::polling>;
@@ -534,7 +570,7 @@ shared_ptr<Engine> createEngine(const EngineConfig config) {
534570

535571
return make_shared<MosquittoEngine>(config, host, port, keep_alive, client_id, clean_session);
536572
} else if (string_starts_with(url, "amqp://")) {
537-
return make_shared<AmqpEngine>(url);
573+
return make_shared<AmqpEngine>(url, config);
538574
}
539575

540576
throw std::runtime_error("Unsupported URL scheme: " + url);

0 commit comments

Comments
 (0)