Skip to content

Commit dc998e1

Browse files
committed
mqtt: Send discovery message periodically
1 parent 444eed6 commit dc998e1

File tree

2 files changed

+40
-7
lines changed

2 files changed

+40
-7
lines changed

include/msgflo.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,10 @@ class Engine {
117117

118118
class EngineConfig {
119119
public:
120-
EngineConfig() : _debugOutput(false) {
120+
EngineConfig()
121+
: _debugOutput(false)
122+
, discoveryPeriod(60)
123+
{
121124
_debugOutput = std::getenv("MSGFLO_CPP_DEBUG") ? true : false;
122125
}
123126

@@ -139,9 +142,10 @@ class EngineConfig {
139142
return _url;
140143
}
141144

142-
private:
145+
public:
143146
bool _debugOutput;
144147
std::string _url;
148+
int discoveryPeriod; // seconds
145149
};
146150

147151
std::shared_ptr<Engine> createEngine(const EngineConfig config);

src/msgflo.cpp

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,14 @@ std::string random_string( size_t length )
2727
return str;
2828
}
2929

30+
int64_t millis_monotonic(void)
31+
{
32+
struct timespec spec;
33+
clock_gettime(CLOCK_MONOTONIC, &spec);
34+
const int64_t ms = (spec.tv_sec*1000) + round(spec.tv_nsec / 1.0e6);
35+
return ms;
36+
}
37+
3038
std::string string_to_upper_copy(const std::string &str) {
3139
std::string ret;
3240
ret.resize(str.size());
@@ -315,8 +323,12 @@ class MosquittoEngine final : public Engine, protected mqtt_event_listener, prot
315323

316324
public:
317325
MosquittoEngine(const EngineConfig config, const string &host, const int port,
318-
const int keep_alive, const string &client_id, const bool clean_session) :
319-
_debugOutput(config.debugOutput()), client(this, host, port, keep_alive, client_id, clean_session) {
326+
const int keep_alive, const string &client_id, const bool clean_session)
327+
: _debugOutput(config.debugOutput())
328+
, client(this, host, port, keep_alive, client_id, clean_session)
329+
, discoveryLastSent(0)
330+
, discoveryPeriod(config.discoveryPeriod/3)
331+
{
320332
client.connect();
321333
}
322334

@@ -343,6 +355,14 @@ class MosquittoEngine final : public Engine, protected mqtt_event_listener, prot
343355
run = true;
344356

345357
while (run) {
358+
const auto t = millis_monotonic()/1000;
359+
if (connected and (t - discoveryLastSent) > discoveryPeriod) {
360+
for(auto &r: registrations) {
361+
sendDiscoveryMessage(r);
362+
}
363+
discoveryLastSent = t;
364+
}
365+
346366
client.poll();
347367
}
348368
}
@@ -374,22 +394,31 @@ class MosquittoEngine final : public Engine, protected mqtt_event_listener, prot
374394
}
375395

376396
virtual void on_connect(int rc) override {
397+
connected = true;
377398
for(auto &r: registrations) {
378399
for (auto &p : r.inports) {
379400
on_msg("Connecting port " + p.id + " to mqtt topic " + p.queue);
380401
client.subscribe(nullptr, p.queue, 0);
381402
}
382-
383-
string data = json11::Json(r.discoveryMessage).dump();
384-
client.publish(nullptr, "fbp", 0, false, data);
403+
sendDiscoveryMessage(r);
385404
}
405+
discoveryLastSent = millis_monotonic()/1000;
406+
}
407+
408+
private:
409+
void sendDiscoveryMessage(const ParticipantRegistration &r) {
410+
const string data = json11::Json(r.discoveryMessage).dump();
411+
client.publish(nullptr, "fbp", 0, false, data);
386412
}
387413

388414
private:
389415
const bool _debugOutput;
390416
atomic_bool run;
391417
msg_flo_mqtt_client client;
392418
vector<ParticipantRegistration> registrations;
419+
bool connected;
420+
int64_t discoveryLastSent;
421+
const int64_t discoveryPeriod;
393422
};
394423

395424
shared_ptr<Engine> createEngine(const EngineConfig config) {

0 commit comments

Comments
 (0)