Skip to content

Commit aa73eca

Browse files
committed
Fix worker stop behavior
1 parent 667052b commit aa73eca

File tree

8 files changed

+25
-19
lines changed

8 files changed

+25
-19
lines changed

backend/redis/redis_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ func createBackend() backend.Backend {
4343
panic(err)
4444
}
4545

46-
// Disable sticky workflow behavior for the test execution
4746
b, err := NewRedisBackend(address, user, password, 0, WithBlockTimeout(time.Millisecond*2))
4847
if err != nil {
4948
panic(err)

backend/test/e2e.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package test
22

33
import (
44
"context"
5+
"fmt"
56
"testing"
67
"time"
78

@@ -15,10 +16,8 @@ import (
1516

1617
func EndToEndBackendTest(t *testing.T, setup func() backend.Backend, teardown func(b backend.Backend)) {
1718
tests := []struct {
18-
name string
19-
workflow interface{}
20-
workflowInputs []interface{}
21-
f func(t *testing.T, ctx context.Context, c client.Client, w worker.Worker)
19+
name string
20+
f func(t *testing.T, ctx context.Context, c client.Client, w worker.Worker)
2221
}{
2322
{
2423
name: "SimpleWorkflow",
@@ -69,13 +68,18 @@ func EndToEndBackendTest(t *testing.T, setup func() backend.Backend, teardown fu
6968
t.Run(tt.name, func(t *testing.T) {
7069
b := setup()
7170
ctx := context.Background()
71+
ctx, cancel := context.WithCancel(ctx)
7272

7373
c := client.New(b)
7474
w := worker.New(b, &worker.DefaultWorkerOptions)
7575

7676
tt.f(t, ctx, c, w)
7777

78-
w.Stop()
78+
cancel()
79+
if err := w.WaitForCompletion(); err != nil {
80+
fmt.Println("Worker did not stop in time")
81+
t.FailNow()
82+
}
7983

8084
if teardown != nil {
8185
teardown(b)

internal/worker/activity.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import (
1616

1717
type ActivityWorker interface {
1818
Start(context.Context) error
19-
Stop() error
19+
WaitForCompletion() error
2020
}
2121

2222
type activityWorker struct {
@@ -61,7 +61,7 @@ func (aw *activityWorker) Start(ctx context.Context) error {
6161
return nil
6262
}
6363

64-
func (aw *activityWorker) Stop() error {
64+
func (aw *activityWorker) WaitForCompletion() error {
6565
aw.wg.Wait()
6666

6767
return nil

internal/worker/workflow.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import (
1616
type WorkflowWorker interface {
1717
Start(context.Context) error
1818

19-
Stop() error
19+
WaitForCompletion() error
2020
}
2121

2222
type workflowWorker struct {
@@ -64,7 +64,7 @@ func (ww *workflowWorker) Start(ctx context.Context) error {
6464
return nil
6565
}
6666

67-
func (ww *workflowWorker) Stop() error {
67+
func (ww *workflowWorker) WaitForCompletion() error {
6868
ww.wg.Wait()
6969

7070
return nil

samples/simple/simple.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func main() {
3434

3535
cancel()
3636

37-
if err := w.Stop(); err != nil {
37+
if err := w.WaitForCompletion(); err != nil {
3838
panic("could not stop worker" + err.Error())
3939
}
4040
}

samples/subworkflow/subworkflow.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ func main() {
3636

3737
cancel()
3838

39-
if err := w.Stop(); err != nil {
39+
if err := w.WaitForCompletion(); err != nil {
4040
panic("could not stop worker" + err.Error())
4141
}
4242
}

samples/timer/timer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ func main() {
3232
startWorkflow(ctx, c)
3333

3434
cancel()
35-
w.Stop()
35+
w.WaitForCompletion()
3636
}
3737

3838
func startWorkflow(ctx context.Context, c client.Client) {

worker/worker.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,14 @@ type Registry interface {
2727
type Worker interface {
2828
Registry
2929

30-
// Start starts the worker
30+
// Start starts the worker.
31+
//
32+
// To stop the worker, cancel the context passed to Start. To wait for completion of the active
33+
// work items, call `WaitForCompletion`.
3134
Start(ctx context.Context) error
3235

33-
// Stop stops the worker and waits for in-progress work to complete
34-
Stop() error
36+
// WaitForCompletion
37+
WaitForCompletion() error
3538
}
3639

3740
type worker struct {
@@ -83,12 +86,12 @@ func (w *worker) Start(ctx context.Context) error {
8386
return nil
8487
}
8588

86-
func (w *worker) Stop() error {
87-
if err := w.workflowWorker.Stop(); err != nil {
89+
func (w *worker) WaitForCompletion() error {
90+
if err := w.workflowWorker.WaitForCompletion(); err != nil {
8891
return err
8992
}
9093

91-
if err := w.activityWorker.Stop(); err != nil {
94+
if err := w.activityWorker.WaitForCompletion(); err != nil {
9295
return err
9396
}
9497

0 commit comments

Comments
 (0)