Skip to content

Commit 4f285a9

Browse files
goffrieConvex, Inc.
authored andcommitted
Simplify LeaderRetentionManager::new (#42288)
GitOrigin-RevId: c25ec8f7c1143b06d2926c69597c1d5c609c0d83
1 parent 21fffc8 commit 4f285a9

File tree

2 files changed

+23
-40
lines changed

2 files changed

+23
-40
lines changed

crates/database/src/database.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -995,6 +995,7 @@ impl<RT: Runtime> Database<RT> {
995995
let retention_manager = LeaderRetentionManager::new(
996996
runtime.clone(),
997997
persistence.clone(),
998+
bootstrap_metadata.clone(),
998999
snapshot_reader.clone(),
9991000
follower_retention_manager,
10001001
shutdown.clone(),

crates/database/src/retention.rs

Lines changed: 22 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,6 @@ use common::{
9696
IndexId,
9797
PersistenceVersion,
9898
RepeatableTimestamp,
99-
TabletIndexName,
10099
Timestamp,
101100
},
102101
value::{
@@ -152,6 +151,7 @@ use crate::{
152151
retention_delete_timer,
153152
},
154153
snapshot_manager::SnapshotManager,
154+
BootstrapMetadata,
155155
};
156156

157157
#[derive(Debug, Clone, Copy)]
@@ -201,7 +201,6 @@ impl Checkpoint {
201201
pub struct LeaderRetentionManager<RT: Runtime> {
202202
rt: RT,
203203
bounds_reader: Reader<SnapshotBounds>,
204-
index_table_id: TabletId,
205204
checkpoint_reader: Reader<Checkpoint>,
206205
document_checkpoint_reader: Reader<Checkpoint>,
207206
handles: Arc<Mutex<Vec<Box<dyn SpawnHandle>>>>,
@@ -212,7 +211,6 @@ impl<RT: Runtime> Clone for LeaderRetentionManager<RT> {
212211
Self {
213212
rt: self.rt.clone(),
214213
bounds_reader: self.bounds_reader.clone(),
215-
index_table_id: self.index_table_id,
216214
checkpoint_reader: self.checkpoint_reader.clone(),
217215
document_checkpoint_reader: self.document_checkpoint_reader.clone(),
218216
handles: self.handles.clone(),
@@ -251,21 +249,22 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
251249
pub async fn new(
252250
rt: RT,
253251
persistence: Arc<dyn Persistence>,
252+
bootstrap_metadata: BootstrapMetadata,
254253
snapshot_reader: Reader<SnapshotManager>,
255254
follower_retention_manager: FollowerRetentionManager<RT>,
256255
lease_lost_shutdown: ShutdownSignal,
257256
retention_rate_limiter: Arc<RateLimiter<RT>>,
258257
) -> anyhow::Result<LeaderRetentionManager<RT>> {
259258
let reader = persistence.reader();
260-
let snapshot_ts = snapshot_reader.lock().persisted_max_repeatable_ts();
261-
let min_snapshot_ts = snapshot_ts.prior_ts(
259+
let latest_ts = snapshot_reader.lock().latest_ts();
260+
let min_index_snapshot_ts = latest_ts.prior_ts(
262261
latest_retention_min_snapshot_ts(reader.as_ref(), RetentionType::Index).await?,
263262
)?;
264-
let min_document_snapshot_ts = snapshot_ts.prior_ts(
263+
let min_document_snapshot_ts = latest_ts.prior_ts(
265264
latest_retention_min_snapshot_ts(reader.as_ref(), RetentionType::Document).await?,
266265
)?;
267266
let bounds = SnapshotBounds {
268-
min_index_snapshot_ts: min_snapshot_ts,
267+
min_index_snapshot_ts,
269268
min_document_snapshot_ts,
270269
};
271270
let (bounds_reader, bounds_writer) = new_split_rw_lock(bounds);
@@ -275,43 +274,29 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
275274
let (document_checkpoint_reader, document_checkpoint_writer) =
276275
new_split_rw_lock(document_checkpoint);
277276

278-
let snapshot = snapshot_reader.lock().latest_snapshot();
279-
let index_registry = snapshot.index_registry;
280-
let meta_index_id = index_registry
281-
.enabled_index_metadata(&TabletIndexName::by_id(index_registry.index_table()))
282-
.expect("meta index id must exist")
283-
.id()
284-
.internal_id();
277+
let index_table_id = bootstrap_metadata.index_tablet_id;
285278
let follower_retention_manager = Arc::new(follower_retention_manager);
286-
let mut index_table_id = None;
287279
// We need to delete from all indexes that might be queried.
288-
// Therefore we scan _index.by_id at min_snapshot_ts before min_snapshot_ts
289-
// starts moving, and update the map before confirming any deletes.
280+
// Therefore we scan _index.by_id at min_index_snapshot_ts before
281+
// min_index_snapshot_ts starts moving, and update the map before
282+
// confirming any deletes.
290283
let mut all_indexes = {
291-
let reader = persistence.reader();
292-
let snapshot_ts = min_snapshot_ts;
293-
let reader =
294-
RepeatablePersistence::new(reader, snapshot_ts, follower_retention_manager.clone());
295-
let reader = reader.read_snapshot(snapshot_ts)?;
296284
let mut meta_index_scan = reader.index_scan(
297-
meta_index_id,
298-
index_registry.index_table(),
285+
bootstrap_metadata.index_by_id,
286+
bootstrap_metadata.index_tablet_id,
287+
*min_index_snapshot_ts,
299288
&Interval::all(),
300289
Order::Asc,
301290
usize::MAX,
291+
follower_retention_manager.clone(),
302292
);
303293
let mut indexes = BTreeMap::new();
304294
while let Some((_, rev)) = meta_index_scan.try_next().await? {
305-
let table_id = rev.value.id().tablet_id;
306-
index_table_id = Some(table_id);
307-
Self::accumulate_index_document(Some(rev.value), &mut indexes, table_id)?;
295+
Self::accumulate_index_document(rev.value, &mut indexes)?;
308296
}
309297
indexes
310298
};
311-
let index_table_id =
312-
index_table_id.ok_or_else(|| anyhow::anyhow!("there must be at least one index"))?;
313-
let mut index_cursor = min_snapshot_ts;
314-
let latest_ts = snapshot_reader.lock().persisted_max_repeatable_ts();
299+
let mut index_cursor = min_index_snapshot_ts;
315300
// Also update the set of indexes up to the current timestamp before document
316301
// retention starts moving.
317302
Self::accumulate_indexes(
@@ -324,7 +309,7 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
324309
)
325310
.await?;
326311

327-
let (send_min_snapshot, receive_min_snapshot) = watch::channel(min_snapshot_ts);
312+
let (send_min_snapshot, receive_min_snapshot) = watch::channel(min_index_snapshot_ts);
328313
let (send_min_document_snapshot, receive_min_document_snapshot) =
329314
watch::channel(min_document_snapshot_ts);
330315
let advance_min_snapshot_handle = rt.spawn(
@@ -370,7 +355,6 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
370355
Ok(Self {
371356
rt,
372357
bounds_reader,
373-
index_table_id,
374358
checkpoint_reader,
375359
document_checkpoint_reader,
376360
handles: Arc::new(Mutex::new(vec![
@@ -1375,14 +1359,9 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
13751359
}
13761360

13771361
fn accumulate_index_document(
1378-
maybe_doc: Option<ResolvedDocument>,
1362+
doc: ResolvedDocument,
13791363
all_indexes: &mut BTreeMap<IndexId, (GenericIndexName<TabletId>, IndexedFields)>,
1380-
index_tablet_id: TabletId,
13811364
) -> anyhow::Result<()> {
1382-
let Some(doc) = maybe_doc else {
1383-
return Ok(());
1384-
};
1385-
anyhow::ensure!(doc.id().tablet_id == index_tablet_id);
13861365
let index_id = doc.id().internal_id();
13871366
let index: ParsedDocument<IndexMetadata<TabletId>> = doc.parse()?;
13881367
let index = index.into_value();
@@ -1427,7 +1406,10 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
14271406
retention_validator,
14281407
);
14291408
while let Some(entry) = document_stream.try_next().await? {
1430-
Self::accumulate_index_document(entry.value, all_indexes, index_table_id)?;
1409+
if let Some(doc) = entry.value {
1410+
anyhow::ensure!(doc.id().tablet_id == index_table_id);
1411+
Self::accumulate_index_document(doc, all_indexes)?;
1412+
}
14311413
}
14321414
*cursor = latest_ts;
14331415
Ok(())

0 commit comments

Comments
 (0)