Skip to content

Commit 584e8b2

Browse files
authored
Q dedup and prio (#55)
1 parent 28b823f commit 584e8b2

File tree

6 files changed

+395
-11
lines changed

6 files changed

+395
-11
lines changed

dbos/client_test.go

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"strings"
7+
"sync"
78
"testing"
89
"time"
910

@@ -18,6 +19,14 @@ func TestEnqueue(t *testing.T) {
1819
// Create queue for communication between client and server
1920
queue := NewWorkflowQueue(serverCtx, "client-enqueue-queue")
2021

22+
// Create a priority-enabled queue with max concurrency of 1 to ensure ordering
23+
// Must be created before Launch()
24+
priorityQueue := NewWorkflowQueue(serverCtx, "priority-test-queue", WithGlobalConcurrency(1), WithPriorityEnabled(true))
25+
26+
// Track execution order for priority test
27+
var executionOrder []string
28+
var mu sync.Mutex
29+
2130
// Register workflows with custom names so client can reference them
2231
type wfInput struct {
2332
Input string
@@ -41,6 +50,15 @@ func TestEnqueue(t *testing.T) {
4150
}
4251
RegisterWorkflow(serverCtx, blockingWorkflow, WithWorkflowName("BlockingWorkflow"))
4352

53+
// Register a workflow that records its execution order (for priority test)
54+
priorityWorkflow := func(ctx DBOSContext, input string) (string, error) {
55+
mu.Lock()
56+
executionOrder = append(executionOrder, input)
57+
mu.Unlock()
58+
return input, nil
59+
}
60+
RegisterWorkflow(serverCtx, priorityWorkflow, WithWorkflowName("PriorityWorkflow"))
61+
4462
// Launch the server context to start processing tasks
4563
err := serverCtx.Launch()
4664
require.NoError(t, err)
@@ -129,6 +147,150 @@ func TestEnqueue(t *testing.T) {
129147
assert.Equal(t, WorkflowStatusCancelled, status.Status)
130148
})
131149

150+
t.Run("EnqueueWithPriority", func(t *testing.T) {
151+
// Reset execution order for this test
152+
mu.Lock()
153+
executionOrder = []string{}
154+
mu.Unlock()
155+
156+
// Enqueue workflow without priority (will use default priority of 0)
157+
handle1, err := Enqueue[string, string](clientCtx, GenericEnqueueOptions[string]{
158+
WorkflowName: "PriorityWorkflow",
159+
QueueName: priorityQueue.Name,
160+
WorkflowInput: "abc",
161+
ApplicationVersion: serverCtx.GetApplicationVersion(),
162+
})
163+
require.NoError(t, err, "failed to enqueue workflow without priority")
164+
165+
// Enqueue with a lower priority (higher number = lower priority)
166+
handle2, err := Enqueue[string, string](clientCtx, GenericEnqueueOptions[string]{
167+
WorkflowName: "PriorityWorkflow",
168+
QueueName: priorityQueue.Name,
169+
WorkflowInput: "def",
170+
Priority: 5,
171+
ApplicationVersion: serverCtx.GetApplicationVersion(),
172+
})
173+
require.NoError(t, err, "failed to enqueue workflow with priority 5")
174+
175+
// Enqueue with a higher priority (lower number = higher priority)
176+
handle3, err := Enqueue[string, string](clientCtx, GenericEnqueueOptions[string]{
177+
WorkflowName: "PriorityWorkflow",
178+
QueueName: priorityQueue.Name,
179+
WorkflowInput: "ghi",
180+
Priority: 1,
181+
ApplicationVersion: serverCtx.GetApplicationVersion(),
182+
})
183+
require.NoError(t, err, "failed to enqueue workflow with priority 1")
184+
185+
// Get results
186+
result1, err := handle1.GetResult()
187+
require.NoError(t, err, "failed to get result from first workflow")
188+
assert.Equal(t, "abc", result1)
189+
190+
result3, err := handle3.GetResult()
191+
require.NoError(t, err, "failed to get result from third workflow")
192+
assert.Equal(t, "ghi", result3)
193+
194+
result2, err := handle2.GetResult()
195+
require.NoError(t, err, "failed to get result from second workflow")
196+
assert.Equal(t, "def", result2)
197+
198+
// Verify execution order: workflows should execute in priority order
199+
// Priority 0 (abc) executes first (already running when others are enqueued)
200+
// Priority 1 (ghi) executes second (higher priority than def)
201+
// Priority 5 (def) executes last (lowest priority)
202+
expectedOrder := []string{"abc", "ghi", "def"}
203+
assert.Equal(t, expectedOrder, executionOrder, "workflows should execute in priority order")
204+
205+
// Verify queue entries are cleaned up
206+
assert.True(t, queueEntriesAreCleanedUp(serverCtx), "expected queue entries to be cleaned up after priority test")
207+
})
208+
209+
t.Run("EnqueueWithDedupID", func(t *testing.T) {
210+
dedupID := "my-client-dedup-id"
211+
wfid1 := "client-dedup-wf1"
212+
wfid2 := "client-dedup-wf2"
213+
214+
// First workflow with deduplication ID - should succeed
215+
handle1, err := Enqueue[wfInput, string](clientCtx, GenericEnqueueOptions[wfInput]{
216+
WorkflowName: "ServerWorkflow",
217+
QueueName: queue.Name,
218+
WorkflowID: wfid1,
219+
DeduplicationID: dedupID,
220+
WorkflowInput: wfInput{Input: "test-input"},
221+
ApplicationVersion: serverCtx.GetApplicationVersion(),
222+
})
223+
require.NoError(t, err, "failed to enqueue first workflow with deduplication ID")
224+
225+
// Second workflow with same deduplication ID but different workflow ID - should fail
226+
_, err = Enqueue[wfInput, string](clientCtx, GenericEnqueueOptions[wfInput]{
227+
WorkflowName: "ServerWorkflow",
228+
QueueName: queue.Name,
229+
WorkflowID: wfid2,
230+
DeduplicationID: dedupID,
231+
WorkflowInput: wfInput{Input: "test-input"},
232+
ApplicationVersion: serverCtx.GetApplicationVersion(),
233+
})
234+
require.Error(t, err, "expected error when enqueueing workflow with same deduplication ID")
235+
236+
// Check that it's the correct error type and message
237+
dbosErr, ok := err.(*DBOSError)
238+
require.True(t, ok, "expected error to be of type *DBOSError, got %T", err)
239+
assert.Equal(t, QueueDeduplicated, dbosErr.Code, "expected error code to be QueueDeduplicated")
240+
241+
expectedMsgPart := fmt.Sprintf("Workflow %s was deduplicated due to an existing workflow in queue %s with deduplication ID %s", wfid2, queue.Name, dedupID)
242+
assert.Contains(t, err.Error(), expectedMsgPart, "expected error message to contain deduplication information")
243+
244+
// Third workflow with different deduplication ID - should succeed
245+
handle3, err := Enqueue[wfInput, string](clientCtx, GenericEnqueueOptions[wfInput]{
246+
WorkflowName: "ServerWorkflow",
247+
QueueName: queue.Name,
248+
DeduplicationID: "different-dedup-id",
249+
WorkflowInput: wfInput{Input: "test-input"},
250+
ApplicationVersion: serverCtx.GetApplicationVersion(),
251+
})
252+
require.NoError(t, err, "failed to enqueue workflow with different deduplication ID")
253+
254+
// Fourth workflow without deduplication ID - should succeed
255+
handle4, err := Enqueue[wfInput, string](clientCtx, GenericEnqueueOptions[wfInput]{
256+
WorkflowName: "ServerWorkflow",
257+
QueueName: queue.Name,
258+
WorkflowInput: wfInput{Input: "test-input"},
259+
ApplicationVersion: serverCtx.GetApplicationVersion(),
260+
})
261+
require.NoError(t, err, "failed to enqueue workflow without deduplication ID")
262+
263+
// Wait for all successful workflows to complete
264+
result1, err := handle1.GetResult()
265+
require.NoError(t, err, "failed to get result from first workflow")
266+
assert.Equal(t, "processed: test-input", result1)
267+
268+
result3, err := handle3.GetResult()
269+
require.NoError(t, err, "failed to get result from third workflow")
270+
assert.Equal(t, "processed: test-input", result3)
271+
272+
result4, err := handle4.GetResult()
273+
require.NoError(t, err, "failed to get result from fourth workflow")
274+
assert.Equal(t, "processed: test-input", result4)
275+
276+
// After first workflow completes, we should be able to enqueue with same deduplication ID
277+
handle5, err := Enqueue[wfInput, string](clientCtx, GenericEnqueueOptions[wfInput]{
278+
WorkflowName: "ServerWorkflow",
279+
QueueName: queue.Name,
280+
WorkflowID: wfid2, // Reuse the workflow ID that failed before
281+
DeduplicationID: dedupID, // Same deduplication ID as first workflow
282+
WorkflowInput: wfInput{Input: "test-input"},
283+
ApplicationVersion: serverCtx.GetApplicationVersion(),
284+
})
285+
require.NoError(t, err, "failed to enqueue workflow with same dedup ID after completion")
286+
287+
result5, err := handle5.GetResult()
288+
require.NoError(t, err, "failed to get result from fifth workflow")
289+
assert.Equal(t, "processed: test-input", result5)
290+
291+
assert.True(t, queueEntriesAreCleanedUp(serverCtx), "expected queue entries to be cleaned up after deduplication test")
292+
})
293+
132294
// Verify all queue entries are cleaned up
133295
require.True(t, queueEntriesAreCleanedUp(serverCtx), "expected queue entries to be cleaned up after client tests")
134296
}

dbos/errors.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ const (
2020
StepExecutionError // General step execution error
2121
DeadLetterQueueError // Workflow moved to dead letter queue after max retries
2222
MaxStepRetriesExceeded // Step exceeded maximum retry attempts
23+
QueueDeduplicated // Workflow was deduplicated in the queue
2324
)
2425

2526
// DBOSError is the unified error type for all DBOS operations.
@@ -186,3 +187,13 @@ func newMaxStepRetriesExceededError(workflowID, stepName string, maxRetries int,
186187
IsBase: true,
187188
}
188189
}
190+
191+
func newQueueDeduplicatedError(workflowID, queueName, deduplicationID string) *DBOSError {
192+
return &DBOSError{
193+
Message: fmt.Sprintf("Workflow %s was deduplicated due to an existing workflow in queue %s with deduplication ID %s", workflowID, queueName, deduplicationID),
194+
Code: QueueDeduplicated,
195+
WorkflowID: workflowID,
196+
QueueName: queueName,
197+
DeduplicationID: deduplicationID,
198+
}
199+
}

0 commit comments

Comments
 (0)