Skip to content

Commit 2a42c1f

Browse files
authored
Merge pull request #178 from cschleiden/cschleiden/fix-worker-stop
Change how workers wait for completion
2 parents d2b4631 + 2fde408 commit 2a42c1f

File tree

5 files changed

+31
-21
lines changed

5 files changed

+31
-21
lines changed

backend/sqlite/sqlite_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,21 @@ import (
1111
)
1212

1313
func Test_SqliteBackend(t *testing.T) {
14+
if testing.Short() {
15+
t.Skip()
16+
}
17+
1418
test.BackendTest(t, func() test.TestBackend {
1519
// Disable sticky workflow behavior for the test execution
1620
return NewInMemoryBackend(backend.WithStickyTimeout(0))
1721
}, nil)
1822
}
1923

2024
func Test_EndToEndSqliteBackend(t *testing.T) {
25+
if testing.Short() {
26+
t.Skip()
27+
}
28+
2129
test.EndToEndBackendTest(t, func() test.TestBackend {
2230
// Disable sticky workflow behavior for the test execution
2331
return NewInMemoryBackend(backend.WithStickyTimeout(0))

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ require (
6767
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
6868
github.com/esimonov/ifshort v1.0.4 // indirect
6969
github.com/ettle/strcase v0.1.1 // indirect
70-
github.com/fatih/color v1.14.1 // indirect
70+
github.com/fatih/color v1.14.1
7171
github.com/fatih/structtag v1.2.0 // indirect
7272
github.com/fsnotify/fsnotify v1.5.4 // indirect
7373
github.com/fzipp/gocyclo v0.6.0 // indirect

go.sum

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -140,8 +140,6 @@ github.com/esimonov/ifshort v1.0.4 h1:6SID4yGWfRae/M7hkVDVVyppy8q/v9OuxNdmjLQStB
140140
github.com/esimonov/ifshort v1.0.4/go.mod h1:Pe8zjlRrJ80+q2CxHLfEOfTwxCZ4O+MuhcHcfgNWTk0=
141141
github.com/ettle/strcase v0.1.1 h1:htFueZyVeE1XNnMEfbqp5r67qAN/4r6ya1ysq8Q+Zcw=
142142
github.com/ettle/strcase v0.1.1/go.mod h1:hzDLsPC7/lwKyBOywSHEP89nt2pDgdy+No1NBA9o9VY=
143-
github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w=
144-
github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk=
145143
github.com/fatih/color v1.14.1 h1:qfhVLaG5s+nCROl1zJsZRxFeYrHLqWroPOQ8BWiNb4w=
146144
github.com/fatih/color v1.14.1/go.mod h1:2oHN61fhTpgcxD3TSWCgKDiH1+x4OiDVVGH8WlgGZGg=
147145
github.com/fatih/structtag v1.2.0 h1:/OdNE99OxoI/PqaW/SuSK9uxxT3f/tcSZgon/ssNSx4=
@@ -390,12 +388,8 @@ github.com/matoous/godox v0.0.0-20210227103229-6504466cf951 h1:pWxk9e//NbPwfxat7
390388
github.com/matoous/godox v0.0.0-20210227103229-6504466cf951/go.mod h1:1BELzlh859Sh1c6+90blK8lbYy0kwQf1bYlBhBysy1s=
391389
github.com/matryer/is v1.4.0 h1:sosSmIWwkYITGrxZ25ULNDeKiMNzFSr4V/eqBQP0PeE=
392390
github.com/matryer/is v1.4.0/go.mod h1:8I/i5uYgLzgsgEloJE1U6xx5HkBQpAZvepWuujKwMRU=
393-
github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
394391
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
395392
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
396-
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
397-
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
398-
github.com/mattn/go-isatty v0.0.16 h1:bq3VjFmv/sOjHtdEhmkEV4x1AJtvUvOJ2PFAZ5+peKQ=
399393
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
400394
github.com/mattn/go-isatty v0.0.17 h1:BTarxUcIeDqL27Mc+vyvdWYSL28zpIhv3RoTdsLMPng=
401395
github.com/mattn/go-isatty v0.0.17/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
@@ -757,7 +751,6 @@ golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7w
757751
golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
758752
golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
759753
golang.org/x/sys v0.0.0-20200113162924-86b910548bc1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
760-
golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
761754
golang.org/x/sys v0.0.0-20200122134326-e047566fdf82/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
762755
golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
763756
golang.org/x/sys v0.0.0-20200212091648-12a6c2dcc1e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -786,7 +779,6 @@ golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7w
786779
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
787780
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
788781
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
789-
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
790782
golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
791783
golang.org/x/sys v0.0.0-20211105183446-c75c47738b0c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
792784
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
@@ -796,8 +788,6 @@ golang.org/x/sys v0.0.0-20220702020025-31831981b65f/go.mod h1:oPkhp1MJrh7nUepCBc
796788
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
797789
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
798790
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
799-
golang.org/x/sys v0.0.0-20220915200043-7b5979e65e41 h1:ohgcoMbSofXygzo6AD2I1kz3BFmW1QArPYTtwEM3UXc=
800-
golang.org/x/sys v0.0.0-20220915200043-7b5979e65e41/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
801791
golang.org/x/sys v0.3.0 h1:w8ZOecv6NaNa/zC8944JTU3vz4u6Lagfk4RPQxv92NQ=
802792
golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
803793
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=

internal/worker/activity.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ type ActivityWorker struct {
2525
activityTaskQueue chan *task.Activity
2626
activityTaskExecutor activity.Executor
2727

28-
wg *sync.WaitGroup
28+
wg sync.WaitGroup
29+
pollersWg sync.WaitGroup
2930

3031
clock clock.Clock
3132
}
@@ -39,14 +40,14 @@ func NewActivityWorker(backend backend.Backend, registry *workflow.Registry, clo
3940
activityTaskQueue: make(chan *task.Activity),
4041
activityTaskExecutor: activity.NewExecutor(backend.Logger(), backend.Tracer(), backend.Converter(), registry),
4142

42-
wg: &sync.WaitGroup{},
43-
4443
clock: clock,
4544
}
4645
}
4746

4847
func (aw *ActivityWorker) Start(ctx context.Context) error {
49-
for i := 0; i <= aw.options.ActivityPollers; i++ {
48+
aw.pollersWg.Add(aw.options.ActivityPollers)
49+
50+
for i := 0; i < aw.options.ActivityPollers; i++ {
5051
go aw.runPoll(ctx)
5152
}
5253

@@ -56,14 +57,19 @@ func (aw *ActivityWorker) Start(ctx context.Context) error {
5657
}
5758

5859
func (aw *ActivityWorker) WaitForCompletion() error {
59-
close(aw.activityTaskQueue)
60+
// Wait for task pollers to finish
61+
aw.pollersWg.Wait()
6062

63+
// Wait for tasks to finish
6164
aw.wg.Wait()
65+
close(aw.activityTaskQueue)
6266

6367
return nil
6468
}
6569

6670
func (aw *ActivityWorker) runPoll(ctx context.Context) {
71+
defer aw.pollersWg.Done()
72+
6773
for {
6874
select {
6975
case <-ctx.Done():

internal/worker/workflow.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ type WorkflowWorker struct {
3131

3232
logger log.Logger
3333

34-
wg *sync.WaitGroup
34+
pollersWg sync.WaitGroup
35+
wg sync.WaitGroup
3536
}
3637

3738
func NewWorkflowWorker(backend backend.Backend, registry *workflow.Registry, options *Options) *WorkflowWorker {
@@ -53,13 +54,13 @@ func NewWorkflowWorker(backend backend.Backend, registry *workflow.Registry, opt
5354
cache: c,
5455

5556
logger: backend.Logger(),
56-
57-
wg: &sync.WaitGroup{},
5857
}
5958
}
6059

6160
func (ww *WorkflowWorker) Start(ctx context.Context) error {
62-
for i := 0; i <= ww.options.WorkflowPollers; i++ {
61+
ww.pollersWg.Add(ww.options.WorkflowPollers)
62+
63+
for i := 0; i < ww.options.WorkflowPollers; i++ {
6364
go ww.runPoll(ctx)
6465
}
6566

@@ -69,14 +70,19 @@ func (ww *WorkflowWorker) Start(ctx context.Context) error {
6970
}
7071

7172
func (ww *WorkflowWorker) WaitForCompletion() error {
72-
close(ww.workflowTaskQueue)
73+
// Wait for task pollers to finish
74+
ww.pollersWg.Wait()
7375

76+
// Wait for tasks to finish
7477
ww.wg.Wait()
78+
close(ww.workflowTaskQueue)
7579

7680
return nil
7781
}
7882

7983
func (ww *WorkflowWorker) runPoll(ctx context.Context) {
84+
defer ww.pollersWg.Done()
85+
8086
for {
8187
select {
8288
case <-ctx.Done():

0 commit comments

Comments
 (0)