Skip to content

Commit 9f09c8e

Browse files
committed
BUGFIX: proper check if waitlist in replace case is reached
1 parent 37c871d commit 9f09c8e

File tree

2 files changed

+11
-27
lines changed

2 files changed

+11
-27
lines changed

prunner.go

Lines changed: 10 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -219,41 +219,24 @@ type jobTask struct {
219219

220220
type jobTasks []jobTask
221221

222-
type scheduleAction int
223-
224-
const (
225-
// scheduleActionStart directly starts a job via PipelineRunner.startJob()
226-
scheduleActionStart scheduleAction = iota
227-
228-
// scheduleActionQueue enqueues the job to the pipeline's waitlist
229-
scheduleActionQueue
230-
231-
// scheduleActionReplace: replace the last job on the waitlist with this one
232-
scheduleActionReplace
233-
234-
// scheduleActionErrNoQueue: error case, if queueing is not allowed (queue_limit=0) and a job is running
235-
scheduleActionErrNoQueue
236-
237-
// scheduleActionErrQueueFull: error case, if queue_limit is reached (with append queue strategy)
238-
scheduleActionErrQueueFull
239-
)
240-
241222
var errNoQueue = errors.New("concurrency exceeded and queueing disabled for pipeline")
242223
var errQueueFull = errors.New("concurrency exceeded and queue limit reached for pipeline")
243224
var ErrJobNotFound = errors.New("job not found")
244225
var errJobAlreadyCompleted = errors.New("job is already completed")
245226
var ErrShuttingDown = errors.New("runner is shutting down")
246227

247228
type QueueStrategyImpl interface {
248-
kannEntgegengenommenWerden(def definition.PipelineDef, waitList []*PipelineJob) error
229+
// first step in prunner.ScheduleAsync => check if we have capacity for this job
230+
canAcceptJob(def definition.PipelineDef, waitList []*PipelineJob) error
249231

232+
// add the job to the waitlist, uses the queue_strategy to determine how
250233
modifyWaitList(def definition.PipelineDef, waitList []*PipelineJob, job *PipelineJob) []*PipelineJob
251234
}
252235

253236
type QueueStrategyAppendImpl struct {
254237
}
255238

256-
func (q QueueStrategyAppendImpl) kannEntgegengenommenWerden(pipelineDef definition.PipelineDef, waitList []*PipelineJob) error {
239+
func (q QueueStrategyAppendImpl) canAcceptJob(pipelineDef definition.PipelineDef, waitList []*PipelineJob) error {
257240
if pipelineDef.QueueLimit != nil && *pipelineDef.QueueLimit == 0 {
258241
// queue limit == 0 -> error -> NOTE: might be moved to config validation
259242
return errNoQueue
@@ -268,6 +251,7 @@ func (q QueueStrategyAppendImpl) kannEntgegengenommenWerden(pipelineDef definiti
268251
func (q QueueStrategyAppendImpl) modifyWaitList(def definition.PipelineDef, previousWaitList []*PipelineJob, job *PipelineJob) []*PipelineJob {
269252
log.
270253
WithField("component", "runner").
254+
WithField("strategy", "append").
271255
WithField("pipeline", job.Pipeline).
272256
WithField("jobID", job.ID).
273257
WithField("variables", job.Variables).
@@ -278,7 +262,7 @@ func (q QueueStrategyAppendImpl) modifyWaitList(def definition.PipelineDef, prev
278262
type QueueStrategyReplaceImpl struct {
279263
}
280264

281-
func (q QueueStrategyReplaceImpl) kannEntgegengenommenWerden(pipelineDef definition.PipelineDef, waitList []*PipelineJob) error {
265+
func (q QueueStrategyReplaceImpl) canAcceptJob(pipelineDef definition.PipelineDef, waitList []*PipelineJob) error {
282266
if pipelineDef.QueueLimit != nil && *pipelineDef.QueueLimit == 0 {
283267
// queue limit == 0 -> error -> NOTE: might be moved to config validation
284268
return errNoQueue
@@ -287,10 +271,11 @@ func (q QueueStrategyReplaceImpl) kannEntgegengenommenWerden(pipelineDef definit
287271
}
288272

289273
func (q QueueStrategyReplaceImpl) modifyWaitList(pipelineDef definition.PipelineDef, previousWaitList []*PipelineJob, job *PipelineJob) []*PipelineJob {
290-
if len(previousWaitList) <= *pipelineDef.QueueLimit {
274+
if len(previousWaitList) < *pipelineDef.QueueLimit {
291275
// waitlist nicht voll -> append
292276
log.
293277
WithField("component", "runner").
278+
WithField("strategy", "replace - waitlist not full -> no replace").
294279
WithField("pipeline", job.Pipeline).
295280
WithField("jobID", job.ID).
296281
WithField("variables", job.Variables).
@@ -313,6 +298,7 @@ func (q QueueStrategyReplaceImpl) modifyWaitList(pipelineDef definition.Pipeline
313298

314299
log.
315300
WithField("component", "runner").
301+
WithField("strategy", "replace - waitlist was full -> replaced last job<").
316302
WithField("pipeline", job.Pipeline).
317303
WithField("jobID", job.ID).
318304
WithField("variables", job.Variables).
@@ -340,7 +326,7 @@ func (r *PipelineRunner) ScheduleAsync(pipeline string, opts ScheduleOpts) (*Pip
340326
}
341327

342328
queueStrategyImpl := getQueueStrategyImpl(pipelineDef.QueueStrategy)
343-
err := queueStrategyImpl.kannEntgegengenommenWerden(pipelineDef, r.waitListByPipeline[pipeline])
329+
err := queueStrategyImpl.canAcceptJob(pipelineDef, r.waitListByPipeline[pipeline])
344330
if err != nil {
345331
return nil, err
346332
}
@@ -371,8 +357,6 @@ func (r *PipelineRunner) ScheduleAsync(pipeline string, opts ScheduleOpts) (*Pip
371357
job.startTimer = time.AfterFunc(job.StartDelay, func() {
372358
r.StartDelayedJob(id)
373359
})
374-
375-
return job, nil
376360
} else {
377361
// no delayed job
378362
runningJobsCount := r.runningJobsCount(pipeline)

prunner_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -914,7 +914,7 @@ func TestPipelineRunner_ScheduleAsync_WithStartDelayNoQueueAndReplaceWillQueueSi
914914
Pipelines: map[string]definition.PipelineDef{
915915
"jobWithStartDelay": {
916916
Concurrency: 1,
917-
StartDelay: 50 * time.Millisecond,
917+
StartDelay: 100 * time.Millisecond,
918918
QueueLimit: intPtr(1),
919919
QueueStrategy: definition.QueueStrategyReplace,
920920
Tasks: map[string]definition.TaskDef{

0 commit comments

Comments
 (0)