Skip to content

Commit 27c6130

Browse files
Remote pagination and obsolete sorting requirements (eclipse-uprotocol#24)
- Pull in latest up-spec - Remove pagination code, to follow up-spec/#283 - Remove obsolete sorting requirement links
1 parent 848af63 commit 27c6130

File tree

6 files changed

+50
-145
lines changed

6 files changed

+50
-145
lines changed

up-spec

up-subscription/src/handlers/fetch_subscribers.rs

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,7 @@ use up_rust::{
2424
UAttributes,
2525
};
2626

27-
use crate::{
28-
helpers,
29-
subscription_manager::{SubscribersResponse, SubscriptionEvent},
30-
};
27+
use crate::{helpers, subscription_manager::SubscriptionEvent, usubscription::SubscriberUUri};
3128

3229
pub(crate) struct FetchSubscribersRequestHandler {
3330
subscription_sender: Sender<SubscriptionEvent>,
@@ -57,7 +54,7 @@ impl RequestHandler for FetchSubscribersRequestHandler {
5754
&request_payload,
5855
message_attributes,
5956
)?;
60-
let FetchSubscribersRequest { topic, offset, .. } = fetch_subscribers_request;
57+
let FetchSubscribersRequest { topic, .. } = fetch_subscribers_request;
6158
let Some(topic) = topic.into_option() else {
6259
return Err(ServiceInvocationError::InvalidArgument(
6360
"No topic defined in request".to_string(),
@@ -69,12 +66,8 @@ impl RequestHandler for FetchSubscribersRequestHandler {
6966
helpers::validate_uri(&topic)?;
7067

7168
// Interact with subscription manager backend
72-
let (respond_to, receive_from) = oneshot::channel::<SubscribersResponse>();
73-
let se = SubscriptionEvent::FetchSubscribers {
74-
topic,
75-
offset,
76-
respond_to,
77-
};
69+
let (respond_to, receive_from) = oneshot::channel::<Vec<SubscriberUUri>>();
70+
let se = SubscriptionEvent::FetchSubscribers { topic, respond_to };
7871

7972
if let Err(e) = self.subscription_sender.send(se).await {
8073
error!("Error communicating with subscription manager: {e}");
@@ -89,8 +82,7 @@ impl RequestHandler for FetchSubscribersRequestHandler {
8982
};
9083

9184
// Build and return result
92-
let (subscribers, has_more) = fetch_subscribers_response;
93-
let subscriber_infos = subscribers
85+
let subscriber_infos = fetch_subscribers_response
9486
.iter()
9587
.map(|subscriber| SubscriberInfo {
9688
uri: Some(subscriber.clone()).into(),
@@ -99,7 +91,6 @@ impl RequestHandler for FetchSubscribersRequestHandler {
9991
.collect();
10092
let fetch_subscribers_response = FetchSubscribersResponse {
10193
subscribers: subscriber_infos,
102-
has_more_records: Some(has_more),
10394
..Default::default()
10495
};
10596

@@ -165,7 +156,7 @@ mod tests {
165156
topic, respond_to, ..
166157
} => {
167158
assert_eq!(topic, test_lib::helpers::local_topic1_uri());
168-
let _ = respond_to.send(SubscribersResponse::default());
159+
let _ = respond_to.send(Vec::default());
169160
}
170161
_ => panic!("Wrong event type"),
171162
}

up-subscription/src/handlers/fetch_subscriptions.rs

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,7 @@ use up_rust::{
2626

2727
use crate::{
2828
helpers,
29-
subscription_manager::{
30-
RequestKind, SubscriptionEntry, SubscriptionEvent, SubscriptionsResponse,
31-
},
29+
subscription_manager::{RequestKind, SubscriptionEntry, SubscriptionEvent},
3230
};
3331

3432
pub(crate) struct FetchSubscriptionsRequestHandler {
@@ -60,9 +58,7 @@ impl RequestHandler for FetchSubscriptionsRequestHandler {
6058
message_attributes,
6159
)?;
6260

63-
let FetchSubscriptionsRequest {
64-
request, offset, ..
65-
} = fetch_subscriptions_request;
61+
let FetchSubscriptionsRequest { request, .. } = fetch_subscriptions_request;
6662
let request_kind = match request {
6763
Some(Request::Topic(topic)) => {
6864
if !topic.is_empty() {
@@ -92,10 +88,9 @@ impl RequestHandler for FetchSubscriptionsRequestHandler {
9288
};
9389

9490
// Interact with subscription manager backend
95-
let (respond_to, receive_from) = oneshot::channel::<SubscriptionsResponse>();
91+
let (respond_to, receive_from) = oneshot::channel::<Vec<SubscriptionEntry>>();
9692
let se = SubscriptionEvent::FetchSubscriptions {
9793
request: request_kind,
98-
offset,
9994
respond_to,
10095
};
10196

@@ -112,8 +107,7 @@ impl RequestHandler for FetchSubscriptionsRequestHandler {
112107
};
113108

114109
// Build and return result
115-
let (subscriptions, has_more) = fetch_subscriptions_response;
116-
let subscription_list: Vec<Subscription> = subscriptions
110+
let subscription_list: Vec<Subscription> = fetch_subscriptions_response
117111
.iter()
118112
.map(
119113
|SubscriptionEntry {
@@ -135,7 +129,6 @@ impl RequestHandler for FetchSubscriptionsRequestHandler {
135129

136130
let fetch_subscriptions_response = FetchSubscriptionsResponse {
137131
subscriptions: subscription_list,
138-
has_more_records: Some(has_more),
139132
..Default::default()
140133
};
141134

@@ -202,7 +195,6 @@ mod tests {
202195
match subscription_event {
203196
SubscriptionEvent::FetchSubscriptions {
204197
request,
205-
offset,
206198
respond_to,
207199
} => {
208200
match request {
@@ -216,11 +208,8 @@ mod tests {
216208
}
217209
_ => panic!("Wrong request details"),
218210
}
219-
assert_eq!(offset.unwrap_or_default(), 42);
220-
221-
let _ = respond_to.send(SubscriptionsResponse::default());
211+
let _ = respond_to.send(Vec::default());
222212
}
223-
224213
_ => panic!("Wrong event type"),
225214
}
226215
}

up-subscription/src/persistency.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -184,8 +184,6 @@ impl SubscriptionsStore {
184184
/// Return a list of all subscribers of given topic
185185
/// * returns `Vec<SubscriberUUri>` that contains all subscriber UUris registered for the topic
186186
/// * returns a `PersistencyError` in case of problems with serialization of data or manipulation of persist storage
187-
// [impl->req~usubscription-fetch-subscribers-stable-sorting~1]
188-
// [impl->req~usubscription-fetch-subscriptions-stable-sorting~1]
189187
pub(crate) fn get_topic_subscribers(
190188
&self,
191189
topic: &TopicUUri,
@@ -214,7 +212,6 @@ impl SubscriptionsStore {
214212
/// Return a list of all topics subscribed to by given subscriber
215213
/// * returns `Vec<TopicUUri>` that contains all topics subscribed to by subscriber
216214
/// * returns a `PersistencyError` in case of problems with serialization of data or manipulation of persist storage
217-
// [impl->req~usubscription-fetch-subscriptions-stable-sorting~1]
218215
pub(crate) fn get_subscriber_topics(
219216
&self,
220217
subscriber: &SubscriberUUri,

up-subscription/src/subscription_manager.rs

Lines changed: 19 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,6 @@ use crate::{
4343
// via tokio mpsc channel. This design allows to forgo the use of any synhronization primitives on the subscription-tracking container
4444
// data types, as any access is coordinated/serialized via the Event selection loop.
4545

46-
// Maximum number of `Subscriber` entries to be returned in a `FetchSusbcriptions´ operation.
47-
const UP_MAX_FETCH_SUBSCRIBERS_LEN: usize = 100;
48-
// Maximum number of `Subscriber` entries to be returned in a `FetchSusbcriptions´ operation.
49-
const UP_MAX_FETCH_SUBSCRIPTIONS_LEN: usize = 100;
50-
5146
// Queue size of message channel for internal commands - like subscription status change messages or subscription expiration commands.
5247
const INTERNAL_COMMAND_BUFFER_SIZE: usize = 128;
5348

@@ -68,9 +63,6 @@ pub(crate) struct SubscriptionEntry {
6863
pub(crate) status: SubscriptionStatus,
6964
}
7065

71-
pub(crate) type SubscribersResponse = (Vec<SubscriberUUri>, bool); // List of subscribers, boolean flag stating if there exist more entries than contained in list
72-
pub(crate) type SubscriptionsResponse = (Vec<SubscriptionEntry>, bool); // List of subscriber entries, boolean flag stating if there exist more entries than contained in list
73-
7466
// This is the 'outside API' of subscription manager, it includes some events that are only to be used in (and only enabled for) testing.
7567
#[derive(Debug)]
7668
pub(crate) enum SubscriptionEvent {
@@ -87,13 +79,11 @@ pub(crate) enum SubscriptionEvent {
8779
},
8880
FetchSubscribers {
8981
topic: TopicUUri,
90-
offset: Option<u32>,
91-
respond_to: oneshot::Sender<SubscribersResponse>, // return list of subscribers and flag indicating whether there are more
82+
respond_to: oneshot::Sender<Vec<SubscriberUUri>>,
9283
},
9384
FetchSubscriptions {
9485
request: RequestKind,
95-
offset: Option<u32>,
96-
respond_to: oneshot::Sender<SubscriptionsResponse>, // return list of Subscriptions and flag indicating whether there are more
86+
respond_to: oneshot::Sender<Vec<SubscriptionEntry>>,
9787
},
9888
// Purely for use during testing: get copy of current topic-subscriper ledger
9989
#[cfg(test)]
@@ -281,26 +271,23 @@ pub(crate) async fn handle_message(
281271
}
282272
}
283273
}
284-
SubscriptionEvent::FetchSubscribers {
285-
topic,
286-
offset,
287-
respond_to,
288-
} => match fetch_subscribers(&subscriptions, topic, offset) {
289-
// [impl->req~usubscription-fetch-subscribers~1]
290-
Ok(result) => {
291-
if respond_to.send(result).is_err() {
292-
error!("Problem with internal communication");
274+
SubscriptionEvent::FetchSubscribers { topic, respond_to } => {
275+
match fetch_subscribers(&subscriptions, topic) {
276+
// [impl->req~usubscription-fetch-subscribers~1]
277+
Ok(result) => {
278+
if respond_to.send(result).is_err() {
279+
error!("Problem with internal communication");
280+
}
281+
}
282+
Err(e) => {
283+
panic!("Persistency failure {e}")
293284
}
294285
}
295-
Err(e) => {
296-
panic!("Persistency failure {e}")
297-
}
298-
},
286+
}
299287
SubscriptionEvent::FetchSubscriptions {
300288
request,
301-
offset,
302289
respond_to,
303-
} => match fetch_subscriptions(&subscriptions, &remote_topics, request, offset) {
290+
} => match fetch_subscriptions(&subscriptions, &remote_topics, request) {
304291
Ok(result) => {
305292
if respond_to.send(result).is_err() {
306293
error!("Problem with internal communication");
@@ -541,37 +528,19 @@ fn remove_subscription(
541528
fn fetch_subscribers(
542529
topic_subscribers: &persistency::SubscriptionsStore,
543530
topic: TopicUUri,
544-
offset: Option<u32>,
545-
) -> Result<SubscribersResponse, persistency::PersistencyError> {
531+
) -> Result<Vec<SubscriberUUri>, persistency::PersistencyError> {
546532
// This will get *every* client that subscribed to `topic` - no matter whether (in the case of remote subscriptions)
547533
// the remote topic is already fully SUBSCRIBED, of still SUSBCRIBED_PENDING
548-
let mut subscribers = topic_subscribers.get_topic_subscribers(&topic)?;
549-
550-
// [impl->req~usubscription-fetch-subscribers-offset~1]
551-
if let Some(offset) = offset {
552-
subscribers.drain(..offset as usize);
553-
}
554-
555-
// split up result list, to make sense of has_more_records field
556-
// [impl->req~usubscription-fetch-subscribers-has-more-records~1]
557-
let has_more = if subscribers.len() > UP_MAX_FETCH_SUBSCRIBERS_LEN {
558-
subscribers.truncate(UP_MAX_FETCH_SUBSCRIBERS_LEN);
559-
true
560-
} else {
561-
false
562-
};
563-
564-
Ok((subscribers, has_more))
534+
topic_subscribers.get_topic_subscribers(&topic)
565535
}
566536

567537
// Fetch all subscriptions of a topic or subscribers
568538
fn fetch_subscriptions(
569539
topic_subscribers: &persistency::SubscriptionsStore,
570540
remote_topics: &persistency::RemoteTopicsStore,
571541
request: RequestKind,
572-
offset: Option<u32>,
573-
) -> Result<SubscriptionsResponse, persistency::PersistencyError> {
574-
let mut results: Vec<SubscriptionEntry> = match request {
542+
) -> Result<Vec<SubscriptionEntry>, persistency::PersistencyError> {
543+
let results: Vec<SubscriptionEntry> = match request {
575544
// [impl->req~usubscription-fetch-subscriptions-by-subscriber~1]
576545
RequestKind::Subscriber(subscriber) => topic_subscribers
577546
.get_subscriber_topics(&subscriber)?
@@ -609,20 +578,7 @@ fn fetch_subscriptions(
609578
.collect(),
610579
};
611580

612-
// [impl->req~usubscription-fetch-subscriptions-offset~1]
613-
if let Some(offset) = offset {
614-
results.drain(..offset as usize);
615-
}
616-
617-
// split up result list, to make sense of has_more_records field
618-
// [impl->req~usubscription-fetch-subscriptions-has-more-records~1]
619-
let mut has_more = false;
620-
if results.len() > UP_MAX_FETCH_SUBSCRIPTIONS_LEN {
621-
results.truncate(UP_MAX_FETCH_SUBSCRIPTIONS_LEN);
622-
has_more = true;
623-
}
624-
625-
Ok((results, has_more))
581+
Ok(results)
626582
}
627583

628584
// Perform remote topic subscription

0 commit comments

Comments
 (0)