Skip to content

Commit 01eebc1

Browse files
authored
Merge pull request #3836 from albinsuresh/refactor/add-topic-filters
refactor: add topic filters to combine them
2 parents bafbd05 + 0214dd5 commit 01eebc1

File tree

10 files changed

+56
-28
lines changed

10 files changed

+56
-28
lines changed

crates/common/mqtt_channel/src/connection.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ impl SubscriberOps for SubscriberHandle {
8383
{
8484
let mut subs = self.subscriptions.lock().unwrap();
8585
for topic in &topics {
86-
subs.add(topic)?;
86+
subs.try_add(topic)?;
8787
}
8888
}
8989
self.client

crates/common/mqtt_channel/src/topics.rs

Lines changed: 43 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ use std::collections::HashSet;
88
use std::convert::TryInto;
99
use std::fmt::Display;
1010
use std::fmt::Formatter;
11+
use std::ops::Add;
12+
use std::ops::AddAssign;
1113

1214
/// An MQTT topic
1315
#[derive(Debug, Clone, Eq, PartialEq, Deserialize, Serialize)]
@@ -93,14 +95,10 @@ impl TopicFilter {
9395
}
9496

9597
/// Check if the pattern is valid and add it to this topic filter.
96-
pub fn add(&mut self, pattern: &str) -> Result<(), MqttError> {
97-
let pattern = String::from(pattern);
98-
if rumqttc::valid_filter(&pattern) {
99-
self.patterns.push(pattern);
100-
Ok(())
101-
} else {
102-
Err(MqttError::InvalidFilter { pattern })
103-
}
98+
pub fn try_add(&mut self, pattern: &str) -> Result<(), MqttError> {
99+
let other = TopicFilter::new(pattern)?;
100+
*self += other;
101+
Ok(())
104102
}
105103

106104
/// Assuming the pattern is valid and add it to this topic filter.
@@ -240,7 +238,7 @@ impl TryInto<TopicFilter> for Vec<&str> {
240238
fn try_into(self) -> Result<TopicFilter, Self::Error> {
241239
let mut filter = TopicFilter::empty();
242240
for pattern in self.into_iter() {
243-
filter.add(pattern)?
241+
filter.try_add(pattern)?
244242
}
245243
Ok(filter)
246244
}
@@ -258,7 +256,7 @@ impl TryInto<TopicFilter> for Vec<String> {
258256
fn try_into(self) -> Result<TopicFilter, Self::Error> {
259257
let mut filter = TopicFilter::empty();
260258
for pattern in self.into_iter() {
261-
filter.add(pattern.as_str())?
259+
filter.try_add(pattern.as_str())?
262260
}
263261
Ok(filter)
264262
}
@@ -270,13 +268,28 @@ impl AsRef<str> for Topic {
270268
}
271269
}
272270

271+
impl Add for TopicFilter {
272+
type Output = TopicFilter;
273+
274+
fn add(mut self, other: TopicFilter) -> TopicFilter {
275+
self += other;
276+
self
277+
}
278+
}
279+
280+
impl AddAssign for TopicFilter {
281+
fn add_assign(&mut self, rhs: Self) {
282+
self.add_all(rhs);
283+
}
284+
}
285+
273286
impl TryInto<TopicFilter> for HashSet<String> {
274287
type Error = MqttError;
275288

276289
fn try_into(self) -> Result<TopicFilter, Self::Error> {
277290
let mut filter = TopicFilter::empty();
278291
for pattern in self.into_iter() {
279-
filter.add(pattern.as_str())?
292+
filter.try_add(pattern.as_str())?
280293
}
281294
Ok(filter)
282295
}
@@ -345,4 +358,23 @@ mod tests {
345358
let removed = topics.remove_overlapping_patterns();
346359
assert!(removed.is_empty());
347360
}
361+
362+
#[test]
363+
fn test_adding_topic_filters() {
364+
let filter1 = TopicFilter::new_unchecked("a/b/c");
365+
let filter2 = TopicFilter::new_unchecked("d/e/f");
366+
367+
let combined = filter1.clone() + filter2.clone();
368+
assert!(combined.accept_topic_name("a/b/c"));
369+
assert!(combined.accept_topic_name("d/e/f"));
370+
}
371+
372+
#[test]
373+
fn test_add_assign_topic_filters() {
374+
let mut filter = TopicFilter::new_unchecked("a/b/c");
375+
filter += TopicFilter::new_unchecked("d/e/f");
376+
377+
assert!(filter.accept_topic_name("a/b/c"));
378+
assert!(filter.accept_topic_name("d/e/f"));
379+
}
348380
}

crates/core/tedge_mapper/src/aws/mapper.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ impl TEdgeComponent for AwsMapper {
103103
fn get_topic_filter(aws_config: &TEdgeConfigReaderAws) -> TopicFilter {
104104
let mut topics = TopicFilter::empty();
105105
for topic in aws_config.topics.0.clone() {
106-
if topics.add(&topic).is_err() {
106+
if topics.try_add(&topic).is_err() {
107107
warn!("The configured topic '{topic}' is invalid and ignored.");
108108
}
109109
}

crates/core/tedge_mapper/src/az/mapper.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ impl TEdgeComponent for AzureMapper {
111111
fn get_topic_filter(az_config: &TEdgeConfigReaderAz) -> TopicFilter {
112112
let mut topics = TopicFilter::empty();
113113
for topic in az_config.topics.0.clone() {
114-
if topics.add(&topic).is_err() {
114+
if topics.try_add(&topic).is_err() {
115115
warn!("The configured topic '{topic}' is invalid and ignored.");
116116
}
117117
}

crates/extensions/c8y_mapper_ext/src/config.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ impl C8yMapperConfig {
221221

222222
// Add user configurable external topic filters
223223
for topic in c8y_config.topics.0.clone() {
224-
if topics.add(&topic).is_err() {
224+
if topics.try_add(&topic).is_err() {
225225
warn!("The configured topic '{topic}' is invalid and ignored.");
226226
}
227227
}
@@ -293,7 +293,7 @@ impl C8yMapperConfig {
293293
bridge_config,
294294
) {
295295
for topic in operations.topics_for_operations() {
296-
topic_filter.add(&topic)?;
296+
topic_filter.try_add(&topic)?;
297297
}
298298
}
299299

crates/extensions/tedge_config_manager/src/lib.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -148,10 +148,9 @@ impl ConfigManagerBuilder {
148148

149149
/// List of MQTT topic filters the log actor has to subscribe to
150150
fn subscriptions(config: &ConfigManagerConfig) -> TopicFilter {
151-
let mut topic_filter = TopicFilter::empty();
152-
topic_filter.add_all(config.config_snapshot_topic.clone());
151+
let mut topic_filter = config.config_snapshot_topic.clone();
153152
if config.config_update_enabled {
154-
topic_filter.add_all(config.config_update_topic.clone());
153+
topic_filter += config.config_update_topic.clone();
155154
}
156155
topic_filter
157156
}

crates/extensions/tedge_flows/src/config.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,7 @@ fn topic_filters(patterns: Vec<String>) -> Result<TopicFilter, ConfigError> {
281281
let mut topics = TopicFilter::empty();
282282
for pattern in patterns {
283283
topics
284-
.add(pattern.as_str())
284+
.try_add(pattern.as_str())
285285
.map_err(|_| ConfigError::IncorrectTopicFilter(pattern.clone()))?;
286286
}
287287
Ok(topics)

crates/extensions/tedge_log_manager/src/config.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,14 +70,13 @@ impl LogManagerConfig {
7070
ChannelFilter::Command(OperationType::LogUpload),
7171
);
7272

73-
let mut log_metadata_sync_topics = mqtt_schema.topics(
73+
let log_metadata_sync_topics = mqtt_schema.topics(
7474
EntityFilter::Entity(&mqtt_device_topic_id),
7575
ChannelFilter::Command(OperationType::SoftwareUpdate),
76-
);
77-
log_metadata_sync_topics.add_all(mqtt_schema.topics(
76+
) + mqtt_schema.topics(
7877
EntityFilter::Entity(&mqtt_device_topic_id),
7978
ChannelFilter::Command(OperationType::ConfigUpdate),
80-
));
79+
);
8180

8281
Ok(Self {
8382
mqtt_schema,

crates/extensions/tedge_log_manager/src/lib.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -149,9 +149,7 @@ impl LogManagerBuilder {
149149

150150
/// List of MQTT topic filters the log actor has to subscribe to
151151
fn subscriptions(config: &LogManagerConfig) -> TopicFilter {
152-
let mut topics = config.logfile_request_topic.clone();
153-
topics.add_all(config.log_metadata_sync_topics.clone());
154-
topics
152+
config.logfile_request_topic.clone() + config.log_metadata_sync_topics.clone()
155153
}
156154

157155
/// Extract a log actor request from an MQTT message

crates/extensions/tedge_mqtt_ext/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@ impl MqttActorBuilder {
272272
pub(crate) fn build_actor(self) -> MqttActor {
273273
let mut topic_filter = TopicFilter::empty();
274274
for pattern in &self.subscription_diff.subscribe {
275-
topic_filter.add(pattern).unwrap();
275+
topic_filter.try_add(pattern).unwrap();
276276
tracing::info!(target: "MQTT sub", "{pattern}");
277277
}
278278

0 commit comments

Comments
 (0)