Skip to content

Commit 43e682c

Browse files
sshaderConvex, Inc.
authored andcommitted
Add OCC retries to scheduling from actions (#26935)
Pattern matching this off of `cancel_job`, since this seems like it could probably benefit from a few retries. GitOrigin-RevId: 7fe1c5b75726d7f0a018c322dad589feb5a6d3b4
1 parent b01a06e commit 43e682c

File tree

1 file changed

+30
-18
lines changed
  • crates/application/src/application_function_runner

1 file changed

+30
-18
lines changed

crates/application/src/application_function_runner/mod.rs

Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2116,25 +2116,37 @@ impl<RT: Runtime> ActionCallbacks for ApplicationFunctionRunner<RT> {
21162116
scheduled_ts: UnixTimestamp,
21172117
context: ExecutionContext,
21182118
) -> anyhow::Result<DeveloperDocumentId> {
2119-
let mut tx = self.database.begin(identity).await?;
2120-
let (path, udf_args) = validate_schedule_args(
2121-
path,
2122-
udf_args,
2123-
scheduled_ts,
2124-
// Scheduling from actions is not transaction and happens at latest
2125-
// timestamp.
2126-
self.database.runtime().unix_timestamp(),
2127-
&mut tx,
2128-
)
2129-
.await?;
2130-
2131-
let virtual_id = VirtualSchedulerModel::new(&mut tx)
2132-
.schedule(path, udf_args, scheduled_ts, context)
2133-
.await?;
2134-
self.database
2135-
.commit_with_write_source(tx, "app_funrun_schedule_job")
2119+
let (_ts, virtual_id, _stats) = self
2120+
.database
2121+
.execute_with_occ_retries(
2122+
identity,
2123+
FunctionUsageTracker::new(),
2124+
PauseClient::new(),
2125+
"app_funrun_schedule_job",
2126+
|tx| {
2127+
let path = path.clone();
2128+
let args = udf_args.clone();
2129+
let context = context.clone();
2130+
async move {
2131+
let (path, udf_args) = validate_schedule_args(
2132+
path,
2133+
args,
2134+
scheduled_ts,
2135+
// Scheduling from actions is not transaction and happens at latest
2136+
// timestamp.
2137+
self.database.runtime().unix_timestamp(),
2138+
tx,
2139+
)
2140+
.await?;
2141+
let virtual_id = VirtualSchedulerModel::new(tx)
2142+
.schedule(path, udf_args, scheduled_ts, context)
2143+
.await?;
2144+
Ok(virtual_id)
2145+
}
2146+
.into()
2147+
},
2148+
)
21362149
.await?;
2137-
21382150
Ok(virtual_id)
21392151
}
21402152

0 commit comments

Comments
 (0)