Skip to content

Commit 6436c16

Browse files
izik1SharksT
andauthored
feat: include timestamp_received when opted into (#53)
* feat: include `timestamp_received` when opted into Signed-off-by: Skyler Ross <skyler@launchbadge.com> * fix: stop replacing the new user_properties with the old one. * chore: apply suggestions --------- Signed-off-by: Skyler Ross <skyler@launchbadge.com> Co-authored-by: Felipe Pessoa <felipe.pessoa@launchbadge.com>
1 parent af7ef03 commit 6436c16

File tree

9 files changed

+843
-46
lines changed

9 files changed

+843
-46
lines changed

Cargo.lock

Lines changed: 55 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ members = [".", "rumqttd-protocol", "fuzz"]
55
# Rename our in-tree copy of `rumqttd` to prevent confusion with the published crate.
66
rumqttd-protocol = { path = "rumqttd-protocol", package = "rumqttd" }
77

8+
[workspace.dependencies.tokio]
9+
version = "1.35.1"
10+
features = ["full"]
11+
812
[package]
913
name = "foxmq"
1014
version = "0.1.0"
@@ -25,6 +29,7 @@ argon2 = { version = "0.5.3", features = ["std"] }
2529
bytes = "1.5.0"
2630
color-eyre = "0.6.2"
2731
dialoguer = "0.11.0"
32+
tokio = { workspace = true }
2833
dotenvy = "0.15.7"
2934
hex = "0.4.3"
3035
num_enum = "0.7.2"
@@ -43,14 +48,12 @@ arbitrary = { version = "1", optional = true, features = ["derive"] }
4348
# TODO: upgrade RusTLS in TCE and message-queue [TG-461]
4449
tokio-rustls = "0.24.1"
4550
rustls-pemfile = "1.0.4"
51+
time = { version = "0.3.36", features = ["formatting"] }
4652

4753
[dependencies.clap]
4854
version = "4.5.0"
4955
features = ["derive", "env"]
5056

51-
[dependencies.tokio]
52-
version = "1.35.1"
53-
features = ["full"]
5457

5558
[dependencies.tracing-subscriber]
5659
version = "0.3.18"

src/mqtt/broker/connection.rs

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@ use crate::mqtt::connect::ConnectPacket;
2929
use crate::mqtt::mailbox::OpenMailbox;
3030
use crate::mqtt::packets::{IncomingPacketSet, IncomingSub, IncomingUnsub, PacketId};
3131
use crate::mqtt::publish::ValidateError;
32-
use crate::mqtt::router::{FilterProperties, RouterConnection, RouterMessage, SubscriptionId};
32+
use crate::mqtt::router::{
33+
FilterProperties, RouterConnection, RouterMessage, SubscribeRequest, SubscriptionId,
34+
};
3335
use crate::mqtt::session::{Session, SessionStore};
3436
use crate::mqtt::trie::Filter;
3537
use crate::mqtt::KeepAlive;
@@ -414,6 +416,7 @@ impl<S: MqttSocket> Connection<S> {
414416
mail.delivery_meta,
415417
Some(mail.packet_id),
416418
&mail.subscription_ids,
419+
mail.include_broker_timestamps,
417420
))
418421
.await?;
419422

@@ -426,6 +429,7 @@ impl<S: MqttSocket> Connection<S> {
426429
PublishMeta::new(QoS::AtMostOnce, mail.retain(), false),
427430
None,
428431
mail.subscription_ids(),
432+
mail.include_broker_timestamps,
429433
))
430434
.await?;
431435
}
@@ -833,12 +837,25 @@ impl<S: MqttSocket> Connection<S> {
833837
has_valid_filters = true;
834838
}
835839

840+
let include_broker_timestamps = sub_props.as_ref().is_some_and(|props| {
841+
props.user_properties.iter().any(|(k, v)| {
842+
k == "include_broker_timestamps" && v.parse::<bool>().unwrap_or(false)
843+
})
844+
});
845+
836846
if has_valid_filters {
837847
self.incoming_packets
838848
.insert_sub(packet_id, IncomingSub { return_codes })
839849
.expect("BUG: we should have checked `.contains()` above");
840850

841-
router.subscribe(packet_id, sub_id, filters).await;
851+
router
852+
.subscribe(SubscribeRequest {
853+
packet_id,
854+
sub_id,
855+
filters,
856+
include_broker_timestamps,
857+
})
858+
.await;
842859
} else {
843860
self.send(Packet::SubAck(
844861
SubAck {

src/mqtt/mailbox.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ pub struct OrderedMail {
5757
pub delivery_meta: PublishMeta,
5858
pub subscription_ids: Vec<SubscriptionId>,
5959
pub publish: Arc<PublishTrasaction>,
60+
pub include_broker_timestamps: bool,
6061
}
6162

6263
/// A QoS 0 PUBLISH.
@@ -65,6 +66,7 @@ pub struct OrderedMail {
6566
pub struct UnorderedMail {
6667
pub kind: UnorderedMailKind,
6768
pub publish: Arc<PublishTrasaction>,
69+
pub include_broker_timestamps: bool,
6870
}
6971

7072
/// A compact way to represent the RETAIN flag for a QoS 0 publish.
@@ -98,6 +100,7 @@ pub struct Release(#[allow(dead_code)] pub PacketId);
98100
struct Delivery {
99101
delivery_meta: PublishMeta,
100102
sub_id: Option<SubscriptionId>,
103+
include_broker_timestamps: bool,
101104
publish: Arc<PublishTrasaction>,
102105
}
103106

@@ -154,6 +157,7 @@ impl MailSender {
154157
subscription_qos: QoS,
155158
retain: bool,
156159
sub_id: Option<SubscriptionId>,
160+
include_broker_timestamps: bool,
157161
publish: Arc<PublishTrasaction>,
158162
) -> bool {
159163
let effective_qos = cmp::min(subscription_qos, publish.meta.qos());
@@ -173,6 +177,7 @@ impl MailSender {
173177
.send(Delivery {
174178
delivery_meta: PublishMeta::new(effective_qos, retain, false),
175179
sub_id,
180+
include_broker_timestamps,
176181
publish,
177182
})
178183
.is_ok()
@@ -238,6 +243,7 @@ impl OpenMailbox<'_> {
238243
// See doc comment on `UnorderedMailKind` for details.
239244
kind: UnorderedMailKind::NotRetained { subscription_ids },
240245
publish,
246+
include_broker_timestamps: _,
241247
}) = self.unordered_mail.back_mut()
242248
{
243249
// If this is a duplicate delivery, coalesce into one PUBLISH.
@@ -265,6 +271,7 @@ impl OpenMailbox<'_> {
265271
}
266272
},
267273
publish: delivery.publish,
274+
include_broker_timestamps: delivery.include_broker_timestamps,
268275
});
269276
}
270277

@@ -305,6 +312,7 @@ impl OpenMailbox<'_> {
305312
packet_id: self.mailbox.next_packet_id.wrapping_increment(),
306313
subscription_ids: Vec::from_iter(delivery.sub_id),
307314
publish: delivery.publish,
315+
include_broker_timestamps: delivery.include_broker_timestamps,
308316
});
309317
}
310318

src/mqtt/publish.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use bytes::Bytes;
22
use std::num::NonZeroU32;
33
use std::ops::Not;
4+
use time::format_description::well_known::Rfc3339;
45

56
use tashi_collections::FnvHashMap;
67

@@ -350,6 +351,7 @@ pub fn txn_to_packet(
350351
delivery_meta: PublishMeta,
351352
packet_id: Option<PacketId>,
352353
sub_ids: &[SubscriptionId],
354+
include_broker_timestamps: bool,
353355
) -> Packet {
354356
Packet::Publish(
355357
Publish::with_all(
@@ -369,13 +371,27 @@ pub fn txn_to_packet(
369371
};
370372
}
371373

374+
let mut user_properties =
375+
clone_prop!(user_properties).map_or(vec![], |props: UserProperties| props.0);
376+
377+
if include_broker_timestamps {
378+
let timestamp_received =
379+
time::OffsetDateTime::from_unix_timestamp(txn.timestamp_received.0 as i64)
380+
.expect("Time overflow");
381+
let timestamp_received = timestamp_received
382+
.format(&Rfc3339)
383+
.expect("formatting error");
384+
385+
user_properties.push(("timestamp_received".to_owned(), timestamp_received))
386+
}
387+
372388
PublishProperties {
373389
payload_format_indicator: clone_prop!(payload_format_indicator),
374390
message_expiry_interval: clone_prop!(message_expiry_interval),
375391
topic_alias: None,
376392
response_topic: clone_prop!(response_topic),
377393
correlation_data: clone_prop!(correlation_data).map(|bytes| bytes.0),
378-
user_properties: clone_prop!(user_properties).map_or(vec![], |props| props.0),
394+
user_properties,
379395

380396
// TODO: now that `rumqttd-protocol` is in-tree, just change the type there
381397
subscription_identifiers: sub_ids.iter().copied().map(Into::into).collect(),

0 commit comments

Comments
 (0)