Skip to content

Commit c484edb

Browse files
authored
Wf forked from and steps timestamps (#188)
Addresses #179 **Workflow forking** - Added `forked_from` column to `workflow_status` to track original workflow ID during fork operations. - Added ForkedFrom filter option for ListWorkflows. **Step timestamps:** Added `started_at_epoch_ms` and `completed_at_epoch_ms` columns to `operation_outputs` to track step execution times. All special steps (`send`, `recv`, `sleep`, `setEvent`, `getEvent`) also capture and record timestamps. **Conductor protocol:** Updated workflow and step responses to include new fields (`ForkedFrom`, `WorkflowTimeoutMS`, `WorkflowDeadlineEpochMS`, `started_at_epoch_ms`, `completed_at_epoch_ms`) as strings. **Admin server:** Updated workflow list response to use `WorkflowDeadlineEpochMS` instead of `Deadline` **Database changes:** Added migrations 4 and 5. **Tests:** Updated tests to verify new fields are present and timestamps are set correctly for all special steps.
1 parent 497523d commit c484edb

File tree

11 files changed

+400
-93
lines changed

11 files changed

+400
-93
lines changed

dbos/admin_server.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,9 +143,9 @@ func toListWorkflowResponse(ws WorkflowStatus) (map[string]any, error) {
143143
}
144144

145145
if !ws.Deadline.IsZero() {
146-
result["Deadline"] = ws.Deadline.UTC().UnixMilli()
146+
result["WorkflowDeadlineEpochMS"] = ws.Deadline.UTC().UnixMilli()
147147
} else {
148-
result["Deadline"] = nil
148+
result["WorkflowDeadlineEpochMS"] = nil
149149
}
150150

151151
if !ws.StartedAt.IsZero() {
@@ -444,6 +444,14 @@ func newAdminServer(ctx *dbosContext, port int) *adminServer {
444444
"child_workflow_id": step.ChildWorkflowID,
445445
}
446446

447+
// Add timestamps if present
448+
if !step.StartedAt.IsZero() {
449+
formattedStep["started_at_epoch_ms"] = step.StartedAt.UnixMilli()
450+
}
451+
if !step.CompletedAt.IsZero() {
452+
formattedStep["completed_at_epoch_ms"] = step.CompletedAt.UnixMilli()
453+
}
454+
447455
if step.Output != nil {
448456
// If there is a value, it should be a JSON string
449457
jsonOutput, ok := step.Output.(string)

dbos/admin_server_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -885,6 +885,12 @@ func TestAdminServer(t *testing.T) {
885885
functionName, ok := step["function_name"].(string)
886886
require.True(t, ok, "function_name should be a string for step %d", i)
887887

888+
// Verify timestamps are present
889+
_, hasStartedAt := step["started_at_epoch_ms"]
890+
assert.True(t, hasStartedAt, "Step %d should have started_at_epoch_ms field", i)
891+
_, hasCompletedAt := step["completed_at_epoch_ms"]
892+
assert.True(t, hasCompletedAt, "Step %d should have completed_at_epoch_ms field", i)
893+
888894
t.Logf("Step %d (%s): output=%v, error=%v", i, functionName, step["output"], step["error"])
889895

890896
switch functionName {

dbos/client_test.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -709,6 +709,11 @@ func TestForkWorkflow(t *testing.T) {
709709
forkedWorkflowID := forkedHandle.GetWorkflowID()
710710
assert.Equal(t, customForkedWorkflowID, forkedWorkflowID, "expected forked workflow ID to match")
711711

712+
// Verify forked_from is set
713+
forkedStatus, err := forkedHandle.GetStatus()
714+
require.NoError(t, err, "failed to get forked workflow status")
715+
assert.Equal(t, originalWorkflowID, forkedStatus.ForkedFrom, "expected forked_from to be set to original workflow ID")
716+
712717
forkedResult, err := forkedHandle.GetResult()
713718
require.NoError(t, err, "failed to get result from forked workflow at step %d", step)
714719

@@ -882,6 +887,17 @@ func TestListWorkflows(t *testing.T) {
882887
require.NoError(t, err, "failed to list all workflows")
883888
assert.GreaterOrEqual(t, len(allWorkflows), 10, "expected at least 10 workflows")
884889

890+
for _, wf := range allWorkflows {
891+
// These fields should exist (may be zero/empty for some workflows)
892+
// Timeout and Deadline are time.Duration and time.Time, so they're always present
893+
_ = wf.Timeout
894+
_ = wf.Deadline
895+
_ = wf.DeduplicationID
896+
_ = wf.Priority
897+
_ = wf.QueuePartitionKey
898+
_ = wf.ForkedFrom
899+
}
900+
885901
// Test 2: Filter by workflow IDs
886902
expectedIDs := workflowIDs[:3]
887903
specificWorkflows, err := client.ListWorkflows(WithWorkflowIDs(expectedIDs))
@@ -1070,6 +1086,11 @@ func TestGetWorkflowSteps(t *testing.T) {
10701086
assert.Nil(t, step.Error, "expected no error in step")
10711087
assert.Equal(t, "", step.ChildWorkflowID, "expected no child workflow ID")
10721088

1089+
// Verify timestamps are present
1090+
assert.False(t, step.StartedAt.IsZero(), "expected step to have StartedAt timestamp")
1091+
assert.False(t, step.CompletedAt.IsZero(), "expected step to have CompletedAt timestamp")
1092+
assert.True(t, step.CompletedAt.After(step.StartedAt) || step.CompletedAt.Equal(step.StartedAt), "expected CompletedAt to be after or equal to StartedAt")
1093+
10731094
// Verify the output wasn't loaded
10741095
require.Nil(t, step.Output, "expected output not to be loaded")
10751096

dbos/conductor.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -592,6 +592,9 @@ func (c *conductor) handleListWorkflowsRequest(data []byte, requestID string) er
592592
if req.Body.Status != nil {
593593
opts = append(opts, WithStatus([]WorkflowStatusType{WorkflowStatusType(*req.Body.Status)}))
594594
}
595+
if req.Body.ForkedFrom != nil {
596+
opts = append(opts, WithForkedFrom(*req.Body.ForkedFrom))
597+
}
595598

596599
workflows, err := c.dbosCtx.ListWorkflows(c.dbosCtx, opts...)
597600
if err != nil {

dbos/conductor_protocol.go

Lines changed: 57 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ type listWorkflowsConductorRequestBody struct {
6363
SortDesc bool `json:"sort_desc"`
6464
LoadInput bool `json:"load_input"`
6565
LoadOutput bool `json:"load_output"`
66+
ForkedFrom *string `json:"forked_from,omitempty"`
6667
}
6768

6869
// listWorkflowsConductorRequest is sent by the conductor to list workflows
@@ -73,25 +74,28 @@ type listWorkflowsConductorRequest struct {
7374

7475
// listWorkflowsConductorResponseBody represents a single workflow in the list response
7576
type listWorkflowsConductorResponseBody struct {
76-
WorkflowUUID string `json:"WorkflowUUID"`
77-
Status *string `json:"Status,omitempty"`
78-
WorkflowName *string `json:"WorkflowName,omitempty"`
79-
WorkflowClassName *string `json:"WorkflowClassName,omitempty"`
80-
WorkflowConfigName *string `json:"WorkflowConfigName,omitempty"`
81-
AuthenticatedUser *string `json:"AuthenticatedUser,omitempty"`
82-
AssumedRole *string `json:"AssumedRole,omitempty"`
83-
AuthenticatedRoles *string `json:"AuthenticatedRoles,omitempty"`
84-
Input *string `json:"Input,omitempty"`
85-
Output *string `json:"Output,omitempty"`
86-
Error *string `json:"Error,omitempty"`
87-
CreatedAt *string `json:"CreatedAt,omitempty"`
88-
UpdatedAt *string `json:"UpdatedAt,omitempty"`
89-
QueueName *string `json:"QueueName,omitempty"`
90-
QueuePartitionKey *string `json:"QueuePartitionKey,omitempty"`
91-
DeduplicationID *string `json:"DeduplicationID,omitempty"`
92-
Priority *int `json:"Priority,omitempty"`
93-
ApplicationVersion *string `json:"ApplicationVersion,omitempty"`
94-
ExecutorID *string `json:"ExecutorID,omitempty"`
77+
WorkflowUUID string `json:"WorkflowUUID"`
78+
Status *string `json:"Status,omitempty"`
79+
WorkflowName *string `json:"WorkflowName,omitempty"`
80+
WorkflowClassName *string `json:"WorkflowClassName,omitempty"`
81+
WorkflowConfigName *string `json:"WorkflowConfigName,omitempty"`
82+
AuthenticatedUser *string `json:"AuthenticatedUser,omitempty"`
83+
AssumedRole *string `json:"AssumedRole,omitempty"`
84+
AuthenticatedRoles *string `json:"AuthenticatedRoles,omitempty"`
85+
Input *string `json:"Input,omitempty"`
86+
Output *string `json:"Output,omitempty"`
87+
Error *string `json:"Error,omitempty"`
88+
CreatedAt *string `json:"CreatedAt,omitempty"`
89+
UpdatedAt *string `json:"UpdatedAt,omitempty"`
90+
QueueName *string `json:"QueueName,omitempty"`
91+
QueuePartitionKey *string `json:"QueuePartitionKey,omitempty"`
92+
DeduplicationID *string `json:"DeduplicationID,omitempty"`
93+
Priority *int `json:"Priority,omitempty"`
94+
ApplicationVersion *string `json:"ApplicationVersion,omitempty"`
95+
ExecutorID *string `json:"ExecutorID,omitempty"`
96+
WorkflowTimeoutMS *string `json:"WorkflowTimeoutMS,omitempty"`
97+
WorkflowDeadlineEpochMS *string `json:"WorkflowDeadlineEpochMS,omitempty"`
98+
ForkedFrom *string `json:"ForkedFrom,omitempty"`
9599
}
96100

97101
// listWorkflowsConductorResponse is sent in response to list workflows requests
@@ -195,6 +199,23 @@ func formatListWorkflowsResponseBody(wf WorkflowStatus) listWorkflowsConductorRe
195199
output.ExecutorID = &wf.ExecutorID
196200
}
197201

202+
// Convert timeout to milliseconds string
203+
if wf.Timeout > 0 {
204+
timeoutStr := strconv.FormatInt(wf.Timeout.Milliseconds(), 10)
205+
output.WorkflowTimeoutMS = &timeoutStr
206+
}
207+
208+
// Convert deadline to epoch milliseconds string
209+
if !wf.Deadline.IsZero() {
210+
deadlineStr := strconv.FormatInt(wf.Deadline.UnixMilli(), 10)
211+
output.WorkflowDeadlineEpochMS = &deadlineStr
212+
}
213+
214+
// Copy forked from
215+
if wf.ForkedFrom != "" {
216+
output.ForkedFrom = &wf.ForkedFrom
217+
}
218+
198219
return output
199220
}
200221

@@ -206,11 +227,13 @@ type listStepsConductorRequest struct {
206227

207228
// workflowStepsConductorResponseBody represents a single workflow step in the list response
208229
type workflowStepsConductorResponseBody struct {
209-
FunctionID int `json:"function_id"`
210-
FunctionName string `json:"function_name"`
211-
Output *string `json:"output,omitempty"`
212-
Error *string `json:"error,omitempty"`
213-
ChildWorkflowID *string `json:"child_workflow_id,omitempty"`
230+
FunctionID int `json:"function_id"`
231+
FunctionName string `json:"function_name"`
232+
Output *string `json:"output,omitempty"`
233+
Error *string `json:"error,omitempty"`
234+
ChildWorkflowID *string `json:"child_workflow_id,omitempty"`
235+
StartedAtEpochMs *string `json:"started_at_epoch_ms,omitempty"`
236+
CompletedAtEpochMs *string `json:"completed_at_epoch_ms,omitempty"`
214237
}
215238

216239
// listStepsConductorResponse is sent in response to list steps requests
@@ -246,6 +269,16 @@ func formatWorkflowStepsResponseBody(step StepInfo) workflowStepsConductorRespon
246269
output.ChildWorkflowID = &step.ChildWorkflowID
247270
}
248271

272+
// Convert timestamps to epoch milliseconds strings
273+
if !step.StartedAt.IsZero() {
274+
startedAtStr := strconv.FormatInt(step.StartedAt.UnixMilli(), 10)
275+
output.StartedAtEpochMs = &startedAtStr
276+
}
277+
if !step.CompletedAt.IsZero() {
278+
completedAtStr := strconv.FormatInt(step.CompletedAt.UnixMilli(), 10)
279+
output.CompletedAtEpochMs = &completedAtStr
280+
}
281+
249282
return output
250283
}
251284

dbos/dbos_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ func TestConfig(t *testing.T) {
251251
require.NoError(t, err)
252252
assert.True(t, exists, "dbos_migrations table should exist")
253253

254-
// Verify migration version is 3 (after initial migration, queue partition key migration, and workflow status index migration)
254+
// Verify migration version is 5 (after initial migration, queue partition key migration, workflow status index migration, forked_from migration, and step timestamps migration)
255255
var version int64
256256
var count int
257257
err = sysDB.pool.QueryRow(dbCtx, "SELECT COUNT(*) FROM dbos.dbos_migrations").Scan(&count)
@@ -260,7 +260,7 @@ func TestConfig(t *testing.T) {
260260

261261
err = sysDB.pool.QueryRow(dbCtx, "SELECT version FROM dbos.dbos_migrations").Scan(&version)
262262
require.NoError(t, err)
263-
assert.Equal(t, int64(3), version, "migration version should be 3 (after initial migration, queue partition key migration, and workflow status index migration)")
263+
assert.Equal(t, int64(5), version, "migration version should be 5 (after initial migration, queue partition key migration, workflow status index migration, forked_from migration, and step timestamps migration)")
264264

265265
// Test manual shutdown and recreate
266266
Shutdown(ctx, 1*time.Minute)
@@ -459,7 +459,7 @@ func TestCustomSystemDBSchema(t *testing.T) {
459459
require.NoError(t, err)
460460
assert.True(t, exists, "dbos_migrations table should exist in custom schema")
461461

462-
// Verify migration version is 3 (after initial migration, queue partition key migration, and workflow status index migration)
462+
// Verify migration version is 5 (after initial migration, queue partition key migration, workflow status index migration, forked_from migration, and step timestamps migration)
463463
var version int64
464464
var count int
465465
err = sysDB.pool.QueryRow(dbCtx, fmt.Sprintf("SELECT COUNT(*) FROM %s.dbos_migrations", customSchema)).Scan(&count)
@@ -468,7 +468,7 @@ func TestCustomSystemDBSchema(t *testing.T) {
468468

469469
err = sysDB.pool.QueryRow(dbCtx, fmt.Sprintf("SELECT version FROM %s.dbos_migrations", customSchema)).Scan(&version)
470470
require.NoError(t, err)
471-
assert.Equal(t, int64(3), version, "migration version should be 3 (after initial migration, queue partition key migration, and workflow status index migration)")
471+
assert.Equal(t, int64(5), version, "migration version should be 5 (after initial migration, queue partition key migration, workflow status index migration, forked_from migration, and step timestamps migration)")
472472
})
473473

474474
// Test workflows for exercising Send/Recv and SetEvent/GetEvent
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
-- Migration 4: Add forked_from column to workflow_status table
2+
-- This enables tracking workflow fork lineage
3+
4+
ALTER TABLE %s.workflow_status
5+
ADD COLUMN forked_from TEXT;
6+
7+
CREATE INDEX "idx_workflow_status_forked_from" ON %s."workflow_status" ("forked_from");
8+
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
-- Migration 5: Add started_at_epoch_ms and completed_at_epoch_ms columns to operation_outputs table
2+
-- This enables visualization of step duration
3+
4+
ALTER TABLE %s.operation_outputs
5+
ADD COLUMN started_at_epoch_ms BIGINT, ADD COLUMN completed_at_epoch_ms BIGINT;
6+

0 commit comments

Comments
 (0)