Skip to content

Commit c7acf56

Browse files
authored
Merge pull request #20 from msgflo/periodic-discover
Send discovery messages periodicically
2 parents 444eed6 + 5f40e9e commit c7acf56

File tree

2 files changed

+79
-10
lines changed

2 files changed

+79
-10
lines changed

include/msgflo.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,10 @@ class Engine {
117117

118118
class EngineConfig {
119119
public:
120-
EngineConfig() : _debugOutput(false) {
120+
EngineConfig()
121+
: _debugOutput(false)
122+
, discoveryPeriod(60)
123+
{
121124
_debugOutput = std::getenv("MSGFLO_CPP_DEBUG") ? true : false;
122125
}
123126

@@ -139,9 +142,10 @@ class EngineConfig {
139142
return _url;
140143
}
141144

142-
private:
145+
public:
143146
bool _debugOutput;
144147
std::string _url;
148+
int discoveryPeriod; // seconds
145149
};
146150

147151
std::shared_ptr<Engine> createEngine(const EngineConfig config);

src/msgflo.cpp

Lines changed: 73 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,14 @@ std::string random_string( size_t length )
2727
return str;
2828
}
2929

30+
int64_t millis_monotonic(void)
31+
{
32+
struct timespec spec;
33+
clock_gettime(CLOCK_MONOTONIC, &spec);
34+
const int64_t ms = (spec.tv_sec*1000) + round(spec.tv_nsec / 1.0e6);
35+
return ms;
36+
}
37+
3038
std::string string_to_upper_copy(const std::string &str) {
3139
std::string ret;
3240
ret.resize(str.size());
@@ -185,6 +193,22 @@ class AbstractEngine {
185193
vector<ParticipantRegistration> registrations;
186194
};
187195

196+
// C-style subclassing
197+
// used to pass context for libev timer callback
198+
struct EvTimerWrapper {
199+
200+
public:
201+
struct ev_timer timer;
202+
std::function<void (void)> callback;
203+
};
204+
205+
static void timeout_cb(struct ev_loop *loop, ev_timer *timer, int revent) {
206+
EvTimerWrapper *wrapper = (EvTimerWrapper *)timer;
207+
if (wrapper->callback) {
208+
wrapper->callback();
209+
}
210+
}
211+
188212
class AmqpEngine final : public Engine, protected AbstractEngine<AmqpEngine> {
189213

190214
struct AmqpMessage final : public AbstractMessage {
@@ -208,11 +232,18 @@ class AmqpEngine final : public Engine, protected AbstractEngine<AmqpEngine> {
208232
};
209233

210234
public:
211-
AmqpEngine(const string &url)
212-
: Engine(), loop(EV_DEFAULT), handler(loop), connection(&handler, AMQP::Address(url)), channel(&connection) {
235+
AmqpEngine(const string &url, EngineConfig config)
236+
: Engine()
237+
, loop(EV_DEFAULT)
238+
, handler(loop)
239+
, connection(&handler, AMQP::Address(url))
240+
, channel(&connection)
241+
, discoveryPeriod(config.discoveryPeriod/3)
242+
{
213243
channel.setQos(1); // TODO: is this prefech?
214244

215245
channel.onReady([&]() {
246+
connected = true;
216247
for(auto &r: registrations) {
217248
for (const auto &port : r.inports) {
218249
setupInPort(r, port);
@@ -233,6 +264,16 @@ class AmqpEngine final : public Engine, protected AbstractEngine<AmqpEngine> {
233264
}
234265

235266
virtual void launch() override {
267+
discoveryTimer.callback = [this]() {
268+
if (not connected) {
269+
return;
270+
}
271+
for(auto &r: registrations) {
272+
this->sendDiscoveryMessage(r);
273+
}
274+
};
275+
ev_timer_init(&discoveryTimer.timer, timeout_cb, discoveryPeriod, discoveryPeriod);
276+
ev_timer_start(loop, &discoveryTimer.timer);
236277
ev_run(loop, 0);
237278
}
238279

@@ -282,6 +323,9 @@ class AmqpEngine final : public Engine, protected AbstractEngine<AmqpEngine> {
282323
AMQP::LibEvHandler handler;
283324
AMQP::TcpConnection connection;
284325
AMQP::TcpChannel channel;
326+
int64_t discoveryPeriod;
327+
EvTimerWrapper discoveryTimer;
328+
bool connected = false;
285329
};
286330

287331
using msg_flo_mqtt_client = mqtt_client<trygvis::mqtt_support::mqtt_client_personality::polling>;
@@ -315,8 +359,12 @@ class MosquittoEngine final : public Engine, protected mqtt_event_listener, prot
315359

316360
public:
317361
MosquittoEngine(const EngineConfig config, const string &host, const int port,
318-
const int keep_alive, const string &client_id, const bool clean_session) :
319-
_debugOutput(config.debugOutput()), client(this, host, port, keep_alive, client_id, clean_session) {
362+
const int keep_alive, const string &client_id, const bool clean_session)
363+
: _debugOutput(config.debugOutput())
364+
, client(this, host, port, keep_alive, client_id, clean_session)
365+
, discoveryLastSent(0)
366+
, discoveryPeriod(config.discoveryPeriod/3)
367+
{
320368
client.connect();
321369
}
322370

@@ -343,6 +391,14 @@ class MosquittoEngine final : public Engine, protected mqtt_event_listener, prot
343391
run = true;
344392

345393
while (run) {
394+
const auto t = millis_monotonic()/1000;
395+
if (connected and (t - discoveryLastSent) > discoveryPeriod) {
396+
for(auto &r: registrations) {
397+
sendDiscoveryMessage(r);
398+
}
399+
discoveryLastSent = t;
400+
}
401+
346402
client.poll();
347403
}
348404
}
@@ -374,22 +430,31 @@ class MosquittoEngine final : public Engine, protected mqtt_event_listener, prot
374430
}
375431

376432
virtual void on_connect(int rc) override {
433+
connected = true;
377434
for(auto &r: registrations) {
378435
for (auto &p : r.inports) {
379436
on_msg("Connecting port " + p.id + " to mqtt topic " + p.queue);
380437
client.subscribe(nullptr, p.queue, 0);
381438
}
382-
383-
string data = json11::Json(r.discoveryMessage).dump();
384-
client.publish(nullptr, "fbp", 0, false, data);
439+
sendDiscoveryMessage(r);
385440
}
441+
discoveryLastSent = millis_monotonic()/1000;
442+
}
443+
444+
private:
445+
void sendDiscoveryMessage(const ParticipantRegistration &r) {
446+
const string data = json11::Json(r.discoveryMessage).dump();
447+
client.publish(nullptr, "fbp", 0, false, data);
386448
}
387449

388450
private:
389451
const bool _debugOutput;
390452
atomic_bool run;
391453
msg_flo_mqtt_client client;
392454
vector<ParticipantRegistration> registrations;
455+
bool connected;
456+
int64_t discoveryLastSent;
457+
const int64_t discoveryPeriod;
393458
};
394459

395460
shared_ptr<Engine> createEngine(const EngineConfig config) {
@@ -505,7 +570,7 @@ shared_ptr<Engine> createEngine(const EngineConfig config) {
505570

506571
return make_shared<MosquittoEngine>(config, host, port, keep_alive, client_id, clean_session);
507572
} else if (string_starts_with(url, "amqp://")) {
508-
return make_shared<AmqpEngine>(url);
573+
return make_shared<AmqpEngine>(url, config);
509574
}
510575

511576
throw std::runtime_error("Unsupported URL scheme: " + url);

0 commit comments

Comments
 (0)