Skip to content

Commit 787a233

Browse files
committed
further simplify notifying of workers
1 parent 7db8649 commit 787a233

File tree

2 files changed

+11
-28
lines changed

2 files changed

+11
-28
lines changed

backend/monoprocess/monoprocess.go

Lines changed: 9 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,11 @@ type monoprocessBackend struct {
2929
// worked on. Note that only one worker will be notified.
3030
// IMPORTANT: Only use this backend if the backend and worker are running in the
3131
// same process.
32-
func NewMonoprocessBackend(b backend.Backend, signalBufferSize int, signalTimeout time.Duration) *monoprocessBackend {
33-
if signalTimeout <= 0 {
34-
signalTimeout = time.Second // default
35-
}
32+
func NewMonoprocessBackend(b backend.Backend) *monoprocessBackend {
3633
mb := &monoprocessBackend{
3734
Backend: b,
38-
workflowSignal: make(chan struct{}, signalBufferSize),
39-
activitySignal: make(chan struct{}, signalBufferSize),
40-
signalTimeout: signalTimeout,
35+
workflowSignal: make(chan struct{}, 1),
36+
activitySignal: make(chan struct{}, 1),
4137
logger: b.Logger(),
4238
}
4339
return mb
@@ -165,32 +161,20 @@ func (b *monoprocessBackend) SignalWorkflow(ctx context.Context, instanceID stri
165161
return nil
166162
}
167163

168-
func (b *monoprocessBackend) notifyActivityWorker(ctx context.Context) bool {
169-
ctx, cancel := context.WithTimeout(ctx, b.signalTimeout)
170-
defer cancel()
164+
func (b *monoprocessBackend) notifyActivityWorker(ctx context.Context) {
171165
select {
172-
case <-ctx.Done():
173-
// we didn't manage to notify the worker that there is a new task, it
174-
// will pick it up after the poll timeout
175-
b.logger.DebugContext(ctx, "failed to signal activity task to worker", "reason", ctx.Err())
176-
return false
177166
case b.activitySignal <- struct{}{}:
178167
b.logger.DebugContext(ctx, "signalled a new activity task to worker")
179-
return true
168+
default:
169+
// the signal channel already contains a signal, no need to add another
180170
}
181171
}
182172

183-
func (b *monoprocessBackend) notifyWorkflowWorker(ctx context.Context) bool {
184-
ctx, cancel := context.WithTimeout(ctx, b.signalTimeout)
185-
defer cancel()
173+
func (b *monoprocessBackend) notifyWorkflowWorker(ctx context.Context) {
186174
select {
187-
case <-ctx.Done():
188-
// we didn't manage to notify the worker that there is a new task, it
189-
// will pick it up after the poll timeout
190-
b.logger.DebugContext(ctx, "failed to signal workflow task to worker", "reason", ctx.Err())
191-
return false
192175
case b.workflowSignal <- struct{}{}:
193176
b.logger.DebugContext(ctx, "signalled a new workflow task to worker")
194-
return true
177+
default:
178+
// the signal channel already contains a signal, no need to add another
195179
}
196180
}

backend/monoprocess/monoprocess_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"errors"
66
"testing"
7-
"time"
87

98
"github.com/cschleiden/go-workflows/backend"
109
"github.com/cschleiden/go-workflows/backend/sqlite"
@@ -21,7 +20,7 @@ func Test_MonoprocessBackend(t *testing.T) {
2120
// Disable sticky workflow behavior for the test execution
2221
options = append(options, backend.WithStickyTimeout(0))
2322

24-
return NewMonoprocessBackend(sqlite.NewInMemoryBackend(options...), 0, time.Millisecond)
23+
return NewMonoprocessBackend(sqlite.NewInMemoryBackend(options...))
2524
}, nil)
2625
}
2726

@@ -34,7 +33,7 @@ func Test_EndToEndMonoprocessBackend(t *testing.T) {
3433
// Disable sticky workflow behavior for the test execution
3534
options = append(options, backend.WithStickyTimeout(0))
3635

37-
return NewMonoprocessBackend(sqlite.NewInMemoryBackend(options...), 0, 0)
36+
return NewMonoprocessBackend(sqlite.NewInMemoryBackend(options...))
3837
}, nil)
3938
}
4039

0 commit comments

Comments
 (0)