@@ -35,13 +35,17 @@ use ballista_core::serde::scheduler::ExecutorMetadata;
3535use datafusion_proto:: logical_plan:: AsLogicalPlan ;
3636use datafusion_proto:: physical_plan:: AsExecutionPlan ;
3737use log:: { debug, error, info, trace, warn} ;
38+ use std:: collections:: HashMap ;
3839use std:: net:: SocketAddr ;
3940
4041use std:: ops:: Deref ;
4142
42- use crate :: cluster:: { bind_task_bias, bind_task_round_robin} ;
43+ use crate :: cluster:: {
44+ bind_task_bias, bind_task_round_robin, unbind_prepare_failed_tasks,
45+ } ;
4346use crate :: config:: TaskDistributionPolicy ;
4447use crate :: scheduler_server:: event:: QueryStageSchedulerEvent ;
48+ use crate :: state:: execution_graph:: TaskDescription ;
4549use std:: time:: { SystemTime , UNIX_EPOCH } ;
4650use tonic:: { Request , Response , Status } ;
4751
@@ -112,10 +116,10 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
112116 let active_jobs = self . state . task_manager . get_running_job_cache ( ) ;
113117 let schedulable_tasks = match self . state . config . task_distribution {
114118 TaskDistributionPolicy :: Bias => {
115- bind_task_bias ( available_slots, active_jobs, |_| false ) . await
119+ bind_task_bias ( available_slots, active_jobs. clone ( ) , |_| false ) . await
116120 }
117121 TaskDistributionPolicy :: RoundRobin => {
118- bind_task_round_robin ( available_slots, active_jobs, |_| false ) . await
122+ bind_task_round_robin ( available_slots, active_jobs. clone ( ) , |_| false ) . await
119123 }
120124 TaskDistributionPolicy :: ConsistentHash { ..} => {
121125 return Err ( Status :: unimplemented (
@@ -124,14 +128,36 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
124128 } ;
125129
126130 let mut tasks = vec ! [ ] ;
131+ let mut prepare_failed_jobs = HashMap :: < String , Vec < TaskDescription > > :: new ( ) ;
127132 for ( _, task) in schedulable_tasks {
128- match self . state . task_manager . prepare_task_definition ( task) {
133+ let job_id = task. partition . job_id . clone ( ) ;
134+ if prepare_failed_jobs. contains_key ( & job_id) {
135+ prepare_failed_jobs. entry ( job_id) . or_default ( ) . push ( task) ;
136+ continue ;
137+ }
138+ match self
139+ . state
140+ . task_manager
141+ . prepare_task_definition ( task. clone ( ) )
142+ {
129143 Ok ( task_definition) => tasks. push ( task_definition) ,
130144 Err ( e) => {
131145 error ! ( "Error preparing task definition: {:?}" , e) ;
146+ prepare_failed_jobs. entry ( job_id) . or_default ( ) . push ( task) ;
132147 }
133148 }
134149 }
150+
151+ unbind_prepare_failed_tasks ( active_jobs, & prepare_failed_jobs) . await ;
152+ for job_id in prepare_failed_jobs. into_keys ( ) {
153+ info ! ( "Cancel prepare task definition failed job: {}" , job_id) ;
154+ self . cancel_job ( job_id) . await . map_err ( |e| {
155+ let msg = format ! ( "Cancel job error due to {e:?}" ) ;
156+ error ! ( "{}" , msg) ;
157+ Status :: internal ( msg)
158+ } ) ?;
159+ }
160+
135161 Ok ( Response :: new ( PollWorkResult { tasks } ) )
136162 } else {
137163 warn ! ( "Received invalid executor poll_work request" ) ;
@@ -527,21 +553,11 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
527553 ) -> Result < Response < CancelJobResult > , Status > {
528554 let job_id = request. into_inner ( ) . job_id ;
529555 info ! ( "Received cancellation request for job {}" , job_id) ;
530-
531- self . query_stage_event_loop
532- . get_sender ( )
533- . map_err ( |e| {
534- let msg = format ! ( "Get query stage event loop error due to {e:?}" ) ;
535- error ! ( "{}" , msg) ;
536- Status :: internal ( msg)
537- } ) ?
538- . post_event ( QueryStageSchedulerEvent :: JobCancel ( job_id) )
539- . await
540- . map_err ( |e| {
541- let msg = format ! ( "Post to query stage event loop error due to {e:?}" ) ;
542- error ! ( "{}" , msg) ;
543- Status :: internal ( msg)
544- } ) ?;
556+ self . cancel_job ( job_id) . await . map_err ( |e| {
557+ let msg = format ! ( "Cancel job error due to {e:?}" ) ;
558+ error ! ( "{}" , msg) ;
559+ Status :: internal ( msg)
560+ } ) ?;
545561 Ok ( Response :: new ( CancelJobResult { cancelled : true } ) )
546562 }
547563
0 commit comments