@@ -8,18 +8,20 @@ use crate::client::commands::submit::defs::{PinMode as PinModeDef, TaskConfigDef
88use crate :: client:: globalsettings:: GlobalSettings ;
99use crate :: common:: arraydef:: IntArray ;
1010use crate :: common:: utils:: fs:: get_current_dir;
11+ use crate :: rpc_call;
1112use crate :: transfer:: connection:: ClientSession ;
1213use crate :: transfer:: messages:: {
13- JobDescription , JobSubmitDescription , JobTaskDescription , PinMode , SubmitRequest ,
14- TaskDescription , TaskKind , TaskKindProgram , TaskWithDependencies ,
14+ FromClientMessage , IdSelector , JobDescription , JobDetailRequest , JobSubmitDescription ,
15+ JobTaskDescription , PinMode , SubmitRequest , TaskDescription , TaskIdSelector , TaskKind ,
16+ TaskKindProgram , TaskSelector , TaskStatusSelector , TaskWithDependencies , ToClientMessage ,
1517} ;
1618use clap:: Parser ;
1719use smallvec:: smallvec;
1820use std:: path:: PathBuf ;
19- use tako:: Map ;
2021use tako:: gateway:: { EntryType , ResourceRequest , ResourceRequestVariants , TaskDataFlags } ;
2122use tako:: program:: { FileOnCloseBehavior , ProgramDefinition , StdioDef } ;
2223use tako:: { JobId , JobTaskCount , JobTaskId } ;
24+ use tako:: { Map , Set } ;
2325
2426#[ derive( Parser ) ]
2527pub struct JobSubmitFileOpts {
@@ -130,14 +132,18 @@ fn build_job_desc_individual_tasks(
130132 tasks : Vec < TaskDef > ,
131133 data_flags : TaskDataFlags ,
132134 has_streaming : bool ,
135+ existing_tasks : & [ JobTaskId ] ,
133136) -> crate :: Result < JobTaskDescription > {
134137 let mut max_id: JobTaskId = tasks
135138 . iter ( )
136139 . map ( |t| t. id )
140+ . chain ( existing_tasks. iter ( ) . copied ( ) . map ( Some ) )
137141 . max ( )
138142 . flatten ( )
139143 . unwrap_or ( JobTaskId :: new ( 0 ) ) ;
140144
145+ let existing_tasks: Set < JobTaskId > = existing_tasks. iter ( ) . copied ( ) . collect ( ) ;
146+
141147 /* Topological sort */
142148 let original_len = tasks. len ( ) ;
143149 let mut new_tasks = Vec :: with_capacity ( original_len) ;
@@ -146,22 +152,40 @@ fn build_job_desc_individual_tasks(
146152 let mut consumers: Map < JobTaskId , Vec < _ > > = Map :: new ( ) ;
147153 for task in tasks {
148154 let t = build_task ( task, & mut max_id, data_flags, has_streaming) ;
149- if in_degrees. insert ( t. id , t. task_deps . len ( ) ) . is_some ( ) {
155+ if existing_tasks. contains ( & t. id ) {
156+ return Err ( crate :: Error :: GenericError ( format ! (
157+ "Task {} has already been defined in this job" ,
158+ t. id
159+ ) ) ) ;
160+ }
161+
162+ let task_deps_from_this_submit: Vec < JobTaskId > = t
163+ . task_deps
164+ . clone ( )
165+ . into_iter ( )
166+ . filter ( |task| !existing_tasks. contains ( task) )
167+ . collect ( ) ;
168+
169+ if in_degrees
170+ . insert ( t. id , task_deps_from_this_submit. len ( ) )
171+ . is_some ( )
172+ {
150173 return Err ( crate :: Error :: GenericError ( format ! (
151174 "Task {} is defined multiple times" ,
152175 t. id
153176 ) ) ) ;
154177 }
155- let is_empty = t . task_deps . is_empty ( ) ;
178+ let is_empty = task_deps_from_this_submit . is_empty ( ) ;
156179 if is_empty {
157180 new_tasks. push ( t) ;
158181 } else {
159- for dep in & t . task_deps {
182+ for dep in & task_deps_from_this_submit {
160183 consumers. entry ( * dep) . or_default ( ) . push ( t. id ) ;
161184 }
162185 unprocessed_tasks. insert ( t. id , t) ;
163186 }
164187 }
188+
165189 let mut idx = 0 ;
166190 while idx < new_tasks. len ( ) {
167191 if let Some ( consumers) = consumers. get ( & new_tasks[ idx] . id ) {
@@ -182,23 +206,35 @@ fn build_job_desc_individual_tasks(
182206 } else {
183207 let t = unprocessed_tasks. values ( ) . next ( ) . unwrap ( ) ;
184208 return Err ( crate :: Error :: GenericError ( format ! (
185- "Task {} is part of dependency cycle or has an invalid dependencies" ,
209+ "Task {} is part of dependency cycle or has invalid dependencies" ,
186210 t. id
187211 ) ) ) ;
188212 }
189213
190214 Ok ( JobTaskDescription :: Graph { tasks : new_tasks } )
191215}
192216
193- fn build_job_submit ( jdef : JobDef , job_id : Option < JobId > ) -> crate :: Result < SubmitRequest > {
217+ fn build_job_submit (
218+ jdef : JobDef ,
219+ job_info : Option < ( JobId , Vec < JobTaskId > ) > ,
220+ ) -> crate :: Result < SubmitRequest > {
194221 let task_desc = if let Some ( array) = jdef. array {
195222 build_job_desc_array ( array, jdef. stream . is_some ( ) )
196223 } else {
197224 let mut data_flags = TaskDataFlags :: empty ( ) ;
198225 if jdef. data_layer {
199226 data_flags. insert ( TaskDataFlags :: ENABLE_DATA_LAYER ) ;
200227 }
201- build_job_desc_individual_tasks ( jdef. tasks , data_flags, jdef. stream . is_some ( ) ) ?
228+ let existing_tasks = job_info
229+ . as_ref ( )
230+ . map ( |( _, tasks) | tasks. as_slice ( ) )
231+ . unwrap_or_default ( ) ;
232+ build_job_desc_individual_tasks (
233+ jdef. tasks ,
234+ data_flags,
235+ jdef. stream . is_some ( ) ,
236+ existing_tasks,
237+ ) ?
202238 } ;
203239 Ok ( SubmitRequest {
204240 job_desc : JobDescription {
@@ -210,7 +246,7 @@ fn build_job_submit(jdef: JobDef, job_id: Option<JobId>) -> crate::Result<Submit
210246 submit_dir : get_current_dir ( ) ,
211247 stream_path : jdef. stream ,
212248 } ,
213- job_id,
249+ job_id : job_info . map ( |j| j . 0 ) ,
214250 } )
215251}
216252
@@ -225,6 +261,25 @@ pub async fn submit_computation_from_job_file(
225261 anyhow:: anyhow!( format!( "Cannot read {}: {}" , opts. path. display( ) , e) )
226262 } ) ?) ?
227263 } ;
228- let request = build_job_submit ( jdef, opts. job ) ?;
264+
265+ let job_info = if let Some ( job_id) = opts. job {
266+ let mut response =
267+ rpc_call ! ( session. connection( ) , FromClientMessage :: JobDetail ( JobDetailRequest {
268+ job_id_selector: IdSelector :: Specific ( IntArray :: from_id( job_id. as_num( ) ) ) ,
269+ task_selector: Some ( TaskSelector {
270+ id_selector: TaskIdSelector :: All ,
271+ status_selector: TaskStatusSelector :: All
272+ } )
273+ } ) , ToClientMessage :: JobDetailResponse ( r) => r)
274+ . await ?;
275+ let Some ( job) = response. details . pop ( ) . and_then ( |( _, detail) | detail) else {
276+ return Err ( anyhow:: anyhow!( "Job {job_id} not found" ) ) ;
277+ } ;
278+ Some ( ( job_id, job. tasks . into_iter ( ) . map ( |( id, _) | id) . collect ( ) ) )
279+ } else {
280+ None
281+ } ;
282+
283+ let request = build_job_submit ( jdef, job_info) ?;
229284 send_submit_request ( gsettings, session, request, false , false , None ) . await
230285}
0 commit comments