Skip to content

Commit 84c60d8

Browse files
authored
Close the test dispatcher when completing (#1117)
Currently, a test workflow like this will not ever run the code after the goroutine's sleep (correct), nor the code in the defer (incorrect, production would run it): ``` func(ctx workflow.Context) error { workflow.Go(func(ctx workflow.Context) { defer func() { fmt.Println("in defer") }() workflow.Sleep(ctx, time.Hour) // wait longer than the workflow lives fmt.Println("after sleep") }) workflow.Sleep(ctx, time.Minute) // just to make sure the goroutine starts return nil } ``` The workflow will correctly end, but since the dispatcher was never closed, any not-yet-complete goroutines would never exit, and we'd leak goroutines. Semantically this should be a safe change: - Any post-complete decisions would not be executed or recorded, and this retains that. - When panicking, anything that would be *recorded* in a defer will not be recorded, so no replay-state-aware user code should be affected. And any code that ignores replay state will now execute like it should, where before it would not. So safe / correct code should be unaffected, leaks should be reduced, and latent mistakes should now cause errors. AFAICT - I'm not sure how complete our tests are here :) There's some room for in-defer code to be semantically incorrect in tests without this fix, (e.g. testing custom logger/metric impls in defers), though I expect those to be very rare bordering on nonexistent. But for the most part I expect that people will not notice this change, they'll just have fewer goroutine leaks during tests (so e.g. https://github.com/uber-go/goleak users will be happy). --- Prior to this fix, the added test fails with: ``` === RUN TestWorkflowUnitTest/Test_StaleGoroutinesAreShutDown internal_workflow_test.go:1210: Error Trace: internal_workflow_test.go:1210 Error: deferred func should have been called within 1 second Test: TestWorkflowUnitTest/Test_StaleGoroutinesAreShutDown internal_workflow_test.go:1216: code after sleep correctly not executed ``` Now it passes with this, which also shows it's not slowing tests down in any meaningful way: ``` === RUN TestWorkflowUnitTest/Test_StaleGoroutinesAreShutDown internal_workflow_test.go:1210: deferred callback executed after 9.177µs internal_workflow_test.go:1217: code after sleep correctly not executed ```
1 parent 9f2fda0 commit 84c60d8

File tree

2 files changed

+38
-0
lines changed

2 files changed

+38
-0
lines changed

internal/internal_workflow_test.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1181,6 +1181,43 @@ func (s *WorkflowUnitTest) Test_WaitGroupWorkflowTest() {
11811181
s.Equal(n, total)
11821182
}
11831183

1184+
func (s *WorkflowUnitTest) Test_StaleGoroutinesAreShutDown() {
1185+
env := s.NewTestWorkflowEnvironment()
1186+
deferred := make(chan struct{})
1187+
after := make(chan struct{})
1188+
wf := func(ctx Context) error {
1189+
Go(ctx, func(ctx Context) {
1190+
defer func() { close(deferred) }()
1191+
_ = Sleep(ctx, time.Hour) // outlive the workflow
1192+
close(after)
1193+
})
1194+
_ = Sleep(ctx, time.Minute)
1195+
return nil
1196+
}
1197+
env.RegisterWorkflow(wf)
1198+
1199+
env.ExecuteWorkflow(wf)
1200+
s.True(env.IsWorkflowCompleted())
1201+
s.NoError(env.GetWorkflowError())
1202+
1203+
// goroutines are shut down async at the moment, so wait with a timeout.
1204+
// give it up to 1s total.
1205+
1206+
started := time.Now()
1207+
maxWait := time.NewTimer(time.Second)
1208+
defer maxWait.Stop()
1209+
select {
1210+
case <-deferred: s.T().Logf("deferred callback executed after %v", time.Now().Sub(started))
1211+
case <-maxWait.C: s.Fail("deferred func should have been called within 1 second")
1212+
}
1213+
// if deferred code has run, this has already occurred-or-not.
1214+
// if it timed out waiting for the deferred code, it has waited long enough, and this is mostly a curiosity.
1215+
select {
1216+
case <-after: s.Fail("code after sleep should not have run")
1217+
default: s.T().Log("code after sleep correctly not executed")
1218+
}
1219+
}
1220+
11841221
var _ WorkflowInterceptorFactory = (*tracingInterceptorFactory)(nil)
11851222

11861223
type tracingInterceptorFactory struct {

internal/internal_workflow_testsuite.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -818,6 +818,7 @@ func (env *testWorkflowEnvironmentImpl) Complete(result []byte, err error) {
818818
env.logger.Debug("Workflow already completed.")
819819
return
820820
}
821+
env.workflowDef.Close()
821822
if _, ok := err.(*CanceledError); ok && env.workflowCancelHandler != nil {
822823
env.workflowCancelHandler()
823824
}

0 commit comments

Comments
 (0)