Skip to content

Commit 63e2c78

Browse files
committed
more concurrent tests
1 parent 3ebbf9f commit 63e2c78

File tree

1 file changed

+229
-1
lines changed

1 file changed

+229
-1
lines changed

dbos/workflows_test.go

Lines changed: 229 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2703,12 +2703,62 @@ func TestWorkflowTimeout(t *testing.T) {
27032703
})
27042704
}
27052705

2706+
func notificationWaiterWorkflow(ctx DBOSContext, pairID int) (string, error) {
2707+
result, err := GetEvent[string](ctx, WorkflowGetEventInput{
2708+
TargetWorkflowID: fmt.Sprintf("notification-setter-%d", pairID),
2709+
Key: "event-key",
2710+
Timeout: 10 * time.Second,
2711+
})
2712+
if err != nil {
2713+
return "", err
2714+
}
2715+
return result, nil
2716+
}
2717+
2718+
func notificationSetterWorkflow(ctx DBOSContext, pairID int) (string, error) {
2719+
err := SetEvent(ctx, WorkflowSetEventInputGeneric[string]{
2720+
Key: "event-key",
2721+
Message: fmt.Sprintf("notification-message-%d", pairID),
2722+
})
2723+
if err != nil {
2724+
return "", err
2725+
}
2726+
return "event-set", nil
2727+
}
2728+
2729+
func sendRecvReceiverWorkflow(ctx DBOSContext, pairID int) (string, error) {
2730+
result, err := Recv[string](ctx, WorkflowRecvInput{
2731+
Topic: "send-recv-topic",
2732+
Timeout: 10 * time.Second,
2733+
})
2734+
if err != nil {
2735+
return "", err
2736+
}
2737+
return result, nil
2738+
}
2739+
2740+
func sendRecvSenderWorkflow(ctx DBOSContext, pairID int) (string, error) {
2741+
err := Send(ctx, WorkflowSendInput[string]{
2742+
DestinationID: fmt.Sprintf("send-recv-receiver-%d", pairID),
2743+
Topic: "send-recv-topic",
2744+
Message: fmt.Sprintf("send-recv-message-%d", pairID),
2745+
})
2746+
if err != nil {
2747+
return "", err
2748+
}
2749+
return "message-sent", nil
2750+
}
2751+
27062752
func TestConcurrentWorkflows(t *testing.T) {
27072753
dbosCtx := setupDBOS(t, true, true)
27082754
RegisterWorkflow(dbosCtx, concurrentSimpleWorkflow)
2755+
RegisterWorkflow(dbosCtx, notificationWaiterWorkflow)
2756+
RegisterWorkflow(dbosCtx, notificationSetterWorkflow)
2757+
RegisterWorkflow(dbosCtx, sendRecvReceiverWorkflow)
2758+
RegisterWorkflow(dbosCtx, sendRecvSenderWorkflow)
27092759

27102760
t.Run("SimpleWorkflow", func(t *testing.T) {
2711-
const numGoroutines = 100
2761+
const numGoroutines = 500
27122762
var wg sync.WaitGroup
27132763
results := make(chan int, numGoroutines)
27142764
errors := make(chan error, numGoroutines)
@@ -2764,4 +2814,182 @@ func TestConcurrentWorkflows(t *testing.T) {
27642814
}
27652815
}
27662816
})
2817+
2818+
t.Run("NotificationWorkflows", func(t *testing.T) {
2819+
const numPairs = 500
2820+
var wg sync.WaitGroup
2821+
waiterResults := make(chan string, numPairs)
2822+
setterResults := make(chan string, numPairs)
2823+
errors := make(chan error, numPairs*2)
2824+
2825+
wg.Add(numPairs * 2)
2826+
2827+
for i := range numPairs {
2828+
go func(pairID int) {
2829+
defer wg.Done()
2830+
handle, err := RunAsWorkflow(dbosCtx, notificationSetterWorkflow, pairID, WithWorkflowID(fmt.Sprintf("notification-setter-%d", pairID)))
2831+
if err != nil {
2832+
errors <- fmt.Errorf("failed to start setter workflow %d: %w", pairID, err)
2833+
return
2834+
}
2835+
result, err := handle.GetResult()
2836+
if err != nil {
2837+
errors <- fmt.Errorf("failed to get result for setter workflow %d: %w", pairID, err)
2838+
return
2839+
}
2840+
setterResults <- result
2841+
}(i)
2842+
2843+
go func(pairID int) {
2844+
defer wg.Done()
2845+
handle, err := RunAsWorkflow(dbosCtx, notificationWaiterWorkflow, pairID)
2846+
if err != nil {
2847+
errors <- fmt.Errorf("failed to start waiter workflow %d: %w", pairID, err)
2848+
return
2849+
}
2850+
result, err := handle.GetResult()
2851+
if err != nil {
2852+
errors <- fmt.Errorf("failed to get result for waiter workflow %d: %w", pairID, err)
2853+
return
2854+
}
2855+
expectedMessage := fmt.Sprintf("notification-message-%d", pairID)
2856+
if result != expectedMessage {
2857+
errors <- fmt.Errorf("waiter workflow %d: expected message '%s', got '%s'", pairID, expectedMessage, result)
2858+
return
2859+
}
2860+
waiterResults <- result
2861+
}(i)
2862+
}
2863+
2864+
wg.Wait()
2865+
close(waiterResults)
2866+
close(setterResults)
2867+
close(errors)
2868+
2869+
if len(errors) > 0 {
2870+
for err := range errors {
2871+
t.Errorf("Workflow error: %v", err)
2872+
}
2873+
t.Fatalf("Expected no errors from notification workflows, got %d errors", len(errors))
2874+
}
2875+
2876+
waiterCount := 0
2877+
receivedWaiterResults := make(map[string]bool)
2878+
for result := range waiterResults {
2879+
waiterCount++
2880+
receivedWaiterResults[result] = true
2881+
}
2882+
2883+
setterCount := 0
2884+
for result := range setterResults {
2885+
setterCount++
2886+
if result != "event-set" {
2887+
t.Errorf("Expected setter result to be 'event-set', got '%s'", result)
2888+
}
2889+
}
2890+
2891+
if waiterCount != numPairs {
2892+
t.Fatalf("Expected %d waiter results, got %d", numPairs, waiterCount)
2893+
}
2894+
2895+
if setterCount != numPairs {
2896+
t.Fatalf("Expected %d setter results, got %d", numPairs, setterCount)
2897+
}
2898+
2899+
for i := range numPairs {
2900+
expectedWaiterResult := fmt.Sprintf("notification-message-%d", i)
2901+
if !receivedWaiterResults[expectedWaiterResult] {
2902+
t.Errorf("Expected waiter result '%s' not found", expectedWaiterResult)
2903+
}
2904+
}
2905+
})
2906+
2907+
t.Run("SendRecvWorkflows", func(t *testing.T) {
2908+
const numPairs = 500
2909+
var wg sync.WaitGroup
2910+
receiverResults := make(chan string, numPairs)
2911+
senderResults := make(chan string, numPairs)
2912+
errors := make(chan error, numPairs*2)
2913+
2914+
wg.Add(numPairs * 2)
2915+
2916+
for i := range numPairs {
2917+
go func(pairID int) {
2918+
defer wg.Done()
2919+
handle, err := RunAsWorkflow(dbosCtx, sendRecvReceiverWorkflow, pairID, WithWorkflowID(fmt.Sprintf("send-recv-receiver-%d", pairID)))
2920+
if err != nil {
2921+
errors <- fmt.Errorf("failed to start receiver workflow %d: %w", pairID, err)
2922+
return
2923+
}
2924+
result, err := handle.GetResult()
2925+
if err != nil {
2926+
errors <- fmt.Errorf("failed to get result for receiver workflow %d: %w", pairID, err)
2927+
return
2928+
}
2929+
expectedMessage := fmt.Sprintf("send-recv-message-%d", pairID)
2930+
if result != expectedMessage {
2931+
errors <- fmt.Errorf("receiver workflow %d: expected message '%s', got '%s'", pairID, expectedMessage, result)
2932+
return
2933+
}
2934+
receiverResults <- result
2935+
}(i)
2936+
2937+
go func(pairID int) {
2938+
defer wg.Done()
2939+
handle, err := RunAsWorkflow(dbosCtx, sendRecvSenderWorkflow, pairID)
2940+
if err != nil {
2941+
errors <- fmt.Errorf("failed to start sender workflow %d: %w", pairID, err)
2942+
return
2943+
}
2944+
result, err := handle.GetResult()
2945+
if err != nil {
2946+
errors <- fmt.Errorf("failed to get result for sender workflow %d: %w", pairID, err)
2947+
return
2948+
}
2949+
senderResults <- result
2950+
}(i)
2951+
}
2952+
2953+
wg.Wait()
2954+
close(receiverResults)
2955+
close(senderResults)
2956+
close(errors)
2957+
2958+
if len(errors) > 0 {
2959+
for err := range errors {
2960+
t.Errorf("Workflow error: %v", err)
2961+
}
2962+
t.Fatalf("Expected no errors from send/recv workflows, got %d errors", len(errors))
2963+
}
2964+
2965+
receiverCount := 0
2966+
receivedReceiverResults := make(map[string]bool)
2967+
for result := range receiverResults {
2968+
receiverCount++
2969+
receivedReceiverResults[result] = true
2970+
}
2971+
2972+
senderCount := 0
2973+
for result := range senderResults {
2974+
senderCount++
2975+
if result != "message-sent" {
2976+
t.Errorf("Expected sender result to be 'message-sent', got '%s'", result)
2977+
}
2978+
}
2979+
2980+
if receiverCount != numPairs {
2981+
t.Fatalf("Expected %d receiver results, got %d", numPairs, receiverCount)
2982+
}
2983+
2984+
if senderCount != numPairs {
2985+
t.Fatalf("Expected %d sender results, got %d", numPairs, senderCount)
2986+
}
2987+
2988+
for i := range numPairs {
2989+
expectedReceiverResult := fmt.Sprintf("send-recv-message-%d", i)
2990+
if !receivedReceiverResults[expectedReceiverResult] {
2991+
t.Errorf("Expected receiver result '%s' not found", expectedReceiverResult)
2992+
}
2993+
}
2994+
})
27672995
}

0 commit comments

Comments
 (0)