Skip to content

Commit dc612b5

Browse files
committed
wait for workflows to complete when shutting down the instance + fix bug in test
1 parent 0653a1e commit dc612b5

File tree

3 files changed

+10
-1
lines changed

3 files changed

+10
-1
lines changed

dbos/dbos.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"log/slog"
1111
"net/url"
1212
"os"
13+
"sync"
1314
"time"
1415

1516
"github.com/robfig/cron/v3"
@@ -93,6 +94,7 @@ type executor struct {
9394
applicationVersion string
9495
applicationID string
9596
executorID string
97+
workflowsWg *sync.WaitGroup
9698
}
9799

98100
func Initialize(inputConfig Config) error {
@@ -101,7 +103,9 @@ func Initialize(inputConfig Config) error {
101103
return newInitializationError("DBOS already initialized")
102104
}
103105

104-
initExecutor := &executor{}
106+
initExecutor := &executor{
107+
workflowsWg: &sync.WaitGroup{},
108+
}
105109

106110
// Load & process the configuration
107111
config, err := processConfig(&inputConfig)
@@ -204,6 +208,7 @@ func Shutdown() {
204208
}
205209

206210
// XXX is there a way to ensure all workflows goroutine are done before closing?
211+
dbos.workflowsWg.Wait()
207212

208213
// Cancel the context to stop the queue runner
209214
if dbos.queueRunnerCancelFunc != nil {

dbos/workflow.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -517,7 +517,9 @@ func runAsWorkflow[P any, R any](ctx context.Context, fn WorkflowFunc[P, R], inp
517517

518518
// Run the function in a goroutine
519519
augmentUserContext := context.WithValue(ctx, workflowStateKey, wfState)
520+
dbos.workflowsWg.Add(1)
520521
go func() {
522+
defer dbos.workflowsWg.Done()
521523
result, err := fn(augmentUserContext, input)
522524
status := WorkflowStatusSuccess
523525
if err != nil {

dbos/workflows_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -788,6 +788,8 @@ func TestWorkflowDeadLetterQueue(t *testing.T) {
788788
t.Fatalf("expected DeadLetterQueueError, got %v", dbosErr.Code)
789789
}
790790

791+
// Unlock the workflow to allow it to complete
792+
deadLetterQueueEvent.Set()
791793
/*
792794
// TODO: test resume when implemented
793795
resumedHandle, err := ...

0 commit comments

Comments
 (0)