@@ -575,14 +575,25 @@ impl<RT: Runtime> SubscriptionClient for ApplicationSubscriptionClient<RT> {
575575 }
576576}
577577
578+ pub enum ExtendValidityResult {
579+ /// The subscription's validity can be extended to the requested timestamp.
580+ Extended ,
581+ /// The subscription may no longer be valid at the requested timestamp.
582+ /// This result can be returned spuriously even if there were no conflicting
583+ /// writes.
584+ Invalid {
585+ /// The earliest conflicting timestamp, if known. This is not guaranteed
586+ /// to be known.
587+ invalid_ts : Option < Timestamp > ,
588+ } ,
589+ }
590+
578591#[ async_trait]
579592pub trait SubscriptionTrait : Send + Sync {
580593 fn wait_for_invalidation ( & self ) -> BoxFuture < ' static , anyhow:: Result < ( ) > > ;
581594
582- // Returns true if the subscription validity can be extended to new_ts. Note
583- // that extend_validity might return false even if the subscription can be
584- // extended, but will never return true if it can't.
585- async fn extend_validity ( & self , new_ts : Timestamp ) -> anyhow:: Result < bool > ;
595+ /// See comments on [`ExtendValidityResult`]
596+ async fn extend_validity ( & self , new_ts : Timestamp ) -> anyhow:: Result < ExtendValidityResult > ;
586597}
587598
588599struct ApplicationSubscription {
@@ -602,10 +613,10 @@ impl SubscriptionTrait for ApplicationSubscription {
602613 }
603614
604615 #[ fastrace:: trace]
605- async fn extend_validity ( & self , new_ts : Timestamp ) -> anyhow:: Result < bool > {
616+ async fn extend_validity ( & self , new_ts : Timestamp ) -> anyhow:: Result < ExtendValidityResult > {
606617 if new_ts < self . initial_ts {
607618 // new_ts is before the initial subscription timestamp.
608- return Ok ( false ) ;
619+ return Ok ( ExtendValidityResult :: Invalid { invalid_ts : None } ) ;
609620 }
610621
611622 // The inner subscription is periodically updated by the subscription
@@ -614,15 +625,19 @@ impl SubscriptionTrait for ApplicationSubscription {
614625 // Subscription is no longer valid. We could check validity from end_ts
615626 // to new_ts, but this is likely to fail and is potentially unbounded amount of
616627 // work, so we return false here. This is valid per the function contract.
617- return Ok ( false ) ;
628+ return Ok ( ExtendValidityResult :: Invalid {
629+ invalid_ts : self . inner . invalid_ts ( ) ,
630+ } ) ;
618631 } ;
619632
620633 let current_token = Token :: new ( self . reads . clone ( ) , current_ts) ;
621- let Some ( _new_token) = self . log . refresh_token ( current_token, new_ts) ? else {
622- // Subscription validity can't be extended. Note that returning false
623- // here also doesn't mean there is a conflict.
624- return Ok ( false ) ;
625- } ;
626- return Ok ( true ) ;
634+ Ok ( match self . log . refresh_token ( current_token, new_ts) ? {
635+ Ok ( _new_token) => ExtendValidityResult :: Extended ,
636+ Err ( invalid_ts) => {
637+ // Subscription validity can't be extended. Note that returning false
638+ // here also doesn't mean there is a conflict.
639+ ExtendValidityResult :: Invalid { invalid_ts }
640+ } ,
641+ } )
627642 }
628643}
0 commit comments