Skip to content

Commit dee7bb4

Browse files
authored
Set Event Multiple Times (#108)
You can now set the same event multiple times from a workflow with `set_event`. `get_event` will retrieve the latest value of the event.
1 parent a3be1bb commit dee7bb4

File tree

4 files changed

+36
-32
lines changed

4 files changed

+36
-32
lines changed

dbos/error.py

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ class DBOSErrorCode(Enum):
3232
InitializationError = 3
3333
WorkflowFunctionNotFound = 4
3434
NonExistentWorkflowError = 5
35-
DuplicateWorkflowEventError = 6
3635
MaxStepRetriesExceeded = 7
3736
NotAuthorized = 8
3837

@@ -87,16 +86,6 @@ def __init__(self, destination_id: str):
8786
)
8887

8988

90-
class DBOSDuplicateWorkflowEventError(DBOSException):
91-
"""Exception raised when a workflow attempts to set an event value more than once per key."""
92-
93-
def __init__(self, workflow_id: str, key: str):
94-
super().__init__(
95-
f"Workflow {workflow_id} has already emitted an event with key {key}",
96-
dbos_error_code=DBOSErrorCode.DuplicateWorkflowEventError.value,
97-
)
98-
99-
10089
class DBOSNotAuthorizedError(DBOSException):
10190
"""Exception raised by DBOS role-based security when the user is not authorized to access a function."""
10291

dbos/system_database.py

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import datetime
22
import os
3-
import select
43
import threading
54
import time
65
from enum import Enum
@@ -14,11 +13,7 @@
1413
from sqlalchemy.exc import DBAPIError
1514

1615
import dbos.utils as utils
17-
from dbos.error import (
18-
DBOSDuplicateWorkflowEventError,
19-
DBOSNonExistentWorkflowError,
20-
DBOSWorkflowConflictIDError,
21-
)
16+
from dbos.error import DBOSNonExistentWorkflowError, DBOSWorkflowConflictIDError
2217

2318
from .dbos_config import ConfigFile
2419
from .logger import dbos_logger
@@ -832,18 +827,18 @@ def set_event(
832827
if recorded_output is not None:
833828
return # Already sent before
834829

835-
try:
836-
c.execute(
837-
pg.insert(SystemSchema.workflow_events).values(
838-
workflow_uuid=workflow_uuid,
839-
key=key,
840-
value=utils.serialize(message),
841-
)
830+
c.execute(
831+
pg.insert(SystemSchema.workflow_events)
832+
.values(
833+
workflow_uuid=workflow_uuid,
834+
key=key,
835+
value=utils.serialize(message),
842836
)
843-
except DBAPIError as dbapi_error:
844-
if dbapi_error.orig.sqlstate == "23505": # type: ignore
845-
raise DBOSDuplicateWorkflowEventError(workflow_uuid, key)
846-
raise
837+
.on_conflict_do_update(
838+
index_elements=["workflow_uuid", "key"],
839+
set_={"value": utils.serialize(message)},
840+
)
841+
)
847842
output: OperationResultInternal = {
848843
"workflow_uuid": workflow_uuid,
849844
"function_id": function_id,

tests/test_dbos.py

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import datetime
2+
import threading
23
import time
34
import uuid
4-
from typing import Any, Optional
5+
from typing import Optional
56

67
import pytest
78
import sqlalchemy as sa
@@ -13,7 +14,6 @@
1314
from dbos.context import assert_current_dbos_context, get_local_dbos_context
1415
from dbos.error import DBOSMaxStepRetriesExceeded
1516
from dbos.system_database import GetWorkflowsInput
16-
from tests.conftest import default_config
1717

1818

1919
def test_simple_workflow(dbos: DBOS) -> None:
@@ -882,3 +882,23 @@ def test_getevent_workflow(
882882
with pytest.raises(Exception) as exc_info:
883883
dbos.set_event("key1", "value1")
884884
assert "set_event() must be called from within a workflow" in str(exc_info.value)
885+
886+
887+
def test_multi_set_event(dbos: DBOS) -> None:
888+
event = threading.Event()
889+
890+
wfid = str(uuid.uuid4())
891+
892+
@DBOS.workflow()
893+
def test_setevent_workflow() -> None:
894+
assert DBOS.workflow_id == wfid
895+
DBOS.set_event("key", "value1")
896+
event.wait()
897+
DBOS.set_event("key", "value2")
898+
899+
with SetWorkflowID(wfid):
900+
handle = DBOS.start_workflow(test_setevent_workflow)
901+
assert DBOS.get_event(wfid, "key") == "value1"
902+
event.set()
903+
assert handle.get_result() == None
904+
assert DBOS.get_event(wfid, "key") == "value2"

tests/test_fastapi_roles.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
# Private API because this is a unit test
2020
from dbos.context import assert_current_dbos_context
21-
from dbos.error import DBOSDuplicateWorkflowEventError, DBOSNotAuthorizedError
21+
from dbos.error import DBOSInitializationError, DBOSNotAuthorizedError
2222
from dbos.system_database import GetWorkflowsInput
2323
from dbos.tracer import dbos_tracer
2424

@@ -54,7 +54,7 @@ def test_dbos_error() -> None:
5454

5555
@app.get("/dbosinternalerror")
5656
def test_dbos_error_internal() -> None:
57-
raise DBOSDuplicateWorkflowEventError("nosuchwf", "test")
57+
raise DBOSInitializationError("oh no")
5858

5959
@app.get("/open/{var1}")
6060
@DBOS.required_roles([])

0 commit comments

Comments
 (0)