Skip to content

Commit fd7ff68

Browse files
committed
public GetWorkflowSteps + create internal Q at launch
1 parent f3c10f3 commit fd7ff68

File tree

9 files changed

+87
-56
lines changed

9 files changed

+87
-56
lines changed

dbos/admin_server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -412,7 +412,7 @@ func newAdminServer(ctx *dbosContext, port int) *adminServer {
412412
mux.HandleFunc(_WORKFLOW_STEPS_PATTERN, func(w http.ResponseWriter, r *http.Request) {
413413
workflowID := r.PathValue("id")
414414

415-
steps, err := ctx.systemDB.getWorkflowSteps(ctx, workflowID)
415+
steps, err := GetWorkflowSteps(ctx, workflowID)
416416
if err != nil {
417417
ctx.logger.Error("Failed to list workflow steps", "workflow_id", workflowID, "error", err)
418418
http.Error(w, fmt.Sprintf("Failed to list steps: %v", err), http.StatusInternalServerError)

dbos/conductor.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -719,8 +719,8 @@ func (c *Conductor) handleListStepsRequest(data []byte, requestID string) error
719719
}
720720
c.logger.Debug("Handling list steps request", "request", req)
721721

722-
// Get workflow steps using the existing systemDB method
723-
steps, err := c.dbosCtx.systemDB.getWorkflowSteps(c.dbosCtx, req.WorkflowID)
722+
// Get workflow steps using the public GetWorkflowSteps method
723+
steps, err := GetWorkflowSteps(c.dbosCtx, req.WorkflowID)
724724
if err != nil {
725725
c.logger.Error("Failed to list workflow steps", "workflow_id", req.WorkflowID, "error", err)
726726
errorMsg := fmt.Sprintf("failed to list workflow steps: %v", err)

dbos/conductor_protocol.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -190,8 +190,8 @@ type listStepsConductorResponse struct {
190190
Output *[]workflowStepsConductorResponseBody `json:"output,omitempty"`
191191
}
192192

193-
// formatWorkflowStepsResponseBody converts stepInfo to workflowStepsConductorResponseBody for the conductor protocol
194-
func formatWorkflowStepsResponseBody(step stepInfo) workflowStepsConductorResponseBody {
193+
// formatWorkflowStepsResponseBody converts StepInfo to workflowStepsConductorResponseBody for the conductor protocol
194+
func formatWorkflowStepsResponseBody(step StepInfo) workflowStepsConductorResponseBody {
195195
output := workflowStepsConductorResponseBody{
196196
FunctionID: step.StepID,
197197
FunctionName: step.StepName,

dbos/dbos.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -101,15 +101,15 @@ type DBOSContext interface {
101101
Shutdown(timeout time.Duration) // Gracefully shutdown all DBOS runtime components with ordered cleanup sequence
102102

103103
// Workflow operations
104-
RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption) (any, error) // Execute a function as a durable step within a workflow
104+
RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption) (any, error) // Execute a function as a durable step within a workflow
105105
RunWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opts ...WorkflowOption) (WorkflowHandle[any], error) // Start a new workflow execution
106-
Send(_ DBOSContext, destinationID string, message any, topic string) error // Send a message to another workflow
107-
Recv(_ DBOSContext, topic string, timeout time.Duration) (any, error) // Receive a message sent to this workflow
108-
SetEvent(_ DBOSContext, key string, message any) error // Set a key-value event for this workflow
109-
GetEvent(_ DBOSContext, targetWorkflowID string, key string, timeout time.Duration) (any, error) // Get a key-value event from a target workflow
110-
Sleep(_ DBOSContext, duration time.Duration) (time.Duration, error) // Durable sleep that survives workflow recovery
111-
GetWorkflowID() (string, error) // Get the current workflow ID (only available within workflows)
112-
GetStepID() (int, error) // Get the current step ID (only available within workflows)
106+
Send(_ DBOSContext, destinationID string, message any, topic string) error // Send a message to another workflow
107+
Recv(_ DBOSContext, topic string, timeout time.Duration) (any, error) // Receive a message sent to this workflow
108+
SetEvent(_ DBOSContext, key string, message any) error // Set a key-value event for this workflow
109+
GetEvent(_ DBOSContext, targetWorkflowID string, key string, timeout time.Duration) (any, error) // Get a key-value event from a target workflow
110+
Sleep(_ DBOSContext, duration time.Duration) (time.Duration, error) // Durable sleep that survives workflow recovery
111+
GetWorkflowID() (string, error) // Get the current workflow ID (only available within workflows)
112+
GetStepID() (int, error) // Get the current step ID (only available within workflows)
113113

114114
// Workflow management
115115
RetrieveWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) // Get a handle to an existing workflow
@@ -118,6 +118,7 @@ type DBOSContext interface {
118118
ResumeWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) // Resume a cancelled workflow
119119
ForkWorkflow(_ DBOSContext, input ForkWorkflowInput) (WorkflowHandle[any], error) // Fork a workflow from a specific step
120120
ListWorkflows(_ DBOSContext, opts ...ListWorkflowsOption) ([]WorkflowStatus, error) // List workflows based on filtering criteria
121+
GetWorkflowSteps(_ DBOSContext, workflowID string) ([]StepInfo, error) // Get the execution steps of a workflow
121122

122123
// Accessors
123124
GetApplicationVersion() string // Get the application version for this context
@@ -328,7 +329,6 @@ func NewDBOSContext(inputConfig Config) (DBOSContext, error) {
328329

329330
// Initialize the queue runner and register DBOS internal queue
330331
initExecutor.queueRunner = newQueueRunner(initExecutor.logger)
331-
NewWorkflowQueue(initExecutor, _DBOS_INTERNAL_QUEUE_NAME)
332332

333333
// Initialize conductor if API key is provided
334334
if config.ConductorAPIKey != "" {
@@ -383,6 +383,7 @@ func (c *dbosContext) Launch() error {
383383
}
384384

385385
// Start the queue runner in a goroutine
386+
NewWorkflowQueue(c, _DBOS_INTERNAL_QUEUE_NAME)
386387
go func() {
387388
c.queueRunner.run(c)
388389
}()

dbos/queues_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ func TestWorkflowQueues(t *testing.T) {
173173
assert.Equal(t, "test-input", res)
174174

175175
// List steps: the workflow should have 1 step
176-
steps, err := dbosCtx.(*dbosContext).systemDB.getWorkflowSteps(dbosCtx, handle.GetWorkflowID())
176+
steps, err := GetWorkflowSteps(dbosCtx, handle.GetWorkflowID())
177177
require.NoError(t, err)
178178
assert.Len(t, steps, 1)
179179
assert.Equal(t, 0, steps[0].StepID)
@@ -207,7 +207,7 @@ func TestWorkflowQueues(t *testing.T) {
207207
assert.Equal(t, expectedResult, res)
208208

209209
// List steps: the workflow should have 2 steps (Start the child and GetResult)
210-
steps, err := dbosCtx.(*dbosContext).systemDB.getWorkflowSteps(dbosCtx, handle.GetWorkflowID())
210+
steps, err := GetWorkflowSteps(dbosCtx, handle.GetWorkflowID())
211211
require.NoError(t, err)
212212
assert.Len(t, steps, 2)
213213
assert.Equal(t, runtime.FuncForPC(reflect.ValueOf(queueWorkflow).Pointer()).Name(), steps[0].StepName)
@@ -230,7 +230,7 @@ func TestWorkflowQueues(t *testing.T) {
230230
assert.Equal(t, expectedResult, res)
231231

232232
// List steps: the workflow should have 2 steps (Start the child and GetResult)
233-
steps, err := dbosCtx.(*dbosContext).systemDB.getWorkflowSteps(dbosCtx, handle.GetWorkflowID())
233+
steps, err := GetWorkflowSteps(dbosCtx, handle.GetWorkflowID())
234234
require.NoError(t, err)
235235
assert.Len(t, steps, 2)
236236
assert.Equal(t, runtime.FuncForPC(reflect.ValueOf(queueWorkflow).Pointer()).Name(), steps[0].StepName)
@@ -253,7 +253,7 @@ func TestWorkflowQueues(t *testing.T) {
253253
assert.Equal(t, expectedResult, res)
254254

255255
// List steps: the workflow should have 2 steps (Start the child and GetResult)
256-
steps, err := dbosCtx.(*dbosContext).systemDB.getWorkflowSteps(dbosCtx, handle.GetWorkflowID())
256+
steps, err := GetWorkflowSteps(dbosCtx, handle.GetWorkflowID())
257257
require.NoError(t, err)
258258
assert.Len(t, steps, 2)
259259
assert.Equal(t, "custom-name", steps[0].StepName)
@@ -279,7 +279,7 @@ func TestWorkflowQueues(t *testing.T) {
279279

280280
// Check that the parent workflow (the one we ran directly) has 2 steps:
281281
// one for enqueueing the child and one for calling GetResult
282-
steps, err := dbosCtx.(*dbosContext).systemDB.getWorkflowSteps(dbosCtx, handle.GetWorkflowID())
282+
steps, err := GetWorkflowSteps(dbosCtx, handle.GetWorkflowID())
283283
require.NoError(t, err)
284284
assert.Len(t, steps, 2)
285285
assert.Equal(t, runtime.FuncForPC(reflect.ValueOf(queueWorkflow).Pointer()).Name(), steps[0].StepName)

dbos/serialization_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ func TestWorkflowEncoding(t *testing.T) {
114114
assert.Equal(t, "workflow error: step error", workflow.Error.Error())
115115

116116
// Test results from GetWorkflowSteps
117-
steps, err := executor.(*dbosContext).systemDB.getWorkflowSteps(context.Background(), directHandle.GetWorkflowID())
117+
steps, err := GetWorkflowSteps(executor, directHandle.GetWorkflowID())
118118
require.NoError(t, err)
119119
require.Len(t, steps, 1)
120120
step := steps[0]
@@ -175,7 +175,7 @@ func TestWorkflowEncoding(t *testing.T) {
175175
assert.Equal(t, "processed by encodingStepStruct", workflowOutput.B)
176176

177177
// Test results from GetWorkflowSteps
178-
steps, err := executor.(*dbosContext).systemDB.getWorkflowSteps(context.Background(), directHandle.GetWorkflowID())
178+
steps, err := GetWorkflowSteps(executor, directHandle.GetWorkflowID())
179179
require.NoError(t, err)
180180
require.Len(t, steps, 1)
181181
step := steps[0]

dbos/system_database.go

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ type systemDatabase interface {
4747
// Steps
4848
recordOperationResult(ctx context.Context, input recordOperationResultDBInput) error
4949
checkOperationExecution(ctx context.Context, input checkOperationExecutionDBInput) (*recordedResult, error)
50-
getWorkflowSteps(ctx context.Context, workflowID string) ([]stepInfo, error)
50+
getWorkflowSteps(ctx context.Context, workflowID string) ([]StepInfo, error)
5151

5252
// Communication (special steps)
5353
send(ctx context.Context, input WorkflowSendInput) error
@@ -1326,15 +1326,16 @@ func (s *sysDB) checkOperationExecution(ctx context.Context, input checkOperatio
13261326
return result, nil
13271327
}
13281328

1329-
type stepInfo struct {
1330-
StepID int
1331-
StepName string
1332-
Output any
1333-
Error error
1334-
ChildWorkflowID string
1329+
// StepInfo contains information about a workflow step execution.
1330+
type StepInfo struct {
1331+
StepID int // The sequential ID of the step within the workflow
1332+
StepName string // The name of the step function
1333+
Output any // The output returned by the step (if any)
1334+
Error error // The error returned by the step (if any)
1335+
ChildWorkflowID string // The ID of a child workflow spawned by this step (if applicable)
13351336
}
13361337

1337-
func (s *sysDB) getWorkflowSteps(ctx context.Context, workflowID string) ([]stepInfo, error) {
1338+
func (s *sysDB) getWorkflowSteps(ctx context.Context, workflowID string) ([]StepInfo, error) {
13381339
query := `SELECT function_id, function_name, output, error, child_workflow_id
13391340
FROM dbos.operation_outputs
13401341
WHERE workflow_uuid = $1
@@ -1346,9 +1347,9 @@ func (s *sysDB) getWorkflowSteps(ctx context.Context, workflowID string) ([]step
13461347
}
13471348
defer rows.Close()
13481349

1349-
var steps []stepInfo
1350+
var steps []StepInfo
13501351
for rows.Next() {
1351-
var step stepInfo
1352+
var step StepInfo
13521353
var outputString *string
13531354
var errorString *string
13541355
var childWorkflowID *string

dbos/workflow.go

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2015,7 +2015,7 @@ func (c *dbosContext) ListWorkflows(_ DBOSContext, opts ...ListWorkflowsOption)
20152015
// Call the context method to list workflows
20162016
workflows, err := c.systemDB.listWorkflows(c, dbInput)
20172017
if err != nil {
2018-
return nil, fmt.Errorf("failed to list workflows: %w", err)
2018+
return nil, err
20192019
}
20202020

20212021
return workflows, nil
@@ -2062,3 +2062,32 @@ func ListWorkflows(ctx DBOSContext, opts ...ListWorkflowsOption) ([]WorkflowStat
20622062
}
20632063
return ctx.ListWorkflows(ctx, opts...)
20642064
}
2065+
2066+
func (c *dbosContext) GetWorkflowSteps(_ DBOSContext, workflowID string) ([]StepInfo, error) {
2067+
return c.systemDB.getWorkflowSteps(c, workflowID)
2068+
}
2069+
2070+
// GetWorkflowSteps retrieves the execution steps of a workflow.
2071+
// Returns a list of step information including step IDs, names, outputs, errors, and child workflow IDs.
2072+
//
2073+
// Parameters:
2074+
// - ctx: DBOS context for the operation
2075+
// - workflowID: The unique identifier of the workflow
2076+
//
2077+
// Returns a slice of StepInfo structs containing information about each executed step.
2078+
//
2079+
// Example:
2080+
//
2081+
// steps, err := dbos.GetWorkflowSteps(ctx, "workflow-id")
2082+
// if err != nil {
2083+
// log.Fatal(err)
2084+
// }
2085+
// for _, step := range steps {
2086+
// log.Printf("Step %d: %s", step.StepID, step.StepName)
2087+
// }
2088+
func GetWorkflowSteps(ctx DBOSContext, workflowID string) ([]StepInfo, error) {
2089+
if ctx == nil {
2090+
return nil, errors.New("ctx cannot be nil")
2091+
}
2092+
return ctx.GetWorkflowSteps(ctx, workflowID)
2093+
}

0 commit comments

Comments
 (0)