Skip to content

Commit 83b8148

Browse files
committed
test conflicting execution of workflows / tasks of the same ID
1 parent cacb25c commit 83b8148

File tree

2 files changed

+205
-7
lines changed

2 files changed

+205
-7
lines changed

dbos/queues_test.go

Lines changed: 57 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"errors"
66
"fmt"
77
"os"
8+
"strings"
89
"sync/atomic"
910
"testing"
1011
"time"
@@ -23,6 +24,7 @@ This suite tests
2324
[x] worker concurrency (2 at a time across two "workers")
2425
[x] worker concurrency X recovery
2526
[x] rate limiter
27+
[x] conflicting workflow on different queues
2628
[] queue deduplication
2729
[] queue priority
2830
[x] queued workflow times out
@@ -48,6 +50,8 @@ func TestWorkflowQueues(t *testing.T) {
4850

4951
queue := NewWorkflowQueue(dbosCtx, "test-queue")
5052
dlqEnqueueQueue := NewWorkflowQueue(dbosCtx, "test-successive-enqueue-queue")
53+
conflictQueue1 := NewWorkflowQueue(dbosCtx, "conflict-queue-1")
54+
conflictQueue2 := NewWorkflowQueue(dbosCtx, "conflict-queue-2")
5155

5256
dlqStartEvent := NewEvent()
5357
dlqCompleteEvent := NewEvent()
@@ -172,14 +176,15 @@ func TestWorkflowQueues(t *testing.T) {
172176
}
173177
})
174178

175-
/* TODO: we will move queue registry in the new interface in a subsequent PR
176179
t.Run("DynamicRegistration", func(t *testing.T) {
177-
q := NewWorkflowQueue("dynamic-queue")
178-
if len(q.name) > 0 {
179-
t.Fatalf("expected nil queue for dynamic registration after DBOS initialization, got %v", q)
180-
}
180+
// Attempting to register a queue after launch should panic
181+
defer func() {
182+
if r := recover(); r == nil {
183+
t.Fatal("expected panic from queue registration after launch but got none")
184+
}
185+
}()
186+
NewWorkflowQueue(dbosCtx, "dynamic-queue")
181187
})
182-
*/
183188

184189
t.Run("QueueWorkflowDLQ", func(t *testing.T) {
185190
workflowID := "blocking-workflow-test"
@@ -255,6 +260,52 @@ func TestWorkflowQueues(t *testing.T) {
255260
t.Fatal("expected queue entries to be cleaned up after successive enqueues test")
256261
}
257262
})
263+
264+
t.Run("ConflictingWorkflowOnDifferentQueues", func(t *testing.T) {
265+
workflowID := "conflicting-workflow-id"
266+
267+
// Enqueue the same workflow ID on the first queue
268+
handle1, err := RunAsWorkflow(dbosCtx, queueWorkflow, "test-input-1", WithQueue(conflictQueue1.Name), WithWorkflowID(workflowID))
269+
if err != nil {
270+
t.Fatalf("failed to enqueue workflow on first queue: %v", err)
271+
}
272+
273+
// Get the result from the first workflow to ensure it completes
274+
result1, err := handle1.GetResult()
275+
if err != nil {
276+
t.Fatalf("failed to get result from first workflow: %v", err)
277+
}
278+
if result1 != "test-input-1" {
279+
t.Fatalf("expected 'test-input-1', got %v", result1)
280+
}
281+
282+
// Now try to enqueue the same workflow ID on a different queue
283+
// This should trigger a ConflictingWorkflowError
284+
_, err = RunAsWorkflow(dbosCtx, queueWorkflow, "test-input-2", WithQueue(conflictQueue2.Name), WithWorkflowID(workflowID))
285+
if err == nil {
286+
t.Fatal("expected ConflictingWorkflowError when enqueueing same workflow ID on different queue, but got none")
287+
}
288+
289+
// Check that it's the correct error type
290+
dbosErr, ok := err.(*DBOSError)
291+
if !ok {
292+
t.Fatalf("expected error to be of type *DBOSError, got %T", err)
293+
}
294+
295+
if dbosErr.Code != ConflictingWorkflowError {
296+
t.Fatalf("expected error code to be ConflictingWorkflowError, got %v", dbosErr.Code)
297+
}
298+
299+
// Check that the error message contains queue information
300+
expectedMsgPart := "different queue"
301+
if !strings.Contains(err.Error(), expectedMsgPart) {
302+
t.Fatalf("expected error message to contain '%s', got '%s'", expectedMsgPart, err.Error())
303+
}
304+
305+
if !queueEntriesAreCleanedUp(dbosCtx) {
306+
t.Fatal("expected queue entries to be cleaned up after conflicting workflow test")
307+
}
308+
})
258309
}
259310

260311
func TestQueueRecovery(t *testing.T) {

dbos/workflows_test.go

Lines changed: 148 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ Test workflow and steps features
55
[x] Wrapping various golang methods in DBOS workflows
66
[x] workflow idempotency
77
[x] workflow DLQ
8-
[] workflow conflicting name
8+
[x] workflow conflicting name
99
[] workflow timeouts & deadlines (including child workflows)
1010
*/
1111

@@ -1902,6 +1902,153 @@ func getEventIdempotencyWorkflow(ctx DBOSContext, input setEventWorkflowInput) (
19021902
return result, nil
19031903
}
19041904

1905+
// Test workflows and steps for parameter mismatch validation
1906+
func conflictWorkflowA(dbosCtx DBOSContext, input string) (string, error) {
1907+
return RunAsStep(dbosCtx, func(ctx context.Context) (string, error) {
1908+
return conflictStepA(ctx)
1909+
})
1910+
}
1911+
1912+
func conflictWorkflowB(dbosCtx DBOSContext, input string) (string, error) {
1913+
return RunAsStep(dbosCtx, func(ctx context.Context) (string, error) {
1914+
return conflictStepB(ctx)
1915+
})
1916+
}
1917+
1918+
func conflictStepA(_ context.Context) (string, error) {
1919+
return "step-a-result", nil
1920+
}
1921+
1922+
func conflictStepB(_ context.Context) (string, error) {
1923+
return "step-b-result", nil
1924+
}
1925+
1926+
func workflowWithMultipleSteps(dbosCtx DBOSContext, input string) (string, error) {
1927+
// First step
1928+
result1, err := RunAsStep(dbosCtx, func(ctx context.Context) (string, error) {
1929+
return conflictStepA(ctx)
1930+
})
1931+
if err != nil {
1932+
return "", err
1933+
}
1934+
1935+
// Second step - this is where we'll test step name conflicts
1936+
result2, err := RunAsStep(dbosCtx, func(ctx context.Context) (string, error) {
1937+
return conflictStepB(ctx)
1938+
})
1939+
if err != nil {
1940+
return "", err
1941+
}
1942+
1943+
return result1 + "-" + result2, nil
1944+
}
1945+
1946+
func TestWorkflowExecutionMismatch(t *testing.T) {
1947+
dbosCtx := setupDBOS(t, true, true)
1948+
1949+
// Register workflows for testing
1950+
RegisterWorkflow(dbosCtx, conflictWorkflowA)
1951+
RegisterWorkflow(dbosCtx, conflictWorkflowB)
1952+
RegisterWorkflow(dbosCtx, workflowWithMultipleSteps)
1953+
1954+
t.Run("WorkflowNameConflict", func(t *testing.T) {
1955+
workflowID := uuid.NewString()
1956+
1957+
// First, run conflictWorkflowA with a specific workflow ID
1958+
handle1, err := RunAsWorkflow(dbosCtx, conflictWorkflowA, "test-input", WithWorkflowID(workflowID))
1959+
if err != nil {
1960+
t.Fatalf("failed to start first workflow: %v", err)
1961+
}
1962+
1963+
// Get the result to ensure it completes
1964+
result1, err := handle1.GetResult()
1965+
if err != nil {
1966+
t.Fatalf("failed to get result from first workflow: %v", err)
1967+
}
1968+
if result1 != "step-a-result" {
1969+
t.Fatalf("expected 'step-a-result', got '%s'", result1)
1970+
}
1971+
1972+
// Now try to run conflictWorkflowB with the same workflow ID
1973+
// This should return a ConflictingWorkflowError
1974+
_, err = RunAsWorkflow(dbosCtx, conflictWorkflowB, "test-input", WithWorkflowID(workflowID))
1975+
if err == nil {
1976+
t.Fatal("expected ConflictingWorkflowError when running different workflow with same ID, but got none")
1977+
}
1978+
1979+
// Check that it's the correct error type
1980+
dbosErr, ok := err.(*DBOSError)
1981+
if !ok {
1982+
t.Fatalf("expected error to be of type *DBOSError, got %T", err)
1983+
}
1984+
1985+
if dbosErr.Code != ConflictingWorkflowError {
1986+
t.Fatalf("expected error code to be ConflictingWorkflowError, got %v", dbosErr.Code)
1987+
}
1988+
1989+
// Check that the error message contains the workflow names
1990+
expectedMsgPart := "Workflow already exists with a different name"
1991+
if !strings.Contains(err.Error(), expectedMsgPart) {
1992+
t.Fatalf("expected error message to contain '%s', got '%s'", expectedMsgPart, err.Error())
1993+
}
1994+
})
1995+
1996+
t.Run("StepNameConflict", func(t *testing.T) {
1997+
// This test simulates a scenario where a workflow is recovered but
1998+
// the step implementation has changed, causing a step name mismatch
1999+
2000+
// First, start a workflow and let it complete partially
2001+
handle1, err := RunAsWorkflow(dbosCtx, workflowWithMultipleSteps, "test-input")
2002+
if err != nil {
2003+
t.Fatalf("failed to start workflow: %v", err)
2004+
}
2005+
2006+
// Complete the workflow
2007+
result, err := handle1.GetResult()
2008+
if err != nil {
2009+
t.Fatalf("failed to get result from workflow: %v", err)
2010+
}
2011+
if result != "step-a-result-step-b-result" {
2012+
t.Fatalf("expected 'step-a-result-step-b-result', got '%s'", result)
2013+
}
2014+
2015+
// Now simulate what happens if we try to check operation execution
2016+
// with a different step name for the same step ID
2017+
workflowID := handle1.GetWorkflowID()
2018+
2019+
// This directly tests the CheckOperationExecution method with mismatched step name
2020+
// We'll check step ID 0 (first step) but with wrong step name
2021+
wrongStepName := "wrong-step-name"
2022+
_, err = dbosCtx.(*dbosContext).systemDB.CheckOperationExecution(dbosCtx, checkOperationExecutionDBInput{
2023+
workflowID: workflowID,
2024+
stepID: 0,
2025+
stepName: wrongStepName,
2026+
})
2027+
2028+
if err == nil {
2029+
t.Fatal("expected UnexpectedStep error when checking operation with wrong step name, but got none")
2030+
}
2031+
2032+
// Check that it's the correct error type
2033+
dbosErr, ok := err.(*DBOSError)
2034+
if !ok {
2035+
t.Fatalf("expected error to be of type *DBOSError, got %T", err)
2036+
}
2037+
2038+
if dbosErr.Code != UnexpectedStep {
2039+
t.Fatalf("expected error code to be UnexpectedStep, got %v", dbosErr.Code)
2040+
}
2041+
2042+
// Check that the error message contains step information
2043+
if !strings.Contains(err.Error(), "Check that your workflow is deterministic") {
2044+
t.Fatalf("expected error message to contain 'Check that your workflow is deterministic', got '%s'", err.Error())
2045+
}
2046+
if !strings.Contains(err.Error(), wrongStepName) {
2047+
t.Fatalf("expected error message to contain wrong step name '%s', got '%s'", wrongStepName, err.Error())
2048+
}
2049+
})
2050+
}
2051+
19052052
func TestSetGetEvent(t *testing.T) {
19062053
dbosCtx := setupDBOS(t, true, true)
19072054

0 commit comments

Comments
 (0)