Skip to content

Commit 575b6cb

Browse files
Preslav LeConvex, Inc.
authored andcommitted
Add end_ts to ApplicationSubscription implementation. (#26395)
This is an RFC to illustrate how adding end_ts in ApplicationSubscription would look like. The tldr is that it doesn't affect behavior and only performance. Having end_ts allows optimizing calling extend_validity multiple times. In practice, this optimization doesn't matter as long as the subscription worker can keep up but send this since there is nothing better than explaining trade-offs than code. Proposal: Do not land this but open to clean up and land if @sujayakar thinks it is cleaner. GitOrigin-RevId: f992a8e119880e6299643c753ec65435e85e1e92
1 parent 9b6a38c commit 575b6cb

File tree

1 file changed

+25
-4
lines changed

1 file changed

+25
-4
lines changed

crates/application/src/api.rs

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,7 @@ impl<RT: Runtime> ApplicationApi for Application<RT> {
207207
let inner = self.subscribe(token.clone()).await?;
208208
Ok(Box::new(ApplicationSubscription {
209209
initial_ts: token.ts(),
210+
end_ts: token.ts(),
210211
reads: token.into_reads(),
211212
inner,
212213
log: self.database.log().clone(),
@@ -218,7 +219,9 @@ impl<RT: Runtime> ApplicationApi for Application<RT> {
218219
pub trait SubscriptionTrait: Send + Sync {
219220
fn wait_for_invalidation(&self) -> BoxFuture<'static, anyhow::Result<()>>;
220221

221-
// Returns true if the subscription validity can be extended to new_ts.
222+
// Returns true if the subscription validity can be extended to new_ts. Note
223+
// that extend_validity might return false even if the subscription can be
224+
// extended, but will never return true if it can't.
222225
async fn extend_validity(&mut self, new_ts: Timestamp) -> anyhow::Result<bool>;
223226
}
224227

@@ -227,7 +230,13 @@ struct ApplicationSubscription {
227230
log: LogReader,
228231

229232
reads: ReadSet,
233+
// The initial timestamp the subscription was created at. This is known
234+
// to be valid.
230235
initial_ts: Timestamp,
236+
// The last timestamp the subscription is known to be valid for.
237+
// NOTE that the inner subscription might be valid to a higher timestamp,
238+
// but end_ts is not automatically updated.
239+
end_ts: Timestamp,
231240
}
232241

233242
#[async_trait]
@@ -242,16 +251,28 @@ impl SubscriptionTrait for ApplicationSubscription {
242251
return Ok(false);
243252
}
244253

254+
if new_ts <= self.end_ts {
255+
// We have already validated the subscription past new_ts.
256+
return Ok(true);
257+
}
258+
259+
// The inner subscription is periodically updated by the subscription
260+
// worker.
245261
let Some(current_ts) = self.inner.current_ts() else {
246-
// Subscription no longer valid.
262+
// Subscription is no longer valid. We could check validity from end_ts
263+
// to new_ts, but this is likely to fail and is potentially unbounded amount of
264+
// work, so we return false here. This is valid per the function contract.
247265
return Ok(false);
248266
};
267+
self.end_ts = self.end_ts.max(current_ts);
249268

250-
let current_token = Token::new(self.reads.clone(), current_ts);
269+
let current_token = Token::new(self.reads.clone(), self.end_ts);
251270
let Some(_new_token) = self.log.refresh_token(current_token, new_ts)? else {
252-
// Subscription validity can't be extended.
271+
// Subscription validity can't be extended. Note that returning false
272+
// here also doesn't mean there is a conflict.
253273
return Ok(false);
254274
};
275+
self.end_ts = self.end_ts.max(new_ts);
255276

256277
return Ok(true);
257278
}

0 commit comments

Comments
 (0)