Skip to content

Commit b137e6a

Browse files
committed
Revert back to explicitly statically defined methods
1 parent ea17e88 commit b137e6a

File tree

2 files changed

+215
-74
lines changed

2 files changed

+215
-74
lines changed

temporalio/worker/_command_aware_visitor.py

Lines changed: 169 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -48,81 +48,180 @@ class CommandInfo:
4848
)
4949

5050

51-
def _create_override_method(
52-
parent_method: Any, command_type: CommandType.ValueType
53-
) -> Any:
54-
"""Create an override method that sets command context."""
55-
56-
async def override_method(self: Any, fs: VisitorFunctions, o: Any) -> None:
57-
with current_command(command_type, o.seq):
58-
await parent_method(self, fs, o)
59-
60-
return override_method
61-
62-
6351
class CommandAwarePayloadVisitor(PayloadVisitor):
6452
"""Payload visitor that sets command context during traversal.
6553
66-
Override methods are created at class definition time for all workflow
67-
commands and activation jobs that have a 'seq' field.
54+
Override methods are explicitly defined for all workflow commands and
55+
activation jobs that have a 'seq' field.
6856
"""
6957

70-
_COMMAND_TYPE_MAP: dict[type[Any], Optional[CommandType.ValueType]] = {
71-
# Commands
72-
ScheduleActivity: CommandType.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK,
73-
ScheduleLocalActivity: CommandType.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK,
74-
StartChildWorkflowExecution: CommandType.COMMAND_TYPE_START_CHILD_WORKFLOW_EXECUTION,
75-
SignalExternalWorkflowExecution: CommandType.COMMAND_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION,
76-
RequestCancelExternalWorkflowExecution: CommandType.COMMAND_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION,
77-
ScheduleNexusOperation: CommandType.COMMAND_TYPE_SCHEDULE_NEXUS_OPERATION,
78-
RequestCancelNexusOperation: CommandType.COMMAND_TYPE_REQUEST_CANCEL_NEXUS_OPERATION,
79-
StartTimer: CommandType.COMMAND_TYPE_START_TIMER,
80-
CancelTimer: CommandType.COMMAND_TYPE_CANCEL_TIMER,
81-
RequestCancelActivity: CommandType.COMMAND_TYPE_REQUEST_CANCEL_ACTIVITY_TASK,
82-
RequestCancelLocalActivity: CommandType.COMMAND_TYPE_REQUEST_CANCEL_ACTIVITY_TASK,
83-
CancelSignalWorkflow: None,
84-
# Workflow activation jobs
85-
ResolveActivity: CommandType.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK,
86-
ResolveChildWorkflowExecutionStart: CommandType.COMMAND_TYPE_START_CHILD_WORKFLOW_EXECUTION,
87-
ResolveChildWorkflowExecution: CommandType.COMMAND_TYPE_START_CHILD_WORKFLOW_EXECUTION,
88-
ResolveSignalExternalWorkflow: CommandType.COMMAND_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION,
89-
ResolveRequestCancelExternalWorkflow: CommandType.COMMAND_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION,
90-
ResolveNexusOperationStart: CommandType.COMMAND_TYPE_SCHEDULE_NEXUS_OPERATION,
91-
ResolveNexusOperation: CommandType.COMMAND_TYPE_SCHEDULE_NEXUS_OPERATION,
92-
FireTimer: CommandType.COMMAND_TYPE_START_TIMER,
93-
}
94-
95-
96-
# Add override methods to CommandAwarePayloadVisitor at class definition time
97-
def _add_class_overrides() -> None:
98-
"""Add override methods to CommandAwarePayloadVisitor class."""
99-
# Process workflow commands
100-
for proto_class in _get_workflow_command_protos_with_seq():
101-
if command_type := CommandAwarePayloadVisitor._COMMAND_TYPE_MAP.get(
102-
proto_class
58+
# Workflow commands
59+
async def _visit_coresdk_workflow_commands_ScheduleActivity(
60+
self, fs: VisitorFunctions, o: ScheduleActivity
61+
) -> None:
62+
with current_command(CommandType.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK, o.seq):
63+
await super()._visit_coresdk_workflow_commands_ScheduleActivity(fs, o)
64+
65+
async def _visit_coresdk_workflow_commands_ScheduleLocalActivity(
66+
self, fs: VisitorFunctions, o: ScheduleLocalActivity
67+
) -> None:
68+
with current_command(CommandType.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK, o.seq):
69+
await super()._visit_coresdk_workflow_commands_ScheduleLocalActivity(fs, o)
70+
71+
async def _visit_coresdk_workflow_commands_StartChildWorkflowExecution(
72+
self, fs: VisitorFunctions, o: StartChildWorkflowExecution
73+
) -> None:
74+
with current_command(
75+
CommandType.COMMAND_TYPE_START_CHILD_WORKFLOW_EXECUTION, o.seq
76+
):
77+
await super()._visit_coresdk_workflow_commands_StartChildWorkflowExecution(
78+
fs, o
79+
)
80+
81+
async def _visit_coresdk_workflow_commands_SignalExternalWorkflowExecution(
82+
self, fs: VisitorFunctions, o: SignalExternalWorkflowExecution
83+
) -> None:
84+
with current_command(
85+
CommandType.COMMAND_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION, o.seq
86+
):
87+
await super()._visit_coresdk_workflow_commands_SignalExternalWorkflowExecution(
88+
fs, o
89+
)
90+
91+
async def _visit_coresdk_workflow_commands_RequestCancelExternalWorkflowExecution(
92+
self, fs: VisitorFunctions, o: RequestCancelExternalWorkflowExecution
93+
) -> None:
94+
with current_command(
95+
CommandType.COMMAND_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION, o.seq
96+
):
97+
# Note: Base class doesn't have this visitor (no payloads to visit)
98+
pass
99+
100+
async def _visit_coresdk_workflow_commands_ScheduleNexusOperation(
101+
self, fs: VisitorFunctions, o: ScheduleNexusOperation
102+
) -> None:
103+
with current_command(CommandType.COMMAND_TYPE_SCHEDULE_NEXUS_OPERATION, o.seq):
104+
await super()._visit_coresdk_workflow_commands_ScheduleNexusOperation(fs, o)
105+
106+
async def _visit_coresdk_workflow_commands_RequestCancelNexusOperation(
107+
self, fs: VisitorFunctions, o: RequestCancelNexusOperation
108+
) -> None:
109+
with current_command(
110+
CommandType.COMMAND_TYPE_REQUEST_CANCEL_NEXUS_OPERATION, o.seq
103111
):
104-
method_name = f"_visit_coresdk_workflow_commands_{proto_class.__name__}"
105-
parent_method = getattr(PayloadVisitor, method_name, None)
106-
if parent_method:
107-
setattr(
108-
CommandAwarePayloadVisitor,
109-
method_name,
110-
_create_override_method(parent_method, command_type),
111-
)
112-
113-
# Process activation jobs
114-
for proto_class in _get_workflow_activation_job_protos_with_seq():
115-
if command_type := CommandAwarePayloadVisitor._COMMAND_TYPE_MAP.get(
116-
proto_class
112+
# Note: Base class doesn't have this visitor (no payloads to visit)
113+
pass
114+
115+
async def _visit_coresdk_workflow_commands_StartTimer(
116+
self, fs: VisitorFunctions, o: StartTimer
117+
) -> None:
118+
with current_command(CommandType.COMMAND_TYPE_START_TIMER, o.seq):
119+
# Note: Base class doesn't have this visitor (no payloads to visit)
120+
pass
121+
122+
async def _visit_coresdk_workflow_commands_CancelTimer(
123+
self, fs: VisitorFunctions, o: CancelTimer
124+
) -> None:
125+
with current_command(CommandType.COMMAND_TYPE_CANCEL_TIMER, o.seq):
126+
# Note: Base class doesn't have this visitor (no payloads to visit)
127+
pass
128+
129+
async def _visit_coresdk_workflow_commands_RequestCancelActivity(
130+
self, fs: VisitorFunctions, o: RequestCancelActivity
131+
) -> None:
132+
with current_command(
133+
CommandType.COMMAND_TYPE_REQUEST_CANCEL_ACTIVITY_TASK, o.seq
117134
):
118-
method_name = f"_visit_coresdk_workflow_activation_{proto_class.__name__}"
119-
parent_method = getattr(PayloadVisitor, method_name, None)
120-
if parent_method:
121-
setattr(
122-
CommandAwarePayloadVisitor,
123-
method_name,
124-
_create_override_method(parent_method, command_type),
125-
)
135+
# Note: Base class doesn't have this visitor (no payloads to visit)
136+
pass
137+
138+
async def _visit_coresdk_workflow_commands_RequestCancelLocalActivity(
139+
self, fs: VisitorFunctions, o: RequestCancelLocalActivity
140+
) -> None:
141+
with current_command(
142+
CommandType.COMMAND_TYPE_REQUEST_CANCEL_ACTIVITY_TASK, o.seq
143+
):
144+
# Note: Base class doesn't have this visitor (no payloads to visit)
145+
pass
146+
147+
async def _visit_coresdk_workflow_commands_CancelSignalWorkflow(
148+
self, fs: VisitorFunctions, o: CancelSignalWorkflow
149+
) -> None:
150+
# CancelSignalWorkflow has seq but no server command type
151+
# (it's an internal SDK command). Set context to None.
152+
with current_command(None, o.seq): # type: ignore
153+
# Note: Base class doesn't have this visitor (no payloads to visit)
154+
pass
155+
156+
# Workflow activation jobs
157+
async def _visit_coresdk_workflow_activation_ResolveActivity(
158+
self, fs: VisitorFunctions, o: ResolveActivity
159+
) -> None:
160+
with current_command(CommandType.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK, o.seq):
161+
await super()._visit_coresdk_workflow_activation_ResolveActivity(fs, o)
162+
163+
async def _visit_coresdk_workflow_activation_ResolveChildWorkflowExecutionStart(
164+
self, fs: VisitorFunctions, o: ResolveChildWorkflowExecutionStart
165+
) -> None:
166+
with current_command(
167+
CommandType.COMMAND_TYPE_START_CHILD_WORKFLOW_EXECUTION, o.seq
168+
):
169+
await super()._visit_coresdk_workflow_activation_ResolveChildWorkflowExecutionStart(
170+
fs, o
171+
)
172+
173+
async def _visit_coresdk_workflow_activation_ResolveChildWorkflowExecution(
174+
self, fs: VisitorFunctions, o: ResolveChildWorkflowExecution
175+
) -> None:
176+
with current_command(
177+
CommandType.COMMAND_TYPE_START_CHILD_WORKFLOW_EXECUTION, o.seq
178+
):
179+
await super()._visit_coresdk_workflow_activation_ResolveChildWorkflowExecution(
180+
fs, o
181+
)
182+
183+
async def _visit_coresdk_workflow_activation_ResolveSignalExternalWorkflow(
184+
self, fs: VisitorFunctions, o: ResolveSignalExternalWorkflow
185+
) -> None:
186+
with current_command(
187+
CommandType.COMMAND_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION, o.seq
188+
):
189+
await super()._visit_coresdk_workflow_activation_ResolveSignalExternalWorkflow(
190+
fs, o
191+
)
192+
193+
async def _visit_coresdk_workflow_activation_ResolveRequestCancelExternalWorkflow(
194+
self, fs: VisitorFunctions, o: ResolveRequestCancelExternalWorkflow
195+
) -> None:
196+
with current_command(
197+
CommandType.COMMAND_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION, o.seq
198+
):
199+
await super()._visit_coresdk_workflow_activation_ResolveRequestCancelExternalWorkflow(
200+
fs, o
201+
)
202+
203+
async def _visit_coresdk_workflow_activation_ResolveNexusOperationStart(
204+
self, fs: VisitorFunctions, o: ResolveNexusOperationStart
205+
) -> None:
206+
with current_command(CommandType.COMMAND_TYPE_SCHEDULE_NEXUS_OPERATION, o.seq):
207+
await super()._visit_coresdk_workflow_activation_ResolveNexusOperationStart(
208+
fs, o
209+
)
210+
211+
async def _visit_coresdk_workflow_activation_ResolveNexusOperation(
212+
self, fs: VisitorFunctions, o: ResolveNexusOperation
213+
) -> None:
214+
with current_command(CommandType.COMMAND_TYPE_SCHEDULE_NEXUS_OPERATION, o.seq):
215+
await super()._visit_coresdk_workflow_activation_ResolveNexusOperation(
216+
fs, o
217+
)
218+
219+
async def _visit_coresdk_workflow_activation_FireTimer(
220+
self, fs: VisitorFunctions, o: FireTimer
221+
) -> None:
222+
with current_command(CommandType.COMMAND_TYPE_START_TIMER, o.seq):
223+
# Note: Base class doesn't have this visitor (no payloads to visit)
224+
pass
126225

127226

128227
def _get_workflow_command_protos_with_seq() -> Iterator[Type[Any]]:
@@ -141,18 +240,14 @@ def _get_workflow_activation_job_protos_with_seq() -> Iterator[Type[Any]]:
141240

142241
@contextmanager
143242
def current_command(
144-
command_type: CommandType.ValueType, command_seq: int
243+
command_type: Optional[CommandType.ValueType], command_seq: int
145244
) -> Iterator[None]:
146245
"""Context manager for setting command info."""
147246
token = current_command_info.set(
148-
CommandInfo(command_type=command_type, command_seq=command_seq)
247+
CommandInfo(command_type=command_type, command_seq=command_seq) # type: ignore
149248
)
150249
try:
151250
yield
152251
finally:
153252
if token:
154253
current_command_info.reset(token)
155-
156-
157-
# Create all override methods on the class when the module is imported
158-
_add_class_overrides()
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
"""Test that CommandAwarePayloadVisitor handles all commands with seq fields."""
2+
3+
from temporalio.worker._command_aware_visitor import (
4+
CommandAwarePayloadVisitor,
5+
_get_workflow_activation_job_protos_with_seq,
6+
_get_workflow_command_protos_with_seq,
7+
)
8+
9+
10+
def test_command_aware_visitor_has_methods_for_all_seq_protos():
11+
"""Verify CommandAwarePayloadVisitor has methods for all protos with seq fields."""
12+
visitor = CommandAwarePayloadVisitor()
13+
14+
# Check all workflow commands with seq have corresponding methods
15+
16+
command_protos = list(_get_workflow_command_protos_with_seq())
17+
job_protos = list(_get_workflow_activation_job_protos_with_seq())
18+
assert command_protos, "Should find workflow commands with seq"
19+
assert job_protos, "Should find workflow activation jobs with seq"
20+
21+
commands_missing = []
22+
for proto_class in command_protos:
23+
method_name = f"_visit_coresdk_workflow_commands_{proto_class.__name__}"
24+
if not hasattr(visitor, method_name):
25+
commands_missing.append(proto_class.__name__)
26+
27+
# Check all workflow activation jobs with seq have corresponding methods
28+
jobs_missing = []
29+
for proto_class in job_protos:
30+
method_name = f"_visit_coresdk_workflow_activation_{proto_class.__name__}"
31+
if not hasattr(visitor, method_name):
32+
jobs_missing.append(proto_class.__name__)
33+
34+
errors = []
35+
if commands_missing:
36+
errors.append(
37+
f"Missing visitor methods for commands with seq: {commands_missing}\n"
38+
f"Add methods to CommandAwarePayloadVisitor for these commands."
39+
)
40+
if jobs_missing:
41+
errors.append(
42+
f"Missing visitor methods for activation jobs with seq: {jobs_missing}\n"
43+
f"Add methods to CommandAwarePayloadVisitor for these jobs."
44+
)
45+
46+
assert not errors, "\n".join(errors)

0 commit comments

Comments
 (0)