Skip to content

Commit 109d4ba

Browse files
authored
Check and set workflow executor_id (#503)
At each step checkpoint, check the workflow executor_id and reset to current if not matching.
1 parent 583733c commit 109d4ba

File tree

7 files changed

+67
-2
lines changed

7 files changed

+67
-2
lines changed

DEVELOPING.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ pdm run pre-commit install
4545
To run unit tests:
4646

4747
```
48-
pdm run pytest
48+
pdm run pytest tests
4949
```
5050

5151
NOTE: The tests need a Postgres database running on `localhost:5432`. To start

dbos/_client.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ def __init__(
158158
engine=system_database_engine,
159159
schema=dbos_system_schema,
160160
serializer=serializer,
161+
executor_id=None,
161162
)
162163
self._sys_db.check_connection()
163164
if application_database_url:

dbos/_dbos.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -460,6 +460,7 @@ def _launch(self, *, debug_mode: bool = False) -> None:
460460
debug_mode=debug_mode,
461461
schema=schema,
462462
serializer=self._serializer,
463+
executor_id=GlobalParams.executor_id,
463464
)
464465
assert self._config["database"]["db_engine_kwargs"] is not None
465466
if self._config["database_url"]:

dbos/_sys_db.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,7 @@ def create(
351351
engine: Optional[sa.Engine],
352352
schema: Optional[str],
353353
serializer: Serializer,
354+
executor_id: Optional[str],
354355
debug_mode: bool = False,
355356
) -> "SystemDatabase":
356357
"""Factory method to create the appropriate SystemDatabase implementation based on URL."""
@@ -363,6 +364,7 @@ def create(
363364
engine=engine,
364365
schema=schema,
365366
serializer=serializer,
367+
executor_id=executor_id,
366368
debug_mode=debug_mode,
367369
)
368370
else:
@@ -374,6 +376,7 @@ def create(
374376
engine=engine,
375377
schema=schema,
376378
serializer=serializer,
379+
executor_id=executor_id,
377380
debug_mode=debug_mode,
378381
)
379382

@@ -385,6 +388,7 @@ def __init__(
385388
engine: Optional[sa.Engine],
386389
schema: Optional[str],
387390
serializer: Serializer,
391+
executor_id: Optional[str],
388392
debug_mode: bool = False,
389393
):
390394
import sqlalchemy.dialects.postgresql as pg
@@ -410,6 +414,8 @@ def __init__(
410414

411415
self.notifications_map = ThreadSafeConditionDict()
412416
self.workflow_events_map = ThreadSafeConditionDict()
417+
self.executor_id = executor_id
418+
413419
self._listener_thread_lock = threading.Lock()
414420

415421
# Now we can run background processes
@@ -1069,6 +1075,27 @@ def _record_operation_result_txn(
10691075
error = result["error"]
10701076
output = result["output"]
10711077
assert error is None or output is None, "Only one of error or output can be set"
1078+
wf_executor_id_row = conn.execute(
1079+
sa.select(
1080+
SystemSchema.workflow_status.c.executor_id,
1081+
).where(
1082+
SystemSchema.workflow_status.c.workflow_uuid == result["workflow_uuid"]
1083+
)
1084+
).fetchone()
1085+
assert wf_executor_id_row is not None
1086+
wf_executor_id = wf_executor_id_row[0]
1087+
if self.executor_id is not None and wf_executor_id != self.executor_id:
1088+
dbos_logger.debug(
1089+
f'Resetting executor_id from {wf_executor_id} to {self.executor_id} for workflow {result["workflow_uuid"]}'
1090+
)
1091+
conn.execute(
1092+
sa.update(SystemSchema.workflow_status)
1093+
.values(executor_id=self.executor_id)
1094+
.where(
1095+
SystemSchema.workflow_status.c.workflow_uuid
1096+
== result["workflow_uuid"]
1097+
)
1098+
)
10721099
sql = sa.insert(SystemSchema.operation_outputs).values(
10731100
workflow_uuid=result["workflow_uuid"],
10741101
function_id=result["function_id"],

dbos/cli/migration.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ def migrate_dbos_databases(
2424
engine=None,
2525
schema=schema,
2626
serializer=DefaultSerializer(),
27+
executor_id=None,
2728
)
2829
sys_db.run_migrations()
2930
if app_database_url:

tests/test_dbos.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,40 @@ def noop() -> None:
117117
assert updated_at >= created_at
118118

119119

120+
def test_eid_reset(dbos: DBOS) -> None:
121+
@DBOS.step()
122+
def test_step() -> str:
123+
return "hello"
124+
125+
@DBOS.workflow()
126+
def test_workflow() -> str:
127+
DBOS.set_event("started", 1)
128+
DBOS.recv("run_step")
129+
return test_step()
130+
131+
wfuuid = str(uuid.uuid4())
132+
with SetWorkflowID(wfuuid):
133+
wfh = dbos.start_workflow(test_workflow)
134+
DBOS.get_event(wfuuid, "started")
135+
with dbos._sys_db.engine.connect() as c:
136+
c.execute(
137+
sa.update(SystemSchema.workflow_status)
138+
.values(executor_id="some_other_executor")
139+
.where(SystemSchema.workflow_status.c.workflow_uuid == wfuuid)
140+
)
141+
c.commit()
142+
DBOS.send(wfuuid, 1, "run_step")
143+
wfh.get_result()
144+
with dbos._sys_db.engine.connect() as c:
145+
x = c.execute(
146+
sa.select(SystemSchema.workflow_status.c.executor_id).where(
147+
SystemSchema.workflow_status.c.workflow_uuid == wfuuid
148+
)
149+
).fetchone()
150+
assert x is not None
151+
assert x[0] == "local"
152+
153+
120154
def test_child_workflow(dbos: DBOS) -> None:
121155
txn_counter: int = 0
122156
wf_counter: int = 0

tests/test_schema_migration.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,8 +135,9 @@ def test_sqlite_systemdb_migration() -> None:
135135
engine_kwargs={},
136136
engine=None,
137137
schema=None,
138-
debug_mode=False,
138+
executor_id=None,
139139
serializer=DefaultSerializer(),
140+
debug_mode=False,
140141
)
141142

142143
# Run migrations

0 commit comments

Comments
 (0)