Skip to content

Commit fe33239

Browse files
ldanilekConvex, Inc.
authored andcommitted
[cluster migration] run retention during migration (#24504)
GitOrigin-RevId: 9146f965fb2009492d0e06b04e2fcc105e6e840e
1 parent b2f1ecd commit fe33239

File tree

4 files changed

+75
-27
lines changed

4 files changed

+75
-27
lines changed

crates/database/src/index_worker.rs

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -925,18 +925,15 @@ impl<RT: Runtime> IndexWriter<RT> {
925925
let min_snapshot_ts = self.retention_validator.min_snapshot_ts().await?;
926926
let all_indexes = btreemap! { index_id => (index_name, indexed_fields) };
927927
// TODO(lee) add checkpointing.
928-
let mut cursor_ts = backfill_begin_ts;
929-
while cursor_ts.succ()? < min_snapshot_ts {
930-
(cursor_ts, _) = LeaderRetentionManager::delete(
931-
min_snapshot_ts,
932-
self.persistence.clone(),
933-
&self.runtime,
934-
cursor_ts,
935-
&all_indexes,
936-
self.retention_validator.clone(),
937-
)
938-
.await?;
939-
}
928+
LeaderRetentionManager::delete_all_no_checkpoint(
929+
backfill_begin_ts,
930+
min_snapshot_ts,
931+
self.persistence.clone(),
932+
&self.runtime,
933+
&all_indexes,
934+
self.retention_validator.clone(),
935+
)
936+
.await?;
940937
Ok(())
941938
}
942939
}

crates/database/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,8 @@ pub use self::{
153153
retention::{
154154
latest_retention_min_snapshot_ts,
155155
FollowerRetentionManager,
156+
LeaderRetentionManager,
157+
RetentionType,
156158
},
157159
snapshot_manager::{
158160
Snapshot,

crates/database/src/retention.rs

Lines changed: 46 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -623,7 +623,7 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
623623
/// entries is the number of index entries we found were expired, not
624624
/// necessarily the total we deleted or wanted to delete, though they're
625625
/// correlated.
626-
pub(crate) async fn delete(
626+
async fn delete(
627627
min_snapshot_ts: Timestamp,
628628
persistence: Arc<dyn Persistence>,
629629
rt: &RT,
@@ -690,6 +690,32 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
690690
.map(|timestamp| (timestamp, total_expired_entries))
691691
}
692692

693+
pub async fn delete_all_no_checkpoint(
694+
mut cursor_ts: Timestamp,
695+
min_snapshot_ts: Timestamp,
696+
persistence: Arc<dyn Persistence>,
697+
rt: &RT,
698+
all_indexes: &BTreeMap<IndexId, (GenericIndexName<TableId>, IndexedFields)>,
699+
retention_validator: Arc<dyn RetentionValidator>,
700+
) -> anyhow::Result<()> {
701+
while cursor_ts.succ()? < min_snapshot_ts {
702+
let (new_cursor_ts, _) = Self::delete(
703+
min_snapshot_ts,
704+
persistence.clone(),
705+
rt,
706+
cursor_ts,
707+
all_indexes,
708+
retention_validator.clone(),
709+
)
710+
.await?;
711+
tracing::info!(
712+
"custom index retention completed between ts {cursor_ts} and {new_cursor_ts}"
713+
);
714+
cursor_ts = new_cursor_ts;
715+
}
716+
Ok(())
717+
}
718+
693719
#[try_stream(ok = (Timestamp, InternalDocumentId), error = anyhow::Error)]
694720
async fn expired_documents(
695721
rt: &RT,
@@ -1222,9 +1248,8 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
12221248
Ok(())
12231249
}
12241250

1225-
async fn get_checkpoint(
1251+
pub async fn get_checkpoint_no_logging(
12261252
persistence: &dyn PersistenceReader,
1227-
snapshot_reader: Reader<SnapshotManager>,
12281253
retention_type: RetentionType,
12291254
) -> anyhow::Result<Timestamp> {
12301255
let key = match retention_type {
@@ -1239,24 +1264,30 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
12391264
.map(ConvexValue::try_from)
12401265
.transpose()?;
12411266
let checkpoint = match checkpoint_value {
1242-
Some(ConvexValue::Int64(ts)) => {
1243-
let checkpoint = Timestamp::try_from(ts)?;
1244-
match retention_type {
1245-
RetentionType::Document => log_document_retention_cursor_age(
1246-
(*snapshot_reader.lock().latest_ts()).secs_since_f64(checkpoint),
1247-
),
1248-
RetentionType::Index => log_retention_cursor_age(
1249-
(*snapshot_reader.lock().latest_ts()).secs_since_f64(checkpoint),
1250-
),
1251-
}
1252-
checkpoint
1253-
},
1267+
Some(ConvexValue::Int64(ts)) => Timestamp::try_from(ts)?,
12541268
None => Timestamp::MIN,
12551269
_ => anyhow::bail!("invalid retention checkpoint {checkpoint_value:?}"),
12561270
};
12571271
Ok(checkpoint)
12581272
}
12591273

1274+
async fn get_checkpoint(
1275+
persistence: &dyn PersistenceReader,
1276+
snapshot_reader: Reader<SnapshotManager>,
1277+
retention_type: RetentionType,
1278+
) -> anyhow::Result<Timestamp> {
1279+
let checkpoint = Self::get_checkpoint_no_logging(persistence, retention_type).await?;
1280+
match retention_type {
1281+
RetentionType::Document => log_document_retention_cursor_age(
1282+
(*snapshot_reader.lock().latest_ts()).secs_since_f64(checkpoint),
1283+
),
1284+
RetentionType::Index => log_retention_cursor_age(
1285+
(*snapshot_reader.lock().latest_ts()).secs_since_f64(checkpoint),
1286+
),
1287+
}
1288+
Ok(checkpoint)
1289+
}
1290+
12601291
fn accumulate_index_document(
12611292
maybe_doc: Option<ResolvedDocument>,
12621293
all_indexes: &mut BTreeMap<IndexId, (GenericIndexName<TableId>, IndexedFields)>,

crates/indexing/src/index_registry.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use common::{
1212
database_index::{
1313
DatabaseIndexState,
1414
DeveloperDatabaseIndexConfig,
15+
IndexedFields,
1516
},
1617
DeveloperIndexConfig,
1718
IndexConfig,
@@ -405,6 +406,23 @@ impl IndexRegistry {
405406
.map(|index| index.metadata())
406407
}
407408

409+
pub fn all_database_index_configs(
410+
&self,
411+
) -> BTreeMap<IndexId, (TabletIndexName, IndexedFields)> {
412+
self.all_indexes()
413+
.filter_map(|index| {
414+
let index_id = index.id().internal_id();
415+
let index_name = index.name.clone();
416+
match &index.config {
417+
IndexConfig::Database {
418+
developer_config, ..
419+
} => Some((index_id, (index_name, developer_config.fields.clone()))),
420+
IndexConfig::Search { .. } | IndexConfig::Vector { .. } => None,
421+
}
422+
})
423+
.collect()
424+
}
425+
408426
pub fn all_enabled_indexes(&self) -> Vec<ParsedDocument<TabletIndexMetadata>> {
409427
self.enabled_indexes
410428
.values()

0 commit comments

Comments
 (0)