Skip to content

Commit 9aabef8

Browse files
authored
DBOS.get_events (#477)
Adds new methods that retrieve all the published events for a given workflow. ```python def get_all_events(workflow_id: str) -> Dict[str, Any]: async def get_all_events_async(workflow_id: str) -> Dict[str, Any]: ``` Addresses #476
1 parent dde9129 commit 9aabef8

File tree

4 files changed

+144
-1
lines changed

4 files changed

+144
-1
lines changed

dbos/_dbos.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
AsyncGenerator,
1717
Callable,
1818
Coroutine,
19+
Dict,
1920
Generator,
2021
Generic,
2122
List,
@@ -974,6 +975,33 @@ async def get_event_async(
974975
lambda: DBOS.get_event(workflow_id, key, timeout_seconds)
975976
)
976977

978+
@classmethod
979+
def get_all_events(cls, workflow_id: str) -> Dict[str, Any]:
980+
"""
981+
Get all events currently present for a workflow ID.
982+
Args:
983+
workflow_id: The workflow ID for which to get events
984+
Returns:
985+
A dictionary mapping event keys to their deserialized values
986+
"""
987+
988+
def fn() -> Dict[str, Any]:
989+
return _get_dbos_instance()._sys_db.get_all_events(workflow_id)
990+
991+
return _get_dbos_instance()._sys_db.call_function_as_step(fn, "DBOS.get_events")
992+
993+
@classmethod
994+
async def get_all_events_async(cls, workflow_id: str) -> Dict[str, Any]:
995+
"""
996+
Get all events currently present for a workflow ID.
997+
Args:
998+
workflow_id: The workflow ID for which to get events
999+
Returns:
1000+
A dictionary mapping event keys to their deserialized values
1001+
"""
1002+
await cls._configure_asyncio_thread_pool()
1003+
return await asyncio.to_thread(cls.get_all_events, workflow_id)
1004+
9771005
@classmethod
9781006
def _execute_workflow_id(cls, workflow_id: str) -> WorkflowHandle[Any]:
9791007
"""Execute a workflow by ID (for recovery)."""
@@ -1225,7 +1253,7 @@ def fn() -> List[StepInfo]:
12251253
async def list_workflow_steps_async(cls, workflow_id: str) -> List[StepInfo]:
12261254
await cls._configure_asyncio_thread_pool()
12271255
return await asyncio.to_thread(cls.list_workflow_steps, workflow_id)
1228-
1256+
12291257
@classproperty
12301258
def application_version(cls) -> str:
12311259
return GlobalParams.app_version

dbos/_sys_db.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1567,6 +1567,32 @@ def set_event(
15671567
}
15681568
self._record_operation_result_txn(output, conn=c)
15691569

1570+
def get_all_events(self, workflow_id: str) -> Dict[str, Any]:
1571+
"""
1572+
Get all events currently present for a workflow ID.
1573+
1574+
Args:
1575+
workflow_id: The workflow UUID to get events for
1576+
1577+
Returns:
1578+
A dictionary mapping event keys to their deserialized values
1579+
"""
1580+
with self.engine.begin() as c:
1581+
rows = c.execute(
1582+
sa.select(
1583+
SystemSchema.workflow_events.c.key,
1584+
SystemSchema.workflow_events.c.value,
1585+
).where(SystemSchema.workflow_events.c.workflow_uuid == workflow_id)
1586+
).fetchall()
1587+
1588+
events: Dict[str, Any] = {}
1589+
for row in rows:
1590+
key = row[0]
1591+
value = _serialization.deserialize(row[1])
1592+
events[key] = value
1593+
1594+
return events
1595+
15701596
@db_retry()
15711597
def get_event(
15721598
self,

tests/test_async.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -606,3 +606,43 @@ async def run_workflow_task() -> str:
606606
# Verify the workflow completes despite the task cancellation
607607
handle: WorkflowHandleAsync[str] = await DBOS.retrieve_workflow_async(wfid)
608608
assert await handle.get_result() == "completed"
609+
610+
611+
@pytest.mark.asyncio
612+
async def test_get_events_async(dbos: DBOS) -> None:
613+
"""Test the async version of get_events function that retrieves all events for a workflow."""
614+
615+
@DBOS.workflow()
616+
async def async_events_workflow() -> str:
617+
# Set multiple events using async methods
618+
await DBOS.set_event_async("event1", "value1")
619+
await DBOS.set_event_async("event2", {"nested": "data", "count": 42})
620+
await DBOS.set_event_async("event3", [1, 2, 3, 4, 5])
621+
return "completed"
622+
623+
# Execute the workflow
624+
handle = await DBOS.start_workflow_async(async_events_workflow)
625+
result = await handle.get_result()
626+
assert result == "completed"
627+
628+
# Get all events for the workflow using async method
629+
events = await DBOS.get_all_events_async(handle.workflow_id)
630+
631+
# Verify all events are present with correct values
632+
assert len(events) == 3
633+
assert events["event1"] == "value1"
634+
assert events["event2"] == {"nested": "data", "count": 42}
635+
assert events["event3"] == [1, 2, 3, 4, 5]
636+
637+
# Test with a workflow that has no events
638+
@DBOS.workflow()
639+
async def no_events_workflow() -> str:
640+
await DBOS.sleep_async(0.01)
641+
return "no events"
642+
643+
handle2 = await DBOS.start_workflow_async(no_events_workflow)
644+
await handle2.get_result()
645+
646+
# Should return empty dict for workflow with no events
647+
events2 = await DBOS.get_all_events_async(handle2.workflow_id)
648+
assert events2 == {}

tests/test_dbos.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1890,3 +1890,52 @@ def recv_workflow() -> Any:
18901890
steps = client.list_workflow_steps(handle.workflow_id)
18911891
assert len(steps) == 3
18921892
assert "setEvent" in steps[0]["function_name"]
1893+
1894+
1895+
def test_get_events(dbos: DBOS) -> None:
1896+
1897+
@DBOS.workflow()
1898+
def events_workflow() -> str:
1899+
# Set multiple events
1900+
DBOS.set_event("event1", "value1")
1901+
DBOS.set_event("event2", {"nested": "data", "count": 42})
1902+
DBOS.set_event("event3", [1, 2, 3, 4, 5])
1903+
return "completed"
1904+
1905+
# Execute the workflow
1906+
handle = DBOS.start_workflow(events_workflow)
1907+
result = handle.get_result()
1908+
assert result == "completed"
1909+
1910+
# Get events, verify they are present with correct values
1911+
def get_events() -> None:
1912+
events = DBOS.get_all_events(handle.workflow_id)
1913+
1914+
assert len(events) == 3
1915+
assert events["event1"] == "value1"
1916+
assert events["event2"] == {"nested": "data", "count": 42}
1917+
assert events["event3"] == [1, 2, 3, 4, 5]
1918+
1919+
# Verify it works
1920+
get_events()
1921+
1922+
# Run it as a workflow, verify it still works
1923+
get_events_workflow = DBOS.workflow()(get_events)
1924+
wfid = str(uuid.uuid4())
1925+
with SetWorkflowID(wfid):
1926+
get_events_workflow()
1927+
steps = DBOS.list_workflow_steps(wfid)
1928+
assert len(steps) == 1
1929+
assert steps[0]["function_name"] == "DBOS.get_events"
1930+
1931+
# Test with a workflow that has no events
1932+
@DBOS.workflow()
1933+
def no_events_workflow() -> str:
1934+
return "no events"
1935+
1936+
handle2 = DBOS.start_workflow(no_events_workflow)
1937+
handle2.get_result()
1938+
1939+
# Should return empty dict for workflow with no events
1940+
events2 = DBOS.get_all_events(handle2.workflow_id)
1941+
assert events2 == {}

0 commit comments

Comments
 (0)