Skip to content

Commit a8432b6

Browse files
committed
integrate with event iterator
Signed-off-by: Tim Li <[email protected]>
1 parent 4dbed67 commit a8432b6

File tree

6 files changed

+391
-407
lines changed

6 files changed

+391
-407
lines changed

cadence/_internal/workflow/context.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from typing import Optional
12
from cadence.client import Client
23
from cadence.workflow import WorkflowContext, WorkflowInfo
34

@@ -7,9 +8,27 @@ class Context(WorkflowContext):
78
def __init__(self, client: Client, info: WorkflowInfo):
89
self._client = client
910
self._info = info
11+
self._replay_mode = True
12+
self._replay_current_time_milliseconds: Optional[int] = None
1013

1114
def info(self) -> WorkflowInfo:
1215
return self._info
1316

1417
def client(self) -> Client:
1518
return self._client
19+
20+
def set_replay_mode(self, replay: bool) -> None:
21+
"""Set whether the workflow is currently in replay mode."""
22+
self._replay_mode = replay
23+
24+
def is_replay_mode(self) -> bool:
25+
"""Check if the workflow is currently in replay mode."""
26+
return self._replay_mode
27+
28+
def set_replay_current_time_milliseconds(self, time_millis: int) -> None:
29+
"""Set the current replay time in milliseconds."""
30+
self._replay_current_time_milliseconds = time_millis
31+
32+
def get_replay_current_time_milliseconds(self) -> Optional[int]:
33+
"""Get the current replay time in milliseconds."""
34+
return self._replay_current_time_milliseconds

cadence/_internal/workflow/decisions_helper.py

Lines changed: 13 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,7 @@
99
from dataclasses import dataclass
1010
from typing import Dict, Optional
1111

12-
from cadence._internal.decision_state_machine import DecisionId, DecisionType
13-
from cadence.api.v1.history_pb2 import HistoryEvent
12+
from cadence._internal.decision_state_machine import DecisionId, DecisionType, DecisionManager
1413

1514
logger = logging.getLogger(__name__)
1615

@@ -28,18 +27,24 @@ class DecisionTracker:
2827

2928
class DecisionsHelper:
3029
"""
31-
Helper class to manage decision IDs and track decision state across workflow execution.
30+
Helper class to manage decision IDs and work with DecisionManager state machines.
3231
33-
This class ensures that each decision gets a unique ID and tracks the lifecycle
34-
of decisions through the workflow execution.
32+
This class generates unique decision IDs and integrates with the DecisionManager
33+
state machines for proper decision lifecycle tracking.
3534
"""
3635

37-
def __init__(self):
38-
"""Initialize the DecisionsHelper."""
36+
def __init__(self, decision_manager: DecisionManager):
37+
"""
38+
Initialize the DecisionsHelper with a DecisionManager reference.
39+
40+
Args:
41+
decision_manager: The DecisionManager containing the state machines
42+
"""
3943
self._next_decision_counters: Dict[DecisionType, int] = {}
4044
self._tracked_decisions: Dict[str, DecisionTracker] = {}
4145
self._decision_id_to_key: Dict[str, str] = {}
42-
logger.debug("DecisionsHelper initialized")
46+
self._decision_manager = decision_manager
47+
logger.debug("DecisionsHelper initialized with DecisionManager integration")
4348

4449
def _get_next_counter(self, decision_type: DecisionType) -> int:
4550
"""
@@ -227,92 +232,6 @@ def update_decision_completed(self, decision_key: str) -> None:
227232
else:
228233
logger.warning(f"No tracker found for decision key: {decision_key}")
229234

230-
def process_history_event(self, event: HistoryEvent) -> None:
231-
"""
232-
Process a history event and update decision trackers accordingly.
233-
234-
Args:
235-
event: The history event to process
236-
"""
237-
attr = event.WhichOneof("attributes")
238-
if not attr:
239-
return
240-
241-
# Handle activity events
242-
if attr == "activity_task_scheduled_event_attributes":
243-
attrs = event.activity_task_scheduled_event_attributes
244-
if hasattr(attrs, "activity_id"):
245-
self.update_decision_scheduled(attrs.activity_id, event.event_id)
246-
247-
elif attr == "activity_task_started_event_attributes":
248-
attrs = event.activity_task_started_event_attributes
249-
if hasattr(attrs, "scheduled_event_id"):
250-
# Find the decision by scheduled event ID
251-
decision_key = self._find_decision_by_scheduled_event_id(
252-
attrs.scheduled_event_id
253-
)
254-
if decision_key:
255-
self.update_decision_started(decision_key, event.event_id)
256-
257-
elif attr in [
258-
"activity_task_completed_event_attributes",
259-
"activity_task_failed_event_attributes",
260-
"activity_task_timed_out_event_attributes",
261-
]:
262-
attrs = getattr(event, attr)
263-
if hasattr(attrs, "scheduled_event_id"):
264-
# Find the decision by scheduled event ID
265-
decision_key = self._find_decision_by_scheduled_event_id(
266-
attrs.scheduled_event_id
267-
)
268-
if decision_key:
269-
self.update_decision_completed(decision_key)
270-
271-
# Handle timer events
272-
elif attr == "timer_started_event_attributes":
273-
attrs = event.timer_started_event_attributes
274-
if hasattr(attrs, "timer_id"):
275-
self.update_decision_initiated(attrs.timer_id, event.event_id)
276-
277-
elif attr == "timer_fired_event_attributes":
278-
attrs = event.timer_fired_event_attributes
279-
if hasattr(attrs, "started_event_id"):
280-
# Find the decision by started event ID
281-
decision_key = self._find_decision_by_started_event_id(
282-
attrs.started_event_id
283-
)
284-
if decision_key:
285-
self.update_decision_completed(decision_key)
286-
287-
# Handle child workflow events
288-
elif attr == "start_child_workflow_execution_initiated_event_attributes":
289-
attrs = event.start_child_workflow_execution_initiated_event_attributes
290-
if hasattr(attrs, "workflow_id"):
291-
self.update_decision_initiated(attrs.workflow_id, event.event_id)
292-
293-
elif attr == "child_workflow_execution_started_event_attributes":
294-
attrs = event.child_workflow_execution_started_event_attributes
295-
if hasattr(attrs, "initiated_event_id"):
296-
# Find the decision by initiated event ID
297-
decision_key = self._find_decision_by_initiated_event_id(
298-
attrs.initiated_event_id
299-
)
300-
if decision_key:
301-
self.update_decision_started(decision_key, event.event_id)
302-
303-
elif attr in [
304-
"child_workflow_execution_completed_event_attributes",
305-
"child_workflow_execution_failed_event_attributes",
306-
"child_workflow_execution_timed_out_event_attributes",
307-
]:
308-
attrs = getattr(event, attr)
309-
if hasattr(attrs, "initiated_event_id"):
310-
# Find the decision by initiated event ID
311-
decision_key = self._find_decision_by_initiated_event_id(
312-
attrs.initiated_event_id
313-
)
314-
if decision_key:
315-
self.update_decision_completed(decision_key)
316235

317236
def _find_decision_by_scheduled_event_id(
318237
self, scheduled_event_id: int

0 commit comments

Comments
 (0)