@@ -175,10 +175,12 @@ func (d *jobDriver) claimAndProcessOneJob(ctx context.Context) error {
175175 jobctx , cancel := context .WithTimeout (ctx , d .jobTimeout )
176176 defer cancel () // Ensure resources are released when the function returns
177177
178+ recorder := newJobProgressRecorder (d .onProgress (job ))
179+
178180 // Process the job.
179181 start := time .Now ()
180182 job .Status .Started = start .UnixMilli ()
181- err = d .processJob (jobctx , job ) // NOTE: We pass in a pointer here such that the job status can be kept in Complete without re-fetching.
183+ err = d .processJob (jobctx , job , recorder ) // NOTE: We pass in a pointer here such that the job status can be kept in Complete without re-fetching.
182184 end := time .Now ()
183185 logger .Debug ("job processed" , "duration" , end .Sub (start ), "error" , err )
184186
@@ -187,17 +189,7 @@ func (d *jobDriver) claimAndProcessOneJob(ctx context.Context) error {
187189 err = jobctx .Err ()
188190 }
189191
190- // Mark the job as failed and remove from queue
191- if err != nil {
192- job .Status .State = provisioning .JobStateError
193- job .Status .Errors = append (job .Status .Errors , err .Error ())
194- }
195-
196- job .Status .Progress = 0 // clear progressbar
197- job .Status .Finished = end .UnixMilli ()
198- if ! job .Status .State .Finished () {
199- job .Status .State = provisioning .JobStateSuccess // no error
200- }
192+ job .Status = recorder .Complete (ctx , err )
201193
202194 // Save the finished job
203195 err = d .historicJobs .WriteJob (ctx , job .DeepCopy ())
@@ -217,7 +209,7 @@ func (d *jobDriver) claimAndProcessOneJob(ctx context.Context) error {
217209 return nil
218210}
219211
220- func (d * jobDriver ) processJob (ctx context.Context , job * provisioning.Job ) error {
212+ func (d * jobDriver ) processJob (ctx context.Context , job * provisioning.Job , recorder JobProgressRecorder ) error {
221213 for _ , worker := range d .workers {
222214 if ! worker .IsSupported (ctx , * job ) {
223215 continue
@@ -228,16 +220,7 @@ func (d *jobDriver) processJob(ctx context.Context, job *provisioning.Job) error
228220 return apifmt .Errorf ("failed to get repository '%s': %w" , job .Spec .Repository , err )
229221 }
230222
231- recorder := newJobProgressRecorder (d .onProgress (job ))
232-
233- err = worker .Process (ctx , repo , * job , recorder )
234- if err != nil {
235- return apifmt .Errorf ("worker failed to process job: %w" , err )
236- }
237-
238- job .Status = recorder .Complete (ctx , err )
239-
240- return nil
223+ return worker .Process (ctx , repo , * job , recorder )
241224 }
242225
243226 return apifmt .Errorf ("no workers were registered to handle the job" )
0 commit comments