Skip to content

Commit 7a5ff0b

Browse files
Merge branch 'main' into 93
2 parents eac94b2 + 68cdafb commit 7a5ff0b

File tree

10 files changed

+147
-75
lines changed

10 files changed

+147
-75
lines changed

dbos/admin_server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -412,7 +412,7 @@ func newAdminServer(ctx *dbosContext, port int) *adminServer {
412412
mux.HandleFunc(_WORKFLOW_STEPS_PATTERN, func(w http.ResponseWriter, r *http.Request) {
413413
workflowID := r.PathValue("id")
414414

415-
steps, err := ctx.systemDB.getWorkflowSteps(ctx, workflowID)
415+
steps, err := GetWorkflowSteps(ctx, workflowID)
416416
if err != nil {
417417
ctx.logger.Error("Failed to list workflow steps", "workflow_id", workflowID, "error", err)
418418
http.Error(w, fmt.Sprintf("Failed to list steps: %v", err), http.StatusInternalServerError)

dbos/conductor.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -719,8 +719,8 @@ func (c *Conductor) handleListStepsRequest(data []byte, requestID string) error
719719
}
720720
c.logger.Debug("Handling list steps request", "request", req)
721721

722-
// Get workflow steps using the existing systemDB method
723-
steps, err := c.dbosCtx.systemDB.getWorkflowSteps(c.dbosCtx, req.WorkflowID)
722+
// Get workflow steps using the public GetWorkflowSteps method
723+
steps, err := GetWorkflowSteps(c.dbosCtx, req.WorkflowID)
724724
if err != nil {
725725
c.logger.Error("Failed to list workflow steps", "workflow_id", req.WorkflowID, "error", err)
726726
errorMsg := fmt.Sprintf("failed to list workflow steps: %v", err)

dbos/conductor_protocol.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -190,8 +190,8 @@ type listStepsConductorResponse struct {
190190
Output *[]workflowStepsConductorResponseBody `json:"output,omitempty"`
191191
}
192192

193-
// formatWorkflowStepsResponseBody converts stepInfo to workflowStepsConductorResponseBody for the conductor protocol
194-
func formatWorkflowStepsResponseBody(step stepInfo) workflowStepsConductorResponseBody {
193+
// formatWorkflowStepsResponseBody converts StepInfo to workflowStepsConductorResponseBody for the conductor protocol
194+
func formatWorkflowStepsResponseBody(step StepInfo) workflowStepsConductorResponseBody {
195195
output := workflowStepsConductorResponseBody{
196196
FunctionID: step.StepID,
197197
FunctionName: step.StepName,

dbos/dbos.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ type DBOSContext interface {
123123
ResumeWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) // Resume a cancelled workflow
124124
ForkWorkflow(_ DBOSContext, input ForkWorkflowInput) (WorkflowHandle[any], error) // Fork a workflow from a specific step
125125
ListWorkflows(_ DBOSContext, opts ...ListWorkflowsOption) ([]WorkflowStatus, error) // List workflows based on filtering criteria
126+
GetWorkflowSteps(_ DBOSContext, workflowID string) ([]StepInfo, error) // Get the execution steps of a workflow
126127

127128
// Accessors
128129
GetApplicationVersion() string // Get the application version for this context
@@ -349,7 +350,6 @@ func NewDBOSContext(inputConfig Config) (DBOSContext, error) {
349350

350351
// Initialize the queue runner and register DBOS internal queue
351352
initExecutor.queueRunner = newQueueRunner(initExecutor.logger)
352-
NewWorkflowQueue(initExecutor, _DBOS_INTERNAL_QUEUE_NAME)
353353

354354
// Initialize conductor if API key is provided
355355
if config.ConductorAPIKey != "" {
@@ -404,6 +404,7 @@ func (c *dbosContext) Launch() error {
404404
}
405405

406406
// Start the queue runner in a goroutine
407+
NewWorkflowQueue(c, _DBOS_INTERNAL_QUEUE_NAME)
407408
go func() {
408409
c.queueRunner.run(c)
409410
}()

dbos/migrations/000001_initial_dbos_schema.up.sql

Lines changed: 47 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ CREATE SCHEMA IF NOT EXISTS dbos;
77
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
88

99
-- Create workflow_status table
10-
CREATE TABLE dbos.workflow_status (
10+
CREATE TABLE IF NOT EXISTS dbos.workflow_status (
1111
workflow_uuid TEXT PRIMARY KEY,
1212
status TEXT,
1313
name TEXT,
@@ -34,17 +34,27 @@ CREATE TABLE dbos.workflow_status (
3434
);
3535

3636
-- Create indexes for workflow_status
37-
CREATE INDEX workflow_status_created_at_index ON dbos.workflow_status (created_at);
38-
CREATE INDEX workflow_status_executor_id_index ON dbos.workflow_status (executor_id);
39-
CREATE INDEX workflow_status_status_index ON dbos.workflow_status (status);
37+
CREATE INDEX IF NOT EXISTS workflow_status_created_at_index ON dbos.workflow_status (created_at);
38+
CREATE INDEX IF NOT EXISTS workflow_status_executor_id_index ON dbos.workflow_status (executor_id);
39+
CREATE INDEX IF NOT EXISTS workflow_status_status_index ON dbos.workflow_status (status);
4040

4141
-- Create unique constraint for queue_name and deduplication_id
42-
ALTER TABLE dbos.workflow_status
43-
ADD CONSTRAINT uq_workflow_status_queue_name_dedup_id
44-
UNIQUE (queue_name, deduplication_id);
42+
DO $$
43+
BEGIN
44+
IF NOT EXISTS (
45+
SELECT 1 FROM information_schema.table_constraints
46+
WHERE constraint_name = 'uq_workflow_status_queue_name_dedup_id'
47+
AND table_name = 'workflow_status'
48+
AND table_schema = 'dbos'
49+
) THEN
50+
ALTER TABLE dbos.workflow_status
51+
ADD CONSTRAINT uq_workflow_status_queue_name_dedup_id
52+
UNIQUE (queue_name, deduplication_id);
53+
END IF;
54+
END $$;
4555

4656
-- Create operation_outputs table
47-
CREATE TABLE dbos.operation_outputs (
57+
CREATE TABLE IF NOT EXISTS dbos.operation_outputs (
4858
workflow_uuid TEXT NOT NULL,
4959
function_id INTEGER NOT NULL,
5060
function_name TEXT NOT NULL DEFAULT '',
@@ -56,7 +66,7 @@ CREATE TABLE dbos.operation_outputs (
5666
ON UPDATE CASCADE ON DELETE CASCADE
5767
);
5868

59-
CREATE TABLE dbos.notifications (
69+
CREATE TABLE IF NOT EXISTS dbos.notifications (
6070
destination_uuid TEXT NOT NULL,
6171
topic TEXT,
6272
message TEXT NOT NULL,
@@ -66,7 +76,7 @@ CREATE TABLE dbos.notifications (
6676
ON UPDATE CASCADE ON DELETE CASCADE
6777
);
6878
-- Create index for notifications
69-
CREATE INDEX idx_workflow_topic ON dbos.notifications (destination_uuid, topic);
79+
CREATE INDEX IF NOT EXISTS idx_workflow_topic ON dbos.notifications (destination_uuid, topic);
7080

7181
-- Create notification function
7282
CREATE OR REPLACE FUNCTION dbos.notifications_function() RETURNS TRIGGER AS $$
@@ -79,12 +89,22 @@ END;
7989
$$ LANGUAGE plpgsql;
8090

8191
-- Create notification trigger
82-
CREATE TRIGGER dbos_notifications_trigger
83-
AFTER INSERT ON dbos.notifications
84-
FOR EACH ROW EXECUTE FUNCTION dbos.notifications_function();
92+
DO $$
93+
BEGIN
94+
IF NOT EXISTS (
95+
SELECT 1 FROM information_schema.triggers
96+
WHERE trigger_name = 'dbos_notifications_trigger'
97+
AND event_object_table = 'notifications'
98+
AND event_object_schema = 'dbos'
99+
) THEN
100+
CREATE TRIGGER dbos_notifications_trigger
101+
AFTER INSERT ON dbos.notifications
102+
FOR EACH ROW EXECUTE FUNCTION dbos.notifications_function();
103+
END IF;
104+
END $$;
85105

86106
-- Create workflow_events table
87-
CREATE TABLE dbos.workflow_events (
107+
CREATE TABLE IF NOT EXISTS dbos.workflow_events (
88108
workflow_uuid TEXT NOT NULL,
89109
key TEXT NOT NULL,
90110
value TEXT NOT NULL,
@@ -104,6 +124,16 @@ END;
104124
$$ LANGUAGE plpgsql;
105125

106126
-- Create events trigger
107-
CREATE TRIGGER dbos_workflow_events_trigger
108-
AFTER INSERT ON dbos.workflow_events
109-
FOR EACH ROW EXECUTE FUNCTION dbos.workflow_events_function();
127+
DO $$
128+
BEGIN
129+
IF NOT EXISTS (
130+
SELECT 1 FROM information_schema.triggers
131+
WHERE trigger_name = 'dbos_workflow_events_trigger'
132+
AND event_object_table = 'workflow_events'
133+
AND event_object_schema = 'dbos'
134+
) THEN
135+
CREATE TRIGGER dbos_workflow_events_trigger
136+
AFTER INSERT ON dbos.workflow_events
137+
FOR EACH ROW EXECUTE FUNCTION dbos.workflow_events_function();
138+
END IF;
139+
END $$;

dbos/queues_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ func TestWorkflowQueues(t *testing.T) {
173173
assert.Equal(t, "test-input", res)
174174

175175
// List steps: the workflow should have 1 step
176-
steps, err := dbosCtx.(*dbosContext).systemDB.getWorkflowSteps(dbosCtx, handle.GetWorkflowID())
176+
steps, err := GetWorkflowSteps(dbosCtx, handle.GetWorkflowID())
177177
require.NoError(t, err)
178178
assert.Len(t, steps, 1)
179179
assert.Equal(t, 0, steps[0].StepID)
@@ -207,7 +207,7 @@ func TestWorkflowQueues(t *testing.T) {
207207
assert.Equal(t, expectedResult, res)
208208

209209
// List steps: the workflow should have 2 steps (Start the child and GetResult)
210-
steps, err := dbosCtx.(*dbosContext).systemDB.getWorkflowSteps(dbosCtx, handle.GetWorkflowID())
210+
steps, err := GetWorkflowSteps(dbosCtx, handle.GetWorkflowID())
211211
require.NoError(t, err)
212212
assert.Len(t, steps, 2)
213213
assert.Equal(t, runtime.FuncForPC(reflect.ValueOf(queueWorkflow).Pointer()).Name(), steps[0].StepName)
@@ -230,7 +230,7 @@ func TestWorkflowQueues(t *testing.T) {
230230
assert.Equal(t, expectedResult, res)
231231

232232
// List steps: the workflow should have 2 steps (Start the child and GetResult)
233-
steps, err := dbosCtx.(*dbosContext).systemDB.getWorkflowSteps(dbosCtx, handle.GetWorkflowID())
233+
steps, err := GetWorkflowSteps(dbosCtx, handle.GetWorkflowID())
234234
require.NoError(t, err)
235235
assert.Len(t, steps, 2)
236236
assert.Equal(t, runtime.FuncForPC(reflect.ValueOf(queueWorkflow).Pointer()).Name(), steps[0].StepName)
@@ -253,7 +253,7 @@ func TestWorkflowQueues(t *testing.T) {
253253
assert.Equal(t, expectedResult, res)
254254

255255
// List steps: the workflow should have 2 steps (Start the child and GetResult)
256-
steps, err := dbosCtx.(*dbosContext).systemDB.getWorkflowSteps(dbosCtx, handle.GetWorkflowID())
256+
steps, err := GetWorkflowSteps(dbosCtx, handle.GetWorkflowID())
257257
require.NoError(t, err)
258258
assert.Len(t, steps, 2)
259259
assert.Equal(t, "custom-name", steps[0].StepName)
@@ -279,7 +279,7 @@ func TestWorkflowQueues(t *testing.T) {
279279

280280
// Check that the parent workflow (the one we ran directly) has 2 steps:
281281
// one for enqueueing the child and one for calling GetResult
282-
steps, err := dbosCtx.(*dbosContext).systemDB.getWorkflowSteps(dbosCtx, handle.GetWorkflowID())
282+
steps, err := GetWorkflowSteps(dbosCtx, handle.GetWorkflowID())
283283
require.NoError(t, err)
284284
assert.Len(t, steps, 2)
285285
assert.Equal(t, runtime.FuncForPC(reflect.ValueOf(queueWorkflow).Pointer()).Name(), steps[0].StepName)

dbos/serialization_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ func TestWorkflowEncoding(t *testing.T) {
114114
assert.Equal(t, "workflow error: step error", workflow.Error.Error())
115115

116116
// Test results from GetWorkflowSteps
117-
steps, err := executor.(*dbosContext).systemDB.getWorkflowSteps(context.Background(), directHandle.GetWorkflowID())
117+
steps, err := GetWorkflowSteps(executor, directHandle.GetWorkflowID())
118118
require.NoError(t, err)
119119
require.Len(t, steps, 1)
120120
step := steps[0]
@@ -175,7 +175,7 @@ func TestWorkflowEncoding(t *testing.T) {
175175
assert.Equal(t, "processed by encodingStepStruct", workflowOutput.B)
176176

177177
// Test results from GetWorkflowSteps
178-
steps, err := executor.(*dbosContext).systemDB.getWorkflowSteps(context.Background(), directHandle.GetWorkflowID())
178+
steps, err := GetWorkflowSteps(executor, directHandle.GetWorkflowID())
179179
require.NoError(t, err)
180180
require.Len(t, steps, 1)
181181
step := steps[0]

dbos/system_database.go

Lines changed: 32 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ type systemDatabase interface {
4747
// Steps
4848
recordOperationResult(ctx context.Context, input recordOperationResultDBInput) error
4949
checkOperationExecution(ctx context.Context, input checkOperationExecutionDBInput) (*recordedResult, error)
50-
getWorkflowSteps(ctx context.Context, workflowID string) ([]stepInfo, error)
50+
getWorkflowSteps(ctx context.Context, workflowID string) ([]StepInfo, error)
5151

5252
// Communication (special steps)
5353
send(ctx context.Context, input WorkflowSendInput) error
@@ -140,28 +140,35 @@ const (
140140

141141
func runMigrations(databaseURL string) error {
142142
// Change the driver to pgx5
143-
databaseURL = "pgx5://" + strings.TrimPrefix(databaseURL, "postgres://")
144-
145-
// Create migration source from embedded files
146-
d, err := iofs.New(migrationFiles, "migrations")
147-
if err != nil {
148-
return newInitializationError(fmt.Sprintf("failed to create migration source: %v", err))
149-
}
150-
151-
// Add custom migration table name to avoid conflicts with user migrations
152-
// Parse the URL to properly determine where to add the query parameter
153143
parsedURL, err := url.Parse(databaseURL)
154144
if err != nil {
155145
return newInitializationError(fmt.Sprintf("failed to parse database URL: %v", err))
156146
}
147+
// Handle various PostgreSQL URL schemes
148+
switch parsedURL.Scheme {
149+
case "postgres", "postgresql":
150+
parsedURL.Scheme = "pgx5"
151+
case "pgx5":
152+
// Already in correct format
153+
default:
154+
return newInitializationError(fmt.Sprintf("unsupported database URL scheme: %s", parsedURL.Scheme))
155+
}
156+
databaseURL = parsedURL.String()
157157

158+
// Add custom migration table name to avoid conflicts with user migrations
158159
// Check if query parameters already exist
159160
separator := "?"
160161
if parsedURL.RawQuery != "" {
161162
separator = "&"
162163
}
163164
databaseURL += separator + "x-migrations-table=" + _DBOS_MIGRATION_TABLE
164165

166+
// Create migration source from embedded files
167+
d, err := iofs.New(migrationFiles, "migrations")
168+
if err != nil {
169+
return newInitializationError(fmt.Sprintf("failed to create migration source: %v", err))
170+
}
171+
165172
// Create migrator
166173
m, err := migrate.NewWithSourceInstance("iofs", d, databaseURL)
167174
if err != nil {
@@ -1326,15 +1333,16 @@ func (s *sysDB) checkOperationExecution(ctx context.Context, input checkOperatio
13261333
return result, nil
13271334
}
13281335

1329-
type stepInfo struct {
1330-
StepID int
1331-
StepName string
1332-
Output any
1333-
Error error
1334-
ChildWorkflowID string
1336+
// StepInfo contains information about a workflow step execution.
1337+
type StepInfo struct {
1338+
StepID int // The sequential ID of the step within the workflow
1339+
StepName string // The name of the step function
1340+
Output any // The output returned by the step (if any)
1341+
Error error // The error returned by the step (if any)
1342+
ChildWorkflowID string // The ID of a child workflow spawned by this step (if applicable)
13351343
}
13361344

1337-
func (s *sysDB) getWorkflowSteps(ctx context.Context, workflowID string) ([]stepInfo, error) {
1345+
func (s *sysDB) getWorkflowSteps(ctx context.Context, workflowID string) ([]StepInfo, error) {
13381346
query := `SELECT function_id, function_name, output, error, child_workflow_id
13391347
FROM dbos.operation_outputs
13401348
WHERE workflow_uuid = $1
@@ -1346,9 +1354,9 @@ func (s *sysDB) getWorkflowSteps(ctx context.Context, workflowID string) ([]step
13461354
}
13471355
defer rows.Close()
13481356

1349-
var steps []stepInfo
1357+
var steps []StepInfo
13501358
for rows.Next() {
1351-
var step stepInfo
1359+
var step StepInfo
13521360
var outputString *string
13531361
var errorString *string
13541362
var childWorkflowID *string
@@ -2078,6 +2086,10 @@ func (s *sysDB) dequeueWorkflows(ctx context.Context, input dequeueWorkflowsInpu
20782086
}
20792087
}
20802088

2089+
if maxTasks <= 0 {
2090+
return nil, nil
2091+
}
2092+
20812093
// Build the query to select workflows for dequeueing
20822094
// Use SKIP LOCKED when no global concurrency is set to avoid blocking,
20832095
// otherwise use NOWAIT to ensure consistent view across processes

dbos/workflow.go

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2015,7 +2015,7 @@ func (c *dbosContext) ListWorkflows(_ DBOSContext, opts ...ListWorkflowsOption)
20152015
// Call the context method to list workflows
20162016
workflows, err := c.systemDB.listWorkflows(c, dbInput)
20172017
if err != nil {
2018-
return nil, fmt.Errorf("failed to list workflows: %w", err)
2018+
return nil, err
20192019
}
20202020

20212021
return workflows, nil
@@ -2062,3 +2062,32 @@ func ListWorkflows(ctx DBOSContext, opts ...ListWorkflowsOption) ([]WorkflowStat
20622062
}
20632063
return ctx.ListWorkflows(ctx, opts...)
20642064
}
2065+
2066+
func (c *dbosContext) GetWorkflowSteps(_ DBOSContext, workflowID string) ([]StepInfo, error) {
2067+
return c.systemDB.getWorkflowSteps(c, workflowID)
2068+
}
2069+
2070+
// GetWorkflowSteps retrieves the execution steps of a workflow.
2071+
// Returns a list of step information including step IDs, names, outputs, errors, and child workflow IDs.
2072+
//
2073+
// Parameters:
2074+
// - ctx: DBOS context for the operation
2075+
// - workflowID: The unique identifier of the workflow
2076+
//
2077+
// Returns a slice of StepInfo structs containing information about each executed step.
2078+
//
2079+
// Example:
2080+
//
2081+
// steps, err := dbos.GetWorkflowSteps(ctx, "workflow-id")
2082+
// if err != nil {
2083+
// log.Fatal(err)
2084+
// }
2085+
// for _, step := range steps {
2086+
// log.Printf("Step %d: %s", step.StepID, step.StepName)
2087+
// }
2088+
func GetWorkflowSteps(ctx DBOSContext, workflowID string) ([]StepInfo, error) {
2089+
if ctx == nil {
2090+
return nil, errors.New("ctx cannot be nil")
2091+
}
2092+
return ctx.GetWorkflowSteps(ctx, workflowID)
2093+
}

0 commit comments

Comments
 (0)