Skip to content

Commit c1df9c3

Browse files
authored
Better Introspection (#501)
- Add all missing database fields to `WorkflowStatus` - Add `forked_from` and `forked_to` fields to `WorkflowStatus`, tracing workflow fork lineage and closing #498 - Use `pool_pre_ping` in the client, closing #500 - Record step start and end times, enabling visualization of step duration
1 parent 58801ac commit c1df9c3

File tree

14 files changed

+340
-194
lines changed

14 files changed

+340
-194
lines changed

dbos/_app_db.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,8 @@ def get_transactions(self, workflow_uuid: str) -> List[StepInfo]:
201201
else row[3]
202202
),
203203
child_workflow_id=None,
204+
started_at_epoch_ms=None,
205+
completed_at_epoch_ms=None,
204206
)
205207
for row in rows
206208
]

dbos/_client.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,9 +149,11 @@ def __init__(
149149
self._sys_db = SystemDatabase.create(
150150
system_database_url=system_database_url,
151151
engine_kwargs={
152+
"connect_args": {"application_name": "dbos_transact_client"},
152153
"pool_timeout": 30,
153154
"max_overflow": 0,
154155
"pool_size": 2,
156+
"pool_pre_ping": True,
155157
},
156158
engine=system_database_engine,
157159
schema=dbos_system_schema,
@@ -162,9 +164,11 @@ def __init__(
162164
self._app_db = ApplicationDatabase.create(
163165
database_url=application_database_url,
164166
engine_kwargs={
167+
"connect_args": {"application_name": "dbos_transact_client"},
165168
"pool_timeout": 30,
166169
"max_overflow": 0,
167170
"pool_size": 2,
171+
"pool_pre_ping": True,
168172
},
169173
schema=dbos_system_schema,
170174
serializer=serializer,
@@ -234,6 +238,7 @@ def _enqueue(self, options: EnqueueOptions, *args: Any, **kwargs: Any) -> str:
234238
),
235239
"inputs": self._serializer.serialize(inputs),
236240
"queue_partition_key": enqueue_options_internal["queue_partition_key"],
241+
"forked_from": None,
237242
}
238243

239244
self._sys_db.init_workflow(
@@ -300,6 +305,7 @@ def send(
300305
"priority": 0,
301306
"inputs": self._serializer.serialize({"args": (), "kwargs": {}}),
302307
"queue_partition_key": None,
308+
"forked_from": None,
303309
}
304310
with self._sys_db.engine.begin() as conn:
305311
self._sys_db._insert_workflow_status(

dbos/_conductor/protocol.py

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,13 @@ class WorkflowsOutput:
143143
QueueName: Optional[str]
144144
ApplicationVersion: Optional[str]
145145
ExecutorID: Optional[str]
146+
WorkflowTimeoutMS: Optional[str]
147+
WorkflowDeadlineEpochMS: Optional[str]
148+
DeduplicationID: Optional[str]
149+
Priority: Optional[str]
150+
QueuePartitionKey: Optional[str]
151+
ForkedFrom: Optional[str]
152+
ForkedTo: Optional[list[str]]
146153

147154
@classmethod
148155
def from_workflow_information(cls, info: WorkflowStatus) -> "WorkflowsOutput":
@@ -152,12 +159,22 @@ def from_workflow_information(cls, info: WorkflowStatus) -> "WorkflowsOutput":
152159
inputs_str = str(info.input) if info.input is not None else None
153160
outputs_str = str(info.output) if info.output is not None else None
154161
error_str = str(info.error) if info.error is not None else None
155-
request_str = None
156162
roles_str = (
157163
str(info.authenticated_roles)
158164
if info.authenticated_roles is not None
159165
else None
160166
)
167+
workflow_timeout_ms_str = (
168+
str(info.workflow_timeout_ms)
169+
if info.workflow_timeout_ms is not None
170+
else None
171+
)
172+
workflow_deadline_epoch_ms_str = (
173+
str(info.workflow_deadline_epoch_ms)
174+
if info.workflow_deadline_epoch_ms is not None
175+
else None
176+
)
177+
priority_str = str(info.priority) if info.priority is not None else None
161178

162179
return cls(
163180
WorkflowUUID=info.workflow_id,
@@ -176,6 +193,13 @@ def from_workflow_information(cls, info: WorkflowStatus) -> "WorkflowsOutput":
176193
QueueName=info.queue_name,
177194
ApplicationVersion=info.app_version,
178195
ExecutorID=info.executor_id,
196+
WorkflowTimeoutMS=workflow_timeout_ms_str,
197+
WorkflowDeadlineEpochMS=workflow_deadline_epoch_ms_str,
198+
DeduplicationID=info.deduplication_id,
199+
Priority=priority_str,
200+
QueuePartitionKey=info.queue_partition_key,
201+
ForkedFrom=info.forked_from,
202+
ForkedTo=info.forked_to,
179203
)
180204

181205

@@ -186,14 +210,28 @@ class WorkflowSteps:
186210
output: Optional[str]
187211
error: Optional[str]
188212
child_workflow_id: Optional[str]
213+
started_at_epoch_ms: Optional[str]
214+
completed_at_epoch_ms: Optional[str]
189215

190216
@classmethod
191217
def from_step_info(cls, info: StepInfo) -> "WorkflowSteps":
192218
output_str = str(info["output"]) if info["output"] is not None else None
193219
error_str = str(info["error"]) if info["error"] is not None else None
220+
started_at_str = (
221+
str(info["started_at_epoch_ms"])
222+
if info["started_at_epoch_ms"] is not None
223+
else None
224+
)
225+
completed_at_str = (
226+
str(info["completed_at_epoch_ms"])
227+
if info["completed_at_epoch_ms"] is not None
228+
else None
229+
)
194230
return cls(
195231
function_id=info["function_id"],
196232
function_name=info["function_name"],
233+
started_at_epoch_ms=started_at_str,
234+
completed_at_epoch_ms=completed_at_str,
197235
output=output_str,
198236
error=error_str,
199237
child_workflow_id=info["child_workflow_id"],

dbos/_core.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,7 @@ def _init_workflow(
300300
if enqueue_options is not None
301301
else None
302302
),
303+
"forked_from": None,
303304
}
304305

305306
# Synchronously record the status and inputs for workflows
@@ -316,6 +317,7 @@ def _init_workflow(
316317
"function_name": wf_name,
317318
"output": None,
318319
"error": dbos._serializer.serialize(e),
320+
"started_at_epoch_ms": int(time.time() * 1000),
319321
}
320322
dbos._sys_db.record_operation_result(result)
321323
raise
@@ -1118,6 +1120,7 @@ def record_step_result(func: Callable[[], R]) -> R:
11181120
"function_name": step_name,
11191121
"output": None,
11201122
"error": None,
1123+
"started_at_epoch_ms": int(time.time() * 1000),
11211124
}
11221125

11231126
try:

dbos/_dbos.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1128,6 +1128,7 @@ def list_workflows(
11281128
name: Optional[str] = None,
11291129
app_version: Optional[str] = None,
11301130
user: Optional[str] = None,
1131+
queue_name: Optional[str] = None,
11311132
limit: Optional[int] = None,
11321133
offset: Optional[int] = None,
11331134
sort_desc: bool = False,
@@ -1151,6 +1152,7 @@ def fn() -> List[WorkflowStatus]:
11511152
workflow_id_prefix=workflow_id_prefix,
11521153
load_input=load_input,
11531154
load_output=load_output,
1155+
queue_name=queue_name,
11541156
)
11551157

11561158
return _get_dbos_instance()._sys_db.call_function_as_step(

dbos/_migration.py

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,11 +215,25 @@ def get_dbos_migration_three(schema: str) -> str:
215215
"""
216216

217217

218+
def get_dbos_migration_four(schema: str) -> str:
219+
return f"""
220+
ALTER TABLE \"{schema}\".workflow_status ADD COLUMN forked_from TEXT;
221+
"""
222+
223+
224+
def get_dbos_migration_five(schema: str) -> str:
225+
return f"""
226+
ALTER TABLE \"{schema}\".operation_outputs ADD COLUMN started_at_epoch_ms BIGINT, ADD COLUMN completed_at_epoch_ms BIGINT;
227+
"""
228+
229+
218230
def get_dbos_migrations(schema: str) -> list[str]:
219231
return [
220232
get_dbos_migration_one(schema),
221233
get_dbos_migration_two(schema),
222234
get_dbos_migration_three(schema),
235+
get_dbos_migration_four(schema),
236+
get_dbos_migration_five(schema),
223237
]
224238

225239

@@ -318,4 +332,20 @@ def get_sqlite_timestamp_expr() -> str:
318332
ON "workflow_status" ("queue_name", "status", "started_at_epoch_ms")
319333
"""
320334

321-
sqlite_migrations = [sqlite_migration_one, sqlite_migration_two, sqlite_migration_three]
335+
sqlite_migration_four = """
336+
ALTER TABLE workflow_status ADD COLUMN forked_from TEXT;
337+
"""
338+
339+
sqlite_migration_five = """
340+
ALTER TABLE operation_outputs ADD COLUMN started_at_epoch_ms BIGINT;
341+
ALTER TABLE operation_outputs ADD COLUMN completed_at_epoch_ms BIGINT;
342+
"""
343+
344+
345+
sqlite_migrations = [
346+
sqlite_migration_one,
347+
sqlite_migration_two,
348+
sqlite_migration_three,
349+
sqlite_migration_four,
350+
sqlite_migration_five,
351+
]

dbos/_schemas/system_database.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ def set_schema(cls, schema_name: Optional[str]) -> None:
7878
Column("inputs", Text()),
7979
Column("priority", Integer(), nullable=False, server_default=text("'0'::int")),
8080
Column("queue_partition_key", Text()),
81+
Column("forked_from", Text()),
8182
Index("workflow_status_created_at_index", "created_at"),
8283
Index("workflow_status_executor_id_index", "executor_id"),
8384
Index("workflow_status_status_index", "status"),
@@ -104,6 +105,8 @@ def set_schema(cls, schema_name: Optional[str]) -> None:
104105
Column("output", Text, nullable=True),
105106
Column("error", Text, nullable=True),
106107
Column("child_workflow_id", Text, nullable=True),
108+
Column("started_at_epoch_ms", BigInteger, nullable=True),
109+
Column("completed_at_epoch_ms", BigInteger, nullable=True),
107110
PrimaryKeyConstraint("workflow_uuid", "function_id"),
108111
)
109112

0 commit comments

Comments
 (0)