-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathorchestrator.go
More file actions
113 lines (96 loc) · 2.27 KB
/
orchestrator.go
File metadata and controls
113 lines (96 loc) · 2.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package patron
import (
"context"
"sync"
)
type WorkerOrchestrator interface {
GetQueueLength() int
AddJobToQueue(job *Job)
Start(context.Context) []WorkerResult
}
type Config struct {
WorkerCount int
WorkerFunc func(job *Job) error
}
type workerOrchestrator struct {
workerCount int
workerFunc func(job *Job) error
jobs []*Job
mu sync.Mutex
}
// New creates a new worker orchestrator.
func New(conf Config) WorkerOrchestrator {
return &workerOrchestrator{
workerCount: conf.WorkerCount,
workerFunc: conf.WorkerFunc,
jobs: make([]*Job, 0),
}
}
// AddJobToQueue adds a new job to the queue.
func (wo *workerOrchestrator) AddJobToQueue(job *Job) {
wo.mu.Lock()
defer wo.mu.Unlock()
wo.jobs = append(wo.jobs, job)
}
// GetQueueLength returns the length of the job queue.
func (wo *workerOrchestrator) GetQueueLength() int {
wo.mu.Lock()
defer wo.mu.Unlock()
return len(wo.jobs)
}
// Start starts all the jobs in the queue using a worker pool.
func (wo *workerOrchestrator) Start(ctx context.Context) []WorkerResult {
// If no jobs, return empty
wo.mu.Lock()
jobCount := len(wo.jobs)
jobs := wo.jobs
wo.mu.Unlock()
if jobCount == 0 {
return nil
}
jobsCh := make(chan *Job, jobCount)
resultsCh := make(chan WorkerResult, jobCount)
var wg sync.WaitGroup
// Push all jobs to channel
for _, job := range jobs {
jobsCh <- job
}
close(jobsCh)
// Start workers
// If jobCount < workerCount, we only need jobCount workers (optimization)
workersToStart := wo.workerCount
if jobsChLen := len(jobs); jobsChLen < workersToStart {
// workersToStart = jobsChLen // Optional optimization, but maybe overhead to logic
}
for i := 0; i < workersToStart; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for job := range jobsCh {
select {
case <-ctx.Done():
return
default:
err := wo.workerFunc(job)
resultsCh <- WorkerResult{
WorkerID: workerID,
JobID: job.ID,
Error: err,
}
}
}
}(i)
}
wg.Wait()
close(resultsCh)
var results []WorkerResult
for res := range resultsCh {
results = append(results, res)
}
// Clear the queue after processing?
// The original implementation seemed to consume them.
wo.mu.Lock()
wo.jobs = nil
wo.mu.Unlock()
return results
}