@@ -44,6 +44,7 @@ use common::{
4444 RequestId ,
4545} ;
4646use database:: {
47+ BootstrapComponentsModel ,
4748 Database ,
4849 ResolvedQuery ,
4950 Transaction ,
@@ -54,7 +55,9 @@ use futures::{
5455 select_biased,
5556 Future ,
5657 FutureExt ,
58+ TryStreamExt ,
5759} ;
60+ use futures_async_stream:: try_stream;
5861use isolate:: JsonPackedValue ;
5962use keybroker:: Identity ;
6063use minitrace:: future:: FutureExt as _;
@@ -71,6 +74,7 @@ use model::{
7174 } ,
7275 CronModel ,
7376 CRON_JOBS_INDEX_BY_NEXT_TS ,
77+ CRON_JOBS_TABLE ,
7478 } ,
7579 modules:: ModuleModel ,
7680} ;
@@ -210,16 +214,8 @@ impl<RT: Runtime> CronJobExecutor<RT> {
210214 job_finished_tx : & mpsc:: Sender < ResolvedDocumentId > ,
211215 ) -> anyhow:: Result < Option < Timestamp > > {
212216 let now = self . rt . generate_timestamp ( ) ?;
213- let index_query = Query :: index_range ( IndexRange {
214- index_name : CRON_JOBS_INDEX_BY_NEXT_TS . clone ( ) ,
215- range : vec ! [ ] ,
216- order : Order :: Asc ,
217- } ) ;
218- let mut query_stream =
219- ResolvedQuery :: new ( tx, TableNamespace :: by_component_TODO ( ) , index_query) ?;
220-
221- while let Some ( doc) = query_stream. next ( tx, None ) . await ? {
222- let job: ParsedDocument < CronJob > = doc. try_into ( ) ?;
217+ let mut job_stream = self . stream_jobs_to_run ( tx) ;
218+ while let Some ( job) = job_stream. try_next ( ) . await ? {
223219 let ( job_id, job) = job. clone ( ) . into_id_and_value ( ) ;
224220 if running_job_ids. contains ( & job_id) {
225221 continue ;
@@ -250,6 +246,42 @@ impl<RT: Runtime> CronJobExecutor<RT> {
250246 Ok ( None )
251247 }
252248
249+ #[ try_stream( boxed, ok = ParsedDocument <CronJob >, error = anyhow:: Error ) ]
250+ async fn stream_jobs_to_run < ' a > ( & ' a self , tx : & ' a mut Transaction < RT > ) {
251+ let namespaces: Vec < _ > = tx
252+ . table_mapping ( )
253+ . iter ( )
254+ . filter ( |( _, _, _, name) | * * name == * CRON_JOBS_TABLE )
255+ . map ( |( _, namespace, ..) | namespace)
256+ . collect ( ) ;
257+ let index_query = Query :: index_range ( IndexRange {
258+ index_name : CRON_JOBS_INDEX_BY_NEXT_TS . clone ( ) ,
259+ range : vec ! [ ] ,
260+ order : Order :: Asc ,
261+ } ) ;
262+ // Key is (next_ts, namespace), where next_ts is for sorting and namespace
263+ // is for deduping.
264+ // Value is (job, query) where job is the job to run and query will get
265+ // the next job to run in that namespace.
266+ let mut queries = BTreeMap :: new ( ) ;
267+ for namespace in namespaces {
268+ let mut query = ResolvedQuery :: new ( tx, namespace, index_query. clone ( ) ) ?;
269+ if let Some ( doc) = query. next ( tx, None ) . await ? {
270+ let job: ParsedDocument < CronJob > = doc. try_into ( ) ?;
271+ let next_ts = job. next_ts ;
272+ queries. insert ( ( next_ts, namespace) , ( job, query) ) ;
273+ }
274+ }
275+ while let Some ( ( ( _min_next_ts, namespace) , ( min_job, mut query) ) ) = queries. pop_first ( ) {
276+ yield min_job;
277+ if let Some ( doc) = query. next ( tx, None ) . await ? {
278+ let job: ParsedDocument < CronJob > = doc. try_into ( ) ?;
279+ let next_ts = job. next_ts ;
280+ queries. insert ( ( next_ts, namespace) , ( job, query) ) ;
281+ }
282+ }
283+ }
284+
253285 // This handles re-running the cron job on transient errors. It
254286 // guarantees that the job was successfully run or the job state changed.
255287 pub async fn execute_job (
@@ -292,12 +324,13 @@ impl<RT: Runtime> CronJobExecutor<RT> {
292324 // Continue without running function since the job state has changed
293325 return Ok ( job_id) ;
294326 } ;
327+ let ( _, component_path) = self . get_job_component ( & mut tx, job_id) . await ?;
295328 tracing:: info!( "Executing {:?}!" , job. cron_spec. udf_path) ;
296329
297330 // Since we don't specify the function type in the cron, we have to use
298331 // the analyzed result.
299332 let path = CanonicalizedComponentFunctionPath {
300- component : ComponentPath :: root ( ) ,
333+ component : component_path ,
301334 udf_path : job. cron_spec . udf_path . clone ( ) ,
302335 } ;
303336 let udf_type = ModuleModel :: new ( & mut tx)
@@ -366,20 +399,37 @@ impl<RT: Runtime> CronJobExecutor<RT> {
366399 }
367400 }
368401
402+ async fn get_job_component (
403+ & self ,
404+ tx : & mut Transaction < RT > ,
405+ job_id : ResolvedDocumentId ,
406+ ) -> anyhow:: Result < ( ComponentId , ComponentPath ) > {
407+ let namespace = tx. table_mapping ( ) . tablet_namespace ( job_id. tablet_id ) ?;
408+ let component = match namespace {
409+ TableNamespace :: Global => ComponentId :: Root ,
410+ TableNamespace :: ByComponent ( id) => ComponentId :: Child ( id) ,
411+ } ;
412+ let component_path = BootstrapComponentsModel :: new ( tx)
413+ . get_component_path ( component)
414+ . await ?;
415+ Ok ( ( component, component_path) )
416+ }
417+
369418 async fn handle_mutation (
370419 & self ,
371420 request_id : RequestId ,
372- tx : Transaction < RT > ,
421+ mut tx : Transaction < RT > ,
373422 job : CronJob ,
374423 job_id : ResolvedDocumentId ,
375424 usage_tracker : FunctionUsageTracker ,
376425 ) -> anyhow:: Result < ( ) > {
377426 let start = self . rt . monotonic_now ( ) ;
378427 let identity = tx. inert_identity ( ) ;
379428 let caller = FunctionCaller :: Cron ;
429+ let ( component, component_path) = self . get_job_component ( & mut tx, job_id) . await ?;
380430 let context = ExecutionContext :: new ( request_id, & caller) ;
381431 let path = CanonicalizedComponentFunctionPath {
382- component : ComponentPath :: root ( ) ,
432+ component : component_path ,
383433 udf_path : job. cron_spec . udf_path . clone ( ) ,
384434 } ;
385435 let mutation_result = self
@@ -412,7 +462,7 @@ impl<RT: Runtime> CronJobExecutor<RT> {
412462 let execution_time_f64 = execution_time. as_secs_f64 ( ) ;
413463 let truncated_log_lines = self . truncate_log_lines ( outcome. log_lines . clone ( ) ) ;
414464
415- let mut model = CronModel :: new ( & mut tx, ComponentId :: TODO ( ) ) ;
465+ let mut model = CronModel :: new ( & mut tx, component ) ;
416466
417467 if let Ok ( ref result) = outcome. result {
418468 let truncated_result = self . truncate_result ( result. clone ( ) ) ;
@@ -427,7 +477,7 @@ impl<RT: Runtime> CronJobExecutor<RT> {
427477 . await ?;
428478 self . complete_job_run (
429479 identity. clone ( ) ,
430- & mut model ,
480+ & mut tx ,
431481 job_id,
432482 & job,
433483 UdfType :: Mutation ,
@@ -457,14 +507,14 @@ impl<RT: Runtime> CronJobExecutor<RT> {
457507 // Continue without updating since the job state has changed
458508 return Ok ( ( ) ) ;
459509 } ;
460- let mut model = CronModel :: new ( & mut tx, ComponentId :: TODO ( ) ) ;
510+ let mut model = CronModel :: new ( & mut tx, component ) ;
461511 let status = CronJobStatus :: Err ( e. to_string ( ) ) ;
462512 model
463513 . insert_cron_job_log ( & job, status, truncated_log_lines, execution_time_f64)
464514 . await ?;
465515 self . complete_job_run (
466516 identity,
467- & mut model ,
517+ & mut tx ,
468518 job_id,
469519 & job,
470520 UdfType :: Mutation ,
@@ -497,14 +547,20 @@ impl<RT: Runtime> CronJobExecutor<RT> {
497547 job_id : ResolvedDocumentId ,
498548 usage_tracker : FunctionUsageTracker ,
499549 ) -> anyhow:: Result < ( ) > {
550+ let namespace = tx. table_mapping ( ) . tablet_namespace ( job_id. tablet_id ) ?;
551+ let component = match namespace {
552+ TableNamespace :: Global => ComponentId :: Root ,
553+ TableNamespace :: ByComponent ( id) => ComponentId :: Child ( id) ,
554+ } ;
500555 let identity = tx. identity ( ) . clone ( ) ;
556+ let ( _, component_path) = self . get_job_component ( & mut tx, job_id) . await ?;
501557 let caller = FunctionCaller :: Cron ;
502558 match job. state {
503559 CronJobState :: Pending => {
504560 // Set state to in progress
505561 let mut updated_job = job. clone ( ) ;
506562 updated_job. state = CronJobState :: InProgress ;
507- CronModel :: new ( & mut tx, ComponentId :: TODO ( ) )
563+ CronModel :: new ( & mut tx, component )
508564 . update_job_state ( job_id, updated_job. clone ( ) )
509565 . await ?;
510566 self . database
@@ -514,7 +570,7 @@ impl<RT: Runtime> CronJobExecutor<RT> {
514570 // Execute the action
515571 let context = ExecutionContext :: new ( request_id, & caller) ;
516572 let path = CanonicalizedComponentFunctionPath {
517- component : ComponentPath :: root ( ) ,
573+ component : component_path ,
518574 udf_path : job. cron_spec . udf_path . clone ( ) ,
519575 } ;
520576 let completion = self
@@ -582,14 +638,14 @@ impl<RT: Runtime> CronJobExecutor<RT> {
582638 // guess the correct behavior here is to store the executionId in the state so
583639 // we can log correctly here.
584640 let context = ExecutionContext :: new ( request_id, & caller) ;
585- let mut model = CronModel :: new ( & mut tx, ComponentId :: TODO ( ) ) ;
641+ let mut model = CronModel :: new ( & mut tx, component ) ;
586642 model
587643 . insert_cron_job_log ( & job, status, log_lines, 0.0 )
588644 . await ?;
589645 let identity: InertIdentity = identity. into ( ) ;
590646 self . complete_job_run (
591647 identity. clone ( ) ,
592- & mut model ,
648+ & mut tx ,
593649 job_id,
594650 & job,
595651 UdfType :: Action ,
@@ -601,7 +657,7 @@ impl<RT: Runtime> CronJobExecutor<RT> {
601657 . await ?;
602658
603659 let path = CanonicalizedComponentFunctionPath {
604- component : ComponentPath :: root ( ) ,
660+ component : component_path ,
605661 udf_path : job. cron_spec . udf_path ,
606662 } ;
607663 self . function_log . log_action_system_error (
@@ -660,13 +716,18 @@ impl<RT: Runtime> CronJobExecutor<RT> {
660716 // Continue without updating since the job state has changed
661717 return Ok ( ( ) ) ;
662718 } ;
663- let mut model = CronModel :: new ( & mut tx, ComponentId :: TODO ( ) ) ;
719+ let namespace = tx. table_mapping ( ) . tablet_namespace ( job_id. tablet_id ) ?;
720+ let component = match namespace {
721+ TableNamespace :: Global => ComponentId :: Root ,
722+ TableNamespace :: ByComponent ( id) => ComponentId :: Child ( id) ,
723+ } ;
724+ let mut model = CronModel :: new ( & mut tx, component) ;
664725 model
665726 . insert_cron_job_log ( expected_state, status, log_lines, execution_time)
666727 . await ?;
667728 self . complete_job_run (
668729 identity,
669- & mut model ,
730+ & mut tx ,
670731 job_id,
671732 expected_state,
672733 UdfType :: Action ,
@@ -682,7 +743,7 @@ impl<RT: Runtime> CronJobExecutor<RT> {
682743 async fn complete_job_run (
683744 & self ,
684745 identity : InertIdentity ,
685- model : & mut CronModel < ' _ , RT > ,
746+ tx : & mut Transaction < RT > ,
686747 job_id : ResolvedDocumentId ,
687748 job : & CronJob ,
688749 udf_type : UdfType ,
@@ -693,6 +754,8 @@ impl<RT: Runtime> CronJobExecutor<RT> {
693754 let mut next_ts = compute_next_ts ( & job. cron_spec , Some ( prev_ts) , now) ?;
694755 let mut num_skipped = 0 ;
695756 let first_skipped_ts = next_ts;
757+ let ( component, component_path) = self . get_job_component ( tx, job_id) . await ?;
758+ let mut model = CronModel :: new ( tx, component) ;
696759 while next_ts < now {
697760 num_skipped += 1 ;
698761 next_ts = compute_next_ts ( & job. cron_spec , Some ( next_ts) , now) ?;
@@ -715,7 +778,7 @@ impl<RT: Runtime> CronJobExecutor<RT> {
715778 runs are in the past"
716779 ) ,
717780 CanonicalizedComponentFunctionPath {
718- component : ComponentPath :: root ( ) ,
781+ component : component_path ,
719782 udf_path : job. cron_spec . udf_path . clone ( ) ,
720783 } ,
721784 job. cron_spec . udf_args . clone ( ) ,
@@ -732,7 +795,7 @@ impl<RT: Runtime> CronJobExecutor<RT> {
732795 runs are in the past"
733796 ) ,
734797 CanonicalizedComponentFunctionPath {
735- component : ComponentPath :: root ( ) ,
798+ component : component_path ,
736799 udf_path : job. cron_spec . udf_path . clone ( ) ,
737800 } ,
738801 job. cron_spec . udf_args . clone ( ) ,
0 commit comments