Skip to content

Commit c99cf28

Browse files
committed
prevent cancel to success trnsitions
1 parent 9ea19da commit c99cf28

File tree

2 files changed

+58
-5
lines changed

2 files changed

+58
-5
lines changed

dbos/system_database.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -697,11 +697,12 @@ type updateWorkflowOutcomeDBInput struct {
697697
tx pgx.Tx
698698
}
699699

700-
// Will evolve as we serialize all output and error types
700+
// updateWorkflowOutcome updates the status, output, and error of a workflow
701+
// Note that transitions from CANCELLED to SUCCESS or ERROR are forbidden
701702
func (s *sysDB) updateWorkflowOutcome(ctx context.Context, input updateWorkflowOutcomeDBInput) error {
702703
query := `UPDATE dbos.workflow_status
703704
SET status = $1, output = $2, error = $3, updated_at = $4, deduplication_id = NULL
704-
WHERE workflow_uuid = $5 AND NOT (status = $6 AND $1 = $7)`
705+
WHERE workflow_uuid = $5 AND NOT (status = $6 AND $1 in ($7, $8))`
705706

706707
outputString, err := serialize(input.output)
707708
if err != nil {
@@ -714,9 +715,9 @@ func (s *sysDB) updateWorkflowOutcome(ctx context.Context, input updateWorkflowO
714715
}
715716

716717
if input.tx != nil {
717-
_, err = input.tx.Exec(ctx, query, input.status, outputString, errorStr, time.Now().UnixMilli(), input.workflowID, WorkflowStatusCancelled, WorkflowStatusError)
718+
_, err = input.tx.Exec(ctx, query, input.status, outputString, errorStr, time.Now().UnixMilli(), input.workflowID, WorkflowStatusCancelled, WorkflowStatusSuccess, WorkflowStatusError)
718719
} else {
719-
_, err = s.pool.Exec(ctx, query, input.status, outputString, errorStr, time.Now().UnixMilli(), input.workflowID, WorkflowStatusCancelled, WorkflowStatusError)
720+
_, err = s.pool.Exec(ctx, query, input.status, outputString, errorStr, time.Now().UnixMilli(), input.workflowID, WorkflowStatusCancelled, WorkflowStatusSuccess, WorkflowStatusError)
720721
}
721722

722723
if err != nil {

dbos/workflows_test.go

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2942,7 +2942,7 @@ func TestWorkflowCancel(t *testing.T) {
29422942
}
29432943
RegisterWorkflow(dbosCtx, blockingWorkflow)
29442944

2945-
t.Run("TestWorkflowCancel", func(t *testing.T) {
2945+
t.Run("TestWorkflowCancelWithRecvError", func(t *testing.T) {
29462946
topic := "cancel-test-topic"
29472947

29482948
// Start the blocking workflow
@@ -2971,6 +2971,58 @@ func TestWorkflowCancel(t *testing.T) {
29712971
require.NoError(t, err, "failed to get workflow status")
29722972
assert.Equal(t, WorkflowStatusCancelled, status.Status, "expected workflow status to be WorkflowStatusCancelled")
29732973
})
2974+
2975+
t.Run("TestWorkflowCancelWithSuccess", func(t *testing.T) {
2976+
blockingEventNoError := NewEvent()
2977+
2978+
// Workflow that waits for an event, then calls Recv(). Does NOT return error when Recv times out
2979+
blockingWorkflowNoError := func(ctx DBOSContext, topic string) (string, error) {
2980+
// Wait for the event
2981+
blockingEventNoError.Wait()
2982+
Recv[string](ctx, topic, 5*time.Second)
2983+
// Ignore the error
2984+
return "", nil
2985+
}
2986+
RegisterWorkflow(dbosCtx, blockingWorkflowNoError)
2987+
2988+
topic := "cancel-no-error-test-topic"
2989+
2990+
// Start the blocking workflow
2991+
handle, err := RunWorkflow(dbosCtx, blockingWorkflowNoError, topic)
2992+
require.NoError(t, err, "failed to start blocking workflow")
2993+
2994+
// Cancel the workflow using DBOS.CancelWorkflow
2995+
err = CancelWorkflow(dbosCtx, handle.GetWorkflowID())
2996+
require.NoError(t, err, "failed to cancel workflow")
2997+
2998+
// Signal the event so the workflow can move on to Recv()
2999+
blockingEventNoError.Set()
3000+
3001+
// Check the return values of the workflow
3002+
// Because this is a direct handle it'll not return an error
3003+
result, err := handle.GetResult()
3004+
require.NoError(t, err, "expected no error from direct handle")
3005+
assert.Equal(t, "", result, "expected empty result from cancelled workflow")
3006+
3007+
// Now use a polling handle to get result -- observe the error
3008+
pollingHandle, err := RetrieveWorkflow[string](dbosCtx, handle.GetWorkflowID())
3009+
require.NoError(t, err, "failed to retrieve workflow with polling handle")
3010+
3011+
result, err = pollingHandle.GetResult()
3012+
require.Error(t, err, "expected error from cancelled workflow even when workflow returns success")
3013+
assert.Equal(t, "", result, "expected empty result from cancelled workflow")
3014+
3015+
// Check that we still get a DBOSError with AwaitedWorkflowCancelled code
3016+
// The gate prevents CANCELLED -> SUCCESS transition
3017+
var dbosErr *DBOSError
3018+
require.ErrorAs(t, err, &dbosErr, "expected error to be of type *DBOSError, got %T", err)
3019+
assert.Equal(t, AwaitedWorkflowCancelled, dbosErr.Code, "expected AwaitedWorkflowCancelled error code, got: %v", dbosErr.Code)
3020+
3021+
// Ensure the workflow status remains CANCELLED
3022+
status, err := handle.GetStatus()
3023+
require.NoError(t, err, "failed to get workflow status")
3024+
assert.Equal(t, WorkflowStatusCancelled, status.Status, "expected workflow status to remain WorkflowStatusCancelled due to gate")
3025+
})
29743026
}
29753027

29763028
var cancelAllBeforeBlockEvent = NewEvent()

0 commit comments

Comments
 (0)