Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,021 changes: 1,021 additions & 0 deletions dbos/client_test.go

Large diffs are not rendered by default.

17 changes: 12 additions & 5 deletions dbos/dbos.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func processConfig(inputConfig *Config) (*Config, error) {

// Load defaults
if dbosConfig.Logger == nil {
dbosConfig.Logger = slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelDebug}))
dbosConfig.Logger = slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo}))
}

return dbosConfig, nil
Expand All @@ -76,15 +76,21 @@ type DBOSContext interface {
// Workflow operations
RunAsStep(_ DBOSContext, fn StepFunc) (any, error) // Execute a function as a durable step within a workflow
RunAsWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opts ...WorkflowOption) (WorkflowHandle[any], error) // Start a new workflow execution
Send(_ DBOSContext, input WorkflowSendInputInternal) error // Send a message to another workflow
Send(_ DBOSContext, input WorkflowSendInput) error // Send a message to another workflow
Recv(_ DBOSContext, input WorkflowRecvInput) (any, error) // Receive a message sent to this workflow
SetEvent(_ DBOSContext, input WorkflowSetEventInput) error // Set a key-value event for this workflow
GetEvent(_ DBOSContext, input WorkflowGetEventInput) (any, error) // Get a key-value event from a target workflow
Sleep(duration time.Duration) (time.Duration, error) // Durable sleep that survives workflow recovery
GetWorkflowID() (string, error) // Get the current workflow ID (only available within workflows)
GetStepID() (int, error) // Get the current step ID (only available within workflows)

// Workflow management
RetrieveWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) // Get a handle to an existing workflow
RetrieveWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) // Get a handle to an existing workflow
Enqueue(_ DBOSContext, params EnqueueOptions) (WorkflowHandle[any], error) // Enqueue a new workflow with parameters
CancelWorkflow(workflowID string) error // Cancel a workflow by setting its status to CANCELLED
ResumeWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) // Resume a cancelled workflow
ForkWorkflow(_ DBOSContext, input ForkWorkflowInput) (WorkflowHandle[any], error) // Fork a workflow from a specific step
ListWorkflows(opts ...ListWorkflowsOption) ([]WorkflowStatus, error) // List workflows based on filtering criteria

// Accessors
GetApplicationVersion() string // Get the application version for this context
Expand Down Expand Up @@ -114,8 +120,9 @@ type dbosContext struct {
workflowsWg *sync.WaitGroup

// Workflow registry
workflowRegistry map[string]workflowRegistryEntry
workflowRegMutex *sync.RWMutex
workflowRegistry map[string]workflowRegistryEntry
workflowRegMutex *sync.RWMutex
workflowCustomNametoFQN sync.Map // Maps fully qualified workflow names to custom names. Usefor when client enqueues a workflow by name because registry is indexed by FQN.

// Workflow scheduler
workflowScheduler *cron.Cron
Expand Down
9 changes: 8 additions & 1 deletion dbos/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,14 @@ func (qr *queueRunner) run(ctx *dbosContext) {
}
for _, workflow := range dequeuedWorkflows {
// Find the workflow in the registry
registeredWorkflow, exists := ctx.workflowRegistry[workflow.name]

wfName, ok := ctx.workflowCustomNametoFQN.Load(workflow.name)
if !ok {
ctx.logger.Error("Workflow not found in registry", "workflow_name", workflow.name)
continue
}

registeredWorkflow, exists := ctx.workflowRegistry[wfName.(string)]
if !exists {
ctx.logger.Error("workflow function not found in registry", "workflow_name", workflow.name)
continue
Expand Down
9 changes: 8 additions & 1 deletion dbos/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ func recoverPendingWorkflows(ctx *dbosContext, executorIDs []string) ([]Workflow
status: []WorkflowStatusType{WorkflowStatusPending},
executorIDs: executorIDs,
applicationVersion: ctx.applicationVersion,
loadInput: true,
})
if err != nil {
return nil, err
Expand All @@ -36,7 +37,13 @@ func recoverPendingWorkflows(ctx *dbosContext, executorIDs []string) ([]Workflow
continue
}

registeredWorkflow, exists := ctx.workflowRegistry[workflow.Name]
wfName, ok := ctx.workflowCustomNametoFQN.Load(workflow.Name)
if !ok {
ctx.logger.Error("Workflow not found in registry", "workflow_name", workflow.Name)
continue
}

registeredWorkflow, exists := ctx.workflowRegistry[wfName.(string)]
if !exists {
ctx.logger.Error("Workflow function not found in registry", "workflow_id", workflow.ID, "name", workflow.Name)
continue
Expand Down
16 changes: 8 additions & 8 deletions dbos/serialization_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ func TestWorkflowEncoding(t *testing.T) {
}

// Test results from ListWorkflows
workflows, err := executor.(*dbosContext).systemDB.listWorkflows(context.Background(), listWorkflowsDBInput{
workflowIDs: []string{directHandle.GetWorkflowID()},
})
workflows, err := ListWorkflows(executor, WithWorkflowIDs(
[]string{directHandle.GetWorkflowID()},
))
if err != nil {
t.Fatalf("failed to list workflows: %v", err)
}
Expand Down Expand Up @@ -220,9 +220,9 @@ func TestWorkflowEncoding(t *testing.T) {
}

// Test results from ListWorkflows
workflows, err := executor.(*dbosContext).systemDB.listWorkflows(context.Background(), listWorkflowsDBInput{
workflowIDs: []string{directHandle.GetWorkflowID()},
})
workflows, err := ListWorkflows(executor, WithWorkflowIDs(
[]string{directHandle.GetWorkflowID()},
))
if err != nil {
t.Fatalf("failed to list workflows: %v", err)
}
Expand Down Expand Up @@ -317,7 +317,7 @@ func setEventUserDefinedTypeWorkflow(ctx DBOSContext, input string) (string, err
},
}

err := SetEvent(ctx, WorkflowSetEventInputGeneric[UserDefinedEventData]{Key: input, Message: eventData})
err := SetEvent(ctx, GenericWorkflowSetEventInput[UserDefinedEventData]{Key: input, Message: eventData})
if err != nil {
return "", err
}
Expand Down Expand Up @@ -394,7 +394,7 @@ func sendUserDefinedTypeWorkflow(ctx DBOSContext, destinationID string) (string,

// Send should automatically register this type with gob
// Note the explicit type parameter since compiler cannot infer UserDefinedEventData from string input
err := Send(ctx, WorkflowSendInput[UserDefinedEventData]{
err := Send(ctx, GenericWorkflowSendInput[UserDefinedEventData]{
DestinationID: destinationID,
Topic: "user-defined-topic",
Message: sendData,
Expand Down
Loading
Loading