Skip to content

Commit 2de92dc

Browse files
qianl15kraftp
andauthored
Fix Stream Recovery (#461) (#463)
Co-authored-by: Peter Kraft <[email protected]>
1 parent 17e859d commit 2de92dc

File tree

2 files changed

+21
-8
lines changed

2 files changed

+21
-8
lines changed

dbos/_sys_db.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1901,8 +1901,13 @@ def write_stream_from_workflow(
19011901
)
19021902
if self._debug_mode and recorded_output is None:
19031903
raise Exception(
1904-
"called set_event in debug mode without a previous execution"
1904+
"called writeStream in debug mode without a previous execution"
19051905
)
1906+
if recorded_output is not None:
1907+
dbos_logger.debug(
1908+
f"Replaying writeStream, id: {function_id}, key: {key}"
1909+
)
1910+
return
19061911
# Find the maximum offset for this workflow_uuid and key combination
19071912
max_offset_result = c.execute(
19081913
sa.select(sa.func.max(SystemSchema.streams.c.offset)).where(

tests/test_streaming.py

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -231,16 +231,19 @@ def test_stream_error_cases(dbos: DBOS) -> None:
231231
def test_stream_workflow_recovery(dbos: DBOS) -> None:
232232
"""Test that stream operations are properly recovered during workflow replay."""
233233

234-
call_count = 0
234+
workflow_call_count = 0
235+
step_call_count = 0
235236

236237
@DBOS.step()
237238
def counting_step() -> int:
238-
nonlocal call_count
239-
call_count += 1
240-
return call_count
239+
nonlocal step_call_count
240+
step_call_count += 1
241+
return step_call_count
241242

242243
@DBOS.workflow()
243244
def recovery_test_workflow() -> None:
245+
nonlocal workflow_call_count
246+
workflow_call_count += 1
244247
count1 = counting_step()
245248
DBOS.write_stream("recovery_stream", f"step_{count1}")
246249

@@ -254,13 +257,18 @@ def recovery_test_workflow() -> None:
254257
with SetWorkflowID(wfid):
255258
recovery_test_workflow()
256259

260+
# Validate stream contents
261+
values = list(DBOS.read_stream(wfid, "recovery_stream"))
262+
assert values == ["step_1", "step_2"]
263+
257264
# Reset call count and run the same workflow ID again (should replay)
258-
call_count = 0
265+
dbos._sys_db.update_workflow_outcome(wfid, "PENDING")
259266
with SetWorkflowID(wfid):
260267
recovery_test_workflow()
261268

262-
# The counting step should not have been called again (replayed from recorded results)
263-
assert call_count == 0
269+
# The workflow should have been called again
270+
assert workflow_call_count == 2
271+
assert step_call_count == 2
264272

265273
# Stream should still be readable and contain the same values
266274
values = list(DBOS.read_stream(wfid, "recovery_stream"))

0 commit comments

Comments
 (0)