Skip to content

Commit 37c871d

Browse files
committed
WIP
1 parent 56ef182 commit 37c871d

File tree

3 files changed

+230
-105
lines changed

3 files changed

+230
-105
lines changed

helper/slice_utils/sliceUtils.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package slice_utils
2+
3+
func Filter[T any](s []T, p func(i T) bool) []T {
4+
var result []T
5+
for _, i := range s {
6+
if p(i) {
7+
result = append(result, i)
8+
}
9+
}
10+
return result
11+
}

prunner.go

Lines changed: 132 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -115,10 +115,13 @@ func NewPipelineRunner(ctx context.Context, defs *definition.PipelinesDef, creat
115115
type PipelineJob struct {
116116
ID uuid.UUID
117117
// Identifier of the pipeline (from the YAML file)
118-
Pipeline string
119-
Env map[string]string
120-
Variables map[string]interface{}
121-
StartDelay time.Duration
118+
Pipeline string
119+
// Identifier of the queue partition, if any (from definition)
120+
queuePartition string
121+
Env map[string]string
122+
Variables map[string]interface{}
123+
User string
124+
StartDelay time.Duration
122125

123126
Completed bool
124127
Canceled bool
@@ -127,8 +130,7 @@ type PipelineJob struct {
127130
// Start is the actual start time of the job. Could be nil if not yet started.
128131
Start *time.Time
129132
// End is the actual end time of the job (can be nil if incomplete)
130-
End *time.Time
131-
User string
133+
End *time.Time
132134
// Tasks is an in-memory representation with state of tasks, sorted by dependencies
133135
Tasks jobTasks
134136
LastError error
@@ -242,6 +244,84 @@ var ErrJobNotFound = errors.New("job not found")
242244
var errJobAlreadyCompleted = errors.New("job is already completed")
243245
var ErrShuttingDown = errors.New("runner is shutting down")
244246

247+
type QueueStrategyImpl interface {
248+
kannEntgegengenommenWerden(def definition.PipelineDef, waitList []*PipelineJob) error
249+
250+
modifyWaitList(def definition.PipelineDef, waitList []*PipelineJob, job *PipelineJob) []*PipelineJob
251+
}
252+
253+
type QueueStrategyAppendImpl struct {
254+
}
255+
256+
func (q QueueStrategyAppendImpl) kannEntgegengenommenWerden(pipelineDef definition.PipelineDef, waitList []*PipelineJob) error {
257+
if pipelineDef.QueueLimit != nil && *pipelineDef.QueueLimit == 0 {
258+
// queue limit == 0 -> error -> NOTE: might be moved to config validation
259+
return errNoQueue
260+
}
261+
if pipelineDef.QueueLimit != nil && len(waitList) >= *pipelineDef.QueueLimit {
262+
// queue full
263+
return errQueueFull
264+
}
265+
return nil
266+
}
267+
268+
func (q QueueStrategyAppendImpl) modifyWaitList(def definition.PipelineDef, previousWaitList []*PipelineJob, job *PipelineJob) []*PipelineJob {
269+
log.
270+
WithField("component", "runner").
271+
WithField("pipeline", job.Pipeline).
272+
WithField("jobID", job.ID).
273+
WithField("variables", job.Variables).
274+
Debugf("Queued: added job to wait list")
275+
return append(previousWaitList, job)
276+
}
277+
278+
type QueueStrategyReplaceImpl struct {
279+
}
280+
281+
func (q QueueStrategyReplaceImpl) kannEntgegengenommenWerden(pipelineDef definition.PipelineDef, waitList []*PipelineJob) error {
282+
if pipelineDef.QueueLimit != nil && *pipelineDef.QueueLimit == 0 {
283+
// queue limit == 0 -> error -> NOTE: might be moved to config validation
284+
return errNoQueue
285+
}
286+
return nil
287+
}
288+
289+
func (q QueueStrategyReplaceImpl) modifyWaitList(pipelineDef definition.PipelineDef, previousWaitList []*PipelineJob, job *PipelineJob) []*PipelineJob {
290+
if len(previousWaitList) <= *pipelineDef.QueueLimit {
291+
// waitlist nicht voll -> append
292+
log.
293+
WithField("component", "runner").
294+
WithField("pipeline", job.Pipeline).
295+
WithField("jobID", job.ID).
296+
WithField("variables", job.Variables).
297+
Debugf("Queued: added job to wait list")
298+
return append(previousWaitList, job)
299+
} else {
300+
// waitlist voll -> replace
301+
previousJob := previousWaitList[len(previousWaitList)-1]
302+
previousJob.Canceled = true
303+
if previousJob.startTimer != nil {
304+
log.
305+
WithField("previousJobID", previousJob.ID).
306+
Debugf("Stopped start timer of previous job")
307+
// Stop timer and unset reference for clean up
308+
previousJob.startTimer.Stop()
309+
previousJob.startTimer = nil
310+
}
311+
waitList := previousWaitList[:]
312+
waitList[len(waitList)-1] = job
313+
314+
log.
315+
WithField("component", "runner").
316+
WithField("pipeline", job.Pipeline).
317+
WithField("jobID", job.ID).
318+
WithField("variables", job.Variables).
319+
Debugf("Queued: replaced job on wait list")
320+
321+
return waitList
322+
}
323+
}
324+
245325
// ScheduleAsync schedules a pipeline execution, if pipeline concurrency config allows for it.
246326
// "pipeline" is the pipeline ID from the YAML file.
247327
//
@@ -259,13 +339,10 @@ func (r *PipelineRunner) ScheduleAsync(pipeline string, opts ScheduleOpts) (*Pip
259339
return nil, errors.Errorf("pipeline %q is not defined", pipeline)
260340
}
261341

262-
action := r.resolveScheduleAction(pipeline)
263-
264-
switch action {
265-
case scheduleActionErrNoQueue:
266-
return nil, errNoQueue
267-
case scheduleActionErrQueueFull:
268-
return nil, errQueueFull
342+
queueStrategyImpl := getQueueStrategyImpl(pipelineDef.QueueStrategy)
343+
err := queueStrategyImpl.kannEntgegengenommenWerden(pipelineDef, r.waitListByPipeline[pipeline])
344+
if err != nil {
345+
return nil, err
269346
}
270347

271348
id, err := uuid.NewV4()
@@ -276,70 +353,46 @@ func (r *PipelineRunner) ScheduleAsync(pipeline string, opts ScheduleOpts) (*Pip
276353
defer r.requestPersist()
277354

278355
job := &PipelineJob{
279-
ID: id,
280-
Pipeline: pipeline,
281-
Created: time.Now(),
282-
Tasks: buildJobTasks(pipelineDef.Tasks),
283-
Env: pipelineDef.Env,
284-
Variables: opts.Variables,
285-
User: opts.User,
286-
StartDelay: pipelineDef.StartDelay,
356+
ID: id,
357+
Pipeline: pipeline,
358+
Created: time.Now(),
359+
Tasks: buildJobTasks(pipelineDef.Tasks),
360+
Env: pipelineDef.Env,
361+
Variables: opts.Variables,
362+
User: opts.User,
363+
queuePartition: opts.QueuePartition,
364+
StartDelay: pipelineDef.StartDelay,
287365
}
288-
289366
r.jobsByID[id] = job
290367
r.jobsByPipeline[pipeline] = append(r.jobsByPipeline[pipeline], job)
291368

292-
if job.StartDelay > 0 {
369+
if pipelineDef.StartDelay > 0 {
293370
// A delayed job is a job on the wait list that is started by a function after a delay
294371
job.startTimer = time.AfterFunc(job.StartDelay, func() {
295372
r.StartDelayedJob(id)
296373
})
297-
}
298-
299-
switch action {
300-
case scheduleActionQueue:
301-
r.waitListByPipeline[pipeline] = append(r.waitListByPipeline[pipeline], job)
302-
303-
log.
304-
WithField("component", "runner").
305-
WithField("pipeline", job.Pipeline).
306-
WithField("jobID", job.ID).
307-
WithField("variables", job.Variables).
308-
Debugf("Queued: added job to wait list")
309374

310375
return job, nil
311-
case scheduleActionReplace:
312-
waitList := r.waitListByPipeline[pipeline]
313-
previousJob := waitList[len(waitList)-1]
314-
previousJob.Canceled = true
315-
if previousJob.startTimer != nil {
316-
log.
317-
WithField("previousJobID", previousJob.ID).
318-
Debugf("Stopped start timer of previous job")
319-
// Stop timer and unset reference for clean up
320-
previousJob.startTimer.Stop()
321-
previousJob.startTimer = nil
322-
}
323-
waitList[len(waitList)-1] = job
376+
} else {
377+
// no delayed job
378+
runningJobsCount := r.runningJobsCount(pipeline)
379+
if runningJobsCount < pipelineDef.Concurrency {
380+
// free capacity -> start job right now and finish.
381+
r.startJob(job)
324382

325-
log.
326-
WithField("component", "runner").
327-
WithField("pipeline", job.Pipeline).
328-
WithField("jobID", job.ID).
329-
WithField("variables", job.Variables).
330-
Debugf("Queued: replaced job on wait list")
383+
log.
384+
WithField("component", "runner").
385+
WithField("pipeline", job.Pipeline).
386+
WithField("jobID", job.ID).
387+
WithField("variables", job.Variables).
388+
Debugf("Started: scheduled job execution")
331389

332-
return job, nil
390+
return job, nil
391+
}
333392
}
334393

335-
r.startJob(job)
336-
337-
log.
338-
WithField("component", "runner").
339-
WithField("pipeline", job.Pipeline).
340-
WithField("jobID", job.ID).
341-
WithField("variables", job.Variables).
342-
Debugf("Started: scheduled job execution")
394+
// modify the waitlist
395+
r.waitListByPipeline[pipeline] = queueStrategyImpl.modifyWaitList(pipelineDef, r.waitListByPipeline[pipeline], job)
343396

344397
return job, nil
345398
}
@@ -790,56 +843,28 @@ func (r *PipelineRunner) runningJobsCount(pipeline string) int {
790843
return running
791844
}
792845

793-
func (r *PipelineRunner) resolveScheduleAction(pipeline string) scheduleAction {
794-
pipelineDef := r.defs.Pipelines[pipeline]
795-
796-
// If a start delay is set, we will always queue the job, otherwise we check if the number of running jobs
797-
// exceed the maximum concurrency
798-
runningJobsCount := r.runningJobsCount(pipeline)
799-
if runningJobsCount < pipelineDef.Concurrency && pipelineDef.StartDelay == 0 {
800-
// job can be started right now, because pipeline is not running at full capacity, and there is no start delay
801-
// (no queue handling)
802-
return scheduleActionStart
803-
}
846+
// waitlistModifierFn modifies the wait list, if the job cannot be executed right away.
847+
type waitlistModifierFn func(previousWaitlist []*PipelineJob, job *PipelineJob) []*PipelineJob
804848

805-
// here, we start with waitlist logic.
806-
waitList := r.waitListByPipeline[pipeline]
849+
func waitlistAppendToQueue(previousWaitlist []*PipelineJob, job *PipelineJob) []*PipelineJob {
850+
return append(previousWaitlist, job)
851+
}
807852

808-
switch pipelineDef.QueueStrategy {
853+
func getQueueStrategyImpl(queueStrategy definition.QueueStrategy) QueueStrategyImpl {
854+
switch queueStrategy {
809855
case definition.QueueStrategyAppend:
810-
if pipelineDef.QueueLimit != nil && *pipelineDef.QueueLimit == 0 {
811-
// queue limit == 0 -> error -> NOTE: might be moved to config validation
812-
return scheduleActionErrNoQueue
813-
}
814-
if pipelineDef.QueueLimit != nil && len(waitList) >= *pipelineDef.QueueLimit {
815-
// queue full
816-
return scheduleActionErrQueueFull
817-
}
818-
819-
// queue not full -> append to queue
820-
return scheduleActionQueue
821-
856+
return QueueStrategyAppendImpl{}
822857
case definition.QueueStrategyReplace:
823-
if pipelineDef.QueueLimit != nil && *pipelineDef.QueueLimit == 0 {
824-
// queue limit == 0 -> error -> NOTE: might be moved to config validation
825-
return scheduleActionErrNoQueue
826-
}
827-
828-
if len(waitList) >= *pipelineDef.QueueLimit {
829-
// queue full -> replace last element
830-
return scheduleActionReplace
831-
}
832-
833-
// queue not full -> append to queue
834-
return scheduleActionQueue
858+
return QueueStrategyReplaceImpl{}
835859
}
836860

837-
// TODO: THIS CASE SHOULD NEVER HAPPEN !!!
838-
return scheduleActionErrQueueFull
861+
return nil
839862
}
840863

841864
func (r *PipelineRunner) isSchedulable(pipeline string) bool {
842-
action := r.resolveScheduleAction(pipeline)
865+
// TODO REPLACE ME!!!
866+
return true
867+
/*action := r.resolveScheduleAction(pipeline)
843868
switch action {
844869
case scheduleActionReplace:
845870
fallthrough
@@ -848,12 +873,14 @@ func (r *PipelineRunner) isSchedulable(pipeline string) bool {
848873
case scheduleActionStart:
849874
return true
850875
}
851-
return false
876+
return false*/
852877
}
853878

854879
type ScheduleOpts struct {
855880
Variables map[string]interface{}
856881
User string
882+
// for queue_strategy=partitioned_replace, the queue partition to use
883+
QueuePartition string
857884
}
858885

859886
func (r *PipelineRunner) initialLoadFromStore() error {

0 commit comments

Comments
 (0)