From fd7ff688c2c2f1f9a890e689dcaacf5e7acd7a95 Mon Sep 17 00:00:00 2001 From: maxdml Date: Fri, 29 Aug 2025 17:22:12 -0700 Subject: [PATCH 1/3] public GetWorkflowSteps + create internal Q at launch --- dbos/admin_server.go | 2 +- dbos/conductor.go | 4 ++-- dbos/conductor_protocol.go | 4 ++-- dbos/dbos.go | 19 ++++++++------- dbos/queues_test.go | 10 ++++---- dbos/serialization_test.go | 4 ++-- dbos/system_database.go | 21 +++++++++-------- dbos/workflow.go | 31 +++++++++++++++++++++++- dbos/workflows_test.go | 48 +++++++++++++++++++------------------- 9 files changed, 87 insertions(+), 56 deletions(-) diff --git a/dbos/admin_server.go b/dbos/admin_server.go index 5696daed..0f7fc68b 100644 --- a/dbos/admin_server.go +++ b/dbos/admin_server.go @@ -412,7 +412,7 @@ func newAdminServer(ctx *dbosContext, port int) *adminServer { mux.HandleFunc(_WORKFLOW_STEPS_PATTERN, func(w http.ResponseWriter, r *http.Request) { workflowID := r.PathValue("id") - steps, err := ctx.systemDB.getWorkflowSteps(ctx, workflowID) + steps, err := GetWorkflowSteps(ctx, workflowID) if err != nil { ctx.logger.Error("Failed to list workflow steps", "workflow_id", workflowID, "error", err) http.Error(w, fmt.Sprintf("Failed to list steps: %v", err), http.StatusInternalServerError) diff --git a/dbos/conductor.go b/dbos/conductor.go index 7e73e2fd..14c50996 100644 --- a/dbos/conductor.go +++ b/dbos/conductor.go @@ -719,8 +719,8 @@ func (c *Conductor) handleListStepsRequest(data []byte, requestID string) error } c.logger.Debug("Handling list steps request", "request", req) - // Get workflow steps using the existing systemDB method - steps, err := c.dbosCtx.systemDB.getWorkflowSteps(c.dbosCtx, req.WorkflowID) + // Get workflow steps using the public GetWorkflowSteps method + steps, err := GetWorkflowSteps(c.dbosCtx, req.WorkflowID) if err != nil { c.logger.Error("Failed to list workflow steps", "workflow_id", req.WorkflowID, "error", err) errorMsg := fmt.Sprintf("failed to list workflow steps: %v", err) diff --git a/dbos/conductor_protocol.go b/dbos/conductor_protocol.go index 7f06065b..7aecd211 100644 --- a/dbos/conductor_protocol.go +++ b/dbos/conductor_protocol.go @@ -190,8 +190,8 @@ type listStepsConductorResponse struct { Output *[]workflowStepsConductorResponseBody `json:"output,omitempty"` } -// formatWorkflowStepsResponseBody converts stepInfo to workflowStepsConductorResponseBody for the conductor protocol -func formatWorkflowStepsResponseBody(step stepInfo) workflowStepsConductorResponseBody { +// formatWorkflowStepsResponseBody converts StepInfo to workflowStepsConductorResponseBody for the conductor protocol +func formatWorkflowStepsResponseBody(step StepInfo) workflowStepsConductorResponseBody { output := workflowStepsConductorResponseBody{ FunctionID: step.StepID, FunctionName: step.StepName, diff --git a/dbos/dbos.go b/dbos/dbos.go index c9013818..726f1542 100644 --- a/dbos/dbos.go +++ b/dbos/dbos.go @@ -101,15 +101,15 @@ type DBOSContext interface { Shutdown(timeout time.Duration) // Gracefully shutdown all DBOS runtime components with ordered cleanup sequence // Workflow operations - RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption) (any, error) // Execute a function as a durable step within a workflow + RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption) (any, error) // Execute a function as a durable step within a workflow RunWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opts ...WorkflowOption) (WorkflowHandle[any], error) // Start a new workflow execution - Send(_ DBOSContext, destinationID string, message any, topic string) error // Send a message to another workflow - Recv(_ DBOSContext, topic string, timeout time.Duration) (any, error) // Receive a message sent to this workflow - SetEvent(_ DBOSContext, key string, message any) error // Set a key-value event for this workflow - GetEvent(_ DBOSContext, targetWorkflowID string, key string, timeout time.Duration) (any, error) // Get a key-value event from a target workflow - Sleep(_ DBOSContext, duration time.Duration) (time.Duration, error) // Durable sleep that survives workflow recovery - GetWorkflowID() (string, error) // Get the current workflow ID (only available within workflows) - GetStepID() (int, error) // Get the current step ID (only available within workflows) + Send(_ DBOSContext, destinationID string, message any, topic string) error // Send a message to another workflow + Recv(_ DBOSContext, topic string, timeout time.Duration) (any, error) // Receive a message sent to this workflow + SetEvent(_ DBOSContext, key string, message any) error // Set a key-value event for this workflow + GetEvent(_ DBOSContext, targetWorkflowID string, key string, timeout time.Duration) (any, error) // Get a key-value event from a target workflow + Sleep(_ DBOSContext, duration time.Duration) (time.Duration, error) // Durable sleep that survives workflow recovery + GetWorkflowID() (string, error) // Get the current workflow ID (only available within workflows) + GetStepID() (int, error) // Get the current step ID (only available within workflows) // Workflow management RetrieveWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) // Get a handle to an existing workflow @@ -118,6 +118,7 @@ type DBOSContext interface { ResumeWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) // Resume a cancelled workflow ForkWorkflow(_ DBOSContext, input ForkWorkflowInput) (WorkflowHandle[any], error) // Fork a workflow from a specific step ListWorkflows(_ DBOSContext, opts ...ListWorkflowsOption) ([]WorkflowStatus, error) // List workflows based on filtering criteria + GetWorkflowSteps(_ DBOSContext, workflowID string) ([]StepInfo, error) // Get the execution steps of a workflow // Accessors GetApplicationVersion() string // Get the application version for this context @@ -328,7 +329,6 @@ func NewDBOSContext(inputConfig Config) (DBOSContext, error) { // Initialize the queue runner and register DBOS internal queue initExecutor.queueRunner = newQueueRunner(initExecutor.logger) - NewWorkflowQueue(initExecutor, _DBOS_INTERNAL_QUEUE_NAME) // Initialize conductor if API key is provided if config.ConductorAPIKey != "" { @@ -383,6 +383,7 @@ func (c *dbosContext) Launch() error { } // Start the queue runner in a goroutine + NewWorkflowQueue(c, _DBOS_INTERNAL_QUEUE_NAME) go func() { c.queueRunner.run(c) }() diff --git a/dbos/queues_test.go b/dbos/queues_test.go index 1b84c1bf..3128dbea 100644 --- a/dbos/queues_test.go +++ b/dbos/queues_test.go @@ -173,7 +173,7 @@ func TestWorkflowQueues(t *testing.T) { assert.Equal(t, "test-input", res) // List steps: the workflow should have 1 step - steps, err := dbosCtx.(*dbosContext).systemDB.getWorkflowSteps(dbosCtx, handle.GetWorkflowID()) + steps, err := GetWorkflowSteps(dbosCtx, handle.GetWorkflowID()) require.NoError(t, err) assert.Len(t, steps, 1) assert.Equal(t, 0, steps[0].StepID) @@ -207,7 +207,7 @@ func TestWorkflowQueues(t *testing.T) { assert.Equal(t, expectedResult, res) // List steps: the workflow should have 2 steps (Start the child and GetResult) - steps, err := dbosCtx.(*dbosContext).systemDB.getWorkflowSteps(dbosCtx, handle.GetWorkflowID()) + steps, err := GetWorkflowSteps(dbosCtx, handle.GetWorkflowID()) require.NoError(t, err) assert.Len(t, steps, 2) assert.Equal(t, runtime.FuncForPC(reflect.ValueOf(queueWorkflow).Pointer()).Name(), steps[0].StepName) @@ -230,7 +230,7 @@ func TestWorkflowQueues(t *testing.T) { assert.Equal(t, expectedResult, res) // List steps: the workflow should have 2 steps (Start the child and GetResult) - steps, err := dbosCtx.(*dbosContext).systemDB.getWorkflowSteps(dbosCtx, handle.GetWorkflowID()) + steps, err := GetWorkflowSteps(dbosCtx, handle.GetWorkflowID()) require.NoError(t, err) assert.Len(t, steps, 2) assert.Equal(t, runtime.FuncForPC(reflect.ValueOf(queueWorkflow).Pointer()).Name(), steps[0].StepName) @@ -253,7 +253,7 @@ func TestWorkflowQueues(t *testing.T) { assert.Equal(t, expectedResult, res) // List steps: the workflow should have 2 steps (Start the child and GetResult) - steps, err := dbosCtx.(*dbosContext).systemDB.getWorkflowSteps(dbosCtx, handle.GetWorkflowID()) + steps, err := GetWorkflowSteps(dbosCtx, handle.GetWorkflowID()) require.NoError(t, err) assert.Len(t, steps, 2) assert.Equal(t, "custom-name", steps[0].StepName) @@ -279,7 +279,7 @@ func TestWorkflowQueues(t *testing.T) { // Check that the parent workflow (the one we ran directly) has 2 steps: // one for enqueueing the child and one for calling GetResult - steps, err := dbosCtx.(*dbosContext).systemDB.getWorkflowSteps(dbosCtx, handle.GetWorkflowID()) + steps, err := GetWorkflowSteps(dbosCtx, handle.GetWorkflowID()) require.NoError(t, err) assert.Len(t, steps, 2) assert.Equal(t, runtime.FuncForPC(reflect.ValueOf(queueWorkflow).Pointer()).Name(), steps[0].StepName) diff --git a/dbos/serialization_test.go b/dbos/serialization_test.go index 1a4e1c75..998f5dee 100644 --- a/dbos/serialization_test.go +++ b/dbos/serialization_test.go @@ -114,7 +114,7 @@ func TestWorkflowEncoding(t *testing.T) { assert.Equal(t, "workflow error: step error", workflow.Error.Error()) // Test results from GetWorkflowSteps - steps, err := executor.(*dbosContext).systemDB.getWorkflowSteps(context.Background(), directHandle.GetWorkflowID()) + steps, err := GetWorkflowSteps(executor, directHandle.GetWorkflowID()) require.NoError(t, err) require.Len(t, steps, 1) step := steps[0] @@ -175,7 +175,7 @@ func TestWorkflowEncoding(t *testing.T) { assert.Equal(t, "processed by encodingStepStruct", workflowOutput.B) // Test results from GetWorkflowSteps - steps, err := executor.(*dbosContext).systemDB.getWorkflowSteps(context.Background(), directHandle.GetWorkflowID()) + steps, err := GetWorkflowSteps(executor, directHandle.GetWorkflowID()) require.NoError(t, err) require.Len(t, steps, 1) step := steps[0] diff --git a/dbos/system_database.go b/dbos/system_database.go index 661bb909..0b4619b2 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -47,7 +47,7 @@ type systemDatabase interface { // Steps recordOperationResult(ctx context.Context, input recordOperationResultDBInput) error checkOperationExecution(ctx context.Context, input checkOperationExecutionDBInput) (*recordedResult, error) - getWorkflowSteps(ctx context.Context, workflowID string) ([]stepInfo, error) + getWorkflowSteps(ctx context.Context, workflowID string) ([]StepInfo, error) // Communication (special steps) send(ctx context.Context, input WorkflowSendInput) error @@ -1326,15 +1326,16 @@ func (s *sysDB) checkOperationExecution(ctx context.Context, input checkOperatio return result, nil } -type stepInfo struct { - StepID int - StepName string - Output any - Error error - ChildWorkflowID string +// StepInfo contains information about a workflow step execution. +type StepInfo struct { + StepID int // The sequential ID of the step within the workflow + StepName string // The name of the step function + Output any // The output returned by the step (if any) + Error error // The error returned by the step (if any) + ChildWorkflowID string // The ID of a child workflow spawned by this step (if applicable) } -func (s *sysDB) getWorkflowSteps(ctx context.Context, workflowID string) ([]stepInfo, error) { +func (s *sysDB) getWorkflowSteps(ctx context.Context, workflowID string) ([]StepInfo, error) { query := `SELECT function_id, function_name, output, error, child_workflow_id FROM dbos.operation_outputs WHERE workflow_uuid = $1 @@ -1346,9 +1347,9 @@ func (s *sysDB) getWorkflowSteps(ctx context.Context, workflowID string) ([]step } defer rows.Close() - var steps []stepInfo + var steps []StepInfo for rows.Next() { - var step stepInfo + var step StepInfo var outputString *string var errorString *string var childWorkflowID *string diff --git a/dbos/workflow.go b/dbos/workflow.go index 0b5475cc..7146e6ec 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -2015,7 +2015,7 @@ func (c *dbosContext) ListWorkflows(_ DBOSContext, opts ...ListWorkflowsOption) // Call the context method to list workflows workflows, err := c.systemDB.listWorkflows(c, dbInput) if err != nil { - return nil, fmt.Errorf("failed to list workflows: %w", err) + return nil, err } return workflows, nil @@ -2062,3 +2062,32 @@ func ListWorkflows(ctx DBOSContext, opts ...ListWorkflowsOption) ([]WorkflowStat } return ctx.ListWorkflows(ctx, opts...) } + +func (c *dbosContext) GetWorkflowSteps(_ DBOSContext, workflowID string) ([]StepInfo, error) { + return c.systemDB.getWorkflowSteps(c, workflowID) +} + +// GetWorkflowSteps retrieves the execution steps of a workflow. +// Returns a list of step information including step IDs, names, outputs, errors, and child workflow IDs. +// +// Parameters: +// - ctx: DBOS context for the operation +// - workflowID: The unique identifier of the workflow +// +// Returns a slice of StepInfo structs containing information about each executed step. +// +// Example: +// +// steps, err := dbos.GetWorkflowSteps(ctx, "workflow-id") +// if err != nil { +// log.Fatal(err) +// } +// for _, step := range steps { +// log.Printf("Step %d: %s", step.StepID, step.StepName) +// } +func GetWorkflowSteps(ctx DBOSContext, workflowID string) ([]StepInfo, error) { + if ctx == nil { + return nil, errors.New("ctx cannot be nil") + } + return ctx.GetWorkflowSteps(ctx, workflowID) +} diff --git a/dbos/workflows_test.go b/dbos/workflows_test.go index 7ad31c4a..c4795624 100644 --- a/dbos/workflows_test.go +++ b/dbos/workflows_test.go @@ -439,7 +439,7 @@ func TestSteps(t *testing.T) { require.NoError(t, err, "failed to get result from step within a step") assert.Equal(t, "from step", result) - steps, err := dbosCtx.(*dbosContext).systemDB.getWorkflowSteps(dbosCtx, handle.GetWorkflowID()) + steps, err := GetWorkflowSteps(dbosCtx, handle.GetWorkflowID()) require.NoError(t, err, "failed to list steps") require.Len(t, steps, 1, "expected 1 step, got %d", len(steps)) }) @@ -476,7 +476,7 @@ func TestSteps(t *testing.T) { } // Verify that the failed step was still recorded in the database - steps, err := dbosCtx.(*dbosContext).systemDB.getWorkflowSteps(dbosCtx, handle.GetWorkflowID()) + steps, err := GetWorkflowSteps(dbosCtx, handle.GetWorkflowID()) require.NoError(t, err, "failed to get workflow steps") require.Len(t, steps, 2, "expected 2 recorded steps") @@ -505,7 +505,7 @@ func TestSteps(t *testing.T) { require.NoError(t, err, "failed to get result from testStepWf2") // Get workflow steps for first workflow and check step name - steps1, err := dbosCtx.(*dbosContext).systemDB.getWorkflowSteps(dbosCtx, handle1.GetWorkflowID()) + steps1, err := GetWorkflowSteps(dbosCtx, handle1.GetWorkflowID()) require.NoError(t, err, "failed to get workflow steps for testStepWf1") require.Len(t, steps1, 1, "expected 1 step in testStepWf1") s1 := steps1[0] @@ -513,7 +513,7 @@ func TestSteps(t *testing.T) { assert.Equal(t, expectedStepName1, s1.StepName, "expected step name to match runtime function name") // Get workflow steps for second workflow and check step name - steps2, err := dbosCtx.(*dbosContext).systemDB.getWorkflowSteps(dbosCtx, handle2.GetWorkflowID()) + steps2, err := GetWorkflowSteps(dbosCtx, handle2.GetWorkflowID()) require.NoError(t, err, "failed to get workflow steps for testStepWf2") require.Len(t, steps2, 1, "expected 1 step in testStepWf2") s2 := steps2[0] @@ -554,7 +554,7 @@ func TestSteps(t *testing.T) { assert.Equal(t, "custom-step-1-result-custom-step-2-result", result) // Verify the custom step names were recorded - steps, err := dbosCtx.(*dbosContext).systemDB.getWorkflowSteps(dbosCtx, handle.GetWorkflowID()) + steps, err := GetWorkflowSteps(dbosCtx, handle.GetWorkflowID()) require.NoError(t, err, "failed to get workflow steps") require.Len(t, steps, 2, "expected 2 steps") @@ -615,7 +615,7 @@ func TestChildWorkflow(t *testing.T) { } // Check the steps from this workflow - steps, err := ctx.(*dbosContext).systemDB.getWorkflowSteps(ctx, workflowID) + steps, err := GetWorkflowSteps(ctx, workflowID) if err != nil { return "", fmt.Errorf("failed to get workflow steps: %w", err) } @@ -692,7 +692,7 @@ func TestChildWorkflow(t *testing.T) { } // Check the steps from this workflow - steps, err := ctx.(*dbosContext).systemDB.getWorkflowSteps(ctx, workflowID) + steps, err := GetWorkflowSteps(ctx, workflowID) if err != nil { return "", fmt.Errorf("failed to get workflow steps: %w", err) } @@ -789,7 +789,7 @@ func TestChildWorkflow(t *testing.T) { require.Equal(t, "from step", result) // Verify the child workflow was recorded as step 0 - steps, err := dbosCtx.(*dbosContext).systemDB.getWorkflowSteps(dbosCtx, parentHandle.GetWorkflowID()) + steps, err := GetWorkflowSteps(dbosCtx, parentHandle.GetWorkflowID()) require.NoError(t, err, "failed to get workflow steps") require.Len(t, steps, 2, "expected 2 recorded steps, got %d", len(steps)) @@ -1044,7 +1044,7 @@ func TestWorkflowRecovery(t *testing.T) { // Verify step states before recovery for i := range numWorkflows { - steps, err := dbosCtx.(*dbosContext).systemDB.getWorkflowSteps(dbosCtx, handles[i].GetWorkflowID()) + steps, err := GetWorkflowSteps(dbosCtx, handles[i].GetWorkflowID()) require.NoError(t, err, "failed to get steps for workflow %d", i) require.Len(t, steps, 1, "expected 1 completed step for workflow %d before recovery", i) @@ -1113,7 +1113,7 @@ func TestWorkflowRecovery(t *testing.T) { // Final verification of step states for i := range numWorkflows { - steps, err := dbosCtx.(*dbosContext).systemDB.getWorkflowSteps(dbosCtx, handles[i].GetWorkflowID()) + steps, err := GetWorkflowSteps(dbosCtx, handles[i].GetWorkflowID()) require.NoError(t, err, "failed to get final steps for workflow %d", i) require.Len(t, steps, 2, "expected 2 steps for workflow %d", i) @@ -1486,7 +1486,7 @@ func TestSendRecv(t *testing.T) { require.Equal(t, "message1-message2-message3", result) // Verify step counting for send workflow (sendWorkflow calls Send 3 times) - sendSteps, err := dbosCtx.(*dbosContext).systemDB.getWorkflowSteps(dbosCtx, handle.GetWorkflowID()) + sendSteps, err := GetWorkflowSteps(dbosCtx, handle.GetWorkflowID()) require.NoError(t, err, "failed to get workflow steps for send workflow") require.Len(t, sendSteps, 3, "expected 3 steps in send workflow (3 Send calls), got %d", len(sendSteps)) for i, step := range sendSteps { @@ -1495,7 +1495,7 @@ func TestSendRecv(t *testing.T) { } // Verify step counting for receive workflow (receiveWorkflow calls Recv 3 times) - receiveSteps, err := dbosCtx.(*dbosContext).systemDB.getWorkflowSteps(dbosCtx, receiveHandle.GetWorkflowID()) + receiveSteps, err := GetWorkflowSteps(dbosCtx, receiveHandle.GetWorkflowID()) require.NoError(t, err, "failed to get workflow steps for receive workflow") require.Len(t, receiveSteps, 3, "expected 3 steps in receive workflow (3 Recv calls), got %d", len(receiveSteps)) for i, step := range receiveSteps { @@ -1527,14 +1527,14 @@ func TestSendRecv(t *testing.T) { require.Equal(t, "test-struct-value", result.Value) // Verify step counting for sendStructWorkflow (calls Send 1 time) - sendSteps, err := dbosCtx.(*dbosContext).systemDB.getWorkflowSteps(dbosCtx, sendHandle.GetWorkflowID()) + sendSteps, err := GetWorkflowSteps(dbosCtx, sendHandle.GetWorkflowID()) require.NoError(t, err, "failed to get workflow steps for send struct workflow") require.Len(t, sendSteps, 1, "expected 1 step in send struct workflow (1 Send call), got %d", len(sendSteps)) require.Equal(t, 0, sendSteps[0].StepID) require.Equal(t, "DBOS.send", sendSteps[0].StepName) // Verify step counting for receiveStructWorkflow (calls Recv 1 time) - receiveSteps, err := dbosCtx.(*dbosContext).systemDB.getWorkflowSteps(dbosCtx, receiveHandle.GetWorkflowID()) + receiveSteps, err := GetWorkflowSteps(dbosCtx, receiveHandle.GetWorkflowID()) require.NoError(t, err, "failed to get workflow steps for receive struct workflow") require.Len(t, receiveSteps, 1, "expected 1 step in receive struct workflow (1 Recv call), got %d", len(receiveSteps)) require.Equal(t, 0, receiveSteps[0].StepID) @@ -1604,7 +1604,7 @@ func TestSendRecv(t *testing.T) { assert.Equal(t, "message1-message2-message3", result, "expected correct result from receive workflow") // Verify step counting for receive workflow (calls Recv 3 times) - receiveSteps, err := dbosCtx.(*dbosContext).systemDB.getWorkflowSteps(dbosCtx, receiveHandle.GetWorkflowID()) + receiveSteps, err := GetWorkflowSteps(dbosCtx, receiveHandle.GetWorkflowID()) require.NoError(t, err, "failed to get workflow steps for receive workflow") require.Len(t, receiveSteps, 3, "expected 3 steps in receive workflow (3 Recv calls), got %d", len(receiveSteps)) for i, step := range receiveSteps { @@ -1631,13 +1631,13 @@ func TestSendRecv(t *testing.T) { recoveredHandles, err := recoverPendingWorkflows(dbosCtx.(*dbosContext), []string{"local"}) require.NoError(t, err, "failed to recover pending workflows") require.Len(t, recoveredHandles, 2, "expected 2 recovered handles, got %d", len(recoveredHandles)) - steps, err := dbosCtx.(*dbosContext).systemDB.getWorkflowSteps(dbosCtx, sendHandle.GetWorkflowID()) + steps, err := GetWorkflowSteps(dbosCtx, sendHandle.GetWorkflowID()) require.NoError(t, err, "failed to get workflow steps") require.Len(t, steps, 1, "expected 1 step in send idempotency workflow, got %d", len(steps)) assert.Equal(t, 0, steps[0].StepID, "expected send idempotency step to have StepID 0") assert.Equal(t, "DBOS.send", steps[0].StepName, "expected send idempotency step to have StepName 'DBOS.send'") - steps, err = dbosCtx.(*dbosContext).systemDB.getWorkflowSteps(dbosCtx, receiveHandle.GetWorkflowID()) + steps, err = GetWorkflowSteps(dbosCtx, receiveHandle.GetWorkflowID()) require.NoError(t, err, "failed to get steps for receive idempotency workflow") require.Len(t, steps, 1, "expected 1 step in receive idempotency workflow, got %d", len(steps)) assert.Equal(t, 0, steps[0].StepID, "expected receive idempotency step to have StepID 0") @@ -2001,7 +2001,7 @@ func TestSetGetEvent(t *testing.T) { assert.Equal(t, "two-events-set", result, "expected result to be 'two-events-set'") // Verify step counting for setTwoEventsWorkflow (calls SetEvent 2 times) - setSteps, err := dbosCtx.(*dbosContext).systemDB.getWorkflowSteps(dbosCtx, setHandle.GetWorkflowID()) + setSteps, err := GetWorkflowSteps(dbosCtx, setHandle.GetWorkflowID()) require.NoError(t, err, "failed to get workflow steps for set two events workflow") require.Len(t, setSteps, 2, "expected 2 steps in set two events workflow (2 SetEvent calls), got %d", len(setSteps)) for i, step := range setSteps { @@ -2010,14 +2010,14 @@ func TestSetGetEvent(t *testing.T) { } // Verify step counting for getFirstEventHandle (calls GetEvent 1 time) - getFirstSteps, err := dbosCtx.(*dbosContext).systemDB.getWorkflowSteps(dbosCtx, getFirstEventHandle.GetWorkflowID()) + getFirstSteps, err := GetWorkflowSteps(dbosCtx, getFirstEventHandle.GetWorkflowID()) require.NoError(t, err, "failed to get workflow steps for get first event workflow") require.Len(t, getFirstSteps, 1, "expected 1 step in get first event workflow (1 GetEvent call), got %d", len(getFirstSteps)) assert.Equal(t, 0, getFirstSteps[0].StepID, "expected step to have StepID 0") assert.Equal(t, "DBOS.getEvent", getFirstSteps[0].StepName, "expected step to have StepName 'DBOS.getEvent'") // Verify step counting for getSecondEventHandle (calls GetEvent 1 time) - getSecondSteps, err := dbosCtx.(*dbosContext).systemDB.getWorkflowSteps(dbosCtx, getSecondEventHandle.GetWorkflowID()) + getSecondSteps, err := GetWorkflowSteps(dbosCtx, getSecondEventHandle.GetWorkflowID()) require.NoError(t, err, "failed to get workflow steps for get second event workflow") require.Len(t, getSecondSteps, 1, "expected 1 step in get second event workflow (1 GetEvent call), got %d", len(getSecondSteps)) assert.Equal(t, 0, getSecondSteps[0].StepID, "expected step to have StepID 0") @@ -2050,7 +2050,7 @@ func TestSetGetEvent(t *testing.T) { } // Verify step counting for setEventWorkflow (calls SetEvent 1 time) - setSteps, err := dbosCtx.(*dbosContext).systemDB.getWorkflowSteps(dbosCtx, setHandle.GetWorkflowID()) + setSteps, err := GetWorkflowSteps(dbosCtx, setHandle.GetWorkflowID()) if err != nil { t.Fatalf("failed to get workflow steps for set event workflow: %v", err) } @@ -2136,7 +2136,7 @@ func TestSetGetEvent(t *testing.T) { setEventStartIdempotencyEvent.Wait() // Verify step counts - setSteps, err := dbosCtx.(*dbosContext).systemDB.getWorkflowSteps(dbosCtx, setHandle.GetWorkflowID()) + setSteps, err := GetWorkflowSteps(dbosCtx, setHandle.GetWorkflowID()) if err != nil { t.Fatalf("failed to get steps for set event idempotency workflow: %v", err) } @@ -2148,7 +2148,7 @@ func TestSetGetEvent(t *testing.T) { t.Fatalf("expected set event idempotency step to have StepName 'DBOS.setEvent', got '%s'", setSteps[0].StepName) } - getSteps, err := dbosCtx.(*dbosContext).systemDB.getWorkflowSteps(dbosCtx, getHandle.GetWorkflowID()) + getSteps, err := GetWorkflowSteps(dbosCtx, getHandle.GetWorkflowID()) if err != nil { t.Fatalf("failed to get steps for get event idempotency workflow: %v", err) } @@ -2292,7 +2292,7 @@ func TestSleep(t *testing.T) { assert.Less(t, elapsed, sleepDuration, "expected elapsed time to be less than sleep duration") // Verify the sleep step was recorded correctly - steps, err := dbosCtx.(*dbosContext).systemDB.getWorkflowSteps(dbosCtx, handle.GetWorkflowID()) + steps, err := GetWorkflowSteps(dbosCtx, handle.GetWorkflowID()) require.NoError(t, err, "failed to get workflow steps") require.Len(t, steps, 1, "expected 1 step (the sleep), got %d", len(steps)) From c89baea31c3d6ef3831bb3d79edfd7805127894d Mon Sep 17 00:00:00 2001 From: maxdml Date: Fri, 29 Aug 2025 17:23:14 -0700 Subject: [PATCH 2/3] exit early if maxTasks <= 0 --- dbos/system_database.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/dbos/system_database.go b/dbos/system_database.go index 0b4619b2..7c27d6a3 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -2079,6 +2079,10 @@ func (s *sysDB) dequeueWorkflows(ctx context.Context, input dequeueWorkflowsInpu } } + if maxTasks <= 0 { + return nil, nil + } + // Build the query to select workflows for dequeueing // Use SKIP LOCKED when no global concurrency is set to avoid blocking, // otherwise use NOWAIT to ensure consistent view across processes From cfca508bc9c68b5d365bcd8a97588d23d7c8add3 Mon Sep 17 00:00:00 2001 From: maxdml Date: Fri, 29 Aug 2025 17:29:59 -0700 Subject: [PATCH 3/3] make db migrations idempotent --- .../000001_initial_dbos_schema.up.sql | 64 ++++++++++++++----- 1 file changed, 47 insertions(+), 17 deletions(-) diff --git a/dbos/migrations/000001_initial_dbos_schema.up.sql b/dbos/migrations/000001_initial_dbos_schema.up.sql index b2e6142a..8d6199ae 100644 --- a/dbos/migrations/000001_initial_dbos_schema.up.sql +++ b/dbos/migrations/000001_initial_dbos_schema.up.sql @@ -7,7 +7,7 @@ CREATE SCHEMA IF NOT EXISTS dbos; CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; -- Create workflow_status table -CREATE TABLE dbos.workflow_status ( +CREATE TABLE IF NOT EXISTS dbos.workflow_status ( workflow_uuid TEXT PRIMARY KEY, status TEXT, name TEXT, @@ -34,17 +34,27 @@ CREATE TABLE dbos.workflow_status ( ); -- Create indexes for workflow_status -CREATE INDEX workflow_status_created_at_index ON dbos.workflow_status (created_at); -CREATE INDEX workflow_status_executor_id_index ON dbos.workflow_status (executor_id); -CREATE INDEX workflow_status_status_index ON dbos.workflow_status (status); +CREATE INDEX IF NOT EXISTS workflow_status_created_at_index ON dbos.workflow_status (created_at); +CREATE INDEX IF NOT EXISTS workflow_status_executor_id_index ON dbos.workflow_status (executor_id); +CREATE INDEX IF NOT EXISTS workflow_status_status_index ON dbos.workflow_status (status); -- Create unique constraint for queue_name and deduplication_id -ALTER TABLE dbos.workflow_status -ADD CONSTRAINT uq_workflow_status_queue_name_dedup_id -UNIQUE (queue_name, deduplication_id); +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM information_schema.table_constraints + WHERE constraint_name = 'uq_workflow_status_queue_name_dedup_id' + AND table_name = 'workflow_status' + AND table_schema = 'dbos' + ) THEN + ALTER TABLE dbos.workflow_status + ADD CONSTRAINT uq_workflow_status_queue_name_dedup_id + UNIQUE (queue_name, deduplication_id); + END IF; +END $$; -- Create operation_outputs table -CREATE TABLE dbos.operation_outputs ( +CREATE TABLE IF NOT EXISTS dbos.operation_outputs ( workflow_uuid TEXT NOT NULL, function_id INTEGER NOT NULL, function_name TEXT NOT NULL DEFAULT '', @@ -56,7 +66,7 @@ CREATE TABLE dbos.operation_outputs ( ON UPDATE CASCADE ON DELETE CASCADE ); -CREATE TABLE dbos.notifications ( +CREATE TABLE IF NOT EXISTS dbos.notifications ( destination_uuid TEXT NOT NULL, topic TEXT, message TEXT NOT NULL, @@ -66,7 +76,7 @@ CREATE TABLE dbos.notifications ( ON UPDATE CASCADE ON DELETE CASCADE ); -- Create index for notifications -CREATE INDEX idx_workflow_topic ON dbos.notifications (destination_uuid, topic); +CREATE INDEX IF NOT EXISTS idx_workflow_topic ON dbos.notifications (destination_uuid, topic); -- Create notification function CREATE OR REPLACE FUNCTION dbos.notifications_function() RETURNS TRIGGER AS $$ @@ -79,12 +89,22 @@ END; $$ LANGUAGE plpgsql; -- Create notification trigger -CREATE TRIGGER dbos_notifications_trigger -AFTER INSERT ON dbos.notifications -FOR EACH ROW EXECUTE FUNCTION dbos.notifications_function(); +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM information_schema.triggers + WHERE trigger_name = 'dbos_notifications_trigger' + AND event_object_table = 'notifications' + AND event_object_schema = 'dbos' + ) THEN + CREATE TRIGGER dbos_notifications_trigger + AFTER INSERT ON dbos.notifications + FOR EACH ROW EXECUTE FUNCTION dbos.notifications_function(); + END IF; +END $$; -- Create workflow_events table -CREATE TABLE dbos.workflow_events ( +CREATE TABLE IF NOT EXISTS dbos.workflow_events ( workflow_uuid TEXT NOT NULL, key TEXT NOT NULL, value TEXT NOT NULL, @@ -104,6 +124,16 @@ END; $$ LANGUAGE plpgsql; -- Create events trigger -CREATE TRIGGER dbos_workflow_events_trigger -AFTER INSERT ON dbos.workflow_events -FOR EACH ROW EXECUTE FUNCTION dbos.workflow_events_function(); +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM information_schema.triggers + WHERE trigger_name = 'dbos_workflow_events_trigger' + AND event_object_table = 'workflow_events' + AND event_object_schema = 'dbos' + ) THEN + CREATE TRIGGER dbos_workflow_events_trigger + AFTER INSERT ON dbos.workflow_events + FOR EACH ROW EXECUTE FUNCTION dbos.workflow_events_function(); + END IF; +END $$;