Skip to content

Commit 141ca83

Browse files
Add requirement markers (eclipse-uprotocol#19)
Add openfasttrace requirement implementation and test markers to the codebase.
1 parent 5cd4bfb commit 141ca83

12 files changed

+82
-4
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,10 @@ This codebase is approaching beta state, mirroring progress in up-spec and up-ru
1414
- [x] create a usubscription-cli module, a simple command-line frontend for running up-subscription
1515
- [x] set up a devcontainer
1616
- [x] feed back learnings and clarifications into up-spec usubscription documentation
17+
- [x] implement subscription expiry
1718
- [ ] implement any changes necessary to align with update/re-write of uSubscription specification
18-
- [ ] create a little demo application for interacting with up-subscription
1919
- [ ] tests for subscriber persistency
20-
- [ ] implement subscription expiry
20+
- [ ] create a little demo application for interacting with up-subscription
2121

2222
## Getting Started
2323

up-subscription/src/handlers/fetch_subscribers.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ impl RequestHandler for FetchSubscribersRequestHandler {
4949
message_attributes: &UAttributes,
5050
request_payload: Option<UPayload>,
5151
) -> Result<Option<UPayload>, ServiceInvocationError> {
52+
// [impl->dsn~usubscription-fetch-subscribers-protobuf~1]
5253
let (fetch_subscribers_request, _source) =
5354
helpers::extract_inputs::<FetchSubscribersRequest>(
5455
RESOURCE_ID_FETCH_SUBSCRIBERS,
@@ -114,6 +115,7 @@ mod tests {
114115

115116
use crate::{helpers, tests::test_lib};
116117

118+
// [utest->dsn~usubscription-fetch-subscribers-protobuf~1]
117119
#[tokio::test]
118120
async fn test_subscribe_success() {
119121
helpers::init_once();

up-subscription/src/handlers/fetch_subscriptions.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ impl RequestHandler for FetchSubscriptionsRequestHandler {
5151
message_attributes: &UAttributes,
5252
request_payload: Option<UPayload>,
5353
) -> Result<Option<UPayload>, ServiceInvocationError> {
54+
// [impl->dsn~usubscription-fetch-subscriptions-protobuf~1]
5455
let (fetch_subscriptions_request, _source) =
5556
helpers::extract_inputs::<FetchSubscriptionsRequest>(
5657
RESOURCE_ID_FETCH_SUBSCRIPTIONS,
@@ -152,6 +153,7 @@ mod tests {
152153

153154
use crate::{helpers, tests::test_lib};
154155

156+
// [utest->dsn~usubscription-fetch-subscriptions-protobuf~1]
155157
#[tokio::test]
156158
async fn test_fetch_subscriptions_success() {
157159
helpers::init_once();

up-subscription/src/handlers/register_for_notifications.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ impl RequestHandler for RegisterNotificationsRequestHandler {
4545
message_attributes: &UAttributes,
4646
request_payload: Option<UPayload>,
4747
) -> Result<Option<UPayload>, ServiceInvocationError> {
48+
// [impl->dsn~usubscription-register-notifications-protobuf~1]
4849
let (register_for_notifications_request, source) =
4950
helpers::extract_inputs::<NotificationsRequest>(
5051
RESOURCE_ID_REGISTER_FOR_NOTIFICATIONS,
@@ -87,6 +88,7 @@ mod tests {
8788

8889
use crate::{helpers, tests::test_lib};
8990

91+
// [utest->dsn~usubscription-register-notifications-protobuf~1]
9092
#[tokio::test]
9193
async fn test_register_notification_success() {
9294
helpers::init_once();

up-subscription/src/handlers/subscribe.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ mod tests {
122122

123123
use crate::{helpers, tests::test_lib};
124124

125+
// [utest->dsn~usubscription-subscribe-protobuf~1]
125126
#[tokio::test]
126127
async fn test_subscribe_success() {
127128
helpers::init_once();

up-subscription/src/handlers/unregister_for_notifications.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ impl RequestHandler for UnregisterNotificationsRequestHandler {
4545
message_attributes: &UAttributes,
4646
request_payload: Option<UPayload>,
4747
) -> Result<Option<UPayload>, ServiceInvocationError> {
48+
// [impl->dsn~usubscription-unregister-notifications-protobuf~1]
4849
let (_subscription_request, source) = helpers::extract_inputs::<NotificationsRequest>(
4950
RESOURCE_ID_UNREGISTER_FOR_NOTIFICATIONS,
5051
resource_id,
@@ -81,6 +82,7 @@ mod tests {
8182

8283
use crate::{helpers, tests::test_lib};
8384

85+
// [utest->dsn~usubscription-unregister-notifications-protobuf~1]
8486
#[tokio::test]
8587
async fn test_unregister_notification_success() {
8688
helpers::init_once();

up-subscription/src/handlers/unsubscribe.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ impl RequestHandler for UnubscribeRequestHandler {
4545
message_attributes: &UAttributes,
4646
request_payload: Option<UPayload>,
4747
) -> Result<Option<UPayload>, ServiceInvocationError> {
48+
// [impl->dsn~usubscription-unsubscribe-protobuf~1]
4849
let (unsubscribe_request, source) = helpers::extract_inputs::<UnsubscribeRequest>(
4950
RESOURCE_ID_UNSUBSCRIBE,
5051
resource_id,
@@ -95,6 +96,7 @@ mod tests {
9596

9697
use crate::{helpers, tests::test_lib};
9798

99+
// [utest->dsn~usubscription-unsubscribe-protobuf~1]
98100
#[tokio::test]
99101
async fn test_unsubscribe_success() {
100102
helpers::init_once();

up-subscription/src/notification_manager.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ pub(crate) async fn notification_engine(
118118
status,
119119
respond_to,
120120
} => {
121+
// [impl->dsn~usubscription-change-notification-type~1]
121122
let update = Update {
122123
topic: Some(topic).into(),
123124
subscriber: Some(SubscriberInfo {
@@ -131,6 +132,7 @@ pub(crate) async fn notification_engine(
131132

132133
// Send Update message to general notification channel
133134
// as per usubscription.proto RegisterForNotifications(NotificationsRequest)
135+
// [impl->dsn~usubscription-change-notification-topic~1]
134136
match UMessageBuilder::publish(usubscription_uri(RESOURCE_ID_SUBSCRIPTION_CHANGE))
135137
.with_message_id(UUID::build())
136138
.build_with_protobuf_payload(&update)
@@ -139,6 +141,7 @@ pub(crate) async fn notification_engine(
139141
error!("Error building global update notification message: {e}");
140142
}
141143
Ok(update_msg) => {
144+
// [impl->dsn~usubscription-change-notification-topic~1]
142145
if let Err(e) = up_transport.send(update_msg).await {
143146
error!(
144147
"Error sending global subscription-change update notification: {e}"
@@ -148,6 +151,7 @@ pub(crate) async fn notification_engine(
148151
}
149152

150153
// Send Update message to any dedicated registered notification-subscribers
154+
// [impl->req~usubscription-register-notifications~1]
151155
if let Ok(topics) = notifications.get_topics() {
152156
for topic_entry in topics {
153157
debug!(

up-subscription/src/persistency.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,10 +112,12 @@ impl SubscriptionsStore {
112112
let topic_string = &topic.to_uri(PERSIST_UP_SCHEMA);
113113

114114
Ok(
115+
// [impl->req~usubscription-subscribe-multiple~1]
115116
if let Some(mut subscriber_list) = self
116117
.persistency
117118
.get::<HashMap<SubscriberAsString, Option<ExpiryTimestamp>>>(topic_string)
118119
{
120+
// [impl->req~usubscription-subscribe-expiration-extension~1]
119121
subscriber_list.insert(subscriber_string.clone(), expiry);
120122
self.persistency
121123
.set(topic_string, &subscriber_list)
@@ -182,6 +184,8 @@ impl SubscriptionsStore {
182184
/// Return a list of all subscribers of given topic
183185
/// * returns `Vec<SubscriberUUri>` that contains all subscriber UUris registered for the topic
184186
/// * returns a `PersistencyError` in case of problems with serialization of data or manipulation of persist storage
187+
// [impl->req~usubscription-fetch-subscribers-stable-sorting~1]
188+
// [impl->req~usubscription-fetch-subscriptions-stable-sorting~1]
185189
pub(crate) fn get_topic_subscribers(
186190
&self,
187191
topic: &TopicUUri,
@@ -210,6 +214,7 @@ impl SubscriptionsStore {
210214
/// Return a list of all topics subscribed to by given subscriber
211215
/// * returns `Vec<TopicUUri>` that contains all topics subscribed to by subscriber
212216
/// * returns a `PersistencyError` in case of problems with serialization of data or manipulation of persist storage
217+
// [impl->req~usubscription-fetch-subscriptions-stable-sorting~1]
213218
pub(crate) fn get_subscriber_topics(
214219
&self,
215220
subscriber: &SubscriberUUri,
@@ -270,6 +275,7 @@ impl SubscriptionsStore {
270275
/// This function does two things
271276
/// - remove any subscription relationships from persistency that have an expiration timestamp that lies in the past
272277
/// - return all remaining subscription relationships which have an expiration timestamp that has not yet expired
278+
// [impl->req~usubscription-subscribe-no-expiration~1]
273279
pub(crate) fn get_and_prune_expiring_subscriptions(
274280
&mut self,
275281
) -> Result<Vec<(SubscriberUUri, TopicUUri, ExpiryTimestamp)>, PersistencyError> {
@@ -446,6 +452,7 @@ impl RemoteTopicsStore {
446452
))
447453
})?
448454
} else {
455+
// [impl->req~usubscription-subscribe-remote-pending~1]
449456
self.set_topic_state(topic, TopicState::SUBSCRIBE_PENDING)?
450457
})
451458
}

up-subscription/src/subscription_manager.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,8 @@ enum Event {
145145

146146
// Core business logic of subscription management - includes container data types for tracking subscriptions and remote subscriptions.
147147
// Interfacing with this purely works via channels, so we do not have to deal with mutexes and similar concepts.
148+
// [impl->req~usubscription-unsubscribe-notifications~1]
149+
// [impl->dsn~usubscription-state-machine~1]
148150
pub(crate) async fn handle_message(
149151
configuration: Arc<USubscriptionConfiguration>,
150152
transport: Arc<dyn UTransport>,
@@ -165,6 +167,8 @@ pub(crate) async fn handle_message(
165167
mpsc::channel::<InternalSubscriptionEvent>(INTERNAL_COMMAND_BUFFER_SIZE);
166168

167169
// At startup, set up timed unsubscribe for any persisted subscriptions that define an expiration timestamp
170+
// [impl->req~usubscription-subscribe-expiration~1]
171+
// [impl->req~usubscription-subscribe-no-expiration~1]
168172
match subscriptions.get_and_prune_expiring_subscriptions() {
169173
Ok(list) => {
170174
for (subscriber, topic, expiry_millis) in list {
@@ -210,6 +214,7 @@ pub(crate) async fn handle_message(
210214
expiry,
211215
respond_to,
212216
} => {
217+
// [impl->req~usubscription-subscribe~1]
213218
match add_subscription(
214219
configuration.clone(),
215220
transport.clone(),
@@ -220,6 +225,7 @@ pub(crate) async fn handle_message(
220225
topic.clone(),
221226
expiry,
222227
) {
228+
// [impl->req~usubscription-subscribe-notifications~1]
223229
Ok(result) => {
224230
// Send topic state change notification
225231
notification_manager::notify(
@@ -244,6 +250,7 @@ pub(crate) async fn handle_message(
244250
topic,
245251
respond_to,
246252
} => {
253+
// [impl->req~usubscription-unsubscribe~1]
247254
match remove_subscription(
248255
configuration.clone(),
249256
transport.clone(),
@@ -277,6 +284,7 @@ pub(crate) async fn handle_message(
277284
offset,
278285
respond_to,
279286
} => match fetch_subscribers(&subscriptions, topic, offset) {
287+
// [impl->req~usubscription-fetch-subscribers~1]
280288
Ok(result) => {
281289
if respond_to.send(result).is_err() {
282290
error!("Problem with internal communication");
@@ -422,10 +430,13 @@ fn add_subscription(
422430
let _ = topic_subscribers.add_subscription(&subscriber, &topic, expiry)?;
423431

424432
// For REMOTE topics, we explicitly track state due to _PENDING scenarios
433+
// [impl->req~usubscription-subscribe-multiple~1]
425434
let state = if topic.is_remote_authority(&uri_provider.get_authority()) {
426435
let state = remote_topics.add_topic_or_get_state(&topic)?;
427436

428437
// if this remote topic is not yet SUBSCRIBED, perform remote subscription
438+
// [impl->req~usubscription-subscribe-remote~1]
439+
// [impl->req~usubscription-subscribe-unsubscribe-pending~1]
429440
if state != TopicState::SUBSCRIBED {
430441
let topic_clone = topic.clone();
431442
let internal_cmd_sender_clone = internal_cmd_sender.clone();
@@ -447,6 +458,8 @@ fn add_subscription(
447458
};
448459

449460
// Set up timed unsubscribe in case expiration timestamp is set
461+
// [impl->req~usubscription-subscribe-expiration~1]
462+
// [impl->req~usubscription-subscribe-no-expiration~1]
450463
if let Some(expiry_millis) = expiry {
451464
schedule_unsubscribe(
452465
expiry_millis,
@@ -473,6 +486,8 @@ fn remove_subscription(
473486
topic: TopicUUri,
474487
) -> Result<SubscriptionStatus, persistency::PersistencyError> {
475488
// if this was the last subscriber to topic and topic is remote
489+
// [impl->req~usubscription-unsubscribe-last-remote~1]
490+
// [impl->req~usubscription-unsubscribe-subscribe-pending~1]
476491
if topic_subscribers.remove_subscription(&subscriber, &topic)?
477492
&& topic.is_remote_authority(&uri_provider.get_authority())
478493
{
@@ -486,6 +501,8 @@ fn remove_subscription(
486501
});
487502
}
488503

504+
// [impl->req~usubscription-unsubscribe-multiple~1]
505+
// [impl->req~usubscription-unsubscribe-remote-unsubscribed~1]
489506
Ok(SubscriptionStatus {
490507
// Whatever happens with the remote topic state - as far as the local client is concerned, it has now UNSUBSCRIBED from this topic
491508
state: TopicState::UNSUBSCRIBED.into(),
@@ -503,11 +520,13 @@ fn fetch_subscribers(
503520
// the remote topic is already fully SUBSCRIBED, of still SUSBCRIBED_PENDING
504521
let mut subscribers = topic_subscribers.get_topic_subscribers(&topic)?;
505522

523+
// [impl->req~usubscription-fetch-subscribers-offset~1]
506524
if let Some(offset) = offset {
507525
subscribers.drain(..offset as usize);
508526
}
509527

510528
// split up result list, to make sense of has_more_records field
529+
// [impl->req~usubscription-fetch-subscribers-has-more-records~1]
511530
let has_more = if subscribers.len() > UP_MAX_FETCH_SUBSCRIBERS_LEN {
512531
subscribers.truncate(UP_MAX_FETCH_SUBSCRIBERS_LEN);
513532
true
@@ -526,6 +545,7 @@ fn fetch_subscriptions(
526545
offset: Option<u32>,
527546
) -> Result<SubscriptionsResponse, persistency::PersistencyError> {
528547
let mut results: Vec<SubscriptionEntry> = match request {
548+
// [impl->req~usubscription-fetch-subscriptions-by-subscriber~1]
529549
RequestKind::Subscriber(subscriber) => topic_subscribers
530550
.get_subscriber_topics(&subscriber)?
531551
.iter()
@@ -543,6 +563,7 @@ fn fetch_subscriptions(
543563
})
544564
.collect(),
545565

566+
// [impl->req~usubscription-fetch-subscriptions-by-topic~1]
546567
RequestKind::Topic(topic) => topic_subscribers
547568
.get_topic_subscribers(&topic)?
548569
.iter()
@@ -561,11 +582,13 @@ fn fetch_subscriptions(
561582
.collect(),
562583
};
563584

585+
// [impl->req~usubscription-fetch-subscriptions-offset~1]
564586
if let Some(offset) = offset {
565587
results.drain(..offset as usize);
566588
}
567589

568590
// split up result list, to make sense of has_more_records field
591+
// [impl->req~usubscription-fetch-subscriptions-has-more-records~1]
569592
let mut has_more = false;
570593
if results.len() > UP_MAX_FETCH_SUBSCRIPTIONS_LEN {
571594
results.truncate(UP_MAX_FETCH_SUBSCRIPTIONS_LEN);
@@ -582,6 +605,7 @@ async fn remote_subscribe(
582605
transport: Arc<dyn UTransport>,
583606
internal_cmd_sender: Sender<InternalSubscriptionEvent>,
584607
) -> Result<(), UStatus> {
608+
// [impl->dsn~usubscription-subscribe-remote-subscriber-change~1]
585609
let rpc_client: Arc<dyn RpcClient> = Arc::new(
586610
InMemoryRpcClient::new(transport, uri_provider)
587611
.await
@@ -595,6 +619,7 @@ async fn remote_subscribe(
595619
};
596620

597621
// send request
622+
// [impl->req~usubscription-remote-max-timeout~1]
598623
let subscription_response: SubscriptionResponse = rpc_client
599624
.invoke_proto_method(
600625
make_remote_subscribe_uuri(&subscription_request.topic),
@@ -610,6 +635,7 @@ async fn remote_subscribe(
610635
})?;
611636

612637
// deal with response
638+
// [impl->req~usubscription-subscribe-remote-response~1]
613639
if subscription_response.is_state(TopicState::SUBSCRIBED) {
614640
debug!("Got remote subscription response, state SUBSCRIBED");
615641

@@ -634,6 +660,7 @@ async fn remote_unsubscribe(
634660
internal_cmd_sender: Sender<InternalSubscriptionEvent>,
635661
) -> Result<(), UStatus> {
636662
let rpc_client: Arc<dyn RpcClient> = Arc::new(
663+
// [impl->dsn~usubscription-unsubscribe-remote-subscriber-change~1]
637664
InMemoryRpcClient::new(transport, uri_provider)
638665
.await
639666
.map_err(|e| UStatus::fail_with_code(UCode::INTERNAL, e.to_string()))?,
@@ -751,6 +778,7 @@ mod tests {
751778
mock_provider
752779
}
753780

781+
// [utest->req~usubscription-subscribe-expiration~1]
754782
#[tokio::test]
755783
async fn test_schedule_future_unsubscribe() {
756784
let (internal_cmd_sender, mut internal_cmd_receiver) =
@@ -786,6 +814,7 @@ mod tests {
786814
}
787815
}
788816

817+
// [utest->req~usubscription-subscribe-expiration~1]
789818
#[tokio::test]
790819
async fn test_schedule_past_unsubscribe() {
791820
let (internal_cmd_sender, mut internal_cmd_receiver) =

0 commit comments

Comments
 (0)