Skip to content

Commit 3b385cb

Browse files
committed
respond to comments
Signed-off-by: Tim Li <[email protected]>
1 parent 073dc34 commit 3b385cb

File tree

7 files changed

+704
-49
lines changed

7 files changed

+704
-49
lines changed
Lines changed: 395 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,395 @@
1+
"""
2+
DecisionsHelper manages the next decision ID which is used for tracking decision state machines.
3+
4+
This helper ensures that decision IDs are properly assigned and tracked to maintain
5+
consistency in the workflow execution state.
6+
"""
7+
8+
import logging
9+
from dataclasses import dataclass
10+
from typing import Dict, Optional
11+
12+
from cadence._internal.decision_state_machine import DecisionId, DecisionType
13+
from cadence.api.v1.history_pb2 import HistoryEvent
14+
15+
logger = logging.getLogger(__name__)
16+
17+
18+
@dataclass
19+
class DecisionTracker:
20+
"""Tracks a decision with its ID and current state."""
21+
22+
decision_id: DecisionId
23+
scheduled_event_id: Optional[int] = None
24+
initiated_event_id: Optional[int] = None
25+
started_event_id: Optional[int] = None
26+
is_completed: bool = False
27+
28+
29+
class DecisionsHelper:
30+
"""
31+
Helper class to manage decision IDs and track decision state across workflow execution.
32+
33+
This class ensures that each decision gets a unique ID and tracks the lifecycle
34+
of decisions through the workflow execution.
35+
"""
36+
37+
def __init__(self):
38+
"""Initialize the DecisionsHelper."""
39+
self._next_decision_counters: Dict[DecisionType, int] = {}
40+
self._tracked_decisions: Dict[str, DecisionTracker] = {}
41+
self._decision_id_to_key: Dict[str, str] = {}
42+
logger.debug("DecisionsHelper initialized")
43+
44+
def _get_next_counter(self, decision_type: DecisionType) -> int:
45+
"""
46+
Get the next counter value for a given decision type.
47+
48+
Args:
49+
decision_type: The type of decision
50+
51+
Returns:
52+
The next counter value
53+
"""
54+
if decision_type not in self._next_decision_counters:
55+
self._next_decision_counters[decision_type] = 1
56+
else:
57+
self._next_decision_counters[decision_type] += 1
58+
59+
return self._next_decision_counters[decision_type]
60+
61+
def generate_activity_id(self, activity_name: str) -> str:
62+
"""
63+
Generate a unique activity ID.
64+
65+
Args:
66+
activity_name: The name of the activity
67+
68+
Returns:
69+
A unique activity ID
70+
"""
71+
counter = self._get_next_counter(DecisionType.ACTIVITY)
72+
activity_id = f"{activity_name}_{counter}"
73+
74+
# Track this decision
75+
decision_id = DecisionId(DecisionType.ACTIVITY, activity_id)
76+
tracker = DecisionTracker(decision_id)
77+
self._tracked_decisions[activity_id] = tracker
78+
self._decision_id_to_key[str(decision_id)] = activity_id
79+
80+
logger.debug(f"Generated activity ID: {activity_id}")
81+
return activity_id
82+
83+
def generate_timer_id(self, timer_name: str = "timer") -> str:
84+
"""
85+
Generate a unique timer ID.
86+
87+
Args:
88+
timer_name: The name/prefix for the timer
89+
90+
Returns:
91+
A unique timer ID
92+
"""
93+
counter = self._get_next_counter(DecisionType.TIMER)
94+
timer_id = f"{timer_name}_{counter}"
95+
96+
# Track this decision
97+
decision_id = DecisionId(DecisionType.TIMER, timer_id)
98+
tracker = DecisionTracker(decision_id)
99+
self._tracked_decisions[timer_id] = tracker
100+
self._decision_id_to_key[str(decision_id)] = timer_id
101+
102+
logger.debug(f"Generated timer ID: {timer_id}")
103+
return timer_id
104+
105+
def generate_child_workflow_id(self, workflow_name: str) -> str:
106+
"""
107+
Generate a unique child workflow ID.
108+
109+
Args:
110+
workflow_name: The name of the child workflow
111+
112+
Returns:
113+
A unique child workflow ID
114+
"""
115+
counter = self._get_next_counter(DecisionType.CHILD_WORKFLOW)
116+
workflow_id = f"{workflow_name}_{counter}"
117+
118+
# Track this decision
119+
decision_id = DecisionId(DecisionType.CHILD_WORKFLOW, workflow_id)
120+
tracker = DecisionTracker(decision_id)
121+
self._tracked_decisions[workflow_id] = tracker
122+
self._decision_id_to_key[str(decision_id)] = workflow_id
123+
124+
logger.debug(f"Generated child workflow ID: {workflow_id}")
125+
return workflow_id
126+
127+
def generate_marker_id(self, marker_name: str) -> str:
128+
"""
129+
Generate a unique marker ID.
130+
131+
Args:
132+
marker_name: The name of the marker
133+
134+
Returns:
135+
A unique marker ID
136+
"""
137+
counter = self._get_next_counter(DecisionType.MARKER)
138+
marker_id = f"{marker_name}_{counter}"
139+
140+
# Track this decision
141+
decision_id = DecisionId(DecisionType.MARKER, marker_id)
142+
tracker = DecisionTracker(decision_id)
143+
self._tracked_decisions[marker_id] = tracker
144+
self._decision_id_to_key[str(decision_id)] = marker_id
145+
146+
logger.debug(f"Generated marker ID: {marker_id}")
147+
return marker_id
148+
149+
def get_decision_tracker(self, decision_key: str) -> Optional[DecisionTracker]:
150+
"""
151+
Get the decision tracker for a given decision key.
152+
153+
Args:
154+
decision_key: The decision key (activity_id, timer_id, etc.)
155+
156+
Returns:
157+
The DecisionTracker if found, None otherwise
158+
"""
159+
return self._tracked_decisions.get(decision_key)
160+
161+
def update_decision_scheduled(
162+
self, decision_key: str, scheduled_event_id: int
163+
) -> None:
164+
"""
165+
Update a decision tracker when it gets scheduled.
166+
167+
Args:
168+
decision_key: The decision key
169+
scheduled_event_id: The event ID when the decision was scheduled
170+
"""
171+
tracker = self._tracked_decisions.get(decision_key)
172+
if tracker:
173+
tracker.scheduled_event_id = scheduled_event_id
174+
logger.debug(
175+
f"Updated decision {decision_key} with scheduled event ID {scheduled_event_id}"
176+
)
177+
else:
178+
logger.warning(f"No tracker found for decision key: {decision_key}")
179+
180+
def update_decision_initiated(
181+
self, decision_key: str, initiated_event_id: int
182+
) -> None:
183+
"""
184+
Update a decision tracker when it gets initiated.
185+
186+
Args:
187+
decision_key: The decision key
188+
initiated_event_id: The event ID when the decision was initiated
189+
"""
190+
tracker = self._tracked_decisions.get(decision_key)
191+
if tracker:
192+
tracker.initiated_event_id = initiated_event_id
193+
logger.debug(
194+
f"Updated decision {decision_key} with initiated event ID {initiated_event_id}"
195+
)
196+
else:
197+
logger.warning(f"No tracker found for decision key: {decision_key}")
198+
199+
def update_decision_started(self, decision_key: str, started_event_id: int) -> None:
200+
"""
201+
Update a decision tracker when it gets started.
202+
203+
Args:
204+
decision_key: The decision key
205+
started_event_id: The event ID when the decision was started
206+
"""
207+
tracker = self._tracked_decisions.get(decision_key)
208+
if tracker:
209+
tracker.started_event_id = started_event_id
210+
logger.debug(
211+
f"Updated decision {decision_key} with started event ID {started_event_id}"
212+
)
213+
else:
214+
logger.warning(f"No tracker found for decision key: {decision_key}")
215+
216+
def update_decision_completed(self, decision_key: str) -> None:
217+
"""
218+
Mark a decision as completed.
219+
220+
Args:
221+
decision_key: The decision key
222+
"""
223+
tracker = self._tracked_decisions.get(decision_key)
224+
if tracker:
225+
tracker.is_completed = True
226+
logger.debug(f"Marked decision {decision_key} as completed")
227+
else:
228+
logger.warning(f"No tracker found for decision key: {decision_key}")
229+
230+
def process_history_event(self, event: HistoryEvent) -> None:
231+
"""
232+
Process a history event and update decision trackers accordingly.
233+
234+
Args:
235+
event: The history event to process
236+
"""
237+
attr = event.WhichOneof("attributes")
238+
if not attr:
239+
return
240+
241+
# Handle activity events
242+
if attr == "activity_task_scheduled_event_attributes":
243+
attrs = event.activity_task_scheduled_event_attributes
244+
if hasattr(attrs, "activity_id"):
245+
self.update_decision_scheduled(attrs.activity_id, event.event_id)
246+
247+
elif attr == "activity_task_started_event_attributes":
248+
attrs = event.activity_task_started_event_attributes
249+
if hasattr(attrs, "scheduled_event_id"):
250+
# Find the decision by scheduled event ID
251+
decision_key = self._find_decision_by_scheduled_event_id(
252+
attrs.scheduled_event_id
253+
)
254+
if decision_key:
255+
self.update_decision_started(decision_key, event.event_id)
256+
257+
elif attr in [
258+
"activity_task_completed_event_attributes",
259+
"activity_task_failed_event_attributes",
260+
"activity_task_timed_out_event_attributes",
261+
]:
262+
attrs = getattr(event, attr)
263+
if hasattr(attrs, "scheduled_event_id"):
264+
# Find the decision by scheduled event ID
265+
decision_key = self._find_decision_by_scheduled_event_id(
266+
attrs.scheduled_event_id
267+
)
268+
if decision_key:
269+
self.update_decision_completed(decision_key)
270+
271+
# Handle timer events
272+
elif attr == "timer_started_event_attributes":
273+
attrs = event.timer_started_event_attributes
274+
if hasattr(attrs, "timer_id"):
275+
self.update_decision_initiated(attrs.timer_id, event.event_id)
276+
277+
elif attr == "timer_fired_event_attributes":
278+
attrs = event.timer_fired_event_attributes
279+
if hasattr(attrs, "started_event_id"):
280+
# Find the decision by started event ID
281+
decision_key = self._find_decision_by_started_event_id(
282+
attrs.started_event_id
283+
)
284+
if decision_key:
285+
self.update_decision_completed(decision_key)
286+
287+
# Handle child workflow events
288+
elif attr == "start_child_workflow_execution_initiated_event_attributes":
289+
attrs = event.start_child_workflow_execution_initiated_event_attributes
290+
if hasattr(attrs, "workflow_id"):
291+
self.update_decision_initiated(attrs.workflow_id, event.event_id)
292+
293+
elif attr == "child_workflow_execution_started_event_attributes":
294+
attrs = event.child_workflow_execution_started_event_attributes
295+
if hasattr(attrs, "initiated_event_id"):
296+
# Find the decision by initiated event ID
297+
decision_key = self._find_decision_by_initiated_event_id(
298+
attrs.initiated_event_id
299+
)
300+
if decision_key:
301+
self.update_decision_started(decision_key, event.event_id)
302+
303+
elif attr in [
304+
"child_workflow_execution_completed_event_attributes",
305+
"child_workflow_execution_failed_event_attributes",
306+
"child_workflow_execution_timed_out_event_attributes",
307+
]:
308+
attrs = getattr(event, attr)
309+
if hasattr(attrs, "initiated_event_id"):
310+
# Find the decision by initiated event ID
311+
decision_key = self._find_decision_by_initiated_event_id(
312+
attrs.initiated_event_id
313+
)
314+
if decision_key:
315+
self.update_decision_completed(decision_key)
316+
317+
def _find_decision_by_scheduled_event_id(
318+
self, scheduled_event_id: int
319+
) -> Optional[str]:
320+
"""Find a decision key by its scheduled event ID."""
321+
for key, tracker in self._tracked_decisions.items():
322+
if tracker.scheduled_event_id == scheduled_event_id:
323+
return key
324+
return None
325+
326+
def _find_decision_by_initiated_event_id(
327+
self, initiated_event_id: int
328+
) -> Optional[str]:
329+
"""Find a decision key by its initiated event ID."""
330+
for key, tracker in self._tracked_decisions.items():
331+
if tracker.initiated_event_id == initiated_event_id:
332+
return key
333+
return None
334+
335+
def _find_decision_by_started_event_id(
336+
self, started_event_id: int
337+
) -> Optional[str]:
338+
"""Find a decision key by its started event ID."""
339+
for key, tracker in self._tracked_decisions.items():
340+
if tracker.started_event_id == started_event_id:
341+
return key
342+
return None
343+
344+
def get_pending_decisions_count(self) -> int:
345+
"""
346+
Get the count of decisions that are not yet completed.
347+
348+
Returns:
349+
The number of pending decisions
350+
"""
351+
return sum(
352+
1
353+
for tracker in self._tracked_decisions.values()
354+
if not tracker.is_completed
355+
)
356+
357+
def get_completed_decisions_count(self) -> int:
358+
"""
359+
Get the count of decisions that have been completed.
360+
361+
Returns:
362+
The number of completed decisions
363+
"""
364+
return sum(
365+
1 for tracker in self._tracked_decisions.values() if tracker.is_completed
366+
)
367+
368+
def reset(self) -> None:
369+
"""Reset all decision tracking state."""
370+
self._next_decision_counters.clear()
371+
self._tracked_decisions.clear()
372+
self._decision_id_to_key.clear()
373+
logger.debug("DecisionsHelper reset")
374+
375+
def get_stats(self) -> Dict[str, int]:
376+
"""
377+
Get statistics about tracked decisions.
378+
379+
Returns:
380+
Dictionary with decision statistics
381+
"""
382+
stats = {
383+
"total_decisions": len(self._tracked_decisions),
384+
"pending_decisions": self.get_pending_decisions_count(),
385+
"completed_decisions": self.get_completed_decisions_count(),
386+
}
387+
388+
# Add per-type counts
389+
for decision_type in DecisionType:
390+
type_name = decision_type.name.lower()
391+
stats[f"{type_name}_count"] = self._next_decision_counters.get(
392+
decision_type, 0
393+
)
394+
395+
return stats

0 commit comments

Comments
 (0)