Skip to content

Commit 5cd4bfb

Browse files
Add implementation of timed subscriptions (eclipse-uprotocol#18)
* Add implementation of timed subscriptions * Add type aliases to improve code clarity
1 parent 41cd68a commit 5cd4bfb

File tree

9 files changed

+665
-227
lines changed

9 files changed

+665
-227
lines changed

up-subscription/src/common/helpers.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@
1414
use log::*;
1515
use std::future::Future;
1616
use std::sync::Once;
17-
use tokio::task;
17+
use std::time::{SystemTime, UNIX_EPOCH};
18+
use tokio::{task, time::Duration};
1819

1920
use up_rust::{
2021
communication::{ServiceInvocationError, UPayload},
@@ -74,3 +75,18 @@ where
7475

7576
Ok((request, source.clone()))
7677
}
78+
79+
pub(crate) fn duration_until_timestamp(future_timestamp_millis: u128) -> Option<Duration> {
80+
let now = SystemTime::now()
81+
.duration_since(UNIX_EPOCH)
82+
.ok()?
83+
.as_millis();
84+
85+
if future_timestamp_millis > now {
86+
Some(Duration::from_millis(
87+
(future_timestamp_millis - now) as u64,
88+
))
89+
} else {
90+
None // Timestamp is in the past
91+
}
92+
}

up-subscription/src/notification_manager.rs

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,14 @@ use up_rust::{
2222
usubscription_uri, SubscriberInfo, SubscriptionStatus, Update,
2323
RESOURCE_ID_SUBSCRIPTION_CHANGE,
2424
},
25-
UMessageBuilder, UTransport, UUri, UUID,
25+
UMessageBuilder, UTransport, UUID,
2626
};
2727

28-
use crate::{helpers, persistency, usubscription, USubscriptionConfiguration};
28+
use crate::{
29+
helpers, persistency,
30+
usubscription::{SubscriberUUri, TopicUUri, INCLUDE_SCHEMA},
31+
USubscriptionConfiguration,
32+
};
2933

3034
// This is the core business logic for tracking and sending subscription update notifications. It is currently implemented as a single
3135
// event-consuming function `notification_engine()`, which is supposed to be spawned into a task, and process the various notification
@@ -35,27 +39,27 @@ use crate::{helpers, persistency, usubscription, USubscriptionConfiguration};
3539
#[derive(Debug)]
3640
pub(crate) enum NotificationEvent {
3741
AddNotifyee {
38-
subscriber: UUri,
39-
topic: UUri,
42+
subscriber: SubscriberUUri,
43+
topic: TopicUUri,
4044
},
4145
RemoveNotifyee {
42-
subscriber: UUri,
46+
subscriber: SubscriberUUri,
4347
},
4448
StateChange {
45-
subscriber: Option<UUri>,
46-
topic: UUri,
49+
subscriber: Option<SubscriberUUri>,
50+
topic: TopicUUri,
4751
status: SubscriptionStatus,
4852
respond_to: oneshot::Sender<()>,
4953
},
5054
// Purely for use during testing: get copy of current notifyee ledger
5155
#[cfg(test)]
5256
GetNotificationTopics {
53-
respond_to: oneshot::Sender<HashMap<UUri, UUri>>,
57+
respond_to: oneshot::Sender<HashMap<SubscriberUUri, TopicUUri>>,
5458
},
5559
// Purely for use during testing: force-set new notifyees ledger
5660
#[cfg(test)]
5761
SetNotificationTopics {
58-
notification_topics_replacement: HashMap<UUri, UUri>,
62+
notification_topics_replacement: HashMap<SubscriberUUri, TopicUUri>,
5963
respond_to: oneshot::Sender<()>,
6064
},
6165
}
@@ -148,18 +152,18 @@ pub(crate) async fn notification_engine(
148152
for topic_entry in topics {
149153
debug!(
150154
"Sending notification to ({}): topic {}, subscriber {}, status {}",
151-
topic_entry.to_uri(usubscription::INCLUDE_SCHEMA),
155+
topic_entry.to_uri(INCLUDE_SCHEMA),
152156
update
153157
.topic
154158
.as_ref()
155159
.unwrap_or_default()
156-
.to_uri(usubscription::INCLUDE_SCHEMA),
160+
.to_uri(INCLUDE_SCHEMA),
157161
update
158162
.subscriber
159163
.uri
160164
.as_ref()
161165
.unwrap_or_default()
162-
.to_uri(usubscription::INCLUDE_SCHEMA),
166+
.to_uri(INCLUDE_SCHEMA),
163167
update.status.as_ref().unwrap_or_default()
164168
);
165169

@@ -206,8 +210,8 @@ pub(crate) async fn notification_engine(
206210
// susbcriber is an Option, because in the case ob remote subscription state changes, there is no subscriber (other than local usubscription service)
207211
pub(crate) async fn notify(
208212
notification_sender: Sender<NotificationEvent>,
209-
subscriber: Option<UUri>,
210-
topic: UUri,
213+
subscriber: Option<SubscriberUUri>,
214+
topic: TopicUUri,
211215
status: SubscriptionStatus,
212216
) {
213217
let (respond_to, receive_from) = oneshot::channel::<()>();

0 commit comments

Comments
 (0)