1313
1414use async_trait:: async_trait;
1515use log:: * ;
16+ use std:: time:: { SystemTime , UNIX_EPOCH } ;
1617use tokio:: { sync:: mpsc:: Sender , sync:: oneshot} ;
1718
1819use up_rust:: {
@@ -24,7 +25,7 @@ use up_rust::{
2425} ;
2526
2627use crate :: {
27- helpers,
28+ helpers, usubscription ,
2829 { notification_manager:: NotificationEvent , subscription_manager:: SubscriptionEvent } ,
2930} ;
3031
@@ -66,11 +67,32 @@ impl RequestHandler for SubscriptionRequestHandler {
6667 ) ) ;
6768 } ;
6869
70+ // Provisionally compute milliseconds to subscription expiry, from protobuf.google.Timestamp input in second granularity (we ignore the nanos).
71+ // Likely to change in the future, when we get rid of the protobuf.google.Timestamp type and track in milliseconds throughought.
72+ let expiry: Option < usubscription:: ExpiryTimestamp > =
73+ match subscription_request. attributes . expire . seconds . try_into ( ) {
74+ Ok ( 0 ) => None ,
75+ Ok ( seconds) => Some ( seconds * 1000 ) ,
76+ Err ( _) => None ,
77+ } ;
78+ // Check if the expiry timestamp is in the future
79+ if let Some ( expiry_ms) = expiry {
80+ let now_ms = SystemTime :: now ( )
81+ . duration_since ( UNIX_EPOCH )
82+ . expect ( "Time went backwards" )
83+ . as_millis ( ) ;
84+ if now_ms > expiry_ms {
85+ return Err ( ServiceInvocationError :: InvalidArgument (
86+ "Subscription Expiry time already passed" . to_string ( ) ,
87+ ) ) ;
88+ }
89+ }
6990 // Interact with subscription manager backend
7091 let ( respond_to, receive_from) = oneshot:: channel :: < SubscriptionStatus > ( ) ;
7192 let se = SubscriptionEvent :: AddSubscription {
7293 subscriber : source. clone ( ) ,
7394 topic : topic. clone ( ) ,
95+ expiry,
7496 respond_to,
7597 } ;
7698
@@ -133,7 +155,7 @@ mod tests {
133155
134156 // create request and other required object(s)
135157 let subscribe_request =
136- test_lib:: helpers:: subscription_request ( test_lib:: helpers:: local_topic1_uri ( ) ) ;
158+ test_lib:: helpers:: subscription_request ( test_lib:: helpers:: local_topic1_uri ( ) , None ) ;
137159 let request_payload = UPayload :: try_from_protobuf ( subscribe_request. clone ( ) ) . unwrap ( ) ;
138160 let message_attributes = UAttributes {
139161 source : Some ( test_lib:: helpers:: subscriber_uri1 ( ) ) . into ( ) ,
@@ -172,6 +194,7 @@ mod tests {
172194 SubscriptionEvent :: AddSubscription {
173195 subscriber,
174196 topic,
197+ expiry : None ,
175198 respond_to,
176199 } => {
177200 assert_eq ! ( subscriber, test_lib:: helpers:: subscriber_uri1( ) ) ;
@@ -214,7 +237,7 @@ mod tests {
214237
215238 // create request and other required object(s)
216239 let subscribe_request =
217- test_lib:: helpers:: subscription_request ( test_lib:: helpers:: local_topic1_uri ( ) ) ;
240+ test_lib:: helpers:: subscription_request ( test_lib:: helpers:: local_topic1_uri ( ) , None ) ;
218241 let request_payload = UPayload :: try_from_protobuf ( subscribe_request. clone ( ) ) . unwrap ( ) ;
219242 let message_attributes = UAttributes {
220243 source : Some ( test_lib:: helpers:: subscriber_uri1 ( ) ) . into ( ) ,
@@ -249,7 +272,7 @@ mod tests {
249272
250273 // create request and other required object(s)
251274 let subscribe_request =
252- test_lib:: helpers:: subscription_request ( test_lib:: helpers:: local_topic1_uri ( ) ) ;
275+ test_lib:: helpers:: subscription_request ( test_lib:: helpers:: local_topic1_uri ( ) , None ) ;
253276 let request_payload = UPayload :: try_from_protobuf ( subscribe_request. clone ( ) ) . unwrap ( ) ;
254277
255278 let ( subscription_sender, _) = mpsc:: channel :: < SubscriptionEvent > ( 1 ) ;
@@ -336,4 +359,111 @@ mod tests {
336359 _ => panic ! ( "Wrong error type" ) ,
337360 }
338361 }
362+
363+ #[ tokio:: test]
364+ async fn test_future_subscription ( ) {
365+ helpers:: init_once ( ) ;
366+
367+ let future_secs = SystemTime :: now ( )
368+ . duration_since ( UNIX_EPOCH )
369+ . unwrap ( )
370+ . as_secs ( )
371+ + 600 ;
372+
373+ // create request and other required object(s)
374+ // SubscriptionRequest currently uses protobuf.google.Timestamp for expiry attribute, which tracks timestamp in second granularity (we ignore the nanos)
375+ // Also, protobuf can only do signed ints, and uses an i64 for the seconds... so we have to force our timestamp into that.
376+ let subscribe_request = test_lib:: helpers:: subscription_request (
377+ test_lib:: helpers:: local_topic1_uri ( ) ,
378+ Some ( u32:: try_from ( future_secs) . unwrap ( ) ) ,
379+ ) ;
380+ let request_payload = UPayload :: try_from_protobuf ( subscribe_request. clone ( ) ) . unwrap ( ) ;
381+ let message_attributes = UAttributes {
382+ source : Some ( test_lib:: helpers:: subscriber_uri1 ( ) ) . into ( ) ,
383+ ..Default :: default ( )
384+ } ;
385+
386+ let ( subscription_sender, mut subscription_receiver) =
387+ mpsc:: channel :: < SubscriptionEvent > ( 1 ) ;
388+ let ( notification_sender, _) = mpsc:: channel :: < NotificationEvent > ( 1 ) ;
389+
390+ // create and spawn off handler, to make all the asnync goodness work
391+ let request_handler =
392+ SubscriptionRequestHandler :: new ( subscription_sender, notification_sender) ;
393+ tokio:: spawn ( async move {
394+ let result = request_handler
395+ . handle_request (
396+ RESOURCE_ID_SUBSCRIBE ,
397+ & message_attributes,
398+ Some ( request_payload) ,
399+ )
400+ . await
401+ . unwrap ( ) ;
402+
403+ let response: SubscriptionResponse = result. unwrap ( ) . extract_protobuf ( ) . unwrap ( ) ;
404+ assert_eq ! (
405+ response. topic. unwrap_or_default( ) ,
406+ test_lib:: helpers:: local_topic1_uri( )
407+ ) ;
408+ assert_eq ! ( response. status. unwrap( ) . state, State :: SUBSCRIBED . into( ) ) ;
409+ } ) ;
410+
411+ // validate subscription manager interaction
412+ let subscription_event = subscription_receiver. recv ( ) . await . unwrap ( ) ;
413+ match subscription_event {
414+ SubscriptionEvent :: AddSubscription {
415+ subscriber,
416+ topic,
417+ expiry,
418+ respond_to,
419+ } => {
420+ assert_eq ! ( subscriber, test_lib:: helpers:: subscriber_uri1( ) ) ;
421+ assert_eq ! ( topic, test_lib:: helpers:: local_topic1_uri( ) ) ;
422+ // we're passing in seconds above, because of how SubscriptionRequest is defined - but internally we are handling milliseconds; therefore *1000
423+ assert_eq ! (
424+ expiry,
425+ Some ( ( future_secs as usubscription:: ExpiryTimestamp ) * 1000 )
426+ ) ;
427+
428+ let _ = respond_to. send ( SubscriptionStatus {
429+ state : State :: SUBSCRIBED . into ( ) ,
430+ ..Default :: default ( )
431+ } ) ;
432+ }
433+ _ => panic ! ( "Wrong event type" ) ,
434+ }
435+ }
436+
437+ #[ tokio:: test]
438+ async fn test_expired_subscription ( ) {
439+ helpers:: init_once ( ) ;
440+
441+ // create request and other required object(s)
442+ let subscribe_request = test_lib:: helpers:: subscription_request (
443+ test_lib:: helpers:: local_topic1_uri ( ) ,
444+ Some ( 10 ) ,
445+ ) ;
446+ let request_payload = UPayload :: try_from_protobuf ( subscribe_request. clone ( ) ) . unwrap ( ) ;
447+ let message_attributes = UAttributes {
448+ source : Some ( test_lib:: helpers:: subscriber_uri1 ( ) ) . into ( ) ,
449+ ..Default :: default ( )
450+ } ;
451+
452+ let ( subscription_sender, _) = mpsc:: channel :: < SubscriptionEvent > ( 1 ) ;
453+ let ( notification_sender, _) = mpsc:: channel :: < NotificationEvent > ( 1 ) ;
454+
455+ // create handler and perform tested operation
456+ let request_handler =
457+ SubscriptionRequestHandler :: new ( subscription_sender, notification_sender) ;
458+
459+ let result = request_handler
460+ . handle_request (
461+ up_rust:: core:: usubscription:: RESOURCE_ID_SUBSCRIBE ,
462+ & message_attributes,
463+ Some ( request_payload) ,
464+ )
465+ . await ;
466+
467+ assert ! ( result. is_err_and( |e| matches!( e, ServiceInvocationError :: InvalidArgument ( _) ) ) ) ;
468+ }
339469}
0 commit comments