Skip to content

Commit 4e8fd60

Browse files
committed
add tests
1 parent e526401 commit 4e8fd60

File tree

1 file changed

+204
-0
lines changed

1 file changed

+204
-0
lines changed

dbos/queues_test.go

Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,12 @@ func TestWorkflowQueues(t *testing.T) {
159159
}
160160
RegisterWorkflow(dbosCtx, workflowEnqueuesAnother)
161161

162+
// Simple workflow for NonExistingQueue test
163+
simpleWorkflow := func(ctx DBOSContext, input string) (string, error) {
164+
return input, nil
165+
}
166+
RegisterWorkflow(dbosCtx, simpleWorkflow)
167+
162168
err := Launch(dbosCtx)
163169
require.NoError(t, err)
164170

@@ -463,6 +469,26 @@ func TestWorkflowQueues(t *testing.T) {
463469

464470
require.True(t, queueEntriesAreCleanedUp(dbosCtx), "expected queue entries to be cleaned up after deduplication test")
465471
})
472+
473+
t.Run("NonExistingQueue", func(t *testing.T) {
474+
// Attempt to enqueue to a non-existing queue
475+
// This should return an error
476+
_, err := RunWorkflow(dbosCtx, simpleWorkflow, "test-input", WithQueue("non-existing-queue"))
477+
require.Error(t, err, "expected error when enqueueing to non-existing queue")
478+
479+
// Check that it's the correct error type
480+
var dbosErr *DBOSError
481+
require.ErrorAs(t, err, &dbosErr, "expected error to be of type *DBOSError, got %T", err)
482+
483+
// Verify the error is wrapped by newWorkflowExecutionError with WorkflowExecutionError code
484+
assert.True(t, errors.Is(err, &DBOSError{Code: WorkflowExecutionError}), "expected error to be WorkflowExecutionError")
485+
486+
// Verify the unwrapped error contains the validation message
487+
unwrappedErr := errors.Unwrap(dbosErr)
488+
require.NotNil(t, unwrappedErr, "expected error to have an unwrapped error")
489+
expectedMsgPart := "does not exist"
490+
assert.Contains(t, unwrappedErr.Error(), expectedMsgPart, "expected unwrapped error message to contain expected part")
491+
})
466492
}
467493

468494
func TestQueueRecovery(t *testing.T) {
@@ -1341,3 +1367,181 @@ func TestListQueuedWorkflows(t *testing.T) {
13411367
require.True(t, queueEntriesAreCleanedUp(dbosCtx), "queue entries should be cleaned up")
13421368
})
13431369
}
1370+
1371+
func TestPartitionedQueues(t *testing.T) {
1372+
t.Run("PartitionKeyWithoutQueue", func(t *testing.T) {
1373+
dbosCtx := setupDBOS(t, true, true)
1374+
1375+
// Register a simple workflow
1376+
simpleWorkflow := func(ctx DBOSContext, input string) (string, error) {
1377+
return input, nil
1378+
}
1379+
RegisterWorkflow(dbosCtx, simpleWorkflow)
1380+
1381+
err := Launch(dbosCtx)
1382+
require.NoError(t, err, "failed to launch DBOS instance")
1383+
1384+
// Attempt to enqueue with a partition key but no queue name
1385+
// This should return an error
1386+
_, err = RunWorkflow(dbosCtx, simpleWorkflow, "test-input", WithQueuePartitionKey("partition-1"))
1387+
require.Error(t, err, "expected error when enqueueing with partition key but no queue name")
1388+
1389+
// Check that it's the correct error type
1390+
var dbosErr *DBOSError
1391+
require.ErrorAs(t, err, &dbosErr, "expected error to be of type *DBOSError, got %T", err)
1392+
1393+
// Verify the error is wrapped by newWorkflowExecutionError with WorkflowExecutionError code
1394+
assert.True(t, errors.Is(err, &DBOSError{Code: WorkflowExecutionError}), "expected error to be WorkflowExecutionError")
1395+
1396+
// Verify the unwrapped error contains the validation message
1397+
unwrappedErr := errors.Unwrap(dbosErr)
1398+
require.NotNil(t, unwrappedErr, "expected error to have an unwrapped error")
1399+
expectedMsgPart := "partition key provided but queue name is missing"
1400+
assert.Contains(t, unwrappedErr.Error(), expectedMsgPart, "expected unwrapped error message to contain expected part")
1401+
})
1402+
1403+
t.Run("PartitionKeyOnNonPartitionedQueue", func(t *testing.T) {
1404+
dbosCtx := setupDBOS(t, true, true)
1405+
1406+
// Create a non-partitioned queue
1407+
nonPartitionedQueue := NewWorkflowQueue(dbosCtx, "non-partitioned-queue")
1408+
1409+
// Register a simple workflow
1410+
simpleWorkflow := func(ctx DBOSContext, input string) (string, error) {
1411+
return input, nil
1412+
}
1413+
RegisterWorkflow(dbosCtx, simpleWorkflow)
1414+
1415+
err := Launch(dbosCtx)
1416+
require.NoError(t, err, "failed to launch DBOS instance")
1417+
1418+
// Attempt to enqueue with a partition key on a non-partitioned queue
1419+
// This should return an error
1420+
_, err = RunWorkflow(dbosCtx, simpleWorkflow, "test-input", WithQueue(nonPartitionedQueue.Name), WithQueuePartitionKey("partition-1"))
1421+
require.Error(t, err, "expected error when enqueueing with partition key on non-partitioned queue")
1422+
1423+
// Check that it's the correct error type
1424+
var dbosErr *DBOSError
1425+
require.ErrorAs(t, err, &dbosErr, "expected error to be of type *DBOSError, got %T", err)
1426+
1427+
// Verify the error is wrapped by newWorkflowExecutionError with WorkflowExecutionError code
1428+
assert.True(t, errors.Is(err, &DBOSError{Code: WorkflowExecutionError}), "expected error to be WorkflowExecutionError")
1429+
1430+
// Verify the unwrapped error contains the validation message
1431+
unwrappedErr := errors.Unwrap(dbosErr)
1432+
require.NotNil(t, unwrappedErr, "expected error to have an unwrapped error")
1433+
expectedMsgPart := "is not a partitioned queue, but a partition key was provided"
1434+
assert.Contains(t, unwrappedErr.Error(), expectedMsgPart, "expected unwrapped error message to contain expected part")
1435+
})
1436+
1437+
t.Run("PartitionKeyWithDeduplicationID", func(t *testing.T) {
1438+
dbosCtx := setupDBOS(t, true, true)
1439+
1440+
// Create a partitioned queue
1441+
partitionedQueue := NewWorkflowQueue(dbosCtx, "partitioned-queue-test", WithPartitionQueue())
1442+
1443+
// Register a simple workflow
1444+
simpleWorkflow := func(ctx DBOSContext, input string) (string, error) {
1445+
return input, nil
1446+
}
1447+
RegisterWorkflow(dbosCtx, simpleWorkflow)
1448+
1449+
err := Launch(dbosCtx)
1450+
require.NoError(t, err, "failed to launch DBOS instance")
1451+
1452+
// Attempt to enqueue with both partition key and deduplication ID
1453+
// This should return an error
1454+
_, err = RunWorkflow(dbosCtx, simpleWorkflow, "test-input", WithQueue(partitionedQueue.Name), WithQueuePartitionKey("partition-1"), WithDeduplicationID("dedup-id"))
1455+
require.Error(t, err, "expected error when enqueueing with both partition key and deduplication ID")
1456+
1457+
// Check that it's the correct error type
1458+
var dbosErr *DBOSError
1459+
require.ErrorAs(t, err, &dbosErr, "expected error to be of type *DBOSError, got %T", err)
1460+
1461+
// Verify the error is wrapped by newWorkflowExecutionError with WorkflowExecutionError code
1462+
assert.True(t, errors.Is(err, &DBOSError{Code: WorkflowExecutionError}), "expected error to be WorkflowExecutionError")
1463+
1464+
// Verify the unwrapped error contains the validation message
1465+
unwrappedErr := errors.Unwrap(dbosErr)
1466+
require.NotNil(t, unwrappedErr, "expected error to have an unwrapped error")
1467+
expectedMsgPart := "partition key and deduplication ID cannot be used together"
1468+
assert.Contains(t, unwrappedErr.Error(), expectedMsgPart, "expected unwrapped error message to contain expected part")
1469+
})
1470+
1471+
t.Run("Dequeue", func(t *testing.T) {
1472+
dbosCtx := setupDBOS(t, true, true)
1473+
1474+
// Create a partitioned queue with concurrency limit of 1 per partition
1475+
partitionedQueue := NewWorkflowQueue(dbosCtx, "partitioned-queue", WithPartitionQueue(), WithGlobalConcurrency(1))
1476+
1477+
// Create events for blocking workflow on partition 1
1478+
partition1StartEvent := NewEvent()
1479+
partition1BlockEvent := NewEvent()
1480+
1481+
// Create blocking workflow for partition 1
1482+
blockingWorkflowP1 := func(ctx DBOSContext, input string) (string, error) {
1483+
partition1StartEvent.Set()
1484+
partition1BlockEvent.Wait()
1485+
return "p1-" + input, nil
1486+
}
1487+
1488+
// Create non-blocking workflow (used for both partitions)
1489+
nonBlockingWorkflow := func(ctx DBOSContext, input string) (string, error) {
1490+
return input, nil
1491+
}
1492+
1493+
RegisterWorkflow(dbosCtx, blockingWorkflowP1)
1494+
RegisterWorkflow(dbosCtx, nonBlockingWorkflow)
1495+
1496+
err := Launch(dbosCtx)
1497+
require.NoError(t, err, "failed to launch DBOS instance")
1498+
1499+
// Enqueue a blocking workflow on partition 1
1500+
handleP1Blocked, err := RunWorkflow(dbosCtx, blockingWorkflowP1, "blocked", WithQueue(partitionedQueue.Name), WithQueuePartitionKey("partition-1"))
1501+
require.NoError(t, err, "failed to enqueue blocking workflow on partition 1")
1502+
1503+
// Wait for the blocking workflow on partition 1 to start
1504+
partition1StartEvent.Wait()
1505+
1506+
// Enqueue a non-blocking workflow on partition 1 - this should be blocked behind the blocking one
1507+
handleP1Normal, err := RunWorkflow(dbosCtx, nonBlockingWorkflow, "p1-normal", WithQueue(partitionedQueue.Name), WithQueuePartitionKey("partition-1"))
1508+
require.NoError(t, err, "failed to enqueue normal workflow on partition 1")
1509+
1510+
// Verify the normal workflow is blocked (ENQUEUED status) behind the blocking one
1511+
statusP1Normal, err := handleP1Normal.GetStatus()
1512+
require.NoError(t, err, "failed to get status of normal workflow on partition 1")
1513+
assert.Equal(t, WorkflowStatusEnqueued, statusP1Normal.Status, "expected normal workflow on partition 1 to be ENQUEUED behind the blocking one")
1514+
1515+
// Enqueue multiple non-blocking workflows on partition 2 - these should all complete
1516+
// even though partition 1 is blocked, demonstrating partition independence
1517+
numP2Workflows := 3
1518+
handlesP2 := make([]WorkflowHandle[string], numP2Workflows)
1519+
for i := range numP2Workflows {
1520+
handle, err := RunWorkflow(dbosCtx, nonBlockingWorkflow, fmt.Sprintf("p2-workflow-%d", i), WithQueue(partitionedQueue.Name), WithQueuePartitionKey("partition-2"))
1521+
require.NoError(t, err, "failed to enqueue workflow %d on partition 2", i)
1522+
handlesP2[i] = handle
1523+
}
1524+
1525+
// Wait for all partition 2 workflows to complete
1526+
for i, handle := range handlesP2 {
1527+
result, err := handle.GetResult()
1528+
require.NoError(t, err, "failed to get result from partition 2 workflow %d", i)
1529+
expectedResult := fmt.Sprintf("p2-workflow-%d", i)
1530+
assert.Equal(t, expectedResult, result, "expected result from partition 2 workflow %d", i)
1531+
}
1532+
1533+
// Verify partition 1 blocking workflow is still pending
1534+
statusP1Blocked, err := handleP1Blocked.GetStatus()
1535+
require.NoError(t, err, "failed to get status of blocking workflow on partition 1")
1536+
assert.Equal(t, WorkflowStatusPending, statusP1Blocked.Status, "expected blocking workflow on partition 1 to still be pending")
1537+
1538+
// Verify the normal workflow on partition 1 is still enqueued
1539+
statusP1Normal, err = handleP1Normal.GetStatus()
1540+
require.NoError(t, err, "failed to get status of normal workflow on partition 1")
1541+
assert.Equal(t, WorkflowStatusEnqueued, statusP1Normal.Status, "expected normal workflow on partition 1 to still be ENQUEUED")
1542+
1543+
// Now unblock partition 1 blocking workflow
1544+
partition1BlockEvent.Set()
1545+
require.True(t, queueEntriesAreCleanedUp(dbosCtx), "expected queue entries to be cleaned up after partitioned queue test")
1546+
})
1547+
}

0 commit comments

Comments
 (0)