Skip to content

Commit 503d46c

Browse files
committed
add a test for steps cancellation status after a wf timeout
1 parent 168f469 commit 503d46c

File tree

2 files changed

+102
-15
lines changed

2 files changed

+102
-15
lines changed

dbos/utils_test.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,3 +168,22 @@ func queueEntriesAreCleanedUp(ctx DBOSContext) bool {
168168

169169
return success
170170
}
171+
172+
func checkWfStatus(ctx DBOSContext, expectedStatus WorkflowStatusType) (bool, error) {
173+
wfid, err := GetWorkflowID(ctx)
174+
if err != nil {
175+
return false, err
176+
}
177+
me, err := RetrieveWorkflow[string](ctx, wfid)
178+
if err != nil {
179+
return false, err
180+
}
181+
meStatus, err := me.GetStatus()
182+
if err != nil {
183+
return false, err
184+
}
185+
if meStatus.Status == expectedStatus {
186+
return true, nil
187+
}
188+
return false, nil
189+
}

dbos/workflows_test.go

Lines changed: 83 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1308,15 +1308,15 @@ func sendWorkflow(ctx DBOSContext, input sendWorkflowInput) (string, error) {
13081308
}
13091309

13101310
func receiveWorkflow(ctx DBOSContext, topic string) (string, error) {
1311-
msg1, err := Recv[string](ctx, topic, 10 * time.Second)
1311+
msg1, err := Recv[string](ctx, topic, 10*time.Second)
13121312
if err != nil {
13131313
return "", err
13141314
}
1315-
msg2, err := Recv[string](ctx, topic, 10 * time.Second)
1315+
msg2, err := Recv[string](ctx, topic, 10*time.Second)
13161316
if err != nil {
13171317
return "", err
13181318
}
1319-
msg3, err := Recv[string](ctx, topic, 10 * time.Second)
1319+
msg3, err := Recv[string](ctx, topic, 10*time.Second)
13201320
if err != nil {
13211321
return "", err
13221322
}
@@ -1335,7 +1335,7 @@ func receiveWorkflowCoordinated(ctx DBOSContext, input struct {
13351335
concurrentRecvStartEvent.Wait()
13361336

13371337
// Do a single Recv call with timeout
1338-
msg, err := Recv[string](ctx, input.Topic, 3 * time.Second)
1338+
msg, err := Recv[string](ctx, input.Topic, 3*time.Second)
13391339
if err != nil {
13401340
return "", err
13411341
}
@@ -1349,7 +1349,7 @@ func sendStructWorkflow(ctx DBOSContext, input sendWorkflowInput) (string, error
13491349
}
13501350

13511351
func receiveStructWorkflow(ctx DBOSContext, topic string) (sendRecvType, error) {
1352-
return Recv[sendRecvType](ctx, topic, 3 * time.Second)
1352+
return Recv[sendRecvType](ctx, topic, 3*time.Second)
13531353
}
13541354

13551355
func sendIdempotencyWorkflow(ctx DBOSContext, input sendWorkflowInput) (string, error) {
@@ -1362,7 +1362,7 @@ func sendIdempotencyWorkflow(ctx DBOSContext, input sendWorkflowInput) (string,
13621362
}
13631363

13641364
func receiveIdempotencyWorkflow(ctx DBOSContext, topic string) (string, error) {
1365-
msg, err := Recv[string](ctx, topic, 3 * time.Second)
1365+
msg, err := Recv[string](ctx, topic, 3*time.Second)
13661366
if err != nil {
13671367
// Unlock the test in this case
13681368
receiveIdempotencyStartEvent.Set()
@@ -1511,7 +1511,7 @@ func TestSendRecv(t *testing.T) {
15111511

15121512
t.Run("RecvMustRunInsideWorkflows", func(t *testing.T) {
15131513
// Attempt to run Recv outside of a workflow context
1514-
_, err := Recv[string](dbosCtx, "test-topic", 1 * time.Second)
1514+
_, err := Recv[string](dbosCtx, "test-topic", 1*time.Second)
15151515
require.Error(t, err, "expected error when running Recv outside of workflow context, but got none")
15161516

15171517
// Check the error type
@@ -1722,7 +1722,7 @@ func setEventWorkflow(ctx DBOSContext, input setEventWorkflowInput) (string, err
17221722
}
17231723

17241724
func getEventWorkflow(ctx DBOSContext, input setEventWorkflowInput) (string, error) {
1725-
result, err := GetEvent[string](ctx, input.Key, input.Message, 3 * time.Second)
1725+
result, err := GetEvent[string](ctx, input.Key, input.Message, 3*time.Second)
17261726
if err != nil {
17271727
return "", err
17281728
}
@@ -1759,7 +1759,7 @@ func setEventIdempotencyWorkflow(ctx DBOSContext, input setEventWorkflowInput) (
17591759
}
17601760

17611761
func getEventIdempotencyWorkflow(ctx DBOSContext, input setEventWorkflowInput) (string, error) {
1762-
result, err := GetEvent[string](ctx, input.Key, input.Message, 3 * time.Second)
1762+
result, err := GetEvent[string](ctx, input.Key, input.Message, 3*time.Second)
17631763
if err != nil {
17641764
return "", err
17651765
}
@@ -1969,7 +1969,7 @@ func TestSetGetEvent(t *testing.T) {
19691969
}
19701970

19711971
// Start a workflow that gets the event from outside the original workflow
1972-
message, err := GetEvent[string](dbosCtx, setHandle.GetWorkflowID(), "test-key", 3 * time.Second)
1972+
message, err := GetEvent[string](dbosCtx, setHandle.GetWorkflowID(), "test-key", 3*time.Second)
19731973
if err != nil {
19741974
t.Fatalf("failed to get event from outside workflow: %v", err)
19751975
}
@@ -1994,7 +1994,7 @@ func TestSetGetEvent(t *testing.T) {
19941994
t.Run("GetEventTimeout", func(t *testing.T) {
19951995
// Try to get an event from a non-existent workflow
19961996
nonExistentID := uuid.NewString()
1997-
message, err := GetEvent[string](dbosCtx, nonExistentID, "test-key", 3 * time.Second)
1997+
message, err := GetEvent[string](dbosCtx, nonExistentID, "test-key", 3*time.Second)
19981998
require.NoError(t, err, "failed to get event from non-existent workflow")
19991999
if message != "" {
20002000
t.Fatalf("expected empty result on timeout, got '%s'", message)
@@ -2008,7 +2008,7 @@ func TestSetGetEvent(t *testing.T) {
20082008
require.NoError(t, err, "failed to set event")
20092009
_, err = setHandle.GetResult()
20102010
require.NoError(t, err, "failed to get result from set event workflow")
2011-
message, err = GetEvent[string](dbosCtx, setHandle.GetWorkflowID(), "non-existent-key", 3 * time.Second)
2011+
message, err = GetEvent[string](dbosCtx, setHandle.GetWorkflowID(), "non-existent-key", 3*time.Second)
20122012
require.NoError(t, err, "failed to get event with non-existent key")
20132013
if message != "" {
20142014
t.Fatalf("expected empty result on timeout with non-existent key, got '%s'", message)
@@ -2155,7 +2155,7 @@ func TestSetGetEvent(t *testing.T) {
21552155
for range numGoroutines {
21562156
go func() {
21572157
defer wg.Done()
2158-
res, err := GetEvent[string](dbosCtx, setHandle.GetWorkflowID(), "concurrent-event-key", 10 * time.Second)
2158+
res, err := GetEvent[string](dbosCtx, setHandle.GetWorkflowID(), "concurrent-event-key", 10*time.Second)
21592159
if err != nil {
21602160
errors <- fmt.Errorf("failed to get event in goroutine: %v", err)
21612161
return
@@ -2260,6 +2260,18 @@ func TestWorkflowTimeout(t *testing.T) {
22602260
<-ctx.Done()
22612261
assert.True(t, errors.Is(ctx.Err(), context.Canceled) || errors.Is(ctx.Err(), context.DeadlineExceeded),
22622262
"workflow was cancelled, but context error is not context.Canceled nor context.DeadlineExceeded: %v", ctx.Err())
2263+
// The status of this workflow should transition to cancelled
2264+
maxtries := 10
2265+
for range maxtries {
2266+
isCancelled, err := checkWfStatus(ctx, WorkflowStatusCancelled)
2267+
if err != nil {
2268+
return "", err
2269+
}
2270+
if isCancelled {
2271+
break
2272+
}
2273+
time.Sleep(500 * time.Millisecond)
2274+
}
22632275
return "", ctx.Err()
22642276
}
22652277
RegisterWorkflow(dbosCtx, waitForCancelWorkflow)
@@ -2334,6 +2346,62 @@ func TestWorkflowTimeout(t *testing.T) {
23342346
assert.Equal(t, WorkflowStatusCancelled, status.Status, "expected workflow status to be WorkflowStatusCancelled")
23352347
})
23362348

2349+
waitForCancelWorkflowWithStepAfterCancel := func(ctx DBOSContext, _ string) (string, error) {
2350+
// Wait for cancellation
2351+
<-ctx.Done()
2352+
// Check that we have the correct cancellation error
2353+
if !errors.Is(ctx.Err(), context.Canceled) && !errors.Is(ctx.Err(), context.DeadlineExceeded) {
2354+
return "", fmt.Errorf("workflow was cancelled, but context error is not context.Canceled nor context.DeadlineExceeded: %v", ctx.Err())
2355+
}
2356+
// The status of this workflow should transition to cancelled
2357+
maxtries := 10
2358+
for range maxtries {
2359+
isCancelled, err := checkWfStatus(ctx, WorkflowStatusCancelled)
2360+
if err != nil {
2361+
return "", err
2362+
}
2363+
if isCancelled {
2364+
break
2365+
}
2366+
time.Sleep(500 * time.Millisecond)
2367+
}
2368+
2369+
// After cancellation, try to run a simple step
2370+
// This should return a WorkflowCancelled error
2371+
return RunAsStep(ctx, simpleStep)
2372+
}
2373+
RegisterWorkflow(dbosCtx, waitForCancelWorkflowWithStepAfterCancel)
2374+
2375+
t.Run("WorkflowWithStepAfterTimeout", func(t *testing.T) {
2376+
// Start a workflow that waits for cancellation then tries to run a step
2377+
cancelCtx, cancelFunc := WithTimeout(dbosCtx, 1*time.Millisecond)
2378+
defer cancelFunc() // Ensure we clean up the context
2379+
handle, err := RunAsWorkflow(cancelCtx, waitForCancelWorkflowWithStepAfterCancel, "wf-with-step-after-timeout")
2380+
require.NoError(t, err, "failed to start workflow with step after timeout")
2381+
2382+
// Wait for the workflow to complete and get the result
2383+
result, err := handle.GetResult()
2384+
fmt.Println(result)
2385+
// The workflow should return a WorkflowCancelled error from the step
2386+
require.Error(t, err, "expected error from workflow")
2387+
2388+
// Check if the error is a DBOSError with WorkflowCancelled code
2389+
var dbosErr *DBOSError
2390+
if errors.As(err, &dbosErr) {
2391+
assert.Equal(t, WorkflowCancelled, dbosErr.Code, "expected WorkflowCancelled error code, got: %v", dbosErr.Code)
2392+
} else {
2393+
// If not a DBOSError, check if it's a context error
2394+
assert.True(t, errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded),
2395+
"expected context.Canceled or context.DeadlineExceeded error, got: %v", err)
2396+
}
2397+
assert.Equal(t, "", result, "expected result to be an empty string")
2398+
2399+
// Check the workflow status: should be cancelled
2400+
status, err := handle.GetStatus()
2401+
require.NoError(t, err, "failed to get workflow status")
2402+
assert.Equal(t, WorkflowStatusCancelled, status.Status, "expected workflow status to be WorkflowStatusCancelled")
2403+
})
2404+
23372405
shorterStepTimeoutWorkflow := func(ctx DBOSContext, _ string) (string, error) {
23382406
// This workflow will run a step that has a shorter timeout than the workflow itself
23392407
// The timeout will trigger a step error, the workflow can do whatever it wants with that error
@@ -2519,7 +2587,7 @@ func TestWorkflowTimeout(t *testing.T) {
25192587
}
25202588

25212589
func notificationWaiterWorkflow(ctx DBOSContext, pairID int) (string, error) {
2522-
result, err := GetEvent[string](ctx, fmt.Sprintf("notification-setter-%d", pairID), "event-key", 10 * time.Second)
2590+
result, err := GetEvent[string](ctx, fmt.Sprintf("notification-setter-%d", pairID), "event-key", 10*time.Second)
25232591
if err != nil {
25242592
return "", err
25252593
}
@@ -2535,7 +2603,7 @@ func notificationSetterWorkflow(ctx DBOSContext, pairID int) (string, error) {
25352603
}
25362604

25372605
func sendRecvReceiverWorkflow(ctx DBOSContext, pairID int) (string, error) {
2538-
result, err := Recv[string](ctx, "send-recv-topic", 10 * time.Second)
2606+
result, err := Recv[string](ctx, "send-recv-topic", 10*time.Second)
25392607
if err != nil {
25402608
return "", err
25412609
}

0 commit comments

Comments
 (0)