Skip to content

Commit a54f04d

Browse files
jordanhunt22Convex, Inc.
authored andcommitted
[Bugfix] Propagate lease lost errors from TableSummaryWorker (#38952)
Propagate the lease lost error and properly shutdown when we hit the error in `TableSummaryWorker`. We were seeing instances repeatedly hit this error and not shutdown. GitOrigin-RevId: 0043d518a03616a7d08d6d072eac358a550692df
1 parent 87ee630 commit a54f04d

File tree

4 files changed

+25
-6
lines changed

4 files changed

+25
-6
lines changed

crates/application/src/lib.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ use common::{
108108
DatabaseSchema,
109109
TableDefinition,
110110
},
111+
shutdown::ShutdownSignal,
111112
types::{
112113
env_var_limit_met,
113114
env_var_name_not_unique,
@@ -684,6 +685,7 @@ impl<RT: Runtime> Application<RT> {
684685
cache: QueryCache,
685686
fetch_client: Arc<dyn FetchClient>,
686687
local_log_sink: Option<String>,
688+
lease_lost_shutdown: ShutdownSignal,
687689
) -> anyhow::Result<Self> {
688690
let module_cache =
689691
ModuleCache::new(runtime.clone(), application_storage.modules_storage.clone()).await;
@@ -717,8 +719,12 @@ impl<RT: Runtime> Application<RT> {
717719
let search_worker = Arc::new(Mutex::new(search_worker));
718720
let search_and_vector_bootstrap_worker =
719721
Arc::new(Mutex::new(database.start_search_and_vector_bootstrap()));
720-
let table_summary_worker =
721-
TableSummaryWorker::start(runtime.clone(), database.clone(), persistence.clone());
722+
let table_summary_worker = TableSummaryWorker::start(
723+
runtime.clone(),
724+
database.clone(),
725+
persistence.clone(),
726+
lease_lost_shutdown,
727+
);
722728
let schema_worker = Arc::new(Mutex::new(runtime.spawn(
723729
"schema_worker",
724730
SchemaWorker::start(runtime.clone(), database.clone()),

crates/application/src/table_summary_worker.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,11 @@ use std::{
44
};
55

66
use common::{
7-
errors::report_error,
7+
errors::{
8+
lease_lost_error,
9+
report_error,
10+
LeaseLostError,
11+
},
812
knobs::{
913
DATABASE_WORKERS_MAX_CHECKPOINT_AGE,
1014
DATABASE_WORKERS_MIN_COMMITS,
@@ -17,6 +21,7 @@ use common::{
1721
SpawnHandle,
1822
UnixTimestamp,
1923
},
24+
shutdown::ShutdownSignal,
2025
};
2126
use database::{
2227
table_summary::write_snapshot,
@@ -64,6 +69,7 @@ impl<RT: Runtime> TableSummaryWorker<RT> {
6469
runtime: RT,
6570
database: Database<RT>,
6671
persistence: Arc<dyn Persistence>,
72+
lease_lost_shutdown: ShutdownSignal,
6773
) -> TableSummaryClient {
6874
let table_summary_worker = Self {
6975
runtime: runtime.clone(),
@@ -73,7 +79,7 @@ impl<RT: Runtime> TableSummaryWorker<RT> {
7379
let (cancel_sender, cancel_receiver) = oneshot::channel();
7480
let handle = runtime.spawn(
7581
"table_summary_worker",
76-
table_summary_worker.go(cancel_receiver),
82+
table_summary_worker.go(cancel_receiver, lease_lost_shutdown),
7783
);
7884
let inner = Inner {
7985
handle,
@@ -127,7 +133,7 @@ impl<RT: Runtime> TableSummaryWorker<RT> {
127133
Ok(())
128134
}
129135

130-
async fn go(self, cancel_receiver: oneshot::Receiver<()>) {
136+
async fn go(self, cancel_receiver: oneshot::Receiver<()>, lease_lost_shutdown: ShutdownSignal) {
131137
tracing::info!("Starting background table summary worker");
132138
let mut timer = Some(table_summary_bootstrap_timer());
133139
let cancel_fut = cancel_receiver.fuse();
@@ -166,6 +172,11 @@ impl<RT: Runtime> TableSummaryWorker<RT> {
166172
}
167173
if let Err(mut err) = result {
168174
report_error(&mut err).await;
175+
if let Some(LeaseLostError) = err.downcast_ref() {
176+
lease_lost_shutdown.signal(
177+
lease_lost_error().context("Failed to write table summary checkpoint"),
178+
);
179+
}
169180
}
170181
let wait_fut = self.runtime.wait(Duration::from_secs(10)).fuse();
171182
pin_mut!(wait_fut);

crates/application/src/test_helpers.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,7 @@ impl<RT: Runtime> ApplicationTestExt<RT> for Application<RT> {
273273
QueryCache::new(*UDF_CACHE_MAX_SIZE),
274274
fetch_client,
275275
None, // local_log_sink
276+
ShutdownSignal::panic(),
276277
)
277278
.await?;
278279

crates/local_backend/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ pub async fn make_app(
149149
persistence.clone(),
150150
runtime.clone(),
151151
searcher.clone(),
152-
preempt_tx,
152+
preempt_tx.clone(),
153153
virtual_system_mapping().clone(),
154154
Arc::new(NoOpUsageEventLogger),
155155
)
@@ -231,6 +231,7 @@ pub async fn make_app(
231231
QueryCache::new(*UDF_CACHE_MAX_SIZE),
232232
fetch_client,
233233
config.local_log_sink.clone(),
234+
preempt_tx.clone(),
234235
)
235236
.await?;
236237

0 commit comments

Comments
 (0)