Skip to content

Commit 13a3aac

Browse files
authored
More tests (#51)
This PR adds * many tests (listed below) * ability to register a workflow with a custom name. * returns an error (instead of a emitting a warning) when a workflow is enqueued in a different queue. Custom name registration: we don't have custom interfaces / struct for workflows and at runtime have only access to the reflection-found workflow full qualified name. So we leverage the existing mechanism to retrieve registration-time workflow parameters (the registry is indexed by reflection names). This should be consistent within application versions, but remember that reflection names can change if the code changes. Added Tests: 1. Queue Conflicting Workflows: Tests that workflows with the same ID on different queues trigger ConflictingWorkflowError 2. Workflow Registration: Tests for double registration scenarios (with/without custom names) 3. Dynamic Registration: Tests that registering queues/workflows after launch panics 4. Child Workflow Recovery: Tests polling handles for recovered child workflows 5. concurrency tests: 3 new tests with 500 concurrent simple workflow, sender/receiver, setter/getter 6. Verify step IDs and names for `Send`, `Receive`, `SetEvent`, `GetEvent`
1 parent c965f36 commit 13a3aac

File tree

4 files changed

+906
-30
lines changed

4 files changed

+906
-30
lines changed

dbos/queues_test.go

Lines changed: 57 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"errors"
66
"fmt"
77
"os"
8+
"strings"
89
"sync/atomic"
910
"testing"
1011
"time"
@@ -23,6 +24,7 @@ This suite tests
2324
[x] worker concurrency (2 at a time across two "workers")
2425
[x] worker concurrency X recovery
2526
[x] rate limiter
27+
[x] conflicting workflow on different queues
2628
[] queue deduplication
2729
[] queue priority
2830
[x] queued workflow times out
@@ -48,6 +50,8 @@ func TestWorkflowQueues(t *testing.T) {
4850

4951
queue := NewWorkflowQueue(dbosCtx, "test-queue")
5052
dlqEnqueueQueue := NewWorkflowQueue(dbosCtx, "test-successive-enqueue-queue")
53+
conflictQueue1 := NewWorkflowQueue(dbosCtx, "conflict-queue-1")
54+
conflictQueue2 := NewWorkflowQueue(dbosCtx, "conflict-queue-2")
5155

5256
dlqStartEvent := NewEvent()
5357
dlqCompleteEvent := NewEvent()
@@ -172,14 +176,15 @@ func TestWorkflowQueues(t *testing.T) {
172176
}
173177
})
174178

175-
/* TODO: we will move queue registry in the new interface in a subsequent PR
176179
t.Run("DynamicRegistration", func(t *testing.T) {
177-
q := NewWorkflowQueue("dynamic-queue")
178-
if len(q.name) > 0 {
179-
t.Fatalf("expected nil queue for dynamic registration after DBOS initialization, got %v", q)
180-
}
180+
// Attempting to register a queue after launch should panic
181+
defer func() {
182+
if r := recover(); r == nil {
183+
t.Fatal("expected panic from queue registration after launch but got none")
184+
}
185+
}()
186+
NewWorkflowQueue(dbosCtx, "dynamic-queue")
181187
})
182-
*/
183188

184189
t.Run("QueueWorkflowDLQ", func(t *testing.T) {
185190
workflowID := "blocking-workflow-test"
@@ -255,6 +260,52 @@ func TestWorkflowQueues(t *testing.T) {
255260
t.Fatal("expected queue entries to be cleaned up after successive enqueues test")
256261
}
257262
})
263+
264+
t.Run("ConflictingWorkflowOnDifferentQueues", func(t *testing.T) {
265+
workflowID := "conflicting-workflow-id"
266+
267+
// Enqueue the same workflow ID on the first queue
268+
handle, err := RunAsWorkflow(dbosCtx, queueWorkflow, "test-input-1", WithQueue(conflictQueue1.Name), WithWorkflowID(workflowID))
269+
if err != nil {
270+
t.Fatalf("failed to enqueue workflow on first queue: %v", err)
271+
}
272+
273+
// Get the result from the first workflow to ensure it completes
274+
result, err := handle.GetResult()
275+
if err != nil {
276+
t.Fatalf("failed to get result from first workflow: %v", err)
277+
}
278+
if result != "test-input-1" {
279+
t.Fatalf("expected 'test-input-1', got %v", result)
280+
}
281+
282+
// Now try to enqueue the same workflow ID on a different queue
283+
// This should trigger a ConflictingWorkflowError
284+
_, err = RunAsWorkflow(dbosCtx, queueWorkflow, "test-input-2", WithQueue(conflictQueue2.Name), WithWorkflowID(workflowID))
285+
if err == nil {
286+
t.Fatal("expected ConflictingWorkflowError when enqueueing same workflow ID on different queue, but got none")
287+
}
288+
289+
// Check that it's the correct error type
290+
dbosErr, ok := err.(*DBOSError)
291+
if !ok {
292+
t.Fatalf("expected error to be of type *DBOSError, got %T", err)
293+
}
294+
295+
if dbosErr.Code != ConflictingWorkflowError {
296+
t.Fatalf("expected error code to be ConflictingWorkflowError, got %v", dbosErr.Code)
297+
}
298+
299+
// Check that the error message contains queue information
300+
expectedMsgPart := "Workflow already exists in a different queue"
301+
if !strings.Contains(err.Error(), expectedMsgPart) {
302+
t.Fatalf("expected error message to contain '%s', got '%s'", expectedMsgPart, err.Error())
303+
}
304+
305+
if !queueEntriesAreCleanedUp(dbosCtx) {
306+
t.Fatal("expected queue entries to be cleaned up after conflicting workflow test")
307+
}
308+
})
258309
}
259310

260311
func TestQueueRecovery(t *testing.T) {

dbos/system_database.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -380,7 +380,7 @@ func (s *systemDatabase) InsertWorkflowStatus(ctx context.Context, input insertW
380380
return nil, newConflictingWorkflowError(input.status.ID, fmt.Sprintf("Workflow already exists with a different name: %s, but the provided name is: %s", result.name, input.status.Name))
381381
}
382382
if len(input.status.QueueName) > 0 && result.queueName != nil && input.status.QueueName != *result.queueName {
383-
s.logger.Warn("Queue name conflict for workflow", "workflow_id", input.status.ID, "result_queue", *result.queueName, "status_queue", input.status.QueueName)
383+
return nil, newConflictingWorkflowError(input.status.ID, fmt.Sprintf("Workflow already exists in a different queue: %s, but the provided queue is: %s", *result.queueName, input.status.QueueName))
384384
}
385385

386386
// Every time we start executing a workflow (and thus attempt to insert its status), we increment `recovery_attempts` by 1.

dbos/workflow.go

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"math"
99
"reflect"
1010
"runtime"
11+
"sync"
1112
"time"
1213

1314
"github.com/google/uuid"
@@ -204,10 +205,11 @@ type WrappedWorkflowFunc func(ctx DBOSContext, input any, opts ...WorkflowOption
204205
type workflowRegistryEntry struct {
205206
wrappedFunction WrappedWorkflowFunc
206207
maxRetries int
208+
name string
207209
}
208210

209211
// Register adds a workflow function to the registry (thread-safe, only once per name)
210-
func registerWorkflow(ctx DBOSContext, workflowName string, fn WrappedWorkflowFunc, maxRetries int) {
212+
func registerWorkflow(ctx DBOSContext, workflowFQN string, fn WrappedWorkflowFunc, maxRetries int, customName string) {
211213
// Skip if we don't have a concrete dbosContext
212214
c, ok := ctx.(*dbosContext)
213215
if !ok {
@@ -221,14 +223,15 @@ func registerWorkflow(ctx DBOSContext, workflowName string, fn WrappedWorkflowFu
221223
c.workflowRegMutex.Lock()
222224
defer c.workflowRegMutex.Unlock()
223225

224-
if _, exists := c.workflowRegistry[workflowName]; exists {
225-
c.logger.Error("workflow function already registered", "fqn", workflowName)
226-
panic(newConflictingRegistrationError(workflowName))
226+
if _, exists := c.workflowRegistry[workflowFQN]; exists {
227+
c.logger.Error("workflow function already registered", "fqn", workflowFQN)
228+
panic(newConflictingRegistrationError(workflowFQN))
227229
}
228230

229-
c.workflowRegistry[workflowName] = workflowRegistryEntry{
231+
c.workflowRegistry[workflowFQN] = workflowRegistryEntry{
230232
wrappedFunction: fn,
231233
maxRetries: maxRetries,
234+
name: customName,
232235
}
233236
}
234237

@@ -274,6 +277,7 @@ func registerScheduledWorkflow(ctx DBOSContext, workflowName string, fn Workflow
274277
type workflowRegistrationParams struct {
275278
cronSchedule string
276279
maxRetries int
280+
name string
277281
}
278282

279283
type workflowRegistrationOption func(*workflowRegistrationParams)
@@ -294,6 +298,12 @@ func WithSchedule(schedule string) workflowRegistrationOption {
294298
}
295299
}
296300

301+
func WithWorkflowName(name string) workflowRegistrationOption {
302+
return func(p *workflowRegistrationParams) {
303+
p.name = name
304+
}
305+
}
306+
297307
// RegisterWorkflow registers the provided function as a durable workflow with the provided DBOSContext workflow registry
298308
// If the workflow is a scheduled workflow (determined by the presence of a cron schedule), it will also register a cron job to execute it
299309
// RegisterWorkflow is generically typed, providing compile-time type checking and allowing us to register the workflow input and output types for gob encoding
@@ -347,7 +357,7 @@ func RegisterWorkflow[P any, R any](ctx DBOSContext, fn GenericWorkflowFunc[P, R
347357
}
348358
return &workflowPollingHandle[any]{workflowID: handle.GetWorkflowID(), dbosContext: ctx}, nil // this is only used by recovery and queue runner so far -- queue runner dismisses it
349359
})
350-
registerWorkflow(ctx, fqn, typeErasedWrapper, registrationParams.maxRetries)
360+
registerWorkflow(ctx, fqn, typeErasedWrapper, registrationParams.maxRetries, registrationParams.name)
351361

352362
// If this is a scheduled workflow, register a cron job
353363
if registrationParams.cronSchedule != "" {
@@ -397,6 +407,7 @@ func WithApplicationVersion(version string) WorkflowOption {
397407
}
398408
}
399409

410+
// An internal option we use to map the reflection function name to the registration options.
400411
func withWorkflowName(name string) WorkflowOption {
401412
return func(p *workflowParams) {
402413
p.workflowName = name
@@ -484,6 +495,9 @@ func (c *dbosContext) RunAsWorkflow(_ DBOSContext, fn WorkflowFunc, input any, o
484495
if registeredWorkflow.maxRetries > 0 {
485496
params.maxRetries = registeredWorkflow.maxRetries
486497
}
498+
if len(registeredWorkflow.name) > 0 {
499+
params.workflowName = registeredWorkflow.name
500+
}
487501

488502
// Check if we are within a workflow (and thus a child workflow)
489503
parentWorkflowState, ok := c.Value(workflowStateKey).(*workflowState)
@@ -705,7 +719,12 @@ func setStepParamDefaults(params *StepParams, stepName string) *StepParams {
705719
BackoffFactor: 2.0,
706720
BaseInterval: 100 * time.Millisecond, // Default base interval
707721
MaxInterval: 5 * time.Second, // Default max interval
708-
StepName: typeErasedStepNameToStepName[stepName],
722+
StepName: func() string {
723+
if value, ok := typeErasedStepNameToStepName.Load(stepName); ok {
724+
return value.(string)
725+
}
726+
return "" // This should never happen
727+
}(),
709728
}
710729
}
711730

@@ -719,15 +738,17 @@ func setStepParamDefaults(params *StepParams, stepName string) *StepParams {
719738
if params.MaxInterval == 0 {
720739
params.MaxInterval = 5 * time.Second // Default max interval
721740
}
722-
if params.StepName == "" {
741+
if len(params.StepName) == 0 {
723742
// If the step name is not provided, use the function name
724-
params.StepName = typeErasedStepNameToStepName[stepName]
743+
if value, ok := typeErasedStepNameToStepName.Load(stepName); ok {
744+
params.StepName = value.(string)
745+
}
725746
}
726747

727748
return params
728749
}
729750

730-
var typeErasedStepNameToStepName = make(map[string]string)
751+
var typeErasedStepNameToStepName sync.Map
731752

732753
func RunAsStep[R any](ctx DBOSContext, fn GenericStepFunc[R]) (R, error) {
733754
if ctx == nil {
@@ -742,7 +763,8 @@ func RunAsStep[R any](ctx DBOSContext, fn GenericStepFunc[R]) (R, error) {
742763

743764
// Type-erase the function
744765
typeErasedFn := StepFunc(func(ctx context.Context) (any, error) { return fn(ctx) })
745-
typeErasedStepNameToStepName[runtime.FuncForPC(reflect.ValueOf(typeErasedFn).Pointer()).Name()] = stepName
766+
typeErasedFnName := runtime.FuncForPC(reflect.ValueOf(typeErasedFn).Pointer()).Name()
767+
typeErasedStepNameToStepName.LoadOrStore(typeErasedFnName, stepName)
746768

747769
// Call the executor method and pass through the result/error
748770
result, err := ctx.RunAsStep(ctx, typeErasedFn)

0 commit comments

Comments
 (0)