@@ -20,6 +20,10 @@ use common::{
2020 knobs:: {
2121 SCHEDULED_JOB_EXECUTION_PARALLELISM ,
2222 SCHEDULED_JOB_GARBAGE_COLLECTION_BATCH_SIZE ,
23+ SCHEDULED_JOB_GARBAGE_COLLECTION_INITIAL_BACKOFF ,
24+ SCHEDULED_JOB_GARBAGE_COLLECTION_MAX_BACKOFF ,
25+ SCHEDULED_JOB_INITIAL_BACKOFF ,
26+ SCHEDULED_JOB_MAX_BACKOFF ,
2327 SCHEDULED_JOB_RETENTION ,
2428 UDF_EXECUTOR_OCC_MAX_RETRIES ,
2529 } ,
@@ -147,9 +151,6 @@ impl<RT: Runtime> ScheduledJobRunner<RT> {
147151 }
148152}
149153
150- const INITIAL_BACKOFF : Duration = Duration :: from_millis ( 10 ) ;
151- const MAX_BACKOFF : Duration = Duration :: from_secs ( 5 ) ;
152-
153154pub struct ScheduledJobExecutor < RT : Runtime > {
154155 context : ScheduledJobContext < RT > ,
155156 pause_client : PauseClient ,
@@ -195,7 +196,8 @@ impl<RT: Runtime> ScheduledJobExecutor<RT> {
195196 pause_client,
196197 } ;
197198 async move {
198- let mut backoff = Backoff :: new ( INITIAL_BACKOFF , MAX_BACKOFF ) ;
199+ let mut backoff =
200+ Backoff :: new ( * SCHEDULED_JOB_INITIAL_BACKOFF , * SCHEDULED_JOB_MAX_BACKOFF ) ;
199201 while let Err ( mut e) = executor. run ( & mut backoff) . await {
200202 let delay = executor. rt . with_rng ( |rng| backoff. fail ( rng) ) ;
201203 tracing:: error!( "Scheduled job executor failed, sleeping {delay:?}" ) ;
@@ -417,7 +419,7 @@ impl<RT: Runtime> ScheduledJobContext<RT> {
417419 job : ScheduledJob ,
418420 job_id : ResolvedDocumentId ,
419421 ) -> ResolvedDocumentId {
420- let mut backoff = Backoff :: new ( INITIAL_BACKOFF , MAX_BACKOFF ) ;
422+ let mut backoff = Backoff :: new ( * SCHEDULED_JOB_INITIAL_BACKOFF , * SCHEDULED_JOB_MAX_BACKOFF ) ;
421423 loop {
422424 // Generate a new request_id for every schedule job execution attempt.
423425 let request_id = RequestId :: new ( ) ;
@@ -731,7 +733,8 @@ impl<RT: Runtime> ScheduledJobContext<RT> {
731733 // Mark the job as completed. Keep trying until we succeed (or
732734 // detect the job state has changed). Don't bubble up the error
733735 // since otherwise we will lose the original execution logs.
734- let mut backoff = Backoff :: new ( INITIAL_BACKOFF , MAX_BACKOFF ) ;
736+ let mut backoff =
737+ Backoff :: new ( * SCHEDULED_JOB_INITIAL_BACKOFF , * SCHEDULED_JOB_MAX_BACKOFF ) ;
735738 while let Err ( mut err) = self
736739 . complete_action ( job_id, & updated_job, usage_tracker. clone ( ) , state. clone ( ) )
737740 . await
@@ -844,7 +847,10 @@ impl<RT: Runtime> ScheduledJobGarbageCollector<RT> {
844847 pub fn start ( rt : RT , database : Database < RT > ) -> impl Future < Output = ( ) > + Send {
845848 let garbage_collector = Self { rt, database } ;
846849 async move {
847- let mut backoff = Backoff :: new ( INITIAL_BACKOFF , MAX_BACKOFF ) ;
850+ let mut backoff = Backoff :: new (
851+ * SCHEDULED_JOB_GARBAGE_COLLECTION_INITIAL_BACKOFF ,
852+ * SCHEDULED_JOB_GARBAGE_COLLECTION_MAX_BACKOFF ,
853+ ) ;
848854 while let Err ( mut e) = garbage_collector. run ( & mut backoff) . await {
849855 let delay = garbage_collector. rt . with_rng ( |rng| backoff. fail ( rng) ) ;
850856 tracing:: error!( "Scheduled job garbage collector failed, sleeping {delay:?}" ) ;
0 commit comments