Skip to content

Commit 8902470

Browse files
committed
client tests
1 parent 77d1586 commit 8902470

File tree

2 files changed

+84
-3
lines changed

2 files changed

+84
-3
lines changed

dbos/client.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -146,12 +146,12 @@ func (c *client) Enqueue(queueName, workflowName string, input any, opts ...Enqu
146146

147147
// Validate partition key is not provided without queue name
148148
if len(params.queuePartitionKey) > 0 && len(queueName) == 0 {
149-
return nil, newWorkflowExecutionError("", fmt.Errorf("partition key provided but queue name is missing"))
149+
return nil, fmt.Errorf("partition key provided but queue name is missing")
150150
}
151151

152152
// Validate partition key and deduplication ID are not both provided (they are incompatible)
153153
if len(params.queuePartitionKey) > 0 && len(params.deduplicationID) > 0 {
154-
return nil, newWorkflowExecutionError("", fmt.Errorf("partition key and deduplication ID cannot be used together"))
154+
return nil, fmt.Errorf("partition key and deduplication ID cannot be used together")
155155
}
156156

157157
workflowID := params.workflowID

dbos/client_test.go

Lines changed: 82 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212
"github.com/stretchr/testify/require"
1313
)
1414

15-
func TestEnqueue(t *testing.T) {
15+
func TestClientEnqueue(t *testing.T) {
1616
// Setup server context - this will process tasks
1717
serverCtx := setupDBOS(t, true, true)
1818

@@ -23,6 +23,10 @@ func TestEnqueue(t *testing.T) {
2323
// Must be created before Launch()
2424
priorityQueue := NewWorkflowQueue(serverCtx, "priority-test-queue", WithGlobalConcurrency(1), WithPriorityEnabled())
2525

26+
// Create a partitioned queue for partition key test
27+
// Must be created before Launch()
28+
partitionedQueue := NewWorkflowQueue(serverCtx, "client-partitioned-queue", WithPartitionQueue())
29+
2630
// Track execution order for priority test
2731
var executionOrder []string
2832
var mu sync.Mutex
@@ -59,6 +63,12 @@ func TestEnqueue(t *testing.T) {
5963
}
6064
RegisterWorkflow(serverCtx, priorityWorkflow, WithWorkflowName("PriorityWorkflow"))
6165

66+
// Simple workflow for partitioned queue test
67+
partitionedWorkflow := func(ctx DBOSContext, input string) (string, error) {
68+
return "partitioned: " + input, nil
69+
}
70+
RegisterWorkflow(serverCtx, partitionedWorkflow, WithWorkflowName("PartitionedWorkflow"))
71+
6272
// Launch the server context to start processing tasks
6373
err := Launch(serverCtx)
6474
require.NoError(t, err)
@@ -257,6 +267,77 @@ func TestEnqueue(t *testing.T) {
257267
assert.True(t, queueEntriesAreCleanedUp(serverCtx), "expected queue entries to be cleaned up after deduplication test")
258268
})
259269

270+
t.Run("EnqueueToPartitionedQueue", func(t *testing.T) {
271+
// Enqueue a workflow to a partitioned queue with a partition key
272+
handle, err := Enqueue[string, string](client, partitionedQueue.Name, "PartitionedWorkflow", "test-input",
273+
WithEnqueueQueuePartitionKey("partition-1"),
274+
WithEnqueueApplicationVersion(serverCtx.GetApplicationVersion()))
275+
require.NoError(t, err, "failed to enqueue workflow to partitioned queue")
276+
277+
// Verify we got a polling handle
278+
_, ok := handle.(*workflowPollingHandle[string])
279+
require.True(t, ok, "expected handle to be of type workflowPollingHandle, got %T", handle)
280+
281+
// Get the result
282+
result, err := handle.GetResult()
283+
require.NoError(t, err, "failed to get result from partitioned queue workflow")
284+
285+
expectedResult := "partitioned: test-input"
286+
assert.Equal(t, expectedResult, result, "expected result to match")
287+
288+
// Verify the workflow status
289+
status, err := handle.GetStatus()
290+
require.NoError(t, err, "failed to get workflow status")
291+
292+
assert.Equal(t, WorkflowStatusSuccess, status.Status, "expected workflow status to be SUCCESS")
293+
assert.Equal(t, "PartitionedWorkflow", status.Name, "expected workflow name to match")
294+
assert.Equal(t, partitionedQueue.Name, status.QueueName, "expected queue name to match")
295+
296+
assert.True(t, queueEntriesAreCleanedUp(serverCtx), "expected queue entries to be cleaned up after partitioned queue test")
297+
})
298+
299+
t.Run("EnqueueWithPartitionKeyWithoutQueue", func(t *testing.T) {
300+
// Attempt to enqueue with a partition key but no queue name
301+
_, err := Enqueue[string, string](client, "", "PartitionedWorkflow", "test-input",
302+
WithEnqueueQueuePartitionKey("partition-1"))
303+
require.Error(t, err, "expected error when enqueueing with partition key but no queue name")
304+
305+
// Verify the error message contains the expected text
306+
assert.Contains(t, err.Error(), "queue name is required", "expected error message to contain 'queue name is required'")
307+
})
308+
309+
t.Run("EnqueueWithPartitionKeyAndDeduplicationID", func(t *testing.T) {
310+
// Attempt to enqueue with both partition key and deduplication ID
311+
// This should return an error
312+
_, err := Enqueue[string, string](client, partitionedQueue.Name, "PartitionedWorkflow", "test-input",
313+
WithEnqueueQueuePartitionKey("partition-1"),
314+
WithEnqueueDeduplicationID("dedup-id"))
315+
require.Error(t, err, "expected error when enqueueing with both partition key and deduplication ID")
316+
317+
// Verify the error message contains the expected text
318+
assert.Contains(t, err.Error(), "partition key and deduplication ID cannot be used together", "expected error message to contain validation message")
319+
})
320+
321+
t.Run("EnqueueWithEmptyQueueName", func(t *testing.T) {
322+
// Attempt to enqueue with empty queue name
323+
// This should return an error
324+
_, err := Enqueue[wfInput, string](client, "", "ServerWorkflow", wfInput{Input: "test-input"})
325+
require.Error(t, err, "expected error when enqueueing with empty queue name")
326+
327+
// Verify the error message contains the expected text
328+
assert.Contains(t, err.Error(), "queue name is required", "expected error message to contain 'queue name is required'")
329+
})
330+
331+
t.Run("EnqueueWithEmptyWorkflowName", func(t *testing.T) {
332+
// Attempt to enqueue with empty workflow name
333+
// This should return an error
334+
_, err := Enqueue[wfInput, string](client, queue.Name, "", wfInput{Input: "test-input"})
335+
require.Error(t, err, "expected error when enqueueing with empty workflow name")
336+
337+
// Verify the error message contains the expected text
338+
assert.Contains(t, err.Error(), "workflow name is required", "expected error message to contain 'workflow name is required'")
339+
})
340+
260341
// Verify all queue entries are cleaned up
261342
require.True(t, queueEntriesAreCleanedUp(serverCtx), "expected queue entries to be cleaned up after client tests")
262343
}

0 commit comments

Comments
 (0)