Skip to content

Commit de4366c

Browse files
authored
DBOS "client context" (#53)
This PR adds workflow management features usable by a "client context", that is, a DBOSContext object that is initialized but not launched. The benefits of doing so is being able to interact with DBOS workflows without having the process dequeue tasks, recover workflows, or run an admin server. Features added: - Resume workflow - Fork workflow - Enqueue workflow by name - Add ability to dismiss output/input from list workflows Because the registry is indexed by workflow FQN, we need to keep another map from workflow "custom" name to workflow FQN. This should have been tested in PR that added custom workflow names.
1 parent e63f890 commit de4366c

File tree

8 files changed

+2056
-83
lines changed

8 files changed

+2056
-83
lines changed

dbos/client_test.go

Lines changed: 1021 additions & 0 deletions
Large diffs are not rendered by default.

dbos/dbos.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ func processConfig(inputConfig *Config) (*Config, error) {
5454

5555
// Load defaults
5656
if dbosConfig.Logger == nil {
57-
dbosConfig.Logger = slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelDebug}))
57+
dbosConfig.Logger = slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo}))
5858
}
5959

6060
return dbosConfig, nil
@@ -76,15 +76,21 @@ type DBOSContext interface {
7676
// Workflow operations
7777
RunAsStep(_ DBOSContext, fn StepFunc) (any, error) // Execute a function as a durable step within a workflow
7878
RunAsWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opts ...WorkflowOption) (WorkflowHandle[any], error) // Start a new workflow execution
79-
Send(_ DBOSContext, input WorkflowSendInputInternal) error // Send a message to another workflow
79+
Send(_ DBOSContext, input WorkflowSendInput) error // Send a message to another workflow
8080
Recv(_ DBOSContext, input WorkflowRecvInput) (any, error) // Receive a message sent to this workflow
8181
SetEvent(_ DBOSContext, input WorkflowSetEventInput) error // Set a key-value event for this workflow
8282
GetEvent(_ DBOSContext, input WorkflowGetEventInput) (any, error) // Get a key-value event from a target workflow
8383
Sleep(duration time.Duration) (time.Duration, error) // Durable sleep that survives workflow recovery
8484
GetWorkflowID() (string, error) // Get the current workflow ID (only available within workflows)
85+
GetStepID() (int, error) // Get the current step ID (only available within workflows)
8586

8687
// Workflow management
87-
RetrieveWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) // Get a handle to an existing workflow
88+
RetrieveWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) // Get a handle to an existing workflow
89+
Enqueue(_ DBOSContext, params EnqueueOptions) (WorkflowHandle[any], error) // Enqueue a new workflow with parameters
90+
CancelWorkflow(workflowID string) error // Cancel a workflow by setting its status to CANCELLED
91+
ResumeWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) // Resume a cancelled workflow
92+
ForkWorkflow(_ DBOSContext, input ForkWorkflowInput) (WorkflowHandle[any], error) // Fork a workflow from a specific step
93+
ListWorkflows(opts ...ListWorkflowsOption) ([]WorkflowStatus, error) // List workflows based on filtering criteria
8894

8995
// Accessors
9096
GetApplicationVersion() string // Get the application version for this context
@@ -114,8 +120,9 @@ type dbosContext struct {
114120
workflowsWg *sync.WaitGroup
115121

116122
// Workflow registry
117-
workflowRegistry map[string]workflowRegistryEntry
118-
workflowRegMutex *sync.RWMutex
123+
workflowRegistry map[string]workflowRegistryEntry
124+
workflowRegMutex *sync.RWMutex
125+
workflowCustomNametoFQN sync.Map // Maps fully qualified workflow names to custom names. Usefor when client enqueues a workflow by name because registry is indexed by FQN.
119126

120127
// Workflow scheduler
121128
workflowScheduler *cron.Cron

dbos/queue.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,14 @@ func (qr *queueRunner) run(ctx *dbosContext) {
199199
}
200200
for _, workflow := range dequeuedWorkflows {
201201
// Find the workflow in the registry
202-
registeredWorkflow, exists := ctx.workflowRegistry[workflow.name]
202+
203+
wfName, ok := ctx.workflowCustomNametoFQN.Load(workflow.name)
204+
if !ok {
205+
ctx.logger.Error("Workflow not found in registry", "workflow_name", workflow.name)
206+
continue
207+
}
208+
209+
registeredWorkflow, exists := ctx.workflowRegistry[wfName.(string)]
203210
if !exists {
204211
ctx.logger.Error("workflow function not found in registry", "workflow_name", workflow.name)
205212
continue

dbos/recovery.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ func recoverPendingWorkflows(ctx *dbosContext, executorIDs []string) ([]Workflow
1111
status: []WorkflowStatusType{WorkflowStatusPending},
1212
executorIDs: executorIDs,
1313
applicationVersion: ctx.applicationVersion,
14+
loadInput: true,
1415
})
1516
if err != nil {
1617
return nil, err
@@ -36,7 +37,13 @@ func recoverPendingWorkflows(ctx *dbosContext, executorIDs []string) ([]Workflow
3637
continue
3738
}
3839

39-
registeredWorkflow, exists := ctx.workflowRegistry[workflow.Name]
40+
wfName, ok := ctx.workflowCustomNametoFQN.Load(workflow.Name)
41+
if !ok {
42+
ctx.logger.Error("Workflow not found in registry", "workflow_name", workflow.Name)
43+
continue
44+
}
45+
46+
registeredWorkflow, exists := ctx.workflowRegistry[wfName.(string)]
4047
if !exists {
4148
ctx.logger.Error("Workflow function not found in registry", "workflow_id", workflow.ID, "name", workflow.Name)
4249
continue

dbos/serialization_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -103,9 +103,9 @@ func TestWorkflowEncoding(t *testing.T) {
103103
}
104104

105105
// Test results from ListWorkflows
106-
workflows, err := executor.(*dbosContext).systemDB.listWorkflows(context.Background(), listWorkflowsDBInput{
107-
workflowIDs: []string{directHandle.GetWorkflowID()},
108-
})
106+
workflows, err := ListWorkflows(executor, WithWorkflowIDs(
107+
[]string{directHandle.GetWorkflowID()},
108+
))
109109
if err != nil {
110110
t.Fatalf("failed to list workflows: %v", err)
111111
}
@@ -220,9 +220,9 @@ func TestWorkflowEncoding(t *testing.T) {
220220
}
221221

222222
// Test results from ListWorkflows
223-
workflows, err := executor.(*dbosContext).systemDB.listWorkflows(context.Background(), listWorkflowsDBInput{
224-
workflowIDs: []string{directHandle.GetWorkflowID()},
225-
})
223+
workflows, err := ListWorkflows(executor, WithWorkflowIDs(
224+
[]string{directHandle.GetWorkflowID()},
225+
))
226226
if err != nil {
227227
t.Fatalf("failed to list workflows: %v", err)
228228
}
@@ -317,7 +317,7 @@ func setEventUserDefinedTypeWorkflow(ctx DBOSContext, input string) (string, err
317317
},
318318
}
319319

320-
err := SetEvent(ctx, WorkflowSetEventInputGeneric[UserDefinedEventData]{Key: input, Message: eventData})
320+
err := SetEvent(ctx, GenericWorkflowSetEventInput[UserDefinedEventData]{Key: input, Message: eventData})
321321
if err != nil {
322322
return "", err
323323
}
@@ -394,7 +394,7 @@ func sendUserDefinedTypeWorkflow(ctx DBOSContext, destinationID string) (string,
394394

395395
// Send should automatically register this type with gob
396396
// Note the explicit type parameter since compiler cannot infer UserDefinedEventData from string input
397-
err := Send(ctx, WorkflowSendInput[UserDefinedEventData]{
397+
err := Send(ctx, GenericWorkflowSendInput[UserDefinedEventData]{
398398
DestinationID: destinationID,
399399
Topic: "user-defined-topic",
400400
Message: sendData,

0 commit comments

Comments
 (0)