Skip to content

Commit dba5fa8

Browse files
emmaling27Convex, Inc.
authored andcommitted
delete_scheduled_functions_table endpoint (#43075)
Adds `/delete_scheduled_jobs_table` endpoint for fast O(1) deletion of all scheduled jobs. This helps when the scheduler gets in a loop, scheduling the same function recursively at such a high rate that the cancel all button doesn't work. This is destructive (removes all scheduled job state, unlike writing the scheduled job state as "canceled" in `/cancel_scheduled_jobs`). GitOrigin-RevId: c1f0e99e57a30081fe5f821d7e35149930800a2e
1 parent bf998db commit dba5fa8

File tree

7 files changed

+179
-8
lines changed

7 files changed

+179
-8
lines changed

crates/application/src/lib.rs

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,10 @@ use model::{
271271
},
272272
ModuleModel,
273273
},
274-
scheduled_jobs::SchedulerModel,
274+
scheduled_jobs::{
275+
ScheduledJobsTable,
276+
SchedulerModel,
277+
},
275278
session_requests::types::SessionRequestIdentifier,
276279
snapshot_imports::types::{
277280
ImportFormat,
@@ -2953,6 +2956,30 @@ impl<RT: Runtime> Application<RT> {
29532956
self.function_log.scheduled_job_lag(window)
29542957
}
29552958

2959+
pub async fn delete_scheduled_jobs_table(
2960+
&self,
2961+
identity: Identity,
2962+
component_id: ComponentId,
2963+
) -> anyhow::Result<()> {
2964+
anyhow::ensure!(
2965+
identity.is_admin() || identity.is_system(),
2966+
unauthorized_error("delete_scheduled_jobs_table")
2967+
);
2968+
let mut tx = self.begin(identity).await?;
2969+
let mut model = TableModel::new(&mut tx);
2970+
model
2971+
.replace_with_empty_table(ScheduledJobsTable, component_id.into())
2972+
.await?;
2973+
let component = tx.must_component_path(component_id)?;
2974+
self.commit_with_audit_log_events(
2975+
tx,
2976+
vec![DeploymentAuditLogEvent::DeleteScheduledJobsTable { component }],
2977+
"delete_scheduled_jobs_table",
2978+
)
2979+
.await?;
2980+
Ok(())
2981+
}
2982+
29562983
pub async fn cancel_all_jobs(
29572984
&self,
29582985
component_id: ComponentId,

crates/application/src/tests/scheduled_jobs.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -401,3 +401,26 @@ async fn test_scheduled_job_retry(
401401
assert_eq!(state, ScheduledJobState::Success);
402402
Ok(())
403403
}
404+
405+
#[convex_macro::test_runtime]
406+
async fn test_delete_scheduled_jobs_table(rt: TestRuntime) -> anyhow::Result<()> {
407+
let application = Application::new_for_tests(&rt).await?;
408+
let mut tx = application.begin(Identity::system()).await?;
409+
create_scheduled_job(&rt, &mut tx, insert_object_path()).await?;
410+
application.commit_test(tx).await?;
411+
412+
let mut tx = application.begin(Identity::system()).await?;
413+
let mut model = SchedulerModel::new(&mut tx, TableNamespace::test_user());
414+
let scheduled_jobs = model.list().await?;
415+
assert_eq!(scheduled_jobs.len(), 1);
416+
417+
application
418+
.delete_scheduled_jobs_table(Identity::system(), ComponentId::Root)
419+
.await?;
420+
let mut tx = application.begin(Identity::system()).await?;
421+
let mut model = SchedulerModel::new(&mut tx, TableNamespace::test_user());
422+
let scheduled_jobs = model.list().await?;
423+
assert!(scheduled_jobs.is_empty());
424+
425+
Ok(())
426+
}

crates/database/src/bootstrap_model/table.rs

Lines changed: 42 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -217,26 +217,28 @@ impl<'a, RT: Runtime> TableModel<'a, RT> {
217217
.namespace(namespace)
218218
.id(&table_name)?;
219219
self.delete_table_by_id_bypassing_schema_enforcement(table_id_and_number.tablet_id)
220-
.await
220+
.await?;
221+
Ok(())
221222
}
222223

223224
pub async fn delete_hidden_table(&mut self, tablet_id: TabletId) -> anyhow::Result<()> {
224225
let table_metadata = self.get_table_metadata(tablet_id).await?;
225226
// We don't need to validate hidden table with the schema.
226227
anyhow::ensure!(table_metadata.state == TableState::Hidden);
227228
self.delete_table_by_id_bypassing_schema_enforcement(tablet_id)
228-
.await
229+
.await?;
230+
Ok(())
229231
}
230232

231-
pub async fn delete_table(&mut self, tablet_id: TabletId) -> anyhow::Result<()> {
233+
pub async fn delete_table(&mut self, tablet_id: TabletId) -> anyhow::Result<TableNumber> {
232234
self.delete_table_by_id_bypassing_schema_enforcement(tablet_id)
233235
.await
234236
}
235237

236238
async fn delete_table_by_id_bypassing_schema_enforcement(
237239
&mut self,
238240
tablet_id: TabletId,
239-
) -> anyhow::Result<()> {
241+
) -> anyhow::Result<TableNumber> {
240242
for index in IndexModel::new(self.tx)
241243
.all_indexes_on_table(tablet_id)
242244
.await?
@@ -258,7 +260,7 @@ impl<'a, RT: Runtime> TableModel<'a, RT> {
258260
SystemMetadataModel::new_global(self.tx)
259261
.replace(table_doc_id, updated_table_metadata.try_into()?)
260262
.await?;
261-
Ok(())
263+
Ok(table_metadata.number)
262264
}
263265

264266
pub async fn get_table_metadata(
@@ -418,7 +420,7 @@ impl<'a, RT: Runtime> TableModel<'a, RT> {
418420
self.delete_table_by_id_bypassing_schema_enforcement(
419421
existing_table_by_name.tablet_id,
420422
)
421-
.await?
423+
.await?;
422424
}
423425
table_metadatas.push(table_metadata);
424426
}
@@ -453,6 +455,40 @@ impl<'a, RT: Runtime> TableModel<'a, RT> {
453455
Ok(())
454456
}
455457

458+
pub async fn replace_with_empty_table<S: SystemTable>(
459+
&mut self,
460+
_system_table: S,
461+
namespace: TableNamespace,
462+
) -> anyhow::Result<()> {
463+
let tablet_id = self
464+
.tx
465+
.table_mapping()
466+
.namespace(namespace)
467+
.name_to_tablet()(S::table_name().clone())?;
468+
469+
let table_number = self.delete_table(tablet_id).await?;
470+
self._insert_table_metadata(
471+
namespace,
472+
S::table_name(),
473+
Some(table_number),
474+
TableState::Active,
475+
)
476+
.await?;
477+
let mut index_model = IndexModel::new(self.tx);
478+
for index in S::indexes() {
479+
let index_metadata = IndexMetadata::new_enabled(
480+
index
481+
.name
482+
.map_table(&|_| anyhow::Ok(S::table_name().clone()))?,
483+
index.fields,
484+
);
485+
index_model
486+
.add_system_index(namespace, index_metadata)
487+
.await?;
488+
}
489+
Ok(())
490+
}
491+
456492
pub async fn insert_table_for_import(
457493
&mut self,
458494
namespace: TableNamespace,

crates/local_backend/src/dashboard.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,10 @@ use crate::{
5959
export_value,
6060
UdfResponse,
6161
},
62+
scheduling::{
63+
__path_delete_scheduled_functions_table,
64+
delete_scheduled_functions_table,
65+
},
6266
schema::IndexMetadataResponse,
6367
LocalAppState,
6468
};
@@ -354,4 +358,5 @@ where
354358
.routes(utoipa_axum::routes!(delete_tables))
355359
.routes(utoipa_axum::routes!(delete_component))
356360
.routes(utoipa_axum::routes!(get_source_code))
361+
.routes(utoipa_axum::routes!(delete_scheduled_functions_table))
357362
}

crates/local_backend/src/scheduling.rs

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,10 @@ use common::{
1111
ComponentPath,
1212
},
1313
http::{
14-
extract::Json,
14+
extract::{
15+
Json,
16+
MtState,
17+
},
1518
HttpResponseError,
1619
},
1720
};
@@ -26,6 +29,7 @@ use serde::{
2629
Serialize,
2730
};
2831
use sync_types::Timestamp;
32+
use utoipa::ToSchema;
2933
use value::TableNamespace;
3034

3135
use crate::{
@@ -140,3 +144,30 @@ pub async fn cancel_job(
140144

141145
Ok(StatusCode::OK)
142146
}
147+
148+
#[derive(Deserialize, ToSchema)]
149+
#[serde(rename_all = "camelCase")]
150+
pub struct DeleteScheduledFunctionsTableRequest {
151+
pub component_id: Option<String>,
152+
}
153+
154+
#[utoipa::path(
155+
post,
156+
path = "/delete_scheduled_functions_table",
157+
request_body = DeleteScheduledFunctionsTableRequest,
158+
responses((status = 200))
159+
)]
160+
pub async fn delete_scheduled_functions_table(
161+
MtState(st): MtState<LocalAppState>,
162+
ExtractIdentity(identity): ExtractIdentity,
163+
Json(DeleteScheduledFunctionsTableRequest { component_id }): Json<
164+
DeleteScheduledFunctionsTableRequest,
165+
>,
166+
) -> Result<impl IntoResponse, HttpResponseError> {
167+
must_be_admin_with_write_access(&identity)?;
168+
let component_id = ComponentId::deserialize_from_string(component_id.as_deref())?;
169+
st.application
170+
.delete_scheduled_jobs_table(identity, component_id)
171+
.await?;
172+
Ok(StatusCode::OK)
173+
}

crates/model/src/deployment_audit_log/types.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,9 @@ pub enum DeploymentAuditLogEvent {
194194
table_names_deleted: BTreeMap<ComponentPath, Vec<TableName>>,
195195
table_count_deleted: u64,
196196
},
197+
DeleteScheduledJobsTable {
198+
component: ComponentPath,
199+
},
197200
}
198201

199202
impl From<IndexDiff> for DeploymentAuditLogEvent {
@@ -247,6 +250,9 @@ impl DeploymentAuditLogEvent {
247250
DeploymentAuditLogEvent::ChangeDeploymentState { .. } => "change_deployment_state",
248251
DeploymentAuditLogEvent::SnapshotImport { .. } => "snapshot_import",
249252
DeploymentAuditLogEvent::ClearTables => "clear_tables",
253+
DeploymentAuditLogEvent::DeleteScheduledJobsTable { .. } => {
254+
"delete_scheduled_jobs_table"
255+
},
250256
}
251257
}
252258

@@ -375,6 +381,12 @@ impl DeploymentAuditLogEvent {
375381
)
376382
},
377383
DeploymentAuditLogEvent::ClearTables => obj!(),
384+
DeploymentAuditLogEvent::DeleteScheduledJobsTable { component } => {
385+
let component = component.serialize();
386+
obj!(
387+
"component" => component
388+
)
389+
},
378390
}
379391
}
380392

@@ -513,6 +525,12 @@ impl TryFrom<ConvexObject> for DeploymentAuditLogEvent {
513525
table_count_deleted: remove_int64(&mut fields, "table_count_deleted")? as u64,
514526
}
515527
},
528+
"delete_scheduled_jobs_table" => {
529+
let component = ComponentPath::deserialize(
530+
remove_nullable_string(&mut fields, "component")?.as_deref(),
531+
)?;
532+
DeploymentAuditLogEvent::DeleteScheduledJobsTable { component }
533+
},
516534
_ => anyhow::bail!("action {action} unrecognized"),
517535
};
518536
Ok(event)

npm-packages/dashboard/dashboard-deployment-openapi.json

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,26 @@
157157
}
158158
}
159159
},
160+
"/delete_scheduled_functions_table": {
161+
"post": {
162+
"operationId": "delete_scheduled_functions_table",
163+
"requestBody": {
164+
"content": {
165+
"application/json": {
166+
"schema": {
167+
"$ref": "#/components/schemas/DeleteScheduledFunctionsTableRequest"
168+
}
169+
}
170+
},
171+
"required": true
172+
},
173+
"responses": {
174+
"200": {
175+
"description": ""
176+
}
177+
}
178+
}
179+
},
160180
"/check_admin_key": {
161181
"get": {
162182
"tags": [
@@ -191,6 +211,17 @@
191211
}
192212
}
193213
},
214+
"DeleteScheduledFunctionsTableRequest": {
215+
"type": "object",
216+
"properties": {
217+
"componentId": {
218+
"type": [
219+
"string",
220+
"null"
221+
]
222+
}
223+
}
224+
},
194225
"DeleteTableArgs": {
195226
"type": "object",
196227
"required": [

0 commit comments

Comments
 (0)