Skip to content

Commit 6da334f

Browse files
committed
Defer subacks to lazy subscriptions until the remote side acknowledged
When a client places subscriptions that match lazy subscriptions that need to be expanded, we need to wait with sending the SUBACK until the other end(s) have sent their SUBACKS. This is so that when clients trigger subsequent actions on it, the subscription will actually exist.
1 parent 64c938f commit 6da334f

23 files changed

+329
-48
lines changed

FlashMQTests/tst_maintests.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -979,20 +979,20 @@ void MainTests::testSavingSessions()
979979
const std::string topic1 = "one/two/three";
980980
std::vector<std::string> subtopics;
981981
subtopics = splitTopic(topic1);
982-
store->addSubscription(c1->getSession(), subtopics, 0, true, false, "", 0);
982+
store->addSubscription(c1->getSession(), 0, subtopics, 0, true, false, "", 0);
983983

984984
const std::string topic2 = "four/five/six";
985985
subtopics = splitTopic(topic2);
986-
store->addSubscription(c2->getSession(), subtopics, 0, false, true, "", 0);
987-
store->addSubscription(c1->getSession(), subtopics, 0, false, false, "", 94612);
986+
store->addSubscription(c2->getSession(), 0, subtopics, 0, false, true, "", 0);
987+
store->addSubscription(c1->getSession(), 0, subtopics, 0, false, false, "", 94612);
988988

989989
const std::string topic3 = "";
990990
subtopics = splitTopic(topic3);
991-
store->addSubscription(c2->getSession(), subtopics, 0, false, false, "", 0);
991+
store->addSubscription(c2->getSession(), 0, subtopics, 0, false, false, "", 0);
992992

993993
const std::string topic4 = "#";
994994
subtopics = splitTopic(topic4);
995-
store->addSubscription(c2->getSession(), subtopics, 0, false, false, "", 0);
995+
store->addSubscription(c2->getSession(), 0, subtopics, 0, false, false, "", 0);
996996

997997
Publish publish("a/b/c", "Hello Barry", 1);
998998
publish.client_id = "ClientIdFromFakePublisher";
@@ -3083,7 +3083,7 @@ void MainTests::testTopicMatchingInSubscriptionTreeHelper(const std::string &sub
30833083
client->setClientProperties(ProtocolVersion::Mqtt5, "mytestclient", {}, "myusername", true, 60);
30843084
store.registerClientAndKickExistingOne(client);
30853085

3086-
store.addSubscription(client->getSession(), subscribe_subtopics, 0, false, false, "", 0);
3086+
store.addSubscription(client->getSession(), 0, subscribe_subtopics, 0, false, false, "", 0);
30873087

30883088
std::vector<ReceivingSubscriber> receivers;
30893089
store.publishRecursively(publish_subtopics.begin(), publish_subtopics.end(), store.root.get(), receivers, "fakeclientid", {});

client.cpp

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -699,6 +699,43 @@ void Client::clearRegistrationData()
699699
this->registrationData.reset();
700700
}
701701

702+
void Client::stageOrSendSubAck(const std::shared_ptr<Client> &this_client, SubAckAction &&action, const size_t expandedCount)
703+
{
704+
assert(this == this_client.get());
705+
706+
if (expandedCount == 0)
707+
{
708+
sendSubAck(action);
709+
return;
710+
}
711+
712+
const uint16_t packid = action.mPacketId;
713+
714+
auto result = stagedSubAcks.insert({packid, std::move(action)});
715+
if (!std::get<bool>(result))
716+
{
717+
logger->log(LOG_WARNING)
718+
<< "stageOrSendSubAck: staged SUBACK with id " << action.mPacketId << " is already exists (id reused). This is probabaly a bug in the client. "
719+
<< "Replying directly instead of staging.";
720+
sendSubAck(action);
721+
return;
722+
}
723+
}
724+
725+
void Client::sendStagedSuback(const uint16_t packetId)
726+
{
727+
assert(this->threadData.lock()->thread_id == pthread_self());
728+
729+
auto pos = stagedSubAcks.find(packetId);
730+
731+
if (pos == stagedSubAcks.end())
732+
return;
733+
734+
const SubAckAction x(std::move(pos->second));
735+
stagedSubAcks.erase(pos);
736+
sendSubAck(x);
737+
}
738+
702739
/**
703740
* @brief Client::stageConnack saves the success connack for later use.
704741
* @param c
@@ -936,6 +973,28 @@ void Client::setAddr(const std::string &address)
936973
addr.setAddress(address);
937974
}
938975

976+
void Client::sendSubAck(const SubAckAction &action)
977+
{
978+
auto store = globals->subscriptionStore;
979+
980+
/*
981+
* Writing the suback here, as opposed to before adding subscriptions way back in the calling context, means the client may
982+
* have already received packets that match the subscription. That should not be a problem, because the spec says:
983+
*
984+
* "The Server is permitted to start sending PUBLISH packets matching the Subscription before the Server sends the SUBACK packet."
985+
*/
986+
SubAck subAck(this->protocolVersion, action.mPacketId, action.mResponseCodes);
987+
MqttPacket response(subAck);
988+
writeMqttPacket(response);
989+
990+
auto session = getSession();
991+
992+
for (const DeferredRetainedSending &d : action.mRetainedSending)
993+
{
994+
store->giveClientRetainedMessages(session, d.mSubtopics, d.mQos, d.mSubscriptionIdentifier);
995+
}
996+
}
997+
939998
bool Client::tryAcmeRedirect()
940999
{
9411000
if (authenticated)

client.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,8 @@ class Client
118118
int disconnectReasonLogLevel = LOG_NOTICE;
119119
std::chrono::time_point<std::chrono::steady_clock> lastActivity = std::chrono::steady_clock::now();
120120

121+
std::unordered_map<uint16_t, SubAckAction> stagedSubAcks;
122+
121123
std::string ssl_version;
122124
std::string clientid;
123125
std::string username;
@@ -161,6 +163,7 @@ class Client
161163
void setReadyForWriting(bool val, MutexLocked<WriteBuf> &writebuf);
162164
void setReadyForReading(bool val);
163165
void setAddr(const std::string &address);
166+
void sendSubAck(const SubAckAction &action);
164167

165168
public:
166169
uint8_t preAuthPacketCounter = 0;
@@ -254,6 +257,8 @@ class Client
254257
const std::unique_ptr<StowedClientRegistrationData> &getRegistrationData() const;
255258
void clearRegistrationData();
256259

260+
void stageOrSendSubAck(const std::shared_ptr<Client> &this_client, SubAckAction &&action, const size_t expandedCount);
261+
void sendStagedSuback(const uint16_t packetId);
257262
void stageConnack(std::unique_ptr<ConnAck> &&c);
258263
void sendConnackSuccess();
259264
void sendConnackDeny(ReasonCodes reason);

enums.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,4 +60,11 @@ enum class HaProxyMode
6060
HaProxyClientVerficiationWithAuthn
6161
};
6262

63+
enum class AddSubscriptionType
64+
{
65+
Invalid,
66+
NewSubscription,
67+
ExistingSubscription
68+
};
69+
6370
#endif // ENUMS_H

flashmq_plugin.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ bool flashmq_plugin_add_subscription(
138138
std::string topicDummy;
139139
parseSubscriptionShare(subtopics, shareName, topicDummy);
140140

141-
const AddSubscriptionType result = store->addSubscription(session_locked, subtopics, qos, noLocal, retainAsPublished, shareName, subscriptionIdentifier);
141+
const auto [result, expand_count] = store->addSubscription(session_locked, 0, subtopics, qos, noLocal, retainAsPublished, shareName, subscriptionIdentifier);
142142
return result == AddSubscriptionType::Invalid ? false : true;
143143
}
144144

lazysubscriptions.cpp

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -279,29 +279,31 @@ void LazySubscriptions::collectClients(
279279
}
280280
}
281281

282-
void LazySubscriptions::expandLazySubscriptions(
283-
TrackedSubscriptionMutationTask task, const std::shared_ptr<Session> originating_session,
282+
size_t LazySubscriptions::expandLazySubscriptions(
283+
TrackedSubscriptionMutationTask task, const std::shared_ptr<Session> &originating_session, const uint16_t originating_packetId,
284284
const std::vector<std::string> &subtopics, const uint8_t qos)
285285
{
286286
assert(subtopics.size() == 0 || subtopics.at(0) != "$share");
287287

288288
// Don't match the bridge's own internal subscriptions to ourself (the 'publish' lines in the config).
289289
if (originating_session->getClientType() == ClientType::LocalBridge)
290-
return;
290+
return 0;
291291

292292
std::vector<ReceivingLazySubscriber> collected_receivers;
293293

294294
{
295295
auto data = root.shared_lock();
296296

297297
if ((*data)->children.empty())
298-
return;
298+
return 0;
299299

300-
collectClients(subtopics.begin(), subtopics.end(), data->get(), data->get()->node_hash, collected_receivers, -1, -1);
300+
collectClients(subtopics.begin(), subtopics.end(), data->get(), 0, collected_receivers, -1, -1);
301301
}
302302

303303
if (collected_receivers.empty())
304-
return;
304+
return 0;
305+
306+
size_t count = 0;
305307

306308
for(ReceivingLazySubscriber &receiver : collected_receivers)
307309
{
@@ -321,13 +323,17 @@ void LazySubscriptions::expandLazySubscriptions(
321323
continue;
322324

323325
const bool doWakeup = tracked_subs->addTrackedSubscriptionMutation(
324-
TrackedSubscriptionMutation(pattern, effective_qos, originating_session->getClientId(), originating_session, task));
326+
TrackedSubscriptionMutation(pattern, effective_qos, originating_session->getClientId(), originating_session, originating_packetId, task));
327+
328+
count++;
325329

326330
if (doWakeup)
327331
{
328332
receiver.thread.lock()->queueProcessTrackedSubscriptionMutations(bridgeState, ProcessTrackedSubscriptionMutationsModifier::FirstFinishResending);
329333
}
330334
}
335+
336+
return count;
331337
}
332338

333339
void registerLazySubscriptions(std::shared_ptr<BridgeState> &bridgeState)

lazysubscriptions.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,8 @@ class LazySubscriptions
7373
void addSubscription(
7474
const std::shared_ptr<BridgeState> &bridgeState, const std::string &pattern,
7575
uint8_t qos, const std::string &distribution_group_name);
76-
void expandLazySubscriptions(
77-
TrackedSubscriptionMutationTask task, const std::shared_ptr<Session> originating_session,
76+
size_t expandLazySubscriptions(
77+
TrackedSubscriptionMutationTask task, const std::shared_ptr<Session> &originating_session, const uint16_t originating_packetId,
7878
const std::vector<std::string> &subtopics, const uint8_t qos);
7979
};
8080

lockedweakptr.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,12 @@ class LockedWeakPtr
2626
return p.lock();
2727
}
2828

29+
std::weak_ptr<T> get_weak()
30+
{
31+
std::lock_guard<std::mutex> l(m);
32+
return p;
33+
}
34+
2935
bool expired()
3036
{
3137
std::lock_guard<std::mutex> l(m);

mainapp.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -623,10 +623,10 @@ void MainApp::start()
623623
websocketsubscriber->setFakeUpgraded();
624624
subscriptionStore->registerClientAndKickExistingOne(websocketsubscriber);
625625
subtopics = splitTopic("#");
626-
subscriptionStore->addSubscription(websocketsubscriber->getSession(), subtopics, 0, false, false, empty, 0);
626+
subscriptionStore->addSubscription(websocketsubscriber->getSession(), 0, subtopics, 0, false, false, empty, 0);
627627

628628
subscriptionStore->registerClientAndKickExistingOne(subscriber);
629-
subscriptionStore->addSubscription(subscriber->getSession(), subtopics, 0, false, false, empty, 0);
629+
subscriptionStore->addSubscription(subscriber->getSession(), 0, subtopics, 0, false, false, empty, 0);
630630

631631
if (connectionProtocol == ConnectionProtocol::WebsocketMqtt && strContains(fuzzFilePathLower, "upgrade"))
632632
{

mqttpacket.cpp

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1388,7 +1388,7 @@ void MqttPacket::handleConnAck(std::shared_ptr<Client> &sender)
13881388
std::string _;
13891389
parseSubscriptionShare(subtopics, shareName, _);
13901390
const bool no_local = shareName.empty(); // See above about no-local.
1391-
store->addSubscription(session, subtopics, pub.qos, no_local, true, shareName, 0);
1391+
store->addSubscription(session, 0, subtopics, pub.qos, no_local, true, shareName, 0);
13921392
}
13931393

13941394
ThreadGlobals::getThreadData()->publishBridgeState(bridgeState, true, {});
@@ -1794,11 +1794,10 @@ void MqttPacket::handleSubscribe(std::shared_ptr<Client> &sender)
17941794
throw ProtocolError("No topics specified to subscribe to.", ReasonCodes::MalformedPacket);
17951795
}
17961796

1797-
SubAck subAck(this->protocolVersion, packet_id, subs_reponse_codes);
1798-
MqttPacket response(subAck);
1799-
sender->writeMqttPacket(response);
1800-
18011797
std::shared_ptr<Session> session = sender->getSession();
1798+
std::vector<DeferredRetainedSending> retainedSending;
1799+
1800+
size_t expandedCount = 0;
18021801

18031802
// Adding the subscription will also send publishes for retained messages, so that's why we're doing it at the end.
18041803
for(const SubscriptionTuple &tup : deferredSubscribes)
@@ -1808,18 +1807,22 @@ void MqttPacket::handleSubscribe(std::shared_ptr<Client> &sender)
18081807

18091808
auto store = globals->subscriptionStore;
18101809

1811-
const AddSubscriptionType add_type = store->addSubscription(
1812-
session, tup.subtopics, tup.qos, tup.noLocal, tup.retainAsPublished, tup.shareName, tup.subscriptionIdentifier);
1810+
auto [addType, oneExpandedCount] = store->addSubscription(
1811+
session, packet_id, tup.subtopics, tup.qos, tup.noLocal, tup.retainAsPublished, tup.shareName, tup.subscriptionIdentifier);
1812+
1813+
expandedCount += oneExpandedCount;
18131814

18141815
if (tup.authResult == AuthResult::success && tup.shareName.empty())
18151816
{
18161817
if ((tup.retainHandling == RetainHandling::SendRetainedMessagesAtSubscribe) ||
1817-
(tup.retainHandling == RetainHandling::SendRetainedMessagesAtNewSubscribeOnly && add_type == AddSubscriptionType::NewSubscription) )
1818+
(tup.retainHandling == RetainHandling::SendRetainedMessagesAtNewSubscribeOnly && addType == AddSubscriptionType::NewSubscription) )
18181819
{
1819-
store->giveClientRetainedMessages(session, tup.subtopics, tup.qos, tup.subscriptionIdentifier);
1820+
retainedSending.emplace_back(tup.subtopics, tup.qos, tup.subscriptionIdentifier);
18201821
}
18211822
}
18221823
}
1824+
1825+
sender->stageOrSendSubAck(sender, {std::move(retainedSending), std::move(subs_reponse_codes), packet_id}, expandedCount);
18231826
}
18241827

18251828
void MqttPacket::handleSubAck(std::shared_ptr<Client> &sender)
@@ -1847,7 +1850,7 @@ void MqttPacket::handleSubAck(std::shared_ptr<Client> &sender)
18471850

18481851
if (tracked_subs)
18491852
{
1850-
tracked_subs->removeMatchingInFlightTrackedSubscriptions(data.packet_id);
1853+
tracked_subs->handledSubackActions(data.packet_id);
18511854

18521855
if (tracked_subs->requiresProcessingTrackedSubscriptions())
18531856
{

0 commit comments

Comments
 (0)