Skip to content

Commit 583733c

Browse files
authored
Query forked_from (#502)
Instead of including `forked_to` in workflow status (which is awkward as it's not a database column), allow querying by `forked_from` to trace fork lineage.
1 parent c1df9c3 commit 583733c

File tree

9 files changed

+40
-40
lines changed

9 files changed

+40
-40
lines changed

dbos/_admin_server.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,7 @@ def _handle_workflows(self, filters: Dict[str, Any]) -> None:
338338
end_time=filters.get("end_time"),
339339
status=filters.get("status"),
340340
app_version=filters.get("application_version"),
341+
forked_from=filters.get("forked_from"),
341342
name=filters.get("workflow_name"),
342343
limit=filters.get("limit"),
343344
offset=filters.get("offset"),
@@ -364,6 +365,7 @@ def _handle_queued_workflows(self, filters: Dict[str, Any]) -> None:
364365
start_time=filters.get("start_time"),
365366
end_time=filters.get("end_time"),
366367
status=filters.get("status"),
368+
forked_from=filters.get("forked_from"),
367369
name=filters.get("workflow_name"),
368370
limit=filters.get("limit"),
369371
offset=filters.get("offset"),

dbos/_conductor/conductor.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,7 @@ def run(self) -> None:
233233
end_time=body["end_time"],
234234
status=body["status"],
235235
app_version=body["application_version"],
236+
forked_from=body["forked_from"],
236237
name=body["workflow_name"],
237238
limit=body["limit"],
238239
offset=body["offset"],
@@ -267,6 +268,7 @@ def run(self) -> None:
267268
start_time=q_body["start_time"],
268269
end_time=q_body["end_time"],
269270
status=q_body["status"],
271+
forked_from=q_body["forked_from"],
270272
name=q_body["workflow_name"],
271273
limit=q_body["limit"],
272274
offset=q_body["offset"],

dbos/_conductor/protocol.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ class ListWorkflowsBody(TypedDict, total=False):
118118
end_time: Optional[str]
119119
status: Optional[str]
120120
application_version: Optional[str]
121+
forked_from: Optional[str]
121122
limit: Optional[int]
122123
offset: Optional[int]
123124
sort_desc: bool
@@ -149,7 +150,6 @@ class WorkflowsOutput:
149150
Priority: Optional[str]
150151
QueuePartitionKey: Optional[str]
151152
ForkedFrom: Optional[str]
152-
ForkedTo: Optional[list[str]]
153153

154154
@classmethod
155155
def from_workflow_information(cls, info: WorkflowStatus) -> "WorkflowsOutput":
@@ -199,7 +199,6 @@ def from_workflow_information(cls, info: WorkflowStatus) -> "WorkflowsOutput":
199199
Priority=priority_str,
200200
QueuePartitionKey=info.queue_partition_key,
201201
ForkedFrom=info.forked_from,
202-
ForkedTo=info.forked_to,
203202
)
204203

205204

@@ -254,6 +253,7 @@ class ListQueuedWorkflowsBody(TypedDict, total=False):
254253
start_time: Optional[str]
255254
end_time: Optional[str]
256255
status: Optional[str]
256+
forked_from: Optional[str]
257257
queue_name: Optional[str]
258258
limit: Optional[int]
259259
offset: Optional[int]

dbos/_dbos.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1127,6 +1127,7 @@ def list_workflows(
11271127
end_time: Optional[str] = None,
11281128
name: Optional[str] = None,
11291129
app_version: Optional[str] = None,
1130+
forked_from: Optional[str] = None,
11301131
user: Optional[str] = None,
11311132
queue_name: Optional[str] = None,
11321133
limit: Optional[int] = None,
@@ -1145,6 +1146,7 @@ def fn() -> List[WorkflowStatus]:
11451146
end_time=end_time,
11461147
name=name,
11471148
app_version=app_version,
1149+
forked_from=forked_from,
11481150
user=user,
11491151
limit=limit,
11501152
offset=offset,
@@ -1169,6 +1171,7 @@ async def list_workflows_async(
11691171
end_time: Optional[str] = None,
11701172
name: Optional[str] = None,
11711173
app_version: Optional[str] = None,
1174+
forked_from: Optional[str] = None,
11721175
user: Optional[str] = None,
11731176
limit: Optional[int] = None,
11741177
offset: Optional[int] = None,
@@ -1186,6 +1189,7 @@ async def list_workflows_async(
11861189
end_time=end_time,
11871190
name=name,
11881191
app_version=app_version,
1192+
forked_from=forked_from,
11891193
user=user,
11901194
limit=limit,
11911195
offset=offset,
@@ -1201,6 +1205,7 @@ def list_queued_workflows(
12011205
*,
12021206
queue_name: Optional[str] = None,
12031207
status: Optional[Union[str, List[str]]] = None,
1208+
forked_from: Optional[str] = None,
12041209
start_time: Optional[str] = None,
12051210
end_time: Optional[str] = None,
12061211
name: Optional[str] = None,
@@ -1214,6 +1219,7 @@ def fn() -> List[WorkflowStatus]:
12141219
_get_dbos_instance()._sys_db,
12151220
queue_name=queue_name,
12161221
status=status,
1222+
forked_from=forked_from,
12171223
start_time=start_time,
12181224
end_time=end_time,
12191225
name=name,
@@ -1233,6 +1239,7 @@ async def list_queued_workflows_async(
12331239
*,
12341240
queue_name: Optional[str] = None,
12351241
status: Optional[Union[str, List[str]]] = None,
1242+
forked_from: Optional[str] = None,
12361243
start_time: Optional[str] = None,
12371244
end_time: Optional[str] = None,
12381245
name: Optional[str] = None,
@@ -1246,6 +1253,7 @@ async def list_queued_workflows_async(
12461253
cls.list_queued_workflows,
12471254
queue_name=queue_name,
12481255
status=status,
1256+
forked_from=forked_from,
12491257
start_time=start_time,
12501258
end_time=end_time,
12511259
name=name,

dbos/_migration.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,7 @@ def get_dbos_migration_three(schema: str) -> str:
218218
def get_dbos_migration_four(schema: str) -> str:
219219
return f"""
220220
ALTER TABLE \"{schema}\".workflow_status ADD COLUMN forked_from TEXT;
221+
CREATE INDEX "idx_workflow_status_forked_from" ON \"{schema}\"."workflow_status" ("forked_from")
221222
"""
222223

223224

@@ -334,6 +335,7 @@ def get_sqlite_timestamp_expr() -> str:
334335

335336
sqlite_migration_four = """
336337
ALTER TABLE workflow_status ADD COLUMN forked_from TEXT;
338+
CREATE INDEX "idx_workflow_status_forked_from" ON "workflow_status" ("forked_from")
337339
"""
338340

339341
sqlite_migration_five = """

dbos/_sys_db.py

Lines changed: 9 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ class WorkflowStatus:
106106
updated_at: Optional[int]
107107
# If this workflow was enqueued, on which queue
108108
queue_name: Optional[str]
109-
# The executor to most recently executed this workflow
109+
# The executor to most recently execute this workflow
110110
executor_id: Optional[str]
111111
# The application version on which this workflow was started
112112
app_version: Optional[str]
@@ -122,8 +122,6 @@ class WorkflowStatus:
122122
queue_partition_key: Optional[str]
123123
# If this workflow was forked from another, that workflow's ID.
124124
forked_from: Optional[str]
125-
# If this workflow was forked to others, those workflows' IDs
126-
forked_to: Optional[list[str]]
127125

128126
# INTERNAL FIELDS
129127

@@ -213,6 +211,8 @@ def __init__(self) -> None:
213211
self.status: Optional[List[str]] = None
214212
# The application version that ran this workflow.
215213
self.application_version: Optional[str] = None
214+
# Get workflows forked from this workflow ID.
215+
self.forked_from: Optional[str] = None
216216
# Return up to this many workflows IDs. IDs are ordered by workflow creation time.
217217
self.limit: Optional[int] = None
218218
# Offset into the matching records for pagination
@@ -244,9 +244,9 @@ class StepInfo(TypedDict):
244244
error: Optional[Exception]
245245
# If the step starts or retrieves the result of a workflow, its ID
246246
child_workflow_id: Optional[str]
247-
# The UNIX epoch timestamp at which this step started
247+
# The Unix epoch timestamp at which this step started
248248
started_at_epoch_ms: Optional[int]
249-
# The UNIX epoch timestamp at which this step completed
249+
# The Unix epoch timestamp at which this step completed
250250
completed_at_epoch_ms: Optional[int]
251251

252252

@@ -931,6 +931,10 @@ def get_workflows(
931931
SystemSchema.workflow_status.c.application_version
932932
== input.application_version
933933
)
934+
if input.forked_from:
935+
query = query.where(
936+
SystemSchema.workflow_status.c.forked_from == input.forked_from
937+
)
934938
if input.workflow_ids:
935939
query = query.where(
936940
SystemSchema.workflow_status.c.workflow_uuid.in_(input.workflow_ids)
@@ -997,29 +1001,6 @@ def get_workflows(
9971001

9981002
workflow_ids.append(info.workflow_id)
9991003
infos.append(info)
1000-
1001-
# Calculate forked_to relationships
1002-
if workflow_ids:
1003-
with self.engine.begin() as c:
1004-
forked_to_query = sa.select(
1005-
SystemSchema.workflow_status.c.forked_from,
1006-
SystemSchema.workflow_status.c.workflow_uuid,
1007-
).where(SystemSchema.workflow_status.c.forked_from.in_(workflow_ids))
1008-
forked_to_rows = c.execute(forked_to_query).fetchall()
1009-
1010-
# Build a mapping of fork-parent workflow ID to list of fork-child workflow IDs
1011-
forked_to_map: Dict[str, List[str]] = {}
1012-
for row in forked_to_rows:
1013-
parent_id = row[0]
1014-
child_id = row[1]
1015-
if parent_id not in forked_to_map:
1016-
forked_to_map[parent_id] = []
1017-
forked_to_map[parent_id].append(child_id)
1018-
1019-
# Populate the forked_to field for each workflow
1020-
for info in infos:
1021-
info.forked_to = forked_to_map.get(info.workflow_id, None)
1022-
10231004
return infos
10241005

10251006
def get_pending_workflows(

dbos/_workflow_commands.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ def list_workflows(
2626
end_time: Optional[str] = None,
2727
name: Optional[str] = None,
2828
app_version: Optional[str] = None,
29+
forked_from: Optional[str] = None,
2930
user: Optional[str] = None,
3031
queue_name: Optional[str] = None,
3132
limit: Optional[int] = None,
@@ -42,6 +43,7 @@ def list_workflows(
4243
input.end_time = end_time
4344
input.status = status if status is None or isinstance(status, list) else [status]
4445
input.application_version = app_version
46+
input.forked_from = forked_from
4547
input.queue_name = queue_name
4648
input.limit = limit
4749
input.name = name
@@ -61,6 +63,7 @@ def list_queued_workflows(
6163
*,
6264
queue_name: Optional[str] = None,
6365
status: Optional[Union[str, List[str]]] = None,
66+
forked_from: Optional[str] = None,
6467
start_time: Optional[str] = None,
6568
end_time: Optional[str] = None,
6669
name: Optional[str] = None,
@@ -73,6 +76,7 @@ def list_queued_workflows(
7376
input.start_time = start_time
7477
input.end_time = end_time
7578
input.status = status if status is None or isinstance(status, list) else [status]
79+
input.forked_from = forked_from
7680
input.limit = limit
7781
input.name = name
7882
input.offset = offset

tests/test_admin_server.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -527,7 +527,6 @@ def test_workflow_2(my_time: datetime) -> str:
527527
assert workflows[0]["Priority"] == "0"
528528
assert workflows[0]["QueuePartitionKey"] is None
529529
assert workflows[0]["ForkedFrom"] is None
530-
assert workflows[0]["ForkedTo"] is None
531530

532531
# Only load input and output as requested
533532
filters = {
@@ -600,10 +599,8 @@ def test_workflow_2(my_time: datetime) -> str:
600599
assert len(workflows) == 2
601600
assert workflows[0]["WorkflowUUID"] == handle_1.workflow_id
602601
assert workflows[0]["ForkedFrom"] == None
603-
assert workflows[0]["ForkedTo"] == [handle_3.workflow_id]
604602
assert workflows[1]["WorkflowUUID"] == handle_3.workflow_id
605603
assert workflows[1]["ForkedFrom"] == handle_1.workflow_id
606-
assert workflows[1]["ForkedTo"] == None
607604

608605
filters = {
609606
"end_time": (datetime.now(timezone.utc) - timedelta(minutes=10)).isoformat()
@@ -621,6 +618,14 @@ def test_workflow_2(my_time: datetime) -> str:
621618
workflows = response.json()
622619
assert len(workflows) == 0
623620

621+
filters = {
622+
"forked_from": "not-an-id",
623+
}
624+
response = requests.post("http://localhost:3001/workflows", json=filters, timeout=5)
625+
assert response.status_code == 200
626+
workflows = response.json()
627+
assert len(workflows) == 0
628+
624629
filters = {
625630
"status": ["SUCCESS", "CANCELLED"],
626631
}

tests/test_workflow_management.py

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -262,8 +262,9 @@ def stepFive(x: int) -> int:
262262
assert stepFourCount == 3
263263
assert stepFiveCount == 4
264264

265-
handle: WorkflowHandle[int] = DBOS.retrieve_workflow(wfid)
266-
assert handle.get_status().forked_to == [fork_id, fork_id_2, fork_id_3]
265+
forks = DBOS.list_workflows(forked_from=wfid)
266+
assert len(forks) == 3
267+
assert [f.workflow_id for f in forks] == [fork_id, fork_id_2, fork_id_3]
267268

268269

269270
def test_restart_fromsteps_transactionsonly(
@@ -357,11 +358,6 @@ def trFive() -> None:
357358
assert trThreeCount == 3
358359
assert trFourCount == 4
359360
assert trFiveCount == 4
360-
assert DBOS.retrieve_workflow(wfid).get_status().forked_to == [
361-
fork_id_one,
362-
fork_id_two,
363-
fork_id_three,
364-
]
365361

366362

367363
def test_restart_fromsteps_steps_tr(

0 commit comments

Comments
 (0)