Skip to content

Commit 39d7eca

Browse files
committed
Add support for lazy subscriptions
This allows defining bridges with broad subscription patterns that will forward incoming matching subscriptions, instead of the whole pattern. This allows for significant reduction in traffic by only receiving traffic clients are interested in.
1 parent 549f8eb commit 39d7eca

22 files changed

+1456
-16
lines changed

CMakeLists.shared

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,9 @@ set(FLASHMQ_HEADERS
9292
${RELPATH}sharedmutexowned.h
9393
${RELPATH}reentrantmap.h
9494
${RELPATH}checkeduniqueptr.h
95+
${RELPATH}lazysubscriptions.h
96+
${RELPATH}trackedsubscriptions.h
97+
${RELPATH}trackedsubscriptionstate.h
9598
)
9699

97100
set(FLASHMQ_IMPLS
@@ -159,4 +162,7 @@ set(FLASHMQ_IMPLS
159162
${RELPATH}fmqssl.cpp
160163
${RELPATH}persistencefunctions.cpp
161164
${RELPATH}clientacceptqueue.cpp
165+
${RELPATH}lazysubscriptions.cpp
166+
${RELPATH}trackedsubscriptions.cpp
167+
${RELPATH}trackedsubscriptionstate.cpp
162168
)

bridgeconfig.cpp

Lines changed: 74 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,6 @@ bool BridgeTopicPath::operator==(const BridgeTopicPath &other) const
8787
return this->topic == other.topic && this->qos == other.qos;
8888
}
8989

90-
9190
BridgeState::BridgeState(const BridgeConfig &config) :
9291
c(config)
9392
{
@@ -210,6 +209,39 @@ void BridgeState::resetReconnectCounter()
210209
intervalLogged = 0;
211210
}
212211

212+
void BridgeState::constructTrackedSubscriptions()
213+
{
214+
if (mTrackedSubscriptions)
215+
return;
216+
mTrackedSubscriptions = std::make_unique<TrackedSubscriptionState>();
217+
}
218+
219+
CheckedUniquePtr<TrackedSubscriptionState> &BridgeState::getTrackedSubscriptions()
220+
{
221+
return mTrackedSubscriptions;
222+
}
223+
224+
/**
225+
* @brief BridgeState::stealTrackedSubscriptions
226+
* @param source
227+
*
228+
* We only move the tracked subscriptions, not the mutations. The idea is that when a new BridgeState is being created (because
229+
* of a config reload), that will already have registered itself as lazy subscriber and begun to receive subscription mutations.
230+
* Those mutations will be processed as soon as the MQTT connection is approved. Before that, this stealing should take
231+
* place. Processing those mutations on the tracked subscriptions we take, should be consistent enough.
232+
*/
233+
void BridgeState::stealTrackedSubscriptions(BridgeState &source)
234+
{
235+
if (!mTrackedSubscriptions || !source.mTrackedSubscriptions)
236+
return;
237+
238+
Logger::getInstance()->log(LOG_WARNING)
239+
<< "Migrating 'tracked subscriptions' from the old connection of " << source.c.clientidPrefix << "' is somewhat experimental. "
240+
<< "There is a chance (un)subscriptions received during the connection transition aren't correctly processed.";
241+
242+
this->mTrackedSubscriptions->replaceTrackedSubscriptions(source.mTrackedSubscriptions->stealTrackedSubscriptions());
243+
}
244+
213245
/**
214246
* @brief BridgeConfig::setClientId is for setting the client ID on start to the one from a saved state. That's why it only works when the prefix matches.
215247
* @param prefix
@@ -254,6 +286,11 @@ void BridgeConfig::setSharedSubscriptionName(const std::string &share_name)
254286
{
255287
setSharedSubscriptionName(publishes, share_name);
256288
setSharedSubscriptionName(subscribes, share_name);
289+
290+
for (auto &l : lazySubscriptions)
291+
{
292+
l.share_name = share_name;
293+
}
257294
}
258295

259296
void BridgeConfig::setSharedSubscriptionName(std::vector<BridgeTopicPath> &topics, const std::string &share_name)
@@ -317,8 +354,8 @@ void BridgeConfig::isValid()
317354
if (address.empty())
318355
throw ConfigFileException("No address specified in bridge");
319356

320-
if (publishes.empty() && subscribes.empty())
321-
throw ConfigFileException("No subscribe or publish paths defined in bridge.");
357+
if (publishes.empty() && subscribes.empty() && lazySubscriptions.empty())
358+
throw ConfigFileException("No (lazy) subscribe or publish paths defined in bridge.");
322359

323360
if (!caDir.empty() && !caFile.empty())
324361
throw ConfigFileException("Specify only one 'ca_file' or 'ca_dir'");
@@ -351,6 +388,11 @@ void BridgeConfig::isValid()
351388
std::for_each(publishes.begin(), publishes.end(), check);
352389
std::for_each(subscribes.begin(), subscribes.end(), check);
353390
}
391+
392+
if (!lazySubscriptions.empty() && protocolVersion < ProtocolVersion::Mqtt5)
393+
{
394+
throw ConfigFileException("Using lazy subscriptions needs at least MQTT5");
395+
}
354396
}
355397

356398
std::vector<BridgeConfig> BridgeConfig::multiply() const
@@ -365,7 +407,11 @@ std::vector<BridgeConfig> BridgeConfig::multiply() const
365407
{
366408
result.push_back(*this);
367409

368-
if (this->connection_count > 1)
410+
/*
411+
* Always give lazy subscriptions a share name as to avoid problems when you change connection_count
412+
* from 1 to > 1 and reload FlashMQ.
413+
*/
414+
if (this->connection_count > 1 || !this->lazySubscriptions.empty())
369415
{
370416
result.back().setSharedSubscriptionName(share_name);
371417

@@ -399,7 +445,7 @@ bool BridgeConfig::operator ==(const BridgeConfig &other) const
399445
&& this->useSavedClientId == other.useSavedClientId && this->maxOutgoingTopicAliases == other.maxOutgoingTopicAliases
400446
&& this->maxIncomingTopicAliases == other.maxIncomingTopicAliases && this->tcpNoDelay == other.tcpNoDelay
401447
&& this->local_prefix == other.local_prefix && this->remote_prefix == other.remote_prefix && this->connection_count == other.connection_count
402-
&& this->maxBufferSize == other.maxBufferSize;
448+
&& this->maxBufferSize == other.maxBufferSize && this->lazySubscriptions == other.lazySubscriptions;
403449
}
404450

405451
bool BridgeConfig::operator !=(const BridgeConfig &other) const
@@ -409,3 +455,26 @@ bool BridgeConfig::operator !=(const BridgeConfig &other) const
409455
}
410456

411457

458+
459+
BridgeLazySubscription::BridgeLazySubscription(const std::string &pattern, uint8_t qos) :
460+
pattern(pattern),
461+
qos(qos)
462+
{
463+
464+
}
465+
466+
bool BridgeLazySubscription::operator==(const BridgeLazySubscription &other) const
467+
{
468+
return this->pattern == other.pattern && this->qos == other.qos && this->share_name == other.share_name;
469+
}
470+
471+
void BridgeLazySubscription::isValid() const
472+
{
473+
if (pattern.empty() || !(isValidSubscribePath(pattern) && pattern.back() == '#'))
474+
throw ConfigFileException("The pattern '" + pattern + "' is not a valid lazy subscription pattern. It must end with a multi-level wildcard.");
475+
476+
if (qos > 2)
477+
throw ConfigFileException("QoS " + std::to_string(qos) + " is not a valid QoS for a lazy subscription");
478+
}
479+
480+

bridgeconfig.h

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
#include "dnsresolver.h"
1010
#include "sslctxmanager.h"
1111
#include "utils.h"
12+
#include "trackedsubscriptionstate.h"
13+
#include "checkeduniqueptr.h"
1214

1315
enum class BridgeTLSMode
1416
{
@@ -26,6 +28,22 @@ struct BridgeTopicPath
2628
bool operator==(const BridgeTopicPath &other) const;
2729
};
2830

31+
struct BridgeLazySubscription
32+
{
33+
std::string pattern;
34+
uint8_t qos = 0;
35+
std::string share_name;
36+
37+
BridgeLazySubscription() = default;
38+
BridgeLazySubscription(const std::string &pattern, uint8_t qos);
39+
40+
bool operator==(const BridgeLazySubscription &other) const;
41+
42+
void isValid() const;
43+
44+
};
45+
46+
2947
/**
3048
* @brief The BridgeClientGroupIds class manages the random IDs used in fmq_client_group_id and shared
3149
* subscription names for them.
@@ -52,6 +70,7 @@ class BridgeClientGroupIds
5270
void saveShareNames(const std::string &path) const;
5371
};
5472

73+
5574
class BridgeConfig
5675
{
5776
std::string clientid;
@@ -98,6 +117,8 @@ class BridgeConfig
98117
std::optional<std::string> local_prefix;
99118
std::optional<std::string> remote_prefix;
100119

120+
std::list<BridgeLazySubscription> lazySubscriptions;
121+
101122
void setClientId(const std::string &prefix, const std::string &id);
102123
const std::string &getClientid() const;
103124
const std::optional<std::string> &getFmqClientGroupId() const;
@@ -109,13 +130,22 @@ class BridgeConfig
109130
bool operator !=(const BridgeConfig &other) const;
110131
};
111132

133+
/**
134+
* Lifetime rules:
135+
*
136+
* - Outlives bridge connects/disconnects/reconnects. It's meant to be one level above Client in that respect.
137+
* - Gets recreated when the bridge config changes and you issue a config reload (SIGHUP). Some data may be
138+
* taken from the existin one for the same client id prefix.
139+
*/
112140
class BridgeState
113141
{
114142
bool sslInitialized = false;
115143
std::chrono::time_point<std::chrono::steady_clock> lastReconnectAttempt;
116144
int reconnectCounter = 0;
117145
const int baseReconnectInterval = (get_random_int<int>() % 30) + 30;
118146
int intervalLogged = 0;
147+
CheckedUniquePtr<TrackedSubscriptionState> mTrackedSubscriptions;
148+
119149
public:
120150
const BridgeConfig c;
121151
std::weak_ptr<Session> session;
@@ -132,6 +162,9 @@ class BridgeState
132162
bool timeForNewReconnectAttempt();
133163
void registerReconnect();
134164
void resetReconnectCounter();
165+
void constructTrackedSubscriptions();
166+
CheckedUniquePtr<TrackedSubscriptionState> &getTrackedSubscriptions();
167+
void stealTrackedSubscriptions(BridgeState &other);
135168
};
136169

137170
#endif // BRIDGECONFIG_H

configfileparser.cpp

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,9 @@ ConfigFileParser::ConfigFileParser(const std::string &path) :
302302
validBridgeKeys.insert("minimum_tls_version");
303303
validBridgeKeys.insert("connection_count");
304304
validBridgeKeys.insert("max_buffer_size");
305+
306+
validBridgeLazySubscriptionKeys.insert("subscribe");
307+
validBridgeLazySubscriptionKeys.insert("qos");
305308
}
306309

307310
std::list<std::string> ConfigFileParser::readFileRecursively(const std::string &path) const
@@ -421,11 +424,12 @@ void ConfigFileParser::loadFile(bool test)
421424
std::stack<ConfigParseLevel> curParseLevel;
422425
std::shared_ptr<Listener> curListener;
423426
std::optional<BridgeConfig> curBridge;
427+
std::optional<BridgeLazySubscription> curLazySubscription;
424428
std::list<BridgeConfig> preMultipliedBridges;
425429
Settings tmpSettings;
426430

427431
curParseLevel.push(ConfigParseLevel::Root);
428-
const std::set<std::string> blockNames {"listen", "bridge"};
432+
const std::set<std::string> blockNames {"listen", "bridge", "lazy_subscription"};
429433

430434
// Then once we know the config file is valid, process it.
431435
for (std::string &line : lines)
@@ -451,6 +455,14 @@ void ConfigFileParser::loadFile(bool test)
451455
curParseLevel.push(ConfigParseLevel::Bridge);
452456
curBridge = std::make_optional<BridgeConfig>();
453457
}
458+
else if (testKeyValidity(key, "lazy_subscription", blockNames))
459+
{
460+
if (curParseLevel.top() != ConfigParseLevel::Bridge)
461+
throw ConfigFileException("Block '" + key + "' can only be defined in a bridge");
462+
463+
curParseLevel.push(ConfigParseLevel::BridgeLazySubscriptions);
464+
curLazySubscription = std::make_optional<BridgeLazySubscription>();
465+
}
454466
else
455467
{
456468
std::ostringstream oss;
@@ -494,6 +506,12 @@ void ConfigFileParser::loadFile(bool test)
494506
curBridge->isValid();
495507
preMultipliedBridges.push_back(std::move(curBridge.value()));
496508
}
509+
else if (curParseLevel.top() == ConfigParseLevel::BridgeLazySubscriptions)
510+
{
511+
curLazySubscription.value().isValid();
512+
curBridge.value().lazySubscriptions.emplace_back(std::move(curLazySubscription.value()));
513+
curLazySubscription.reset();
514+
}
497515

498516
curParseLevel.pop();
499517

@@ -934,6 +952,26 @@ void ConfigFileParser::loadFile(bool test)
934952
testCorrectNumberOfValues(key, number_of_expected_values, values);
935953
continue;
936954
}
955+
else if (curParseLevel.top() == ConfigParseLevel::BridgeLazySubscriptions)
956+
{
957+
if (testKeyValidity(key, "subscribe", validBridgeLazySubscriptionKeys))
958+
{
959+
if (!isValidUtf8(valueTrimmed) || !isValidSubscribePath(valueTrimmed))
960+
throw ConfigFileException("Path '" + valueTrimmed + "' is not a valid subscribe pattern");
961+
962+
if (!curLazySubscription.value().pattern.empty())
963+
throw ConfigFileException("Multiple 'subscribe' lines for lazy subscriptions is not valid. Use multiple 'lazy_subscription' blocks.");
964+
965+
curLazySubscription.value().pattern = valueTrimmed;
966+
}
967+
else if (testKeyValidity(key, "qos", validBridgeLazySubscriptionKeys))
968+
{
969+
curLazySubscription.value().qos = value_to_int_ranged<uint8_t>(key, valueTrimmed, 0, 2);
970+
}
971+
972+
testCorrectNumberOfValues(key, number_of_expected_values, values);
973+
continue;
974+
}
937975

938976
const std::string plugin_opt_ = "plugin_opt_";
939977
if (startsWith(key, plugin_opt_))

configfileparser.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ enum class ConfigParseLevel
2626
{
2727
Root,
2828
Listen,
29-
Bridge
29+
Bridge,
30+
BridgeLazySubscriptions
3031
};
3132

3233
template<typename T>
@@ -101,6 +102,7 @@ class ConfigFileParser
101102
std::set<std::string> validKeys;
102103
std::set<std::string> validListenKeys;
103104
std::set<std::string> validBridgeKeys;
105+
std::set<std::string> validBridgeLazySubscriptionKeys;
104106

105107
const std::regex key_value_regex = std::regex("^([\\w\\-]+)\\s+(.+)$");
106108
const std::regex block_regex_start = std::regex("^([a-zA-Z0-9_\\-]+) *\\{$");

globals.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@
33

44
#include <memory>
55
#include <pthread.h>
6+
#include <optional>
67

78
#include "subscriptionstore.h"
9+
#include "lazysubscriptions.h"
810
#include "globalstats.h"
911
#include "bridgeconfig.h"
1012
#include "checkedsharedptr.h"
@@ -26,6 +28,7 @@ class Globals
2628
GlobalStats stats;
2729
BridgeClientGroupIds bridgeClientGroupIds;
2830
MutexOwned<std::vector<std::shared_ptr<ThreadData>>> threadDatas;
31+
std::optional<LazySubscriptions> lazySubscriptions;
2932

3033
CheckedSharedPtr<ThreadData> getDeterministicThreadData();
3134
};

0 commit comments

Comments
 (0)