Skip to content

Commit 12041b7

Browse files
authored
Parallel lock nil if not set (#41)
Update the worker parallel lock to be nil if not set. This saves memory when no concurrency limit is set. Signed-off-by: joshvanl <[email protected]>
1 parent 04f65d7 commit 12041b7

File tree

2 files changed

+26
-15
lines changed

2 files changed

+26
-15
lines changed

backend/worker.go

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -34,31 +34,35 @@ type worker[T WorkItem] struct {
3434
type NewTaskWorkerOptions func(*WorkerOptions)
3535

3636
type WorkerOptions struct {
37-
MaxParallelWorkItems int32
37+
MaxParallelWorkItems *int32
3838
}
3939

4040
func NewWorkerOptions() *WorkerOptions {
41-
return &WorkerOptions{
42-
MaxParallelWorkItems: 1,
43-
}
41+
return &WorkerOptions{}
4442
}
4543

4644
func WithMaxParallelism(n int32) NewTaskWorkerOptions {
4745
return func(o *WorkerOptions) {
48-
o.MaxParallelWorkItems = n
46+
o.MaxParallelWorkItems = &n
4947
}
5048
}
5149

5250
func NewTaskWorker[T WorkItem](p TaskProcessor[T], logger Logger, opts ...NewTaskWorkerOptions) TaskWorker[T] {
53-
options := &WorkerOptions{MaxParallelWorkItems: 1}
51+
options := &WorkerOptions{}
5452
for _, configure := range opts {
5553
configure(options)
5654
}
55+
56+
var parallelLock chan struct{}
57+
if options.MaxParallelWorkItems != nil {
58+
parallelLock = make(chan struct{}, *options.MaxParallelWorkItems)
59+
}
60+
5761
return &worker[T]{
5862
processor: p,
5963
logger: logger,
6064
workItems: make(chan T),
61-
parallelLock: make(chan struct{}, options.MaxParallelWorkItems),
65+
parallelLock: parallelLock,
6266
closeCh: make(chan struct{}),
6367
}
6468
}
@@ -87,15 +91,20 @@ func (w *worker[T]) Start(ctx context.Context) {
8791
defer w.logger.Infof("%v: worker stopped", w.Name())
8892

8993
for {
90-
select {
91-
case w.parallelLock <- struct{}{}:
92-
case <-ctx.Done():
93-
return
94+
95+
if w.parallelLock != nil {
96+
select {
97+
case w.parallelLock <- struct{}{}:
98+
case <-ctx.Done():
99+
return
100+
}
94101
}
95102

96103
wi, err := w.processor.NextWorkItem(ctx)
97104
if err != nil {
98-
<-w.parallelLock
105+
if w.parallelLock != nil {
106+
<-w.parallelLock
107+
}
99108

100109
if ctx.Err() != nil {
101110
return
@@ -108,7 +117,9 @@ func (w *worker[T]) Start(ctx context.Context) {
108117
w.wg.Add(1)
109118
go func() {
110119
defer func() {
111-
<-w.parallelLock
120+
if w.parallelLock != nil {
121+
<-w.parallelLock
122+
}
112123
w.wg.Done()
113124
}()
114125
w.processWorkItem(ctx, wi)

tests/worker_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ func Test_TaskWorker(t *testing.T) {
173173
}
174174
tp.AddWorkItems(first, second)
175175

176-
worker := backend.NewTaskWorker[*backend.ActivityWorkItem](tp, logger)
176+
worker := backend.NewTaskWorker[*backend.ActivityWorkItem](tp, logger, backend.WithMaxParallelism(1))
177177

178178
worker.Start(ctx)
179179

@@ -220,7 +220,7 @@ func Test_StartAndStop(t *testing.T) {
220220
}
221221
tp.AddWorkItems(&first, &second)
222222

223-
worker := backend.NewTaskWorker[*backend.ActivityWorkItem](tp, logger)
223+
worker := backend.NewTaskWorker[*backend.ActivityWorkItem](tp, logger, backend.WithMaxParallelism(1))
224224

225225
worker.Start(ctx)
226226

0 commit comments

Comments
 (0)