Skip to content

Commit 211771b

Browse files
committed
FEATURE: Add QueueStrategyPartitionedReplace
1 parent 9f09c8e commit 211771b

File tree

1 file changed

+54
-6
lines changed

1 file changed

+54
-6
lines changed

prunner.go

Lines changed: 54 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package prunner
33
import (
44
"context"
55
"fmt"
6+
"github.com/Flowpack/prunner/helper/slice_utils"
67
"io"
78
"sort"
89
"sync"
@@ -227,10 +228,10 @@ var ErrShuttingDown = errors.New("runner is shutting down")
227228

228229
type QueueStrategyImpl interface {
229230
// first step in prunner.ScheduleAsync => check if we have capacity for this job
230-
canAcceptJob(def definition.PipelineDef, waitList []*PipelineJob) error
231+
canAcceptJob(pipelineDef definition.PipelineDef, waitList []*PipelineJob) error
231232

232233
// add the job to the waitlist, uses the queue_strategy to determine how
233-
modifyWaitList(def definition.PipelineDef, waitList []*PipelineJob, job *PipelineJob) []*PipelineJob
234+
modifyWaitList(pipelineDef definition.PipelineDef, waitList []*PipelineJob, job *PipelineJob) []*PipelineJob
234235
}
235236

236237
type QueueStrategyAppendImpl struct {
@@ -248,10 +249,9 @@ func (q QueueStrategyAppendImpl) canAcceptJob(pipelineDef definition.PipelineDef
248249
return nil
249250
}
250251

251-
func (q QueueStrategyAppendImpl) modifyWaitList(def definition.PipelineDef, previousWaitList []*PipelineJob, job *PipelineJob) []*PipelineJob {
252+
func (q QueueStrategyAppendImpl) modifyWaitList(pipelineDef definition.PipelineDef, previousWaitList []*PipelineJob, job *PipelineJob) []*PipelineJob {
252253
log.
253254
WithField("component", "runner").
254-
WithField("strategy", "append").
255255
WithField("pipeline", job.Pipeline).
256256
WithField("jobID", job.ID).
257257
WithField("variables", job.Variables).
@@ -275,7 +275,6 @@ func (q QueueStrategyReplaceImpl) modifyWaitList(pipelineDef definition.Pipeline
275275
// waitlist nicht voll -> append
276276
log.
277277
WithField("component", "runner").
278-
WithField("strategy", "replace - waitlist not full -> no replace").
279278
WithField("pipeline", job.Pipeline).
280279
WithField("jobID", job.ID).
281280
WithField("variables", job.Variables).
@@ -298,7 +297,6 @@ func (q QueueStrategyReplaceImpl) modifyWaitList(pipelineDef definition.Pipeline
298297

299298
log.
300299
WithField("component", "runner").
301-
WithField("strategy", "replace - waitlist was full -> replaced last job<").
302300
WithField("pipeline", job.Pipeline).
303301
WithField("jobID", job.ID).
304302
WithField("variables", job.Variables).
@@ -308,6 +306,54 @@ func (q QueueStrategyReplaceImpl) modifyWaitList(pipelineDef definition.Pipeline
308306
}
309307
}
310308

309+
type QueueStrategyPartitionedReplaceImpl struct {
310+
}
311+
312+
func (q QueueStrategyPartitionedReplaceImpl) canAcceptJob(pipelineDef definition.PipelineDef, waitList []*PipelineJob) error {
313+
if pipelineDef.QueuePartitionLimit != nil && *pipelineDef.QueuePartitionLimit == 0 {
314+
// queue limit == 0 -> error -> NOTE: might be moved to config validation
315+
return errNoQueue
316+
}
317+
return nil
318+
}
319+
320+
func (q QueueStrategyPartitionedReplaceImpl) modifyWaitList(pipelineDef definition.PipelineDef, previousWaitList []*PipelineJob, job *PipelineJob) []*PipelineJob {
321+
queuePartition := job.queuePartition
322+
partitionedWaitList := slice_utils.Filter(previousWaitList, func(job *PipelineJob) bool {
323+
return job.queuePartition == queuePartition
324+
})
325+
326+
if len(partitionedWaitList) < *pipelineDef.QueuePartitionLimit {
327+
// partitioned wait list not full -> append job
328+
return append(previousWaitList, job)
329+
}
330+
331+
// partitioned wait list full -> replace partitioned job
332+
previousJob := partitionedWaitList[len(partitionedWaitList)-1]
333+
previousJob.Canceled = true
334+
if previousJob.startTimer != nil {
335+
log.
336+
WithField("previousJobID", previousJob.ID).
337+
Debugf("Stopped start timer of previous job")
338+
// Stop timer and unset reference for clean up
339+
previousJob.startTimer.Stop()
340+
previousJob.startTimer = nil
341+
}
342+
343+
// remove the just cancelled job from the waitlist
344+
waitList := slice_utils.Filter(previousWaitList, func(job *PipelineJob) bool {
345+
return previousJob != job
346+
})
347+
348+
log.
349+
WithField("component", "runner").
350+
WithField("pipeline", job.Pipeline).
351+
WithField("jobID", job.ID).
352+
WithField("variables", job.Variables).
353+
Debugf("Queued: partitioned replaced job on wait list")
354+
return append(waitList, job)
355+
}
356+
311357
// ScheduleAsync schedules a pipeline execution, if pipeline concurrency config allows for it.
312358
// "pipeline" is the pipeline ID from the YAML file.
313359
//
@@ -840,6 +886,8 @@ func getQueueStrategyImpl(queueStrategy definition.QueueStrategy) QueueStrategyI
840886
return QueueStrategyAppendImpl{}
841887
case definition.QueueStrategyReplace:
842888
return QueueStrategyReplaceImpl{}
889+
case definition.QueueStrategyPartitionedReplace:
890+
return QueueStrategyPartitionedReplaceImpl{}
843891
}
844892

845893
return nil

0 commit comments

Comments
 (0)