Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 162 additions & 0 deletions dbos/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"strings"
"sync"
"testing"
"time"

Expand All @@ -18,6 +19,14 @@ func TestEnqueue(t *testing.T) {
// Create queue for communication between client and server
queue := NewWorkflowQueue(serverCtx, "client-enqueue-queue")

// Create a priority-enabled queue with max concurrency of 1 to ensure ordering
// Must be created before Launch()
priorityQueue := NewWorkflowQueue(serverCtx, "priority-test-queue", WithGlobalConcurrency(1), WithPriorityEnabled(true))

// Track execution order for priority test
var executionOrder []string
var mu sync.Mutex

// Register workflows with custom names so client can reference them
type wfInput struct {
Input string
Expand All @@ -41,6 +50,15 @@ func TestEnqueue(t *testing.T) {
}
RegisterWorkflow(serverCtx, blockingWorkflow, WithWorkflowName("BlockingWorkflow"))

// Register a workflow that records its execution order (for priority test)
priorityWorkflow := func(ctx DBOSContext, input string) (string, error) {
mu.Lock()
executionOrder = append(executionOrder, input)
mu.Unlock()
return input, nil
}
RegisterWorkflow(serverCtx, priorityWorkflow, WithWorkflowName("PriorityWorkflow"))

// Launch the server context to start processing tasks
err := serverCtx.Launch()
require.NoError(t, err)
Expand Down Expand Up @@ -129,6 +147,150 @@ func TestEnqueue(t *testing.T) {
assert.Equal(t, WorkflowStatusCancelled, status.Status)
})

t.Run("EnqueueWithPriority", func(t *testing.T) {
// Reset execution order for this test
mu.Lock()
executionOrder = []string{}
mu.Unlock()

// Enqueue workflow without priority (will use default priority of 0)
handle1, err := Enqueue[string, string](clientCtx, GenericEnqueueOptions[string]{
WorkflowName: "PriorityWorkflow",
QueueName: priorityQueue.Name,
WorkflowInput: "abc",
ApplicationVersion: serverCtx.GetApplicationVersion(),
})
require.NoError(t, err, "failed to enqueue workflow without priority")

// Enqueue with a lower priority (higher number = lower priority)
handle2, err := Enqueue[string, string](clientCtx, GenericEnqueueOptions[string]{
WorkflowName: "PriorityWorkflow",
QueueName: priorityQueue.Name,
WorkflowInput: "def",
Priority: 5,
ApplicationVersion: serverCtx.GetApplicationVersion(),
})
require.NoError(t, err, "failed to enqueue workflow with priority 5")

// Enqueue with a higher priority (lower number = higher priority)
handle3, err := Enqueue[string, string](clientCtx, GenericEnqueueOptions[string]{
WorkflowName: "PriorityWorkflow",
QueueName: priorityQueue.Name,
WorkflowInput: "ghi",
Priority: 1,
ApplicationVersion: serverCtx.GetApplicationVersion(),
})
require.NoError(t, err, "failed to enqueue workflow with priority 1")

// Get results
result1, err := handle1.GetResult()
require.NoError(t, err, "failed to get result from first workflow")
assert.Equal(t, "abc", result1)

result3, err := handle3.GetResult()
require.NoError(t, err, "failed to get result from third workflow")
assert.Equal(t, "ghi", result3)

result2, err := handle2.GetResult()
require.NoError(t, err, "failed to get result from second workflow")
assert.Equal(t, "def", result2)

// Verify execution order: workflows should execute in priority order
// Priority 0 (abc) executes first (already running when others are enqueued)
// Priority 1 (ghi) executes second (higher priority than def)
// Priority 5 (def) executes last (lowest priority)
expectedOrder := []string{"abc", "ghi", "def"}
assert.Equal(t, expectedOrder, executionOrder, "workflows should execute in priority order")

// Verify queue entries are cleaned up
assert.True(t, queueEntriesAreCleanedUp(serverCtx), "expected queue entries to be cleaned up after priority test")
})

t.Run("EnqueueWithDedupID", func(t *testing.T) {
dedupID := "my-client-dedup-id"
wfid1 := "client-dedup-wf1"
wfid2 := "client-dedup-wf2"

// First workflow with deduplication ID - should succeed
handle1, err := Enqueue[wfInput, string](clientCtx, GenericEnqueueOptions[wfInput]{
WorkflowName: "ServerWorkflow",
QueueName: queue.Name,
WorkflowID: wfid1,
DeduplicationID: dedupID,
WorkflowInput: wfInput{Input: "test-input"},
ApplicationVersion: serverCtx.GetApplicationVersion(),
})
require.NoError(t, err, "failed to enqueue first workflow with deduplication ID")

// Second workflow with same deduplication ID but different workflow ID - should fail
_, err = Enqueue[wfInput, string](clientCtx, GenericEnqueueOptions[wfInput]{
WorkflowName: "ServerWorkflow",
QueueName: queue.Name,
WorkflowID: wfid2,
DeduplicationID: dedupID,
WorkflowInput: wfInput{Input: "test-input"},
ApplicationVersion: serverCtx.GetApplicationVersion(),
})
require.Error(t, err, "expected error when enqueueing workflow with same deduplication ID")

// Check that it's the correct error type and message
dbosErr, ok := err.(*DBOSError)
require.True(t, ok, "expected error to be of type *DBOSError, got %T", err)
assert.Equal(t, QueueDeduplicated, dbosErr.Code, "expected error code to be QueueDeduplicated")

expectedMsgPart := fmt.Sprintf("Workflow %s was deduplicated due to an existing workflow in queue %s with deduplication ID %s", wfid2, queue.Name, dedupID)
assert.Contains(t, err.Error(), expectedMsgPart, "expected error message to contain deduplication information")

// Third workflow with different deduplication ID - should succeed
handle3, err := Enqueue[wfInput, string](clientCtx, GenericEnqueueOptions[wfInput]{
WorkflowName: "ServerWorkflow",
QueueName: queue.Name,
DeduplicationID: "different-dedup-id",
WorkflowInput: wfInput{Input: "test-input"},
ApplicationVersion: serverCtx.GetApplicationVersion(),
})
require.NoError(t, err, "failed to enqueue workflow with different deduplication ID")

// Fourth workflow without deduplication ID - should succeed
handle4, err := Enqueue[wfInput, string](clientCtx, GenericEnqueueOptions[wfInput]{
WorkflowName: "ServerWorkflow",
QueueName: queue.Name,
WorkflowInput: wfInput{Input: "test-input"},
ApplicationVersion: serverCtx.GetApplicationVersion(),
})
require.NoError(t, err, "failed to enqueue workflow without deduplication ID")

// Wait for all successful workflows to complete
result1, err := handle1.GetResult()
require.NoError(t, err, "failed to get result from first workflow")
assert.Equal(t, "processed: test-input", result1)

result3, err := handle3.GetResult()
require.NoError(t, err, "failed to get result from third workflow")
assert.Equal(t, "processed: test-input", result3)

result4, err := handle4.GetResult()
require.NoError(t, err, "failed to get result from fourth workflow")
assert.Equal(t, "processed: test-input", result4)

// After first workflow completes, we should be able to enqueue with same deduplication ID
handle5, err := Enqueue[wfInput, string](clientCtx, GenericEnqueueOptions[wfInput]{
WorkflowName: "ServerWorkflow",
QueueName: queue.Name,
WorkflowID: wfid2, // Reuse the workflow ID that failed before
DeduplicationID: dedupID, // Same deduplication ID as first workflow
WorkflowInput: wfInput{Input: "test-input"},
ApplicationVersion: serverCtx.GetApplicationVersion(),
})
require.NoError(t, err, "failed to enqueue workflow with same dedup ID after completion")

result5, err := handle5.GetResult()
require.NoError(t, err, "failed to get result from fifth workflow")
assert.Equal(t, "processed: test-input", result5)

assert.True(t, queueEntriesAreCleanedUp(serverCtx), "expected queue entries to be cleaned up after deduplication test")
})

// Verify all queue entries are cleaned up
require.True(t, queueEntriesAreCleanedUp(serverCtx), "expected queue entries to be cleaned up after client tests")
}
Expand Down
11 changes: 11 additions & 0 deletions dbos/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const (
StepExecutionError // General step execution error
DeadLetterQueueError // Workflow moved to dead letter queue after max retries
MaxStepRetriesExceeded // Step exceeded maximum retry attempts
QueueDeduplicated // Workflow was deduplicated in the queue
)

// DBOSError is the unified error type for all DBOS operations.
Expand Down Expand Up @@ -186,3 +187,13 @@ func newMaxStepRetriesExceededError(workflowID, stepName string, maxRetries int,
IsBase: true,
}
}

func newQueueDeduplicatedError(workflowID, queueName, deduplicationID string) *DBOSError {
return &DBOSError{
Message: fmt.Sprintf("Workflow %s was deduplicated due to an existing workflow in queue %s with deduplication ID %s", workflowID, queueName, deduplicationID),
Code: QueueDeduplicated,
WorkflowID: workflowID,
QueueName: queueName,
DeduplicationID: deduplicationID,
}
}
Loading
Loading