diff --git a/up-subscription/src/configuration.rs b/up-subscription/src/configuration.rs index 62ca347..af5e5b9 100644 --- a/up-subscription/src/configuration.rs +++ b/up-subscription/src/configuration.rs @@ -19,6 +19,9 @@ use up_rust::{ LocalUriProvider, UUri, }; +/// What uSubscription service uses as the resource ID for LocalUriProvider::get_source_uri() +pub(crate) const SOURCE_URI_RESOURCE_ID: u16 = 0x00FF; + /// Default subscription and notification command channel buffer size pub(crate) const DEFAULT_COMMAND_BUFFER_SIZE: usize = 1024; @@ -128,7 +131,7 @@ impl LocalUriProvider for USubscriptionConfiguration { &self.authority_name, USUBSCRIPTION_TYPE_ID, USUBSCRIPTION_VERSION_MAJOR, - 0x0, + SOURCE_URI_RESOURCE_ID, ) .expect("Error constructing usubscription UUri") } diff --git a/up-subscription/src/handlers/reset.rs b/up-subscription/src/handlers/reset.rs index 7230033..48c2c2a 100644 --- a/up-subscription/src/handlers/reset.rs +++ b/up-subscription/src/handlers/reset.rs @@ -89,6 +89,8 @@ mod tests { use super::*; use tokio::sync::mpsc::{self}; + use up_rust::UUri; + use crate::{helpers, tests::test_lib}; // [utest->dsn~usubscription-reset-protobuf~1] @@ -254,9 +256,12 @@ mod tests { helpers::init_once(); // create request and other required object(s) + let bad_source = + UUri::try_from("up://LOCAL/1000/1/F").expect("Error during test case setup"); + let request_payload = UPayload::try_from_protobuf(ResetRequest::default()).unwrap(); let message_attributes = UAttributes { - source: Some(test_lib::helpers::subscriber_uri1()).into(), + source: Some(bad_source).into(), ..Default::default() }; diff --git a/up-subscription/src/handlers/unregister_for_notifications.rs b/up-subscription/src/handlers/unregister_for_notifications.rs index 2b149fe..d4ae22f 100644 --- a/up-subscription/src/handlers/unregister_for_notifications.rs +++ b/up-subscription/src/handlers/unregister_for_notifications.rs @@ -70,6 +70,7 @@ impl RequestHandler for UnregisterNotificationsRequestHandler { // Interact with notification manager backend let se = NotificationEvent::RemoveNotifyee { subscriber: source.clone(), + topic: topic.clone(), }; if let Err(e) = self.notification_sender.send(se).await { @@ -139,8 +140,9 @@ mod tests { // validate subscription manager interaction let notification_event = notification_receiver.recv().await.unwrap(); match notification_event { - NotificationEvent::RemoveNotifyee { subscriber } => { + NotificationEvent::RemoveNotifyee { subscriber, topic } => { assert_eq!(subscriber, test_lib::helpers::subscriber_uri1()); + assert_eq!(topic, test_lib::helpers::local_topic1_uri()); } _ => panic!("Wrong event type"), } diff --git a/up-subscription/src/notification_manager.rs b/up-subscription/src/notification_manager.rs index 8fed864..596cd10 100644 --- a/up-subscription/src/notification_manager.rs +++ b/up-subscription/src/notification_manager.rs @@ -12,7 +12,6 @@ ********************************************************************************/ use log::*; -use std::collections::HashMap; use std::sync::Arc; use tokio::sync::{mpsc::Receiver, mpsc::Sender, oneshot, Notify}; @@ -21,7 +20,7 @@ use up_rust::{ usubscription_uri, SubscriberInfo, SubscriptionStatus, Update, RESOURCE_ID_SUBSCRIPTION_CHANGE, }, - UMessageBuilder, UTransport, UUID, + LocalUriProvider, UMessageBuilder, UTransport, UUID, }; use crate::{ @@ -31,6 +30,9 @@ use crate::{ USubscriptionConfiguration, }; +// From usubscription.proto, the uprotocol.notification_topic resource ID +pub(crate) const SOURCE_URI_RESOURCE_ID: u16 = 0x8000; + // This is the core business logic for tracking and sending subscription update notifications. It is currently implemented as a single // event-consuming function `notification_engine()`, which is supposed to be spawned into a task, and process the various notification // `Events` that it can receive via tokio mpsc channel. @@ -44,6 +46,7 @@ pub(crate) enum NotificationEvent { }, RemoveNotifyee { subscriber: SubscriberUUri, + topic: TopicUUri, }, StateChange { subscriber: Option, @@ -55,12 +58,12 @@ pub(crate) enum NotificationEvent { respond_to: oneshot::Sender>, }, GetNotificationTopics { - respond_to: oneshot::Sender>, + respond_to: oneshot::Sender>, }, // Purely for use during testing: force-set new notifyees ledger #[cfg(test)] SetNotificationTopics { - notification_topics_replacement: HashMap, + notification_topics_replacement: Vec<(SubscriberUUri, TopicUUri)>, respond_to: oneshot::Sender<()>, }, } @@ -93,9 +96,15 @@ impl PartialEq for NotificationEvent { }, ) => s1 == s2 && t1 == t2, ( - NotificationEvent::RemoveNotifyee { subscriber: s1 }, - NotificationEvent::RemoveNotifyee { subscriber: s2 }, - ) => s1 == s2, + NotificationEvent::RemoveNotifyee { + subscriber: s1, + topic: t1, + }, + NotificationEvent::RemoveNotifyee { + subscriber: s2, + topic: t2, + }, + ) => s1 == s2 && t1 == t2, // Don't care about the test-only variants _ => false, } @@ -129,6 +138,7 @@ pub(crate) async fn notification_engine( }, }; match event { + // [impl->req~usubscription-register-notifications~1] NotificationEvent::AddNotifyee { subscriber, topic } => { if !topic.is_event() { error!("Topic UUri is not a valid event target"); @@ -142,8 +152,9 @@ pub(crate) async fn notification_engine( } }; } - NotificationEvent::RemoveNotifyee { subscriber } => { - match notifications.remove_notifyee(&subscriber) { + // [impl->req~usubscription-unregister-notifications~1] + NotificationEvent::RemoveNotifyee { subscriber, topic } => { + match notifications.remove_notifyee(&subscriber, &topic) { Ok(_) => {} Err(e) => { error!("Persistency failure {e}") @@ -158,7 +169,7 @@ pub(crate) async fn notification_engine( } => { // [impl->dsn~usubscription-change-notification-type~1] let update = Update { - topic: Some(topic).into(), + topic: Some(topic.clone()).into(), subscriber: Some(SubscriberInfo { uri: subscriber.into(), ..Default::default() @@ -188,29 +199,27 @@ pub(crate) async fn notification_engine( } } - // Send Update message to any dedicated registered notification-subscribers + // Send Update notification message to any dedicated registered notification-subscribers // [impl->req~usubscription-register-notifications~1] - if let Ok(topics) = notifications.get_topics() { - for topic_entry in topics { + if let Ok(subscribers) = notifications.get_subscribers_registered_for_topic(&topic) + { + for subscribers_entry in subscribers { debug!( - "Sending notification to ({}): topic {}, subscriber {}, status {}", - topic_entry.to_uri(INCLUDE_SCHEMA), + "Sending notification to ({}), about topic {} changing state to {}", + subscribers_entry.to_uri(INCLUDE_SCHEMA), update .topic .as_ref() .unwrap_or_default() .to_uri(INCLUDE_SCHEMA), - update - .subscriber - .uri - .as_ref() - .unwrap_or_default() - .to_uri(INCLUDE_SCHEMA), update.status.as_ref().unwrap_or_default() ); - match UMessageBuilder::publish(topic_entry.clone()) - .build_with_protobuf_payload(&update) + match UMessageBuilder::notification( + configuration.get_resource_uri(SOURCE_URI_RESOURCE_ID), + subscribers_entry.clone(), + ) + .build_with_protobuf_payload(&update) { Ok(update_msg) => { let _r = up_transport.send(update_msg).await.inspect_err(|e| @@ -254,7 +263,7 @@ pub(crate) async fn notification_engine( // Convenience wrapper for sending state change notification messages // `susbcriber` is an Option, because in the case ob remote subscription state changes, there is no subscriber (other than local usubscription service) -pub(crate) async fn notify( +pub(crate) async fn notify_state_change( notification_sender: Sender, subscriber: Option, topic: TopicUUri, @@ -282,9 +291,9 @@ pub(crate) async fn notify( // Might return an empty list if data retrieval fails for some reason, but will perform the reset in any case. pub(crate) async fn reset( notification_sender: Sender, -) -> Result, Box> { +) -> Result, Box> { // Get current notification registrations - let (respond_to, receive_from) = oneshot::channel::>(); + let (respond_to, receive_from) = oneshot::channel::>(); notification_sender .send(NotificationEvent::GetNotificationTopics { respond_to }) .await?; diff --git a/up-subscription/src/persistency.rs b/up-subscription/src/persistency.rs index fff564d..84d83fb 100644 --- a/up-subscription/src/persistency.rs +++ b/up-subscription/src/persistency.rs @@ -12,7 +12,7 @@ ********************************************************************************/ use pickledb::{PickleDb, PickleDbDumpPolicy, SerializationMethod}; -use protobuf::{Enum, Message}; +use protobuf::Enum; use serde::de::Error; use std::collections::HashMap; use std::{convert::TryInto, path::PathBuf}; @@ -556,15 +556,23 @@ impl NotificationStore { topic: &TopicUUri, ) -> Result<(), PersistencyError> { let subscriber_string = subscriber.to_uri(Self::PERSIST_UP_SCHEMA); - let topic_bytes = serialize_uuri(topic) - .map_err(|e| PersistencyError::serialization_error(e.to_string()))?; + let topic_string = topic.to_uri(Self::PERSIST_UP_SCHEMA); - self.persistency - .set(&subscriber_string, &topic_bytes) - .map_err(|e| { + if !self.persistency.lexists(&topic_string) { + self.persistency.lcreate(&topic_string).map_err(|e| { PersistencyError::internal_error(format!( "Error setting notification configuration in persistency {e}" )) + })?; + } + + self.persistency + .ladd(&topic_string, &subscriber_string) + .map(|_| ()) + .ok_or_else(|| { + PersistencyError::internal_error( + "Error setting notification configuration in persistency", + ) }) } @@ -574,9 +582,16 @@ impl NotificationStore { pub(crate) fn remove_notifyee( &mut self, subscriber: &SubscriberUUri, + topic: &TopicUUri, ) -> Result<(), PersistencyError> { + let topic_string = topic.to_uri(Self::PERSIST_UP_SCHEMA); + if !self.persistency.lexists(&topic_string) { + return Ok(()); + } + + let subscriber_string = subscriber.to_uri(Self::PERSIST_UP_SCHEMA); self.persistency - .rem(&subscriber.to_uri(Self::PERSIST_UP_SCHEMA)) + .lrem_value(&topic_string, &subscriber_string) .map_err(|e| { PersistencyError::internal_error(format!( "Error setting notification configuration in persistency {e}" @@ -586,20 +601,33 @@ impl NotificationStore { Ok(()) } - /// Returns a list of all topic keys from custom notification persistency - /// * return a `Vec` list of topic UUris + /// Returns a list of all subscribers that have registered to be notified on change of topic state + /// + /// # Arguments + /// + /// * `topic` - UUri of the topic that subscribers registered to be notified about. + /// + /// # Returns + /// + /// * return a `Vec` list of subscriber UUris that want to be notified about topic state changes /// * returns a `PersistencyError` in case something went wrong with data serialization or storage - pub(crate) fn get_topics(&mut self) -> Result, PersistencyError> { + pub(crate) fn get_subscribers_registered_for_topic( + &mut self, + topic: &TopicUUri, + ) -> Result, PersistencyError> { + let topic_string = topic.to_uri(Self::PERSIST_UP_SCHEMA); + + if !self.persistency.lexists(&topic_string) { + return Ok(vec![]); + } + let mut result = vec![]; - for entry in self.persistency.iter() { - if let Some(bytes) = entry.get_value::>() { - let topic = deserialize_uuri(&bytes).map_err(|e| { - PersistencyError::serialization_error(format!( - "Error deserializing notification topic {e}" - )) - })?; - result.push(topic); + for entry in self.persistency.liter(&topic_string) { + if let Some(subscriber_string) = entry.get_item::() { + let subscriber = UUri::try_from(subscriber_string) + .map_err(|e| PersistencyError::serialization_error(e.to_string()))?; + result.push(subscriber); } } Ok(result) @@ -612,59 +640,63 @@ impl NotificationStore { for key in keys { self.persistency.rem(&key).map_err(|e| { PersistencyError::internal_error(format!( - "Error removing notification entries from persistency {e}" + "Error removing registered-for-notifications from persistency {e}" )) })?; } self.persistency.dump().map_err(|e| { PersistencyError::internal_error(format!( - "Error dumping cleared notifications to persistency {e}" + "Error dumping cleared registered-for-notification data to persistency {e}" )) })?; - Ok(()) } pub(crate) fn get_data( &self, - ) -> Result, Box> { + ) -> Result, Box> { #[allow(clippy::mutable_key_type)] - let mut map: HashMap = HashMap::new(); - - for kv in self.persistency.iter() { - if let Some(bytes) = kv.get_value::>() { - let value = deserialize_uuri(&bytes)?; - map.insert(UUri::try_from(kv.get_key())?, value); + let mut list: Vec<(SubscriberUUri, TopicUUri)> = Vec::new(); + + let topic_strings = self.persistency.get_all(); + for topic_string in topic_strings { + for entry in self.persistency.liter(&topic_string) { + if let Some(subscriber_string) = entry.get_item::() { + let subscriber = UUri::try_from(subscriber_string)?; + let topic = UUri::try_from(topic_string.clone())?; + list.push((subscriber, topic)); + } } } - Ok(map) + Ok(list) } #[cfg(test)] #[allow(clippy::mutable_key_type)] pub(crate) fn set_data( &mut self, - map: HashMap, + list: Vec<(SubscriberUUri, TopicUUri)>, ) -> Result<(), Box> { - for (key, value) in map { - let _r = self - .persistency - .set(&key.to_uri(PERSIST_UP_SCHEMA), &serialize_uuri(&value)?); + for (subscriber, topic) in list { + let subscriber_string = subscriber.to_uri(Self::PERSIST_UP_SCHEMA); + let topic_string = topic.to_uri(Self::PERSIST_UP_SCHEMA); + + if !self.persistency.lexists(&topic_string) { + self.persistency.lcreate(&topic_string).map_err(|e| { + PersistencyError::internal_error(format!( + "Error setting notification configuration in persistency {e}" + )) + })?; + } + + self.persistency.ladd(&topic_string, &subscriber_string); } + Ok(()) } } -// custom serialization functions -fn serialize_uuri(uuri: &UUri) -> Result, Box> { - Ok(uuri.write_to_bytes()?) -} - -fn deserialize_uuri(bytes: &[u8]) -> Result> { - Ok(UUri::parse_from_bytes(bytes)?) -} - fn serialize_topic_state( state: &TopicState, ) -> Result, Box> { @@ -735,32 +767,8 @@ mod tests { // manager business logic are located in tests/persistency_tests.rs use super::*; - use crate::test_lib::{self}; use test_case::test_case; - #[test_case(test_lib::helpers::subscriber_uri1(), &[10, 5, 76, 79, 67, 65, 76, 16, 128, 32, 24, 1, 32, 128, 32]; "Subscriber UUri")] - #[test_case(test_lib::helpers::local_topic1_uri(), &[10, 5, 76, 79, 67, 65, 76, 16, 128, 128, 64, 24, 1, 32, 199, 149, 2]; "Local UUri")] - #[test_case(test_lib::helpers::remote_topic1_uri(), &[10, 6, 82, 69, 77, 79, 84, 69, 16, 128, 160, 1, 24, 1, 32, 128, 64]; "Remote UUri")] - #[test_case(UUri::default(), &[]; "Empty UUri")] - #[tokio::test] - async fn test_serialize_deserialize_uuri(uri: UUri, bytes: &[u8]) { - helpers::init_once(); - - // To bytes - let serialized_bytes = serialize_uuri(&uri); - assert!(serialized_bytes.is_ok()); - - let serialized_bytes = serialized_bytes.unwrap(); - assert_eq!(serialized_bytes, bytes); - - // and back - let reconstructed_uri = deserialize_uuri(&serialized_bytes); - assert!(reconstructed_uri.is_ok()); - - let reconstructed_uri = reconstructed_uri.unwrap(); - assert_eq!(reconstructed_uri, uri); - } - #[test_case(TopicState::UNSUBSCRIBED, &[0,0,0,0]; "State UNSUBSCRIBED")] #[test_case(TopicState::SUBSCRIBE_PENDING, &[1,0,0,0]; "State SUBSCRIBE_PENDING")] #[test_case(TopicState::SUBSCRIBED, &[2,0,0,0]; "State SUBSCRIBED")] diff --git a/up-subscription/src/subscription_manager.rs b/up-subscription/src/subscription_manager.rs index 620afbe..812f63f 100644 --- a/up-subscription/src/subscription_manager.rs +++ b/up-subscription/src/subscription_manager.rs @@ -222,7 +222,7 @@ pub(crate) async fn handle_message( // [impl->dsn~usubscription-change-notification-update~1] Ok(result) => { // Send topic state change notification - notification_manager::notify( + notification_manager::notify_state_change( notification_sender.clone(), Some(subscriber.clone()), topic.clone(), @@ -257,7 +257,7 @@ pub(crate) async fn handle_message( Ok(result) => { // Send topic state change notification // [impl->dsn~usubscription-change-notification-update~1] - notification_manager::notify( + notification_manager::notify_state_change( notification_sender.clone(), Some(subscriber), topic, @@ -375,7 +375,7 @@ pub(crate) async fn handle_message( // Want to do this out off the main control flow helpers::spawn_and_log_error(async move { for subscriber in subscribers { - notification_manager::notify( + notification_manager::notify_state_change( notification_sender_clone.clone(), Some(subscriber), topic_clone.clone(), @@ -425,7 +425,7 @@ pub(crate) async fn handle_message( Ok(result) => { // Send topic state change notification // [impl->dsn~usubscription-change-notification-update~1] - notification_manager::notify( + notification_manager::notify_state_change( notification_sender.clone(), Some(subscriber), topic, @@ -622,7 +622,7 @@ async fn reset( helpers::spawn_and_log_error(async move { // Notify all topic subscribers for (subscriber, topic, _) in flattened_subscriptions { - notification_manager::notify( + notification_manager::notify_state_change( notification_sender.clone(), Some(subscriber), topic, @@ -636,7 +636,7 @@ async fn reset( // Notify all registered-for-notification clients for (subscriber, topic) in registered_notifactions { - notification_manager::notify( + notification_manager::notify_state_change( notification_sender.clone(), Some(subscriber), topic, diff --git a/up-subscription/src/tests/notification_manager_tests.rs b/up-subscription/src/tests/notification_manager_tests.rs index 9e9538e..a0bb832 100644 --- a/up-subscription/src/tests/notification_manager_tests.rs +++ b/up-subscription/src/tests/notification_manager_tests.rs @@ -13,7 +13,6 @@ #[cfg(test)] mod tests { - use std::collections::HashMap; use std::error::Error; use std::sync::Arc; use tokio::sync::{ @@ -25,13 +24,13 @@ mod tests { core::usubscription::{ usubscription_uri, State, SubscriptionStatus, Update, RESOURCE_ID_SUBSCRIPTION_CHANGE, }, - UMessage, UMessageBuilder, UUID, + LocalUriProvider, UMessage, UMessageBuilder, UUID, }; use crate::{ configuration::DEFAULT_COMMAND_BUFFER_SIZE, helpers, - notification_manager::{notification_engine, NotificationEvent}, + notification_manager::{notification_engine, NotificationEvent, SOURCE_URI_RESOURCE_ID}, test_lib, usubscription::{SubscriberUUri, TopicUUri}, USubscriptionConfiguration, @@ -43,18 +42,18 @@ mod tests { } impl CommandSender { - fn new(expected_message: Vec) -> Self { - let config = Arc::new( - USubscriptionConfiguration::create( - test_lib::helpers::LOCAL_AUTHORITY.to_string(), - None, - None, - false, - None, - ) - .unwrap(), - ); + fn get_config() -> USubscriptionConfiguration { + USubscriptionConfiguration::create( + test_lib::helpers::LOCAL_AUTHORITY.to_string(), + None, + None, + false, + None, + ) + .unwrap() + } + fn new(config: Arc, expected_message: Vec) -> Self { let shutdown_notification = Arc::new(Notify::new()); let (command_sender, command_receiver) = mpsc::channel::(DEFAULT_COMMAND_BUFFER_SIZE); @@ -85,10 +84,14 @@ mod tests { .await?) } - async fn remove_notifyee(&self, subscriber: SubscriberUUri) -> Result<(), Box> { + async fn remove_notifyee( + &self, + subscriber: SubscriberUUri, + topic: TopicUUri, + ) -> Result<(), Box> { Ok(self .command_sender - .send(NotificationEvent::RemoveNotifyee { subscriber }) + .send(NotificationEvent::RemoveNotifyee { subscriber, topic }) .await?) } @@ -114,9 +117,8 @@ mod tests { async fn get_notification_topics( &self, - ) -> Result, Box> { - let (respond_to, receive_from) = - oneshot::channel::>(); + ) -> Result, Box> { + let (respond_to, receive_from) = oneshot::channel::>(); self.command_sender .send(NotificationEvent::GetNotificationTopics { respond_to }) .await?; @@ -127,7 +129,7 @@ mod tests { #[allow(clippy::mutable_key_type)] async fn set_notification_topics( &self, - notification_topics_replacement: HashMap, + notification_topics_replacement: Vec<(SubscriberUUri, TopicUUri)>, ) -> Result<(), Box> { let (respond_to, receive_from) = oneshot::channel::<()>(); self.command_sender @@ -144,7 +146,7 @@ mod tests { #[tokio::test] async fn test_add_notifyee() { helpers::init_once(); - let command_sender = CommandSender::new(vec![]); + let command_sender = CommandSender::new(Arc::new(CommandSender::get_config()), vec![]); let expected_subscriber = test_lib::helpers::subscriber_info1().uri.unwrap(); let expected_topic = test_lib::helpers::local_topic1_uri(); @@ -161,26 +163,21 @@ mod tests { .expect("Error communicating with subscription manager"); assert_eq!(notification_topics.len(), 1); - assert!(notification_topics.contains_key(&expected_subscriber)); - assert_eq!( - *notification_topics.get(&expected_subscriber).unwrap(), - expected_topic - ); + assert!(notification_topics.contains(&(expected_subscriber, expected_topic))); } #[tokio::test] async fn test_remove_notifyee() { helpers::init_once(); - let command_sender = CommandSender::new(vec![]); + let command_sender = CommandSender::new(Arc::new(CommandSender::get_config()), vec![]); // prepare things let expected_subscriber = test_lib::helpers::subscriber_info1().uri.unwrap(); let expected_topic = test_lib::helpers::local_topic1_uri(); #[allow(clippy::mutable_key_type)] - let mut notification_topics_replacement: HashMap = - HashMap::new(); - notification_topics_replacement.insert(expected_subscriber.clone(), expected_topic.clone()); + let notification_topics_replacement: Vec<(SubscriberUUri, TopicUUri)> = + vec![(expected_subscriber.clone(), expected_topic.clone())]; command_sender .set_notification_topics(notification_topics_replacement) @@ -189,7 +186,7 @@ mod tests { // operation to test command_sender - .remove_notifyee(expected_subscriber.clone()) + .remove_notifyee(expected_subscriber.clone(), expected_topic.clone()) .await .expect("Error communicating with subscription manager"); @@ -233,7 +230,10 @@ mod tests { .build_with_protobuf_payload(&expected_update) .unwrap(); - let command_sender = CommandSender::new(vec![expected_message_general_channel]); + let command_sender = CommandSender::new( + Arc::new(CommandSender::get_config()), + vec![expected_message_general_channel], + ); // operation to test let r = command_sender @@ -245,4 +245,140 @@ mod tests { .await; assert!(r.is_ok()) } + + // [utest->req~usubscription-register-notifications~1] + #[tokio::test] + async fn test_state_change_direct_notification() { + helpers::init_once(); + + // prepare things + // this is the status&topic&subscriber that the notification is about + let changing_status = SubscriptionStatus { + state: State::SUBSCRIBED.into(), + ..Default::default() + }; + let changing_topic = test_lib::helpers::local_topic1_uri(); + let interested_subscriber = test_lib::helpers::subscriber_info1(); + let interested_subscriber_uri = interested_subscriber.uri.clone().unwrap(); + + // the update message that we're expecting + let expected_update = Update { + topic: Some(changing_topic.clone()).into(), + subscriber: Some(interested_subscriber.clone()).into(), + status: Some(changing_status.clone()).into(), + ..Default::default() + }; + + // this is the generic update channel notification, that always is sent + let expected_message_general_channel = + UMessageBuilder::publish(usubscription_uri(RESOURCE_ID_SUBSCRIPTION_CHANGE)) + .with_message_id(UUID::build()) + .build_with_protobuf_payload(&expected_update) + .unwrap(); + + // this is the expected direct notification message + let config = Arc::new(CommandSender::get_config()); + let expected_notitication_message = UMessageBuilder::notification( + config.get_resource_uri(SOURCE_URI_RESOURCE_ID), + interested_subscriber_uri.clone(), + ) + .with_message_id(UUID::build()) + .build_with_protobuf_payload(&expected_update) + .unwrap(); + + // Set up test rig, to expect both general-channel and direct notification messages + let command_sender = CommandSender::new( + config, + vec![ + expected_message_general_channel, + expected_notitication_message, + ], + ); + + command_sender + .add_notifyee(interested_subscriber_uri.clone(), changing_topic.clone()) + .await + .expect("Error preparing test case context"); + + // operation to test - we now expect two messages, the general-channel notification as well as the direct notification to subscriber1 + let r = command_sender + .state_change( + interested_subscriber_uri, + changing_topic.clone(), + changing_status, + ) + .await; + assert!(r.is_ok()) + } + + // [utest->req~usubscription-unregister-notifications~1] + #[tokio::test] + async fn test_unregister_direct_notification() { + helpers::init_once(); + + // prepare things + // this is the status&topic&subscriber that the notification is about + let changing_status = SubscriptionStatus { + state: State::SUBSCRIBED.into(), + ..Default::default() + }; + let changing_topic = test_lib::helpers::local_topic1_uri(); + let interested_subscriber = test_lib::helpers::subscriber_info1(); + let interested_subscriber_uri = interested_subscriber.uri.clone().unwrap(); + + // the update message that we're expecting + let expected_update = Update { + topic: Some(changing_topic.clone()).into(), + subscriber: Some(interested_subscriber.clone()).into(), + status: Some(changing_status.clone()).into(), + ..Default::default() + }; + + // this is the generic update channel notification, that always is sent + let expected_message_general_channel = + UMessageBuilder::publish(usubscription_uri(RESOURCE_ID_SUBSCRIPTION_CHANGE)) + .with_message_id(UUID::build()) + .build_with_protobuf_payload(&expected_update) + .unwrap(); + + // Set up test rig, to expect only general-channel message + let config = Arc::new(CommandSender::get_config()); + let command_sender = CommandSender::new(config, vec![expected_message_general_channel]); + + // pre-set notification manager with direct-notification request for topic1 by subscriber1 + command_sender + .set_notification_topics(vec![( + interested_subscriber_uri.clone(), + changing_topic.clone(), + )]) + .await + .expect("Error preparing test case context"); + + // check that we're really set up for 1 direct notification of subscriber1 for topic1 + let list = command_sender + .get_notification_topics() + .await + .expect("Error preparing test case context"); + assert_eq!(list.len(), 1); + assert_eq!( + list.first().expect("Error preparing test case context"), + &(interested_subscriber_uri.clone(), changing_topic.clone()) + ); + + // Unsubscribe subscriber1 for notifications on topic1 + command_sender + .remove_notifyee(interested_subscriber_uri.clone(), changing_topic.clone()) + .await + .expect("Error preparing test case context"); + + // operation to test - we're now only expecting the general-channel notification message + let r = command_sender + .state_change( + interested_subscriber_uri, + changing_topic.clone(), + changing_status, + ) + .await; + assert!(r.is_ok()) + } } diff --git a/up-subscription/src/tests/persistency_tests.rs b/up-subscription/src/tests/persistency_tests.rs index 7816d56..8d9dbf6 100644 --- a/up-subscription/src/tests/persistency_tests.rs +++ b/up-subscription/src/tests/persistency_tests.rs @@ -16,7 +16,7 @@ mod tests { use std::time::{SystemTime, UNIX_EPOCH}; use tokio::time::{sleep, Duration}; - use crate::{persistency, test_lib, USubscriptionConfiguration}; + use crate::{helpers, persistency, test_lib, USubscriptionConfiguration}; fn get_configuration() -> USubscriptionConfiguration { USubscriptionConfiguration::create( @@ -31,6 +31,8 @@ mod tests { #[tokio::test] async fn test_get_and_prune_expiring_subscriptions() { + helpers::init_once(); + let mut subscriptions = persistency::SubscriptionsStore::new(&get_configuration()); // Prepare subscription persistency with two subscriptions, one with and one without expiry timestamp @@ -88,6 +90,8 @@ mod tests { // [utest->req~usubscription-reset~1] #[tokio::test] async fn test_reset_subscriptions() { + helpers::init_once(); + let mut subscriptions = persistency::SubscriptionsStore::new(&get_configuration()); let _ = subscriptions.add_subscription( @@ -116,6 +120,8 @@ mod tests { // [utest->req~usubscription-reset~1] #[tokio::test] async fn test_reset_remote_subscriptions() { + helpers::init_once(); + let mut remote_topics = persistency::RemoteTopicsStore::new(&get_configuration()); let _ = remote_topics.add_topic_or_get_state(&test_lib::helpers::local_topic1_uri()); @@ -136,6 +142,40 @@ mod tests { // [utest->req~usubscription-reset~1] #[tokio::test] async fn test_reset_notifications() { + helpers::init_once(); + + let mut notifications = persistency::NotificationStore::new(&get_configuration()); + + notifications + .add_notifyee( + &test_lib::helpers::subscriber_uri1(), + &test_lib::helpers::local_topic1_uri(), + ) + .expect("Error adding test data set"); + + notifications + .add_notifyee( + &test_lib::helpers::subscriber_uri2(), + &test_lib::helpers::local_topic2_uri(), + ) + .expect("Error adding test data set"); + + let data = notifications.get_data(); + assert!(data.is_ok()); + assert_eq!(data.unwrap().len(), 2); + + let r = notifications.reset(); + assert!(r.is_ok()); + + let data = notifications.get_data(); + assert!(data.is_ok()); + assert_eq!(data.unwrap().len(), 0); + } + + #[tokio::test] + async fn test_one_subscriber_multiple_topics() { + helpers::init_once(); + let mut notifications = persistency::NotificationStore::new(&get_configuration()); let _ = notifications.add_notifyee( @@ -143,19 +183,34 @@ mod tests { &test_lib::helpers::local_topic1_uri(), ); let _ = notifications.add_notifyee( - &test_lib::helpers::subscriber_uri2(), + &test_lib::helpers::subscriber_uri1(), &test_lib::helpers::local_topic2_uri(), ); + // Should be two entries let data = notifications.get_data(); assert!(data.is_ok()); assert_eq!(data.unwrap().len(), 2); + } - let r = notifications.reset(); - assert!(r.is_ok()); + #[tokio::test] + async fn test_multiple_subscribers_one_topic() { + helpers::init_once(); + + let mut notifications = persistency::NotificationStore::new(&get_configuration()); + + let _ = notifications.add_notifyee( + &test_lib::helpers::subscriber_uri1(), + &test_lib::helpers::local_topic1_uri(), + ); + let _ = notifications.add_notifyee( + &test_lib::helpers::subscriber_uri2(), + &test_lib::helpers::local_topic1_uri(), + ); + // Should be two entries let data = notifications.get_data(); assert!(data.is_ok()); - assert_eq!(data.unwrap().len(), 0); + assert_eq!(data.unwrap().len(), 2); } } diff --git a/up-subscription/src/tests/subscription_manager_tests.rs b/up-subscription/src/tests/subscription_manager_tests.rs index 1789bdf..c57a2fe 100644 --- a/up-subscription/src/tests/subscription_manager_tests.rs +++ b/up-subscription/src/tests/subscription_manager_tests.rs @@ -19,6 +19,7 @@ mod tests { use std::error::Error; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; + use std::vec; use test_case::test_case; use tokio::sync::{mpsc, mpsc::Sender, oneshot, Notify}; @@ -141,7 +142,7 @@ mod tests { let shutdown_notification_cloned = shutdown_notification.clone(); helpers::spawn_and_log_error(async move { #[allow(clippy::mutable_key_type)] - let mut notification_topics: HashMap = HashMap::new(); + let mut notification_topics: Vec<(SubscriberUUri, TopicUUri)> = Vec::new(); loop { tokio::select! { Some(event) = notification_receiver.recv() => { @@ -374,7 +375,7 @@ mod tests { #[allow(clippy::mutable_key_type)] async fn set_notification_topics( &self, - notification_topics_replacement: HashMap, + notification_topics_replacement: Vec<(SubscriberUUri, TopicUUri)>, ) -> Result<(), Box> { let (respond_to, receive_from) = oneshot::channel::<()>(); let command = NotificationEvent::SetNotificationTopics { @@ -1285,12 +1286,10 @@ mod tests { // Add a notification-registration to the mock backend, for which we subsequently expect a notifcation on reset #[allow(clippy::mutable_key_type)] - let mut notification_topics_replacement: HashMap = - HashMap::new(); - notification_topics_replacement.insert( + let notification_topics_replacement: Vec<(SubscriberUUri, TopicUUri)> = vec![( test_lib::helpers::subscriber_uri2(), test_lib::helpers::local_topic2_uri(), - ); + )]; assert!(command_sender .set_notification_topics(notification_topics_replacement) .await diff --git a/up-subscription/src/tests/test_lib.rs b/up-subscription/src/tests/test_lib.rs index 0aae8dd..a6f60b6 100644 --- a/up-subscription/src/tests/test_lib.rs +++ b/up-subscription/src/tests/test_lib.rs @@ -162,15 +162,15 @@ pub(crate) mod helpers { pub(crate) const SUBSCRIBER1_ID: u32 = 0x0000_1000; pub(crate) const SUBSCRIBER1_VERSION: u8 = 0x01; - pub(crate) const SUBSCRIBER1_RESOURCE: u16 = 0x1000; + pub(crate) const SUBSCRIBER1_RESOURCE: u16 = 0x0000; const SUBSCRIBER2_ID: u32 = 0x0000_2000; const SUBSCRIBER2_VERSION: u8 = 0x01; - const SUBSCRIBER2_RESOURCE: u16 = 0x1000; + const SUBSCRIBER2_RESOURCE: u16 = 0x0010; const SUBSCRIBER3_ID: u32 = 0x0000_3000; const SUBSCRIBER3_VERSION: u8 = 0x01; - const SUBSCRIBER3_RESOURCE: u16 = 0x1000; + const SUBSCRIBER3_RESOURCE: u16 = 0x0100; #[allow(dead_code)] // final decision on removing this to happen after functional spec alignment is complete const NOTIFICATION_TOPIC_ID: u32 = 0x001_0000;