Skip to content

Commit 5d3a217

Browse files
goffrieConvex, Inc.
authored andcommitted
Split DATABASE_WORKERS_MAX_CHECKPOINT_AGE into search and table-summary knobs (#42873)
GitOrigin-RevId: c3e2b97e406cd365e717524150cea40a15157364
1 parent 0e3b229 commit 5d3a217

File tree

7 files changed

+23
-20
lines changed

7 files changed

+23
-20
lines changed

crates/application/src/table_summary_worker.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,10 @@ use common::{
1010
LeaseLostError,
1111
},
1212
knobs::{
13-
DATABASE_WORKERS_MAX_CHECKPOINT_AGE,
1413
DATABASE_WORKERS_MIN_COMMITS,
1514
TABLE_SUMMARY_AGE_JITTER_SECONDS,
1615
TABLE_SUMMARY_BOOTSTRAP_RECENT_THRESHOLD,
16+
TABLE_SUMMARY_MAX_CHECKPOINT_AGE,
1717
},
1818
persistence::Persistence,
1919
runtime::{
@@ -136,12 +136,10 @@ impl<RT: Runtime> TableSummaryWorker<RT> {
136136

137137
fn jittered_max_age(&self) -> Duration {
138138
let max_age_jitter = (*TABLE_SUMMARY_AGE_JITTER_SECONDS)
139-
.min(DATABASE_WORKERS_MAX_CHECKPOINT_AGE.as_secs_f32() / 2.0)
139+
.min(TABLE_SUMMARY_MAX_CHECKPOINT_AGE.as_secs_f32() / 2.0)
140140
* self.runtime.rng().random_range(-1.0..=1.0);
141-
Duration::try_from_secs_f32(
142-
DATABASE_WORKERS_MAX_CHECKPOINT_AGE.as_secs_f32() + max_age_jitter,
143-
)
144-
.unwrap_or_default()
141+
Duration::try_from_secs_f32(TABLE_SUMMARY_MAX_CHECKPOINT_AGE.as_secs_f32() + max_age_jitter)
142+
.unwrap_or_default()
145143
}
146144

147145
async fn go(self, cancel_receiver: oneshot::Receiver<()>, lease_lost_shutdown: ShutdownSignal) {

crates/common/src/knobs.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -524,7 +524,7 @@ pub static SEARCH_INDEX_WORKER_PAGES_PER_SECOND: LazyLock<NonZeroU32> = LazyLock
524524
)
525525
});
526526

527-
/// Don't allow database workers to have more than an hour of uncheckpointed
527+
/// Don't allow search index workers to have more than an hour of uncheckpointed
528528
/// data.
529529
///
530530
/// For search/vector index workers - Note that fast-forwarding will keep the
@@ -535,8 +535,8 @@ pub static SEARCH_INDEX_WORKER_PAGES_PER_SECOND: LazyLock<NonZeroU32> = LazyLock
535535
/// DocumentRevisionStream to build new segments, so this value needs to be low
536536
/// enough to not block the search index flushers for too long, or else writes
537537
/// will start failing. This is why we set this value lower for pro users (10m).
538-
pub static DATABASE_WORKERS_MAX_CHECKPOINT_AGE: LazyLock<Duration> =
539-
LazyLock::new(|| Duration::from_secs(env_config("DATABASE_WORKERS_MAX_CHECKPOINT_AGE", 3600)));
538+
pub static SEARCH_WORKERS_MAX_CHECKPOINT_AGE: LazyLock<Duration> =
539+
LazyLock::new(|| Duration::from_secs(env_config("SEARCH_WORKERS_MAX_CHECKPOINT_AGE", 3600)));
540540

541541
/// Don't fast-forward an index less than ten seconds forward so we don't
542542
/// amplify every commit into another write when the system is under heavy load.
@@ -1237,19 +1237,24 @@ pub static MAX_SEARCHLIGHT_REQUEST_SIZE: LazyLock<usize> =
12371237
/// other than the other workers happens.
12381238
///
12391239
/// We must also ensure that workers advance periodically to ensure that we can
1240-
/// run document retention in the future. Database times are bumped periodically
1240+
/// run document retention. Database times are bumped periodically
12411241
/// even if no writes occur. So any worker that checks this should always have
12421242
/// some maximum period of time after which they checkpoint unconditionally.
12431243
pub static DATABASE_WORKERS_MIN_COMMITS: LazyLock<usize> =
12441244
LazyLock::new(|| env_config("DATABASE_WORKERS_MIN_COMMITS", 500));
12451245

1246+
/// Update table summaries for idle instances once every 4 hours.
1247+
pub static TABLE_SUMMARY_MAX_CHECKPOINT_AGE: LazyLock<Duration> = LazyLock::new(|| {
1248+
Duration::from_secs(env_config("TABLE_SUMMARY_MAX_CHECKPOINT_AGE", 4 * 60 * 60))
1249+
});
1250+
12461251
/// The TableSummaryWorker must checkpoint every
1247-
/// [`DATABASE_WORKERS_MAX_CHECKPOINT_AGE`] seconds even if nothing has changed.
1252+
/// [`TABLE_SUMMARY_MAX_CHECKPOINT_AGE`] seconds even if nothing has changed.
12481253
/// However, to prevent all instances from checkpointing at the same time, we'll
12491254
/// add a jitter of up to ±TABLE_SUMMARY_AGE_JITTER_SECONDS.
12501255
///
12511256
/// Note: the configured value is capped at
1252-
/// `DATABASE_WORKERS_MAX_CHECKPOINT_AGE/2`.
1257+
/// `TABLE_SUMMARY_MAX_CHECKPOINT_AGE/2`.
12531258
pub static TABLE_SUMMARY_AGE_JITTER_SECONDS: LazyLock<f32> =
12541259
LazyLock::new(|| env_config("TABLE_SUMMARY_AGE_JITTER_SECONDS", 900.0));
12551260

crates/database/src/search_index_workers/fast_forward.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@ use common::{
2020
Interval,
2121
},
2222
knobs::{
23-
DATABASE_WORKERS_MAX_CHECKPOINT_AGE,
2423
DATABASE_WORKERS_MIN_COMMITS,
2524
DATABASE_WORKERS_POLL_INTERVAL,
2625
INDEX_WORKERS_INITIAL_BACKOFF,
26+
SEARCH_WORKERS_MAX_CHECKPOINT_AGE,
2727
},
2828
persistence::PersistenceSnapshot,
2929
query::Order,
@@ -177,7 +177,7 @@ impl FastForwardIndexWorker {
177177
if let Some(last_fast_forward_info) = last_fast_forward_info
178178
&& commits_since_load - last_fast_forward_info.observed_commits
179179
< *DATABASE_WORKERS_MIN_COMMITS
180-
&& now - last_fast_forward_info.ts < *DATABASE_WORKERS_MAX_CHECKPOINT_AGE
180+
&& now - last_fast_forward_info.ts < *SEARCH_WORKERS_MAX_CHECKPOINT_AGE
181181
{
182182
tracing::debug!(
183183
"{log_name} not enough commits and too recent to fast forward: {}, {:?}",

crates/database/src/search_index_workers/search_flusher.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ use anyhow::Context;
1616
use common::{
1717
bootstrap_model::index::IndexMetadata,
1818
knobs::{
19-
DATABASE_WORKERS_MAX_CHECKPOINT_AGE,
2019
DEFAULT_DOCUMENTS_PAGE_SIZE,
20+
SEARCH_WORKERS_MAX_CHECKPOINT_AGE,
2121
VECTOR_INDEX_WORKER_PAGE_SIZE,
2222
},
2323
persistence::{
@@ -297,7 +297,7 @@ impl<RT: Runtime, T: SearchIndex + 'static> SearchFlusher<RT, T> {
297297
anyhow::ensure!(ts <= *step_ts);
298298

299299
let index_age = *step_ts - ts;
300-
let too_old = (index_age >= *DATABASE_WORKERS_MAX_CHECKPOINT_AGE
300+
let too_old = (index_age >= *SEARCH_WORKERS_MAX_CHECKPOINT_AGE
301301
&& index_size > 0)
302302
.then_some(BuildReason::TooOld);
303303
if too_old.is_some() {

crates/database/src/tests/randomized_search_tests.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use common::{
2121
IndexMetadata,
2222
},
2323
floating_point::assert_approx_equal,
24-
knobs::DATABASE_WORKERS_MAX_CHECKPOINT_AGE,
24+
knobs::SEARCH_WORKERS_MAX_CHECKPOINT_AGE,
2525
pause::PauseController,
2626
persistence::Persistence,
2727
query::{
@@ -1435,7 +1435,7 @@ async fn test_flushing_does_not_invalidate_subscriptions(rt: TestRuntime) -> any
14351435
// Force a checkpoint by advancing time
14361436
scenario
14371437
.rt
1438-
.advance_time(*DATABASE_WORKERS_MAX_CHECKPOINT_AGE * 2)
1438+
.advance_time(*SEARCH_WORKERS_MAX_CHECKPOINT_AGE * 2)
14391439
.await;
14401440
scenario.insert("new text", "b").await?;
14411441
scenario.backfill().await?;

crates/database/src/text_index_worker/fast_forward.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ pub mod tests {
171171
);
172172

173173
// Check that we fast-forward if we advance time sufficiently far forward past
174-
// DATABASE_WORKERS_MAX_CHECKPOINT_AGE even with no writes.
174+
// SEARCH_WORKERS_MAX_CHECKPOINT_AGE even with no writes.
175175
rt.advance_time(Duration::from_secs(7200)).await;
176176
database.bump_max_repeatable_ts().await?;
177177
let metrics = fast_forward(&rt, database, &mut last_fast_forward_info).await?;

crates/database/src/vector_index_worker/fast_forward.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ mod tests {
176176
);
177177

178178
// Check that we fast-forward if we advance time sufficiently far forward past
179-
// DATABASE_WORKERS_MAX_CHECKPOINT_AGE.
179+
// SEARCH_WORKERS_MAX_CHECKPOINT_AGE.
180180
rt.advance_time(Duration::from_secs(7200)).await;
181181
database.bump_max_repeatable_ts().await?;
182182
let metrics = fast_forward(&rt, &database, &mut last_fast_forward_info).await?;

0 commit comments

Comments
 (0)