Skip to content

Commit 6e7630c

Browse files
authored
feat(worker): return workflow completed result on decision processing (#54)
<!-- Describe what has changed in this PR --> **What changed?** * moved event loop, workflow task and dataconverter from `WorkflowEngine` inside of the `WorkflowInstance` (A similar implementation of [ReplayWorkflow in java](https://github.com/cadence-workflow/cadence-java-client/blob/e559e1d31a3b4674c8e7972afdd541e5f79f89ea/src/main/java/com/uber/cadence/internal/replay/ReplayWorkflow.java#L25) * removed decisionhandler dependency in WorkflowEngine * set workflow completion result decision if workflow has finished. <!-- Tell your future self why have you made these changes --> **Why?** Current processing doesn't return workflow completion result <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> **How did you test it?** WIP Unit Test <!-- Assuming the worst case, what can be broken when deploying this change to production? --> **Potential risks** <!-- Is it notable for release? e.g. schema updates, configuration or data migration required? If so, please mention it, and also update CHANGELOG.md --> **Release notes** <!-- Is there any documentation updates should be made for config, https://cadenceworkflow.io/docs/operation-guide/setup/ ? If so, please open an PR in https://github.com/cadence-workflow/cadence-docs --> **Documentation Changes** --------- Signed-off-by: Shijie Sheng <[email protected]>
1 parent 72cb7c7 commit 6e7630c

12 files changed

+272
-457
lines changed

cadence/_internal/workflow/context.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1+
from contextlib import contextmanager
12
from datetime import timedelta
23
from math import ceil
3-
from typing import Optional, Any, Unpack, Type, cast
4+
from typing import Iterator, Optional, Any, Unpack, Type, cast
45

56
from cadence._internal.workflow.statemachine.decision_manager import DecisionManager
67
from cadence._internal.workflow.decisions_helper import DecisionsHelper
@@ -15,13 +16,12 @@ class Context(WorkflowContext):
1516
def __init__(
1617
self,
1718
info: WorkflowInfo,
18-
decision_helper: DecisionsHelper,
1919
decision_manager: DecisionManager,
2020
):
2121
self._info = info
2222
self._replay_mode = True
2323
self._replay_current_time_milliseconds: Optional[int] = None
24-
self._decision_helper = decision_helper
24+
self._decision_helper = DecisionsHelper()
2525
self._decision_manager = decision_manager
2626

2727
def info(self) -> WorkflowInfo:
@@ -110,6 +110,12 @@ def get_replay_current_time_milliseconds(self) -> Optional[int]:
110110
"""Get the current replay time in milliseconds."""
111111
return self._replay_current_time_milliseconds
112112

113+
@contextmanager
114+
def _activate(self) -> Iterator["Context"]:
115+
token = WorkflowContext._var.set(self)
116+
yield self
117+
WorkflowContext._var.reset(token)
118+
113119

114120
def _round_to_nearest_second(delta: timedelta) -> timedelta:
115121
return timedelta(seconds=ceil(delta.total_seconds()))

cadence/_internal/workflow/decision_events_iterator.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111

1212
from cadence._internal.workflow.history_event_iterator import HistoryEventsIterator
1313
from cadence.api.v1.history_pb2 import HistoryEvent
14-
from cadence.api.v1.service_worker_pb2 import PollForDecisionTaskResponse
1514

1615

1716
@dataclass
@@ -44,10 +43,8 @@ class DecisionEventsIterator(Iterator[DecisionEvents]):
4443

4544
def __init__(
4645
self,
47-
decision_task: PollForDecisionTaskResponse,
4846
events: List[HistoryEvent],
4947
):
50-
self._decision_task = decision_task
5148
self._events: HistoryEventsIterator = HistoryEventsIterator(events)
5249
self._next_decision_event_id: Optional[int] = None
5350
self._replay_current_time_milliseconds: Optional[int] = None

cadence/_internal/workflow/workflow_engine.py

Lines changed: 107 additions & 162 deletions
Large diffs are not rendered by default.
Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,38 @@
1+
from asyncio import Task
2+
from typing import Any, Optional
3+
from cadence._internal.workflow.deterministic_event_loop import DeterministicEventLoop
4+
from cadence.api.v1.common_pb2 import Payload
5+
from cadence.data_converter import DataConverter
16
from cadence.workflow import WorkflowDefinition
27

38

49
class WorkflowInstance:
5-
def __init__(self, workflow_definition: WorkflowDefinition):
10+
def __init__(
11+
self, workflow_definition: WorkflowDefinition, data_converter: DataConverter
12+
):
613
self._definition = workflow_definition
7-
self._instance = workflow_definition.cls().__init__()
14+
self._data_converter = data_converter
15+
self._instance = workflow_definition.cls() # construct a new workflow object
16+
self._loop = DeterministicEventLoop()
17+
self._task: Optional[Task] = None
818

9-
async def run(self, *args):
10-
run_method = self._definition.get_run_method(self._instance)
11-
return run_method(*args)
19+
def start(self, input: Payload):
20+
if self._task is None:
21+
run_method = self._definition.get_run_method(self._instance)
22+
# TODO handle multiple inputs
23+
workflow_input = self._data_converter.from_data(input, [Any])
24+
self._task = self._loop.create_task(run_method(*workflow_input))
25+
26+
def run_once(self):
27+
self._loop.run_until_yield()
28+
29+
def is_done(self) -> bool:
30+
return self._task is not None and self._task.done()
31+
32+
# TODO: consider cache result to avoid multiple data conversions
33+
def get_result(self) -> Payload:
34+
if self._task is None:
35+
raise RuntimeError("Workflow is not started yet")
36+
result = self._task.result()
37+
# TODO: handle result with multiple outputs
38+
return self._data_converter.to_data([result])

cadence/worker/_decision_task_handler.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,6 @@ async def _handle_task_implementation(
121121
workflow_run_id=run_id,
122122
workflow_task_list=self.task_list,
123123
data_converter=self._client.data_converter,
124-
workflow_events=workflow_events,
125124
)
126125

127126
# Use thread-safe cache to get or create workflow engine
@@ -136,7 +135,7 @@ async def _handle_task_implementation(
136135
self._workflow_engines[cache_key] = workflow_engine
137136

138137
decision_result = await asyncio.get_running_loop().run_in_executor(
139-
self._executor, workflow_engine.process_decision, task
138+
self._executor, workflow_engine.process_decision, workflow_events
140139
)
141140

142141
# Clean up completed workflows from cache to prevent memory leaks

cadence/workflow.py

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
from datetime import timedelta
66
from typing import (
77
Callable,
8-
List,
98
cast,
109
Optional,
1110
Union,
@@ -15,10 +14,10 @@
1514
Type,
1615
Unpack,
1716
Any,
17+
Generic,
1818
)
1919
import inspect
2020

21-
from cadence.api.v1.history_pb2 import HistoryEvent
2221
from cadence.data_converter import DataConverter
2322

2423
ResultType = TypeVar("ResultType")
@@ -44,6 +43,7 @@ async def execute_activity(
4443

4544

4645
T = TypeVar("T", bound=Callable[..., Any])
46+
C = TypeVar("C")
4747

4848

4949
class WorkflowDefinitionOptions(TypedDict, total=False):
@@ -52,16 +52,16 @@ class WorkflowDefinitionOptions(TypedDict, total=False):
5252
name: str
5353

5454

55-
class WorkflowDefinition:
55+
class WorkflowDefinition(Generic[C]):
5656
"""
5757
Definition of a workflow class with metadata.
5858
5959
Similar to ActivityDefinition but for workflow classes.
6060
Provides type safety and metadata for workflow classes.
6161
"""
6262

63-
def __init__(self, cls: Type, name: str, run_method_name: str):
64-
self._cls = cls
63+
def __init__(self, cls: Type[C], name: str, run_method_name: str):
64+
self._cls: Type[C] = cls
6565
self._name = name
6666
self._run_method_name = run_method_name
6767

@@ -71,7 +71,7 @@ def name(self) -> str:
7171
return self._name
7272

7373
@property
74-
def cls(self) -> Type:
74+
def cls(self) -> Type[C]:
7575
"""Get the workflow class."""
7676
return self._cls
7777

@@ -151,7 +151,7 @@ def decorator(f: T) -> T:
151151
raise ValueError(f"Workflow run method '{f.__name__}' must be async")
152152

153153
# Attach metadata to the function
154-
f._workflow_run = True # type: ignore
154+
setattr(f, "_workflow_run", None)
155155
return f
156156

157157
# Support both @workflow.run and @workflow.run()
@@ -163,14 +163,13 @@ def decorator(f: T) -> T:
163163
return decorator(func)
164164

165165

166-
@dataclass
166+
@dataclass(frozen=True)
167167
class WorkflowInfo:
168168
workflow_type: str
169169
workflow_domain: str
170170
workflow_id: str
171171
workflow_run_id: str
172172
workflow_task_list: str
173-
workflow_events: List[HistoryEvent]
174173
data_converter: DataConverter
175174

176175

@@ -193,9 +192,9 @@ async def execute_activity(
193192
) -> ResultType: ...
194193

195194
@contextmanager
196-
def _activate(self) -> Iterator[None]:
195+
def _activate(self) -> Iterator["WorkflowContext"]:
197196
token = WorkflowContext._var.set(self)
198-
yield None
197+
yield self
199198
WorkflowContext._var.reset(token)
200199

201200
@staticmethod

tests/cadence/_internal/workflow/test_decision_events_iterator.py

Lines changed: 3 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,14 @@
33
Tests for Decision Events Iterator.
44
"""
55

6-
import pytest
76
from typing import List
7+
import pytest
88

9-
from cadence.api.v1.history_pb2 import HistoryEvent, History
10-
from cadence.api.v1.service_worker_pb2 import PollForDecisionTaskResponse
11-
from cadence.api.v1.common_pb2 import WorkflowExecution
129

1310
from cadence._internal.workflow.decision_events_iterator import (
1411
DecisionEventsIterator,
1512
)
13+
from cadence.api.v1.history_pb2 import HistoryEvent
1614

1715

1816
class TestDecisionEventsIterator:
@@ -95,8 +93,7 @@ class TestDecisionEventsIterator:
9593
)
9694
def test_successful_cases(self, name, event_types, expected):
9795
events = create_mock_history_event(event_types)
98-
decision_task = create_mock_decision_task(events)
99-
iterator = DecisionEventsIterator(decision_task, events)
96+
iterator = DecisionEventsIterator(events)
10097

10198
batches = [decision_events for decision_events in iterator]
10299
assert len(expected) == len(batches)
@@ -166,26 +163,3 @@ def create_mock_history_event(event_types: List[str]) -> List[HistoryEvent]:
166163
events.append(event)
167164

168165
return events
169-
170-
171-
def create_mock_decision_task(
172-
events: List[HistoryEvent], next_page_token: bytes = None
173-
) -> PollForDecisionTaskResponse:
174-
"""Create a mock decision task for testing."""
175-
task = PollForDecisionTaskResponse()
176-
177-
# Mock history
178-
history = History()
179-
history.events.extend(events)
180-
task.history.CopyFrom(history)
181-
182-
# Mock workflow execution
183-
workflow_execution = WorkflowExecution()
184-
workflow_execution.workflow_id = "test-workflow"
185-
workflow_execution.run_id = "test-run"
186-
task.workflow_execution.CopyFrom(workflow_execution)
187-
188-
if next_page_token:
189-
task.next_page_token = next_page_token
190-
191-
return task
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
#!/usr/bin/env python3
2+
from typing import List
3+
import pytest
4+
from cadence.api.v1.common_pb2 import Payload
5+
from cadence.api.v1.history_pb2 import (
6+
DecisionTaskCompletedEventAttributes,
7+
DecisionTaskScheduledEventAttributes,
8+
DecisionTaskStartedEventAttributes,
9+
HistoryEvent,
10+
WorkflowExecutionCompletedEventAttributes,
11+
WorkflowExecutionStartedEventAttributes,
12+
)
13+
from cadence._internal.workflow.workflow_engine import WorkflowEngine
14+
from cadence import workflow
15+
from cadence.data_converter import DefaultDataConverter
16+
from cadence.workflow import WorkflowInfo, WorkflowDefinition, WorkflowDefinitionOptions
17+
18+
19+
class TestWorkflow:
20+
@workflow.run
21+
async def echo(self, input_data):
22+
return f"echo: {input_data}"
23+
24+
25+
class TestWorkflowEngine:
26+
"""Unit tests for WorkflowEngine."""
27+
28+
@pytest.fixture
29+
def echo_workflow_definition(self) -> WorkflowDefinition:
30+
"""Create a mock workflow definition."""
31+
workflow_opts = WorkflowDefinitionOptions(name="test_workflow")
32+
return WorkflowDefinition.wrap(TestWorkflow, workflow_opts)
33+
34+
@pytest.fixture
35+
def simple_workflow_events(self) -> List[HistoryEvent]:
36+
return [
37+
HistoryEvent(
38+
event_id=1,
39+
workflow_execution_started_event_attributes=WorkflowExecutionStartedEventAttributes(
40+
input=Payload(data=b'"test-input"')
41+
),
42+
),
43+
HistoryEvent(
44+
event_id=2,
45+
decision_task_scheduled_event_attributes=DecisionTaskScheduledEventAttributes(),
46+
),
47+
HistoryEvent(
48+
event_id=3,
49+
decision_task_started_event_attributes=DecisionTaskStartedEventAttributes(
50+
scheduled_event_id=2
51+
),
52+
),
53+
HistoryEvent(
54+
event_id=4,
55+
decision_task_completed_event_attributes=DecisionTaskCompletedEventAttributes(
56+
scheduled_event_id=2,
57+
),
58+
),
59+
HistoryEvent(
60+
event_id=5,
61+
workflow_execution_completed_event_attributes=WorkflowExecutionCompletedEventAttributes(
62+
result=Payload(data=b'"echo: test-input"')
63+
),
64+
),
65+
]
66+
67+
def test_process_simple_workflow(
68+
self,
69+
echo_workflow_definition: WorkflowDefinition,
70+
simple_workflow_events: List[HistoryEvent],
71+
):
72+
workflow_engine = create_workflow_engine(echo_workflow_definition)
73+
decision_result = workflow_engine.process_decision(simple_workflow_events[:3])
74+
assert len(decision_result.decisions) == 1
75+
assert decision_result.decisions[
76+
0
77+
].complete_workflow_execution_decision_attributes.result == Payload(
78+
data=b'"echo: test-input"'
79+
)
80+
81+
82+
def create_workflow_engine(workflow_definition: WorkflowDefinition) -> WorkflowEngine:
83+
"""Create workflow engine."""
84+
return WorkflowEngine(
85+
info=WorkflowInfo(
86+
workflow_type="test_workflow",
87+
workflow_domain="test-domain",
88+
workflow_id="test-workflow-id",
89+
workflow_run_id="test-run-id",
90+
workflow_task_list="test-task-list",
91+
data_converter=DefaultDataConverter(),
92+
),
93+
workflow_definition=workflow_definition,
94+
)

0 commit comments

Comments
 (0)