Skip to content

Commit 5cc635f

Browse files
authored
[Rust][Protocol][Telemetry] Retain (#793)
1 parent f94067b commit 5cc635f

File tree

2 files changed

+207
-1
lines changed

2 files changed

+207
-1
lines changed

rust/azure_iot_operations_protocol/src/telemetry/sender.rs

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,9 @@ pub struct Message<T: PayloadSerialize> {
189189
/// Cloud event of the telemetry message.
190190
#[builder(default = "None")]
191191
cloud_event: Option<CloudEvent>,
192+
/// Indicates whether the message should be retained or not.
193+
#[builder(default = "false")]
194+
retain: bool,
192195
}
193196

194197
impl<T: PayloadSerialize> MessageBuilder<T> {
@@ -475,7 +478,7 @@ where
475478
.publish_with_properties(
476479
message_topic,
477480
message.qos,
478-
false,
481+
message.retain,
479482
message.serialized_payload.payload,
480483
publish_properties,
481484
)
@@ -732,4 +735,38 @@ mod tests {
732735

733736
assert!(message_builder_result.is_err());
734737
}
738+
739+
#[test]
740+
fn test_message_defaults() {
741+
let mut mock_telemetry_payload = MockPayload::new();
742+
mock_telemetry_payload
743+
.expect_serialize()
744+
.returning(|| {
745+
Ok(SerializedPayload {
746+
payload: String::new().into(),
747+
content_type: "application/json".to_string(),
748+
format_indicator: FormatIndicator::Utf8EncodedCharacterData,
749+
})
750+
})
751+
.times(1);
752+
753+
let message_builder_result = MessageBuilder::default()
754+
.payload(mock_telemetry_payload)
755+
.unwrap()
756+
.build();
757+
758+
assert!(message_builder_result.is_ok());
759+
let m = message_builder_result.unwrap();
760+
761+
assert!(!m.retain);
762+
assert_eq!(
763+
m.qos,
764+
azure_iot_operations_mqtt::control_packet::QoS::AtLeastOnce
765+
);
766+
assert_eq!(m.message_expiry, Duration::from_secs(10));
767+
assert!(m.custom_user_data.is_empty());
768+
assert!(m.topic_tokens.is_empty());
769+
assert!(m.cloud_event.is_none());
770+
assert!(m.serialized_payload.payload.is_empty());
771+
}
735772
}

rust/azure_iot_operations_protocol/tests/telemetry_network_tests.rs

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -450,3 +450,172 @@ async fn telemetry_complex_send_receive_network_tests() {
450450
.is_ok()
451451
);
452452
}
453+
454+
fn setup_session_and_handle(client_id: &str) -> (Session, SessionExitHandle) {
455+
let _ = Builder::new()
456+
.filter_level(log::LevelFilter::max())
457+
.format_timestamp(None)
458+
.filter_module("rumqttc", log::LevelFilter::Info)
459+
.filter_module("azure_iot_operations", log::LevelFilter::Info)
460+
.try_init();
461+
462+
let connection_settings = MqttConnectionSettingsBuilder::default()
463+
.client_id(client_id)
464+
.hostname("localhost")
465+
.tcp_port(1883u16)
466+
.keep_alive(Duration::from_secs(5))
467+
.clean_start(true)
468+
.use_tls(false)
469+
.build()
470+
.unwrap();
471+
let session_options = SessionOptionsBuilder::default()
472+
.connection_settings(connection_settings)
473+
.build()
474+
.unwrap();
475+
let session = Session::new(session_options).unwrap();
476+
477+
let exit_handle: SessionExitHandle = session.create_exit_handle();
478+
(session, exit_handle)
479+
}
480+
481+
/// Tests telemetry retained message scenario
482+
/// Publisher sends a retained message in 1 session.
483+
/// Subscriber receives the retained message in another session.
484+
/// If retain is set to true, the message should be received even if the publisher is not connected
485+
#[tokio::test]
486+
async fn telemetry_retained_message_test() {
487+
// Skip if network tests are disabled
488+
if env::var("ENABLE_NETWORK_TESTS").is_err() {
489+
log::warn!("This test is skipped. Set ENABLE_NETWORK_TESTS to run.");
490+
return;
491+
}
492+
493+
let application_context = ApplicationContextBuilder::default().build().unwrap();
494+
495+
let _ = Builder::new()
496+
.filter_level(log::LevelFilter::max())
497+
.format_timestamp(None)
498+
.filter_module("rumqttc", log::LevelFilter::Info)
499+
.filter_module("azure_iot_operations", log::LevelFilter::Info)
500+
.try_init();
501+
502+
let retained_topic = "protocol/tests/retained/telemetry";
503+
let publisher_id = "telemetry_retained_publisher-rust";
504+
let subscriber_id = "telemetry_retained_subscriber-rust";
505+
506+
let retained_payload = DataPayload {
507+
external_temperature: 25.5,
508+
internal_temperature: 21.0,
509+
};
510+
511+
let sender_options = telemetry::sender::OptionsBuilder::default()
512+
.topic_pattern(retained_topic)
513+
.build()
514+
.unwrap();
515+
516+
let receiver_options = telemetry::receiver::OptionsBuilder::default()
517+
.topic_pattern(retained_topic)
518+
.auto_ack(false) // Manual ack for testing
519+
.build()
520+
.unwrap();
521+
522+
// === 1: Publisher sending a retained message ===
523+
let (publisher_session, pub_session_exit_handle) = setup_session_and_handle(publisher_id);
524+
let publisher_monitor = publisher_session.create_connection_monitor();
525+
526+
let publisher: telemetry::Sender<DataPayload, _> = telemetry::Sender::new(
527+
application_context.clone(),
528+
publisher_session.create_managed_client(),
529+
sender_options.clone(),
530+
)
531+
.unwrap();
532+
533+
// Run the publisher session and test in parallel
534+
let publisher_test = tokio::task::spawn(async move {
535+
publisher_monitor.connected().await;
536+
log::info!("Publisher connected");
537+
538+
let retained_message = telemetry::sender::MessageBuilder::default()
539+
.payload(retained_payload)
540+
.unwrap()
541+
.qos(QoS::AtLeastOnce)
542+
.retain(true) // Set the retain flag
543+
.build()
544+
.unwrap();
545+
546+
assert!(publisher.send(retained_message).await.is_ok());
547+
log::info!("Published retained message");
548+
549+
// Give the broker time to store the retained message
550+
tokio::time::sleep(Duration::from_secs(2)).await;
551+
552+
pub_session_exit_handle.try_exit().await.unwrap();
553+
});
554+
555+
// Run publisher test and session
556+
assert!(
557+
tokio::try_join!(
558+
async move { publisher_test.await.map_err(|e| { e.to_string() }) },
559+
async move { publisher_session.run().await.map_err(|e| { e.to_string() }) }
560+
)
561+
.is_ok()
562+
);
563+
564+
// === 2. Subscriber receives the retained message ===
565+
let (subscriber_session, sub_session_exit_handle) = setup_session_and_handle(subscriber_id);
566+
567+
let mut subscriber: telemetry::Receiver<DataPayload, _> = telemetry::Receiver::new(
568+
application_context.clone(),
569+
subscriber_session.create_managed_client(),
570+
receiver_options.clone(),
571+
)
572+
.unwrap();
573+
574+
let subscriber_test = tokio::task::spawn(async move {
575+
// Test was hanging and Panic or any assert after this task was not working when retain = false
576+
// Had to include a time out because of that
577+
let receive_telemetry_task = tokio::task::spawn({
578+
async move {
579+
let mut count = 0;
580+
if let Some(Ok((message, ack_token))) =
581+
tokio::time::timeout(Duration::from_secs(5), subscriber.recv())
582+
.await
583+
.unwrap()
584+
{
585+
log::info!("Received retained message");
586+
count += 1;
587+
// if auto-ack is true and QoS is 1, this should be Some
588+
assert!(ack_token.is_some());
589+
590+
// Verify the message content matches what was published
591+
assert_eq!(
592+
message.payload, retained_payload,
593+
"Payload mismatch in retained message"
594+
);
595+
596+
// Acknowledge message since QoS 1
597+
ack_token.unwrap().ack().await.unwrap().await.unwrap();
598+
log::info!("Acknowledged message");
599+
}
600+
// only the 1 expected messages should occur (checks that recv() didn't return None when it shouldn't have)
601+
assert_eq!(count, 1);
602+
// cleanup should be successful
603+
assert!(subscriber.shutdown().await.is_ok());
604+
}
605+
});
606+
607+
assert!(receive_telemetry_task.await.is_ok());
608+
609+
sub_session_exit_handle.try_exit().await.unwrap();
610+
});
611+
612+
assert!(
613+
tokio::try_join!(
614+
async move { subscriber_test.await.map_err(|e| { e.to_string() }) },
615+
async move { subscriber_session.run().await.map_err(|e| e.to_string()) }
616+
)
617+
.is_ok()
618+
);
619+
620+
log::info!("Retained message test completed successfully");
621+
}

0 commit comments

Comments
 (0)