Skip to content

Commit 3ebbf9f

Browse files
committed
add test for running workflows in goroutines
1 parent 6d38d3f commit 3ebbf9f

File tree

1 file changed

+69
-0
lines changed

1 file changed

+69
-0
lines changed

dbos/workflows_test.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,12 @@ func simpleStep(_ context.Context) (string, error) {
4444
return "from step", nil
4545
}
4646

47+
func concurrentSimpleWorkflow(dbosCtx DBOSContext, input int) (int, error) {
48+
return RunAsStep(dbosCtx, func(ctx context.Context) (int, error) {
49+
return input * 2, nil
50+
})
51+
}
52+
4753
func simpleStepError(_ context.Context) (string, error) {
4854
return "", fmt.Errorf("step failure")
4955
}
@@ -2696,3 +2702,66 @@ func TestWorkflowTimeout(t *testing.T) {
26962702
}
26972703
})
26982704
}
2705+
2706+
func TestConcurrentWorkflows(t *testing.T) {
2707+
dbosCtx := setupDBOS(t, true, true)
2708+
RegisterWorkflow(dbosCtx, concurrentSimpleWorkflow)
2709+
2710+
t.Run("SimpleWorkflow", func(t *testing.T) {
2711+
const numGoroutines = 100
2712+
var wg sync.WaitGroup
2713+
results := make(chan int, numGoroutines)
2714+
errors := make(chan error, numGoroutines)
2715+
2716+
wg.Add(numGoroutines)
2717+
for i := range numGoroutines {
2718+
go func(input int) {
2719+
defer wg.Done()
2720+
handle, err := RunAsWorkflow(dbosCtx, concurrentSimpleWorkflow, input)
2721+
if err != nil {
2722+
errors <- fmt.Errorf("failed to start workflow %d: %w", input, err)
2723+
return
2724+
}
2725+
result, err := handle.GetResult()
2726+
if err != nil {
2727+
errors <- fmt.Errorf("failed to get result for workflow %d: %w", input, err)
2728+
return
2729+
}
2730+
results <- result
2731+
}(i)
2732+
}
2733+
2734+
wg.Wait()
2735+
close(results)
2736+
close(errors)
2737+
2738+
if len(errors) > 0 {
2739+
for err := range errors {
2740+
t.Errorf("Workflow error: %v", err)
2741+
}
2742+
t.Fatalf("Expected no errors from concurrent workflows, got %d errors", len(errors))
2743+
}
2744+
2745+
resultCount := 0
2746+
receivedResults := make(map[int]bool)
2747+
for result := range results {
2748+
resultCount++
2749+
if result < 0 || result >= numGoroutines*2 || result%2 != 0 {
2750+
t.Errorf("Unexpected result %d", result)
2751+
} else {
2752+
receivedResults[result] = true
2753+
}
2754+
}
2755+
2756+
if resultCount != numGoroutines {
2757+
t.Fatalf("Expected %d results, got %d", numGoroutines, resultCount)
2758+
}
2759+
2760+
for i := range numGoroutines {
2761+
expectedResult := i * 2
2762+
if !receivedResults[expectedResult] {
2763+
t.Errorf("Expected result %d not found", expectedResult)
2764+
}
2765+
}
2766+
})
2767+
}

0 commit comments

Comments
 (0)