From 619f4687c209da3aca147c9002b42004fb88b3fd Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 13 Aug 2025 16:25:21 -0700 Subject: [PATCH 1/6] accept dedup ID and priority --- dbos/errors.go | 11 +++++++++++ dbos/system_database.go | 8 ++++++++ dbos/workflow.go | 22 ++++++++++++++++++++++ 3 files changed, 41 insertions(+) diff --git a/dbos/errors.go b/dbos/errors.go index f392b33a..b71c3780 100644 --- a/dbos/errors.go +++ b/dbos/errors.go @@ -20,6 +20,7 @@ const ( StepExecutionError // General step execution error DeadLetterQueueError // Workflow moved to dead letter queue after max retries MaxStepRetriesExceeded // Step exceeded maximum retry attempts + QueueDeduplicated // Workflow was deduplicated in the queue ) // DBOSError is the unified error type for all DBOS operations. @@ -186,3 +187,13 @@ func newMaxStepRetriesExceededError(workflowID, stepName string, maxRetries int, IsBase: true, } } + +func newQueueDeduplicatedError(workflowID, queueName, deduplicationID string) *DBOSError { + return &DBOSError{ + Message: fmt.Sprintf("Workflow %s was deduplicated due to an existing workflow in queue %s with deduplication ID %s", workflowID, queueName, deduplicationID), + Code: QueueDeduplicated, + WorkflowID: workflowID, + QueueName: queueName, + DeduplicationID: deduplicationID, + } +} diff --git a/dbos/system_database.go b/dbos/system_database.go index 8de034ab..01e59ce8 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -376,6 +376,14 @@ func (s *sysDB) insertWorkflowStatus(ctx context.Context, input insertWorkflowSt &workflowDeadlineEpochMS, ) if err != nil { + // Handle unique constraint violation for the deduplication ID (this should be the only case for a 23505) + if pgErr, ok := err.(*pgconn.PgError); ok && pgErr.Code == "23505" { + return nil, newQueueDeduplicatedError( + input.status.ID, + input.status.QueueName, + input.status.DeduplicationID, + ) + } return nil, fmt.Errorf("failed to insert workflow status: %w", err) } diff --git a/dbos/workflow.go b/dbos/workflow.go index 83840fad..7cbe455f 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -436,6 +436,8 @@ type workflowParams struct { queueName string applicationVersion string maxRetries int + deduplicationID string + priority uint } // WorkflowOption is a functional option for configuring workflow execution parameters. @@ -465,6 +467,20 @@ func WithApplicationVersion(version string) WorkflowOption { } } +// WithDeduplicationID sets a deduplication ID for the workflow. +func WithDeduplicationID(id string) WorkflowOption { + return func(p *workflowParams) { + p.deduplicationID = id + } +} + +// WithPriority sets the execution priority for the workflow. +func WithPriority(priority uint) WorkflowOption { + return func(p *workflowParams) { + p.priority = priority + } +} + // An internal option we use to map the reflection function name to the registration options. func withWorkflowName(name string) WorkflowOption { return func(p *workflowParams) { @@ -650,6 +666,8 @@ func (c *dbosContext) RunAsWorkflow(_ DBOSContext, fn WorkflowFunc, input any, o Input: input, ApplicationID: c.GetApplicationID(), QueueName: params.queueName, + DeduplicationID: params.deduplicationID, + Priority: int(params.priority), } // Init status and record child workflow relationship in a single transaction @@ -1321,6 +1339,7 @@ type EnqueueOptions struct { WorkflowID string ApplicationVersion string DeduplicationID string + Priority uint WorkflowTimeout time.Duration WorkflowInput any } @@ -1347,6 +1366,7 @@ func (c *dbosContext) Enqueue(_ DBOSContext, params EnqueueOptions) (WorkflowHan Input: params.WorkflowInput, QueueName: params.QueueName, DeduplicationID: params.DeduplicationID, + Priority: int(params.Priority), } uncancellableCtx := WithoutCancel(c) @@ -1384,6 +1404,7 @@ type GenericEnqueueOptions[P any] struct { WorkflowID string ApplicationVersion string DeduplicationID string + Priority uint WorkflowTimeout time.Duration WorkflowInput P } @@ -1461,6 +1482,7 @@ func Enqueue[P any, R any](ctx DBOSContext, params GenericEnqueueOptions[P]) (Wo WorkflowID: params.WorkflowID, ApplicationVersion: params.ApplicationVersion, DeduplicationID: params.DeduplicationID, + Priority: params.Priority, WorkflowInput: params.WorkflowInput, WorkflowTimeout: params.WorkflowTimeout, }) From 192045baa684537541f24d251ed7a6948c00260d Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 13 Aug 2025 16:27:02 -0700 Subject: [PATCH 2/6] fix --- dbos/workflow.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/dbos/workflow.go b/dbos/workflow.go index 7cbe455f..d8de1d2b 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -1486,11 +1486,14 @@ func Enqueue[P any, R any](ctx DBOSContext, params GenericEnqueueOptions[P]) (Wo WorkflowInput: params.WorkflowInput, WorkflowTimeout: params.WorkflowTimeout, }) + if err != nil { + return nil, err + } return &workflowPollingHandle[R]{ workflowID: handle.GetWorkflowID(), dbosContext: ctx, - }, err + }, nil } // CancelWorkflow cancels a running or enqueued workflow by setting its status to CANCELLED. From 8141109041519a054171dd0f1843eb528c56f3f7 Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 13 Aug 2025 16:34:23 -0700 Subject: [PATCH 3/6] tests --- dbos/client_test.go | 162 +++++++++++++++++++++++++++++++++++++++ dbos/queues_test.go | 179 +++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 339 insertions(+), 2 deletions(-) diff --git a/dbos/client_test.go b/dbos/client_test.go index f0ae5017..733c4937 100644 --- a/dbos/client_test.go +++ b/dbos/client_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "strings" + "sync" "testing" "time" @@ -18,6 +19,14 @@ func TestEnqueue(t *testing.T) { // Create queue for communication between client and server queue := NewWorkflowQueue(serverCtx, "client-enqueue-queue") + // Create a priority-enabled queue with max concurrency of 1 to ensure ordering + // Must be created before Launch() + priorityQueue := NewWorkflowQueue(serverCtx, "priority-test-queue", WithGlobalConcurrency(1), WithPriorityEnabled(true)) + + // Track execution order for priority test + var executionOrder []string + var mu sync.Mutex + // Register workflows with custom names so client can reference them type wfInput struct { Input string @@ -41,6 +50,15 @@ func TestEnqueue(t *testing.T) { } RegisterWorkflow(serverCtx, blockingWorkflow, WithWorkflowName("BlockingWorkflow")) + // Register a workflow that records its execution order (for priority test) + priorityWorkflow := func(ctx DBOSContext, input string) (string, error) { + mu.Lock() + executionOrder = append(executionOrder, input) + mu.Unlock() + return input, nil + } + RegisterWorkflow(serverCtx, priorityWorkflow, WithWorkflowName("PriorityWorkflow")) + // Launch the server context to start processing tasks err := serverCtx.Launch() require.NoError(t, err) @@ -129,6 +147,150 @@ func TestEnqueue(t *testing.T) { assert.Equal(t, WorkflowStatusCancelled, status.Status) }) + t.Run("EnqueueWithPriority", func(t *testing.T) { + // Reset execution order for this test + mu.Lock() + executionOrder = []string{} + mu.Unlock() + + // Enqueue workflow without priority (will use default priority of 0) + handle1, err := Enqueue[string, string](clientCtx, GenericEnqueueOptions[string]{ + WorkflowName: "PriorityWorkflow", + QueueName: priorityQueue.Name, + WorkflowInput: "abc", + ApplicationVersion: serverCtx.GetApplicationVersion(), + }) + require.NoError(t, err, "failed to enqueue workflow without priority") + + // Enqueue with a lower priority (higher number = lower priority) + handle2, err := Enqueue[string, string](clientCtx, GenericEnqueueOptions[string]{ + WorkflowName: "PriorityWorkflow", + QueueName: priorityQueue.Name, + WorkflowInput: "def", + Priority: 5, + ApplicationVersion: serverCtx.GetApplicationVersion(), + }) + require.NoError(t, err, "failed to enqueue workflow with priority 5") + + // Enqueue with a higher priority (lower number = higher priority) + handle3, err := Enqueue[string, string](clientCtx, GenericEnqueueOptions[string]{ + WorkflowName: "PriorityWorkflow", + QueueName: priorityQueue.Name, + WorkflowInput: "ghi", + Priority: 1, + ApplicationVersion: serverCtx.GetApplicationVersion(), + }) + require.NoError(t, err, "failed to enqueue workflow with priority 1") + + // Get results + result1, err := handle1.GetResult() + require.NoError(t, err, "failed to get result from first workflow") + assert.Equal(t, "abc", result1) + + result3, err := handle3.GetResult() + require.NoError(t, err, "failed to get result from third workflow") + assert.Equal(t, "ghi", result3) + + result2, err := handle2.GetResult() + require.NoError(t, err, "failed to get result from second workflow") + assert.Equal(t, "def", result2) + + // Verify execution order: workflows should execute in priority order + // Priority 0 (abc) executes first (already running when others are enqueued) + // Priority 1 (ghi) executes second (higher priority than def) + // Priority 5 (def) executes last (lowest priority) + expectedOrder := []string{"abc", "ghi", "def"} + assert.Equal(t, expectedOrder, executionOrder, "workflows should execute in priority order") + + // Verify queue entries are cleaned up + assert.True(t, queueEntriesAreCleanedUp(serverCtx), "expected queue entries to be cleaned up after priority test") + }) + + t.Run("EnqueueWithDedupID", func(t *testing.T) { + dedupID := "my-client-dedup-id" + wfid1 := "client-dedup-wf1" + wfid2 := "client-dedup-wf2" + + // First workflow with deduplication ID - should succeed + handle1, err := Enqueue[wfInput, string](clientCtx, GenericEnqueueOptions[wfInput]{ + WorkflowName: "ServerWorkflow", + QueueName: queue.Name, + WorkflowID: wfid1, + DeduplicationID: dedupID, + WorkflowInput: wfInput{Input: "test-input"}, + ApplicationVersion: serverCtx.GetApplicationVersion(), + }) + require.NoError(t, err, "failed to enqueue first workflow with deduplication ID") + + // Second workflow with same deduplication ID but different workflow ID - should fail + _, err = Enqueue[wfInput, string](clientCtx, GenericEnqueueOptions[wfInput]{ + WorkflowName: "ServerWorkflow", + QueueName: queue.Name, + WorkflowID: wfid2, + DeduplicationID: dedupID, + WorkflowInput: wfInput{Input: "test-input"}, + ApplicationVersion: serverCtx.GetApplicationVersion(), + }) + require.Error(t, err, "expected error when enqueueing workflow with same deduplication ID") + + // Check that it's the correct error type and message + dbosErr, ok := err.(*DBOSError) + require.True(t, ok, "expected error to be of type *DBOSError, got %T", err) + assert.Equal(t, QueueDeduplicated, dbosErr.Code, "expected error code to be QueueDeduplicated") + + expectedMsgPart := fmt.Sprintf("Workflow %s was deduplicated due to an existing workflow in queue %s with deduplication ID %s", wfid2, queue.Name, dedupID) + assert.Contains(t, err.Error(), expectedMsgPart, "expected error message to contain deduplication information") + + // Third workflow with different deduplication ID - should succeed + handle3, err := Enqueue[wfInput, string](clientCtx, GenericEnqueueOptions[wfInput]{ + WorkflowName: "ServerWorkflow", + QueueName: queue.Name, + DeduplicationID: "different-dedup-id", + WorkflowInput: wfInput{Input: "test-input"}, + ApplicationVersion: serverCtx.GetApplicationVersion(), + }) + require.NoError(t, err, "failed to enqueue workflow with different deduplication ID") + + // Fourth workflow without deduplication ID - should succeed + handle4, err := Enqueue[wfInput, string](clientCtx, GenericEnqueueOptions[wfInput]{ + WorkflowName: "ServerWorkflow", + QueueName: queue.Name, + WorkflowInput: wfInput{Input: "test-input"}, + ApplicationVersion: serverCtx.GetApplicationVersion(), + }) + require.NoError(t, err, "failed to enqueue workflow without deduplication ID") + + // Wait for all successful workflows to complete + result1, err := handle1.GetResult() + require.NoError(t, err, "failed to get result from first workflow") + assert.Equal(t, "processed: test-input", result1) + + result3, err := handle3.GetResult() + require.NoError(t, err, "failed to get result from third workflow") + assert.Equal(t, "processed: test-input", result3) + + result4, err := handle4.GetResult() + require.NoError(t, err, "failed to get result from fourth workflow") + assert.Equal(t, "processed: test-input", result4) + + // After first workflow completes, we should be able to enqueue with same deduplication ID + handle5, err := Enqueue[wfInput, string](clientCtx, GenericEnqueueOptions[wfInput]{ + WorkflowName: "ServerWorkflow", + QueueName: queue.Name, + WorkflowID: wfid2, // Reuse the workflow ID that failed before + DeduplicationID: dedupID, // Same deduplication ID as first workflow + WorkflowInput: wfInput{Input: "test-input"}, + ApplicationVersion: serverCtx.GetApplicationVersion(), + }) + require.NoError(t, err, "failed to enqueue workflow with same dedup ID after completion") + + result5, err := handle5.GetResult() + require.NoError(t, err, "failed to get result from fifth workflow") + assert.Equal(t, "processed: test-input", result5) + + assert.True(t, queueEntriesAreCleanedUp(serverCtx), "expected queue entries to be cleaned up after deduplication test") + }) + // Verify all queue entries are cleaned up require.True(t, queueEntriesAreCleanedUp(serverCtx), "expected queue entries to be cleaned up after client tests") } diff --git a/dbos/queues_test.go b/dbos/queues_test.go index 3acc5053..e0a7c985 100644 --- a/dbos/queues_test.go +++ b/dbos/queues_test.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "os" + "sync" "sync/atomic" "testing" "time" @@ -26,8 +27,8 @@ This suite tests [x] worker concurrency X recovery [x] rate limiter [x] conflicting workflow on different queues -[] queue deduplication -[] queue priority +[x] queue deduplication +[x] queue priority [x] queued workflow times out [] scheduled workflow enqueues another workflow */ @@ -53,6 +54,7 @@ func TestWorkflowQueues(t *testing.T) { dlqEnqueueQueue := NewWorkflowQueue(dbosCtx, "test-successive-enqueue-queue") conflictQueue1 := NewWorkflowQueue(dbosCtx, "conflict-queue-1") conflictQueue2 := NewWorkflowQueue(dbosCtx, "conflict-queue-2") + dedupQueue := NewWorkflowQueue(dbosCtx, "test-dedup-queue") dlqStartEvent := NewEvent() dlqCompleteEvent := NewEvent() @@ -61,6 +63,33 @@ func TestWorkflowQueues(t *testing.T) { // Register workflows with dbosContext RegisterWorkflow(dbosCtx, queueWorkflow) + // Queue deduplication test workflows + var dedupWorkflowEvent *Event + childWorkflow := func(ctx DBOSContext, var1 string) (string, error) { + if dedupWorkflowEvent != nil { + dedupWorkflowEvent.Wait() + } + return var1 + "-c", nil + } + RegisterWorkflow(dbosCtx, childWorkflow) + + testWorkflow := func(ctx DBOSContext, var1 string) (string, error) { + // Make sure the child workflow is not blocked by the same deduplication ID + childHandle, err := RunAsWorkflow(ctx, childWorkflow, var1, WithQueue(dedupQueue.Name)) + if err != nil { + return "", fmt.Errorf("failed to enqueue child workflow: %v", err) + } + if dedupWorkflowEvent != nil { + dedupWorkflowEvent.Wait() + } + result, err := childHandle.GetResult() + if err != nil { + return "", fmt.Errorf("failed to get child result: %v", err) + } + return result + "-p", nil + } + RegisterWorkflow(dbosCtx, testWorkflow) + // Create workflow with child that can call the main workflow queueWorkflowWithChild := func(ctx DBOSContext, input string) (string, error) { // Start a child workflow @@ -240,6 +269,72 @@ func TestWorkflowQueues(t *testing.T) { require.True(t, queueEntriesAreCleanedUp(dbosCtx), "expected queue entries to be cleaned up after conflicting workflow test") }) + + t.Run("QueueDeduplication", func(t *testing.T) { + workflowEvent := NewEvent() + dedupWorkflowEvent = workflowEvent + defer func() { + dedupWorkflowEvent = nil + }() + + // Make sure only one workflow is running at a time + wfid := uuid.NewString() + dedupID := "my_dedup_id" + handle1, err := RunAsWorkflow(dbosCtx, testWorkflow, "abc", WithQueue(dedupQueue.Name), WithWorkflowID(wfid), WithDeduplicationID(dedupID)) + require.NoError(t, err, "failed to enqueue first workflow with deduplication ID") + + // Enqueue the same workflow with a different deduplication ID should be fine + anotherHandle, err := RunAsWorkflow(dbosCtx, testWorkflow, "ghi", WithQueue(dedupQueue.Name), WithDeduplicationID("my_other_dedup_id")) + require.NoError(t, err, "failed to enqueue workflow with different deduplication ID") + + // Enqueue a workflow without deduplication ID should be fine + nodedupHandle1, err := RunAsWorkflow(dbosCtx, testWorkflow, "jkl", WithQueue(dedupQueue.Name)) + require.NoError(t, err, "failed to enqueue workflow without deduplication ID") + + // Enqueued multiple times without deduplication ID but with different inputs should be fine, but get the result of the first one + nodedupHandle2, err := RunAsWorkflow(dbosCtx, testWorkflow, "mno", WithQueue(dedupQueue.Name), WithWorkflowID(wfid)) + require.NoError(t, err, "failed to enqueue workflow with same workflow ID") + + // Enqueue the same workflow with the same deduplication ID should raise an exception + wfid2 := uuid.NewString() + _, err = RunAsWorkflow(dbosCtx, testWorkflow, "def", WithQueue(dedupQueue.Name), WithWorkflowID(wfid2), WithDeduplicationID(dedupID)) + require.Error(t, err, "expected error when enqueueing workflow with same deduplication ID") + + // Check that it's the correct error type and message + dbosErr, ok := err.(*DBOSError) + require.True(t, ok, "expected error to be of type *DBOSError, got %T", err) + assert.Equal(t, QueueDeduplicated, dbosErr.Code, "expected error code to be QueueDeduplicated") + + expectedMsgPart := fmt.Sprintf("Workflow %s was deduplicated due to an existing workflow in queue %s with deduplication ID %s", wfid2, dedupQueue.Name, dedupID) + assert.Contains(t, err.Error(), expectedMsgPart, "expected error message to contain deduplication information") + + // Now unblock the first two workflows and wait for them to finish + workflowEvent.Set() + result1, err := handle1.GetResult() + require.NoError(t, err, "failed to get result from first workflow") + assert.Equal(t, "abc-c-p", result1, "expected first workflow result to be 'abc-c-p'") + + resultAnother, err := anotherHandle.GetResult() + require.NoError(t, err, "failed to get result from workflow with different dedup ID") + assert.Equal(t, "ghi-c-p", resultAnother, "expected another workflow result to be 'ghi-c-p'") + + resultNodedup1, err := nodedupHandle1.GetResult() + require.NoError(t, err, "failed to get result from workflow without dedup ID") + assert.Equal(t, "jkl-c-p", resultNodedup1, "expected nodedup1 workflow result to be 'jkl-c-p'") + + resultNodedup2, err := nodedupHandle2.GetResult() + require.NoError(t, err, "failed to get result from reused workflow ID") + assert.Equal(t, "abc-c-p", resultNodedup2, "expected nodedup2 workflow result to be 'abc-c-p'") + + // Invoke the workflow again with the same deduplication ID now should be fine because it's no longer in the queue + handle2, err := RunAsWorkflow(dbosCtx, testWorkflow, "def", WithQueue(dedupQueue.Name), WithWorkflowID(wfid2), WithDeduplicationID(dedupID)) + require.NoError(t, err, "failed to enqueue workflow with same dedup ID after completion") + result2, err := handle2.GetResult() + require.NoError(t, err, "failed to get result from second workflow with same dedup ID") + assert.Equal(t, "def-c-p", result2, "expected second workflow result to be 'def-c-p'") + + require.True(t, queueEntriesAreCleanedUp(dbosCtx), "expected queue entries to be cleaned up after deduplication test") + }) } func TestQueueRecovery(t *testing.T) { @@ -842,3 +937,83 @@ func TestQueueTimeouts(t *testing.T) { require.True(t, queueEntriesAreCleanedUp(dbosCtx), "expected queue entries to be cleaned up after workflow cancellation, but they are not") }) } + +func TestPriorityQueue(t *testing.T) { + dbosCtx := setupDBOS(t, true, true) + + // Create priority-enabled queue with max concurrency of 1 + priorityQueue := NewWorkflowQueue(dbosCtx, "test_queue_priority", WithGlobalConcurrency(1), WithPriorityEnabled(true)) + childQueue := NewWorkflowQueue(dbosCtx, "test_queue_child") + + workflowEvent := NewEvent() + var wfPriorityList []int + var mu sync.Mutex + + childWorkflow := func(ctx DBOSContext, p int) (int, error) { + workflowEvent.Wait() + return p, nil + } + RegisterWorkflow(dbosCtx, childWorkflow) + + testWorkflow := func(ctx DBOSContext, priority int) (int, error) { + mu.Lock() + wfPriorityList = append(wfPriorityList, priority) + mu.Unlock() + + // Make sure the priority is not propagated + childHandle, err := RunAsWorkflow(ctx, childWorkflow, priority, WithQueue(childQueue.Name)) + if err != nil { + return 0, fmt.Errorf("failed to enqueue child workflow: %v", err) + } + workflowEvent.Wait() + result, err := childHandle.GetResult() + if err != nil { + return 0, fmt.Errorf("failed to get child result: %v", err) + } + return result + priority, nil + } + RegisterWorkflow(dbosCtx, testWorkflow) + + err := dbosCtx.Launch() + require.NoError(t, err) + + var wfHandles []WorkflowHandle[int] + + // First, enqueue a workflow without priority (default to priority 0) + handle, err := RunAsWorkflow(dbosCtx, testWorkflow, 0, WithQueue(priorityQueue.Name)) + require.NoError(t, err) + wfHandles = append(wfHandles, handle) + + // Then, enqueue workflows with priority 1 to 5 + for i := 1; i <= 5; i++ { + handle, err := RunAsWorkflow(dbosCtx, testWorkflow, i, WithQueue(priorityQueue.Name), WithPriority(uint(i))) + require.NoError(t, err) + wfHandles = append(wfHandles, handle) + } + + // Finally, enqueue two workflows without priority again (default priority 0) + handle6, err := RunAsWorkflow(dbosCtx, testWorkflow, 6, WithQueue(priorityQueue.Name)) + require.NoError(t, err) + wfHandles = append(wfHandles, handle6) + + handle7, err := RunAsWorkflow(dbosCtx, testWorkflow, 7, WithQueue(priorityQueue.Name)) + require.NoError(t, err) + wfHandles = append(wfHandles, handle7) + + // The finish sequence should be 0, 6, 7, 1, 2, 3, 4, 5 + // (lower priority numbers execute first, same priority follows FIFO) + workflowEvent.Set() + + for i, handle := range wfHandles { + result, err := handle.GetResult() + require.NoError(t, err, "failed to get result from workflow %d", i) + assert.Equal(t, i*2, result, "expected result %d for workflow %d", i*2, i) + } + + mu.Lock() + expectedOrder := []int{0, 6, 7, 1, 2, 3, 4, 5} + assert.Equal(t, expectedOrder, wfPriorityList, "expected workflow execution order %v, got %v", expectedOrder, wfPriorityList) + mu.Unlock() + + require.True(t, queueEntriesAreCleanedUp(dbosCtx), "expected queue entries to be cleaned up after priority queue test") +} From aad0db7c4c355fcf2700077092a3c04b5cdab357 Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 13 Aug 2025 16:58:47 -0700 Subject: [PATCH 4/6] style nits --- dbos/workflows_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/dbos/workflows_test.go b/dbos/workflows_test.go index fb4dd1c7..29b6fff2 100644 --- a/dbos/workflows_test.go +++ b/dbos/workflows_test.go @@ -359,14 +359,14 @@ func stepWithinAStepWorkflow(dbosCtx DBOSContext, input string) (string, error) // Global counter for retry testing var stepRetryAttemptCount int -func stepRetryAlwaysFailsStep(ctx context.Context) (string, error) { +func stepRetryAlwaysFailsStep(_ context.Context) (string, error) { stepRetryAttemptCount++ return "", fmt.Errorf("always fails - attempt %d", stepRetryAttemptCount) } var stepIdempotencyCounter int -func stepIdempotencyTest(ctx context.Context) (string, error) { +func stepIdempotencyTest(_ context.Context) (string, error) { stepIdempotencyCounter++ return "", nil } @@ -1858,7 +1858,7 @@ type setEventWorkflowInput struct { } func setEventWorkflow(ctx DBOSContext, input setEventWorkflowInput) (string, error) { - err := SetEvent(ctx, GenericWorkflowSetEventInput[string]{Key: input.Key, Message: input.Message}) + err := SetEvent(ctx, GenericWorkflowSetEventInput[string](input)) if err != nil { return "", err } @@ -1897,7 +1897,7 @@ func setTwoEventsWorkflow(ctx DBOSContext, input setEventWorkflowInput) (string, } func setEventIdempotencyWorkflow(ctx DBOSContext, input setEventWorkflowInput) (string, error) { - err := SetEvent(ctx, GenericWorkflowSetEventInput[string]{Key: input.Key, Message: input.Message}) + err := SetEvent(ctx, GenericWorkflowSetEventInput[string](input)) if err != nil { return "", err } From fefeaab7124eddb057fcef421d355fd9426f0aa6 Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 13 Aug 2025 17:16:44 -0700 Subject: [PATCH 5/6] nits --- dbos/queues_test.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/dbos/queues_test.go b/dbos/queues_test.go index e0a7c985..f49fd82e 100644 --- a/dbos/queues_test.go +++ b/dbos/queues_test.go @@ -308,7 +308,7 @@ func TestWorkflowQueues(t *testing.T) { expectedMsgPart := fmt.Sprintf("Workflow %s was deduplicated due to an existing workflow in queue %s with deduplication ID %s", wfid2, dedupQueue.Name, dedupID) assert.Contains(t, err.Error(), expectedMsgPart, "expected error message to contain deduplication information") - // Now unblock the first two workflows and wait for them to finish + // Now unblock the workflows and wait for them to finish workflowEvent.Set() result1, err := handle1.GetResult() require.NoError(t, err, "failed to get result from first workflow") @@ -960,7 +960,6 @@ func TestPriorityQueue(t *testing.T) { wfPriorityList = append(wfPriorityList, priority) mu.Unlock() - // Make sure the priority is not propagated childHandle, err := RunAsWorkflow(ctx, childWorkflow, priority, WithQueue(childQueue.Name)) if err != nil { return 0, fmt.Errorf("failed to enqueue child workflow: %v", err) @@ -984,11 +983,15 @@ func TestPriorityQueue(t *testing.T) { require.NoError(t, err) wfHandles = append(wfHandles, handle) - // Then, enqueue workflows with priority 1 to 5 - for i := 1; i <= 5; i++ { + // Then, enqueue workflows with priority 5 to 1 + reversedPriorityHandles := make([]WorkflowHandle[int], 0, 5) + for i := 5; i > 0; i-- { handle, err := RunAsWorkflow(dbosCtx, testWorkflow, i, WithQueue(priorityQueue.Name), WithPriority(uint(i))) require.NoError(t, err) - wfHandles = append(wfHandles, handle) + reversedPriorityHandles = append(reversedPriorityHandles, handle) + } + for i := 0; i < len(reversedPriorityHandles); i++ { + wfHandles = append(wfHandles, reversedPriorityHandles[len(reversedPriorityHandles)-i-1]) } // Finally, enqueue two workflows without priority again (default priority 0) From e040dad5e8ede84548a91d9ba8ad60cb6ac80f8a Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 13 Aug 2025 17:33:04 -0700 Subject: [PATCH 6/6] increase test timeout --- dbos/workflows_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/dbos/workflows_test.go b/dbos/workflows_test.go index 29b6fff2..73fbf868 100644 --- a/dbos/workflows_test.go +++ b/dbos/workflows_test.go @@ -1295,15 +1295,15 @@ func sendWorkflow(ctx DBOSContext, input sendWorkflowInput) (string, error) { } func receiveWorkflow(ctx DBOSContext, topic string) (string, error) { - msg1, err := Recv[string](ctx, WorkflowRecvInput{Topic: topic, Timeout: 3 * time.Second}) + msg1, err := Recv[string](ctx, WorkflowRecvInput{Topic: topic, Timeout: 10 * time.Second}) if err != nil { return "", err } - msg2, err := Recv[string](ctx, WorkflowRecvInput{Topic: topic, Timeout: 3 * time.Second}) + msg2, err := Recv[string](ctx, WorkflowRecvInput{Topic: topic, Timeout: 10 * time.Second}) if err != nil { return "", err } - msg3, err := Recv[string](ctx, WorkflowRecvInput{Topic: topic, Timeout: 3 * time.Second}) + msg3, err := Recv[string](ctx, WorkflowRecvInput{Topic: topic, Timeout: 10 * time.Second}) if err != nil { return "", err } @@ -1601,7 +1601,7 @@ func TestSendRecv(t *testing.T) { t.Fatalf("failed to start receive workflow: %v", err) } - // Send messages from outside a workflow context (should work now) + // Send messages from outside a workflow context for i := range 3 { err = Send(dbosCtx, GenericWorkflowSendInput[string]{ DestinationID: receiveHandle.GetWorkflowID(),