Skip to content

Commit db5a67f

Browse files
committed
Refactor: don't use default PayloadVisitor to set command context
1 parent a50f436 commit db5a67f

File tree

9 files changed

+197
-275
lines changed

9 files changed

+197
-275
lines changed

scripts/gen_payload_visitor.py

Lines changed: 4 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -62,55 +62,6 @@ def emit_singular(
6262
await self._visit_{child_method}(fs, {access_expr})"""
6363

6464

65-
def emit_singular_with_seq(
66-
field_name: str, access_expr: str, child_method: str, presence_word: str
67-
) -> str:
68-
# Helper to emit a singular field visit that sets the seq contextvar, with presence check but
69-
# without headers guard since this is used for commands only.
70-
71-
# Map field names to command types
72-
field_to_command_type = {
73-
# Commands
74-
"schedule_activity": "COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK",
75-
"schedule_local_activity": "COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK",
76-
"start_child_workflow_execution": "COMMAND_TYPE_START_CHILD_WORKFLOW_EXECUTION",
77-
"signal_external_workflow_execution": "COMMAND_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION",
78-
"schedule_nexus_operation": "COMMAND_TYPE_SCHEDULE_NEXUS_OPERATION",
79-
"request_cancel_activity": "COMMAND_TYPE_REQUEST_CANCEL_ACTIVITY_TASK",
80-
"request_cancel_local_activity": "COMMAND_TYPE_REQUEST_CANCEL_ACTIVITY_TASK",
81-
"request_cancel_external_workflow_execution": "COMMAND_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION",
82-
"request_cancel_nexus_operation": "COMMAND_TYPE_REQUEST_CANCEL_NEXUS_OPERATION",
83-
"cancel_timer": "COMMAND_TYPE_CANCEL_TIMER",
84-
"cancel_signal_workflow": "COMMAND_TYPE_CANCEL_SIGNAL_WORKFLOW",
85-
"start_timer": "COMMAND_TYPE_START_TIMER",
86-
# Resolutions (use the corresponding command type)
87-
"resolve_activity": "COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK",
88-
"resolve_child_workflow_execution_start": "COMMAND_TYPE_START_CHILD_WORKFLOW_EXECUTION",
89-
"resolve_child_workflow_execution": "COMMAND_TYPE_START_CHILD_WORKFLOW_EXECUTION",
90-
"resolve_signal_external_workflow": "COMMAND_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION",
91-
"resolve_request_cancel_external_workflow": "COMMAND_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION",
92-
"resolve_nexus_operation_start": "COMMAND_TYPE_SCHEDULE_NEXUS_OPERATION",
93-
"resolve_nexus_operation": "COMMAND_TYPE_SCHEDULE_NEXUS_OPERATION",
94-
}
95-
96-
command_type = field_to_command_type.get(field_name)
97-
if not command_type:
98-
raise ValueError(f"Unknown field with seq: {field_name}")
99-
100-
return f"""\
101-
{presence_word} o.HasField("{field_name}"):
102-
token = current_command_info.set(
103-
CommandInfo(
104-
command_type=temporalio.api.enums.v1.command_type_pb2.CommandType.{command_type},
105-
command_seq={access_expr}.seq,
106-
)
107-
)
108-
try:
109-
await self._visit_{child_method}(fs, {access_expr})
110-
finally:
111-
current_command_info.reset(token)"""
112-
113-
11465
class VisitorGenerator:
11566
def generate(self, roots: list[Descriptor]) -> str:
11667
"""
@@ -129,26 +80,11 @@ def generate(self, roots: list[Descriptor]) -> str:
12980
header = """
13081
# This file is generated by gen_payload_visitor.py. Changes should be made there.
13182
import abc
132-
import contextvars
133-
from dataclasses import dataclass
134-
from typing import Any, MutableSequence, Optional
83+
from typing import Any, MutableSequence
13584
136-
import temporalio.api.enums.v1.command_type_pb2
13785
from temporalio.api.common.v1.message_pb2 import Payload
13886
13987
140-
@dataclass(frozen=True)
141-
class CommandInfo:
142-
\"\"\"Information identifying a specific command instance.\"\"\"
143-
144-
command_type: temporalio.api.enums.v1.command_type_pb2.CommandType
145-
command_seq: int
146-
147-
148-
current_command_info: contextvars.ContextVar[Optional[CommandInfo]] = (
149-
contextvars.ContextVar("current_command_info", default=None)
150-
)
151-
15288
class VisitorFunctions(abc.ABC):
15389
\"\"\"Set of functions which can be called by the visitor.
15490
Allows handling payloads as a sequence.
@@ -318,29 +254,6 @@ def walk(self, desc: Descriptor) -> bool:
318254
)
319255
)
320256

321-
commands_with_seq = {
322-
"cancel_signal_workflow",
323-
"cancel_timer",
324-
"request_cancel_activity",
325-
"request_cancel_external_workflow_execution",
326-
"request_cancel_local_activity",
327-
"request_cancel_nexus_operation",
328-
"schedule_activity",
329-
"schedule_local_activity",
330-
"schedule_nexus_operation",
331-
"signal_external_workflow_execution",
332-
"start_child_workflow_execution",
333-
"start_timer",
334-
}
335-
activation_jobs_with_seq = {
336-
"resolve_activity",
337-
"resolve_child_workflow_execution_start",
338-
"resolve_child_workflow_execution",
339-
"resolve_nexus_operation_start",
340-
"resolve_nexus_operation",
341-
"resolve_request_cancel_external_workflow",
342-
"resolve_signal_external_workflow",
343-
}
344257
# Process oneof fields as if/elif chains
345258
for oneof_idx, fields in oneof_fields.items():
346259
oneof_lines = []
@@ -352,25 +265,9 @@ def walk(self, desc: Descriptor) -> bool:
352265
if child_has_payload:
353266
if_word = "if" if first else "elif"
354267
first = False
355-
if (
356-
desc.full_name == "coresdk.workflow_commands.WorkflowCommand"
357-
and field.name in commands_with_seq
358-
):
359-
line = emit_singular_with_seq(
360-
field.name, f"o.{field.name}", name_for(child_desc), if_word
361-
)
362-
elif (
363-
desc.full_name
364-
== "coresdk.workflow_activation.WorkflowActivationJob"
365-
and field.name in activation_jobs_with_seq
366-
):
367-
line = emit_singular_with_seq(
368-
field.name, f"o.{field.name}", name_for(child_desc), if_word
369-
)
370-
else:
371-
line = emit_singular(
372-
field.name, f"o.{field.name}", name_for(child_desc), if_word
373-
)
268+
line = emit_singular(
269+
field.name, f"o.{field.name}", name_for(child_desc), if_word
270+
)
374271
oneof_lines.append(line)
375272
if oneof_lines:
376273
lines.extend(oneof_lines)

temporalio/bridge/_visitor.py

Lines changed: 25 additions & 151 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,10 @@
11
# This file is generated by gen_payload_visitor.py. Changes should be made there.
22
import abc
3-
import contextvars
4-
from dataclasses import dataclass
5-
from typing import Any, MutableSequence, Optional
3+
from typing import Any, MutableSequence
64

7-
import temporalio.api.enums.v1.command_type_pb2
85
from temporalio.api.common.v1.message_pb2 import Payload
96

107

11-
@dataclass(frozen=True)
12-
class CommandInfo:
13-
"""Information identifying a specific command instance."""
14-
15-
command_type: temporalio.api.enums.v1.command_type_pb2.CommandType.ValueType
16-
command_seq: int
17-
18-
19-
current_command_info: contextvars.ContextVar[Optional[CommandInfo]] = (
20-
contextvars.ContextVar("current_command_info", default=None)
21-
)
22-
23-
248
class VisitorFunctions(abc.ABC):
259
"""Set of functions which can be called by the visitor.
2610
Allows handling payloads as a sequence.
@@ -263,100 +247,35 @@ async def _visit_coresdk_workflow_activation_WorkflowActivationJob(self, fs, o):
263247
fs, o.signal_workflow
264248
)
265249
elif o.HasField("resolve_activity"):
266-
token = current_command_info.set(
267-
CommandInfo(
268-
command_type=temporalio.api.enums.v1.command_type_pb2.CommandType.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK,
269-
command_seq=o.resolve_activity.seq,
270-
)
250+
await self._visit_coresdk_workflow_activation_ResolveActivity(
251+
fs, o.resolve_activity
271252
)
272-
try:
273-
await self._visit_coresdk_workflow_activation_ResolveActivity(
274-
fs, o.resolve_activity
275-
)
276-
finally:
277-
current_command_info.reset(token)
278253
elif o.HasField("resolve_child_workflow_execution_start"):
279-
token = current_command_info.set(
280-
CommandInfo(
281-
command_type=temporalio.api.enums.v1.command_type_pb2.CommandType.COMMAND_TYPE_START_CHILD_WORKFLOW_EXECUTION,
282-
command_seq=o.resolve_child_workflow_execution_start.seq,
283-
)
254+
await self._visit_coresdk_workflow_activation_ResolveChildWorkflowExecutionStart(
255+
fs, o.resolve_child_workflow_execution_start
284256
)
285-
try:
286-
await self._visit_coresdk_workflow_activation_ResolveChildWorkflowExecutionStart(
287-
fs, o.resolve_child_workflow_execution_start
288-
)
289-
finally:
290-
current_command_info.reset(token)
291257
elif o.HasField("resolve_child_workflow_execution"):
292-
token = current_command_info.set(
293-
CommandInfo(
294-
command_type=temporalio.api.enums.v1.command_type_pb2.CommandType.COMMAND_TYPE_START_CHILD_WORKFLOW_EXECUTION,
295-
command_seq=o.resolve_child_workflow_execution.seq,
296-
)
258+
await self._visit_coresdk_workflow_activation_ResolveChildWorkflowExecution(
259+
fs, o.resolve_child_workflow_execution
297260
)
298-
try:
299-
await self._visit_coresdk_workflow_activation_ResolveChildWorkflowExecution(
300-
fs, o.resolve_child_workflow_execution
301-
)
302-
finally:
303-
current_command_info.reset(token)
304261
elif o.HasField("resolve_signal_external_workflow"):
305-
token = current_command_info.set(
306-
CommandInfo(
307-
command_type=temporalio.api.enums.v1.command_type_pb2.CommandType.COMMAND_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION,
308-
command_seq=o.resolve_signal_external_workflow.seq,
309-
)
262+
await self._visit_coresdk_workflow_activation_ResolveSignalExternalWorkflow(
263+
fs, o.resolve_signal_external_workflow
310264
)
311-
try:
312-
await self._visit_coresdk_workflow_activation_ResolveSignalExternalWorkflow(
313-
fs, o.resolve_signal_external_workflow
314-
)
315-
finally:
316-
current_command_info.reset(token)
317265
elif o.HasField("resolve_request_cancel_external_workflow"):
318-
token = current_command_info.set(
319-
CommandInfo(
320-
command_type=temporalio.api.enums.v1.command_type_pb2.CommandType.COMMAND_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION,
321-
command_seq=o.resolve_request_cancel_external_workflow.seq,
322-
)
266+
await self._visit_coresdk_workflow_activation_ResolveRequestCancelExternalWorkflow(
267+
fs, o.resolve_request_cancel_external_workflow
323268
)
324-
try:
325-
await self._visit_coresdk_workflow_activation_ResolveRequestCancelExternalWorkflow(
326-
fs, o.resolve_request_cancel_external_workflow
327-
)
328-
finally:
329-
current_command_info.reset(token)
330269
elif o.HasField("do_update"):
331270
await self._visit_coresdk_workflow_activation_DoUpdate(fs, o.do_update)
332271
elif o.HasField("resolve_nexus_operation_start"):
333-
token = current_command_info.set(
334-
CommandInfo(
335-
command_type=temporalio.api.enums.v1.command_type_pb2.CommandType.COMMAND_TYPE_SCHEDULE_NEXUS_OPERATION,
336-
command_seq=o.resolve_nexus_operation_start.seq,
337-
)
272+
await self._visit_coresdk_workflow_activation_ResolveNexusOperationStart(
273+
fs, o.resolve_nexus_operation_start
338274
)
339-
try:
340-
await (
341-
self._visit_coresdk_workflow_activation_ResolveNexusOperationStart(
342-
fs, o.resolve_nexus_operation_start
343-
)
344-
)
345-
finally:
346-
current_command_info.reset(token)
347275
elif o.HasField("resolve_nexus_operation"):
348-
token = current_command_info.set(
349-
CommandInfo(
350-
command_type=temporalio.api.enums.v1.command_type_pb2.CommandType.COMMAND_TYPE_SCHEDULE_NEXUS_OPERATION,
351-
command_seq=o.resolve_nexus_operation.seq,
352-
)
276+
await self._visit_coresdk_workflow_activation_ResolveNexusOperation(
277+
fs, o.resolve_nexus_operation
353278
)
354-
try:
355-
await self._visit_coresdk_workflow_activation_ResolveNexusOperation(
356-
fs, o.resolve_nexus_operation
357-
)
358-
finally:
359-
current_command_info.reset(token)
360279

361280
async def _visit_coresdk_workflow_activation_WorkflowActivation(self, fs, o):
362281
for v in o.jobs:
@@ -452,18 +371,9 @@ async def _visit_coresdk_workflow_commands_WorkflowCommand(self, fs, o):
452371
if o.HasField("user_metadata"):
453372
await self._visit_temporal_api_sdk_v1_UserMetadata(fs, o.user_metadata)
454373
if o.HasField("schedule_activity"):
455-
token = current_command_info.set(
456-
CommandInfo(
457-
command_type=temporalio.api.enums.v1.command_type_pb2.CommandType.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK,
458-
command_seq=o.schedule_activity.seq,
459-
)
374+
await self._visit_coresdk_workflow_commands_ScheduleActivity(
375+
fs, o.schedule_activity
460376
)
461-
try:
462-
await self._visit_coresdk_workflow_commands_ScheduleActivity(
463-
fs, o.schedule_activity
464-
)
465-
finally:
466-
current_command_info.reset(token)
467377
elif o.HasField("respond_to_query"):
468378
await self._visit_coresdk_workflow_commands_QueryResult(
469379
fs, o.respond_to_query
@@ -481,44 +391,17 @@ async def _visit_coresdk_workflow_commands_WorkflowCommand(self, fs, o):
481391
fs, o.continue_as_new_workflow_execution
482392
)
483393
elif o.HasField("start_child_workflow_execution"):
484-
token = current_command_info.set(
485-
CommandInfo(
486-
command_type=temporalio.api.enums.v1.command_type_pb2.CommandType.COMMAND_TYPE_START_CHILD_WORKFLOW_EXECUTION,
487-
command_seq=o.start_child_workflow_execution.seq,
488-
)
394+
await self._visit_coresdk_workflow_commands_StartChildWorkflowExecution(
395+
fs, o.start_child_workflow_execution
489396
)
490-
try:
491-
await self._visit_coresdk_workflow_commands_StartChildWorkflowExecution(
492-
fs, o.start_child_workflow_execution
493-
)
494-
finally:
495-
current_command_info.reset(token)
496397
elif o.HasField("signal_external_workflow_execution"):
497-
token = current_command_info.set(
498-
CommandInfo(
499-
command_type=temporalio.api.enums.v1.command_type_pb2.CommandType.COMMAND_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION,
500-
command_seq=o.signal_external_workflow_execution.seq,
501-
)
398+
await self._visit_coresdk_workflow_commands_SignalExternalWorkflowExecution(
399+
fs, o.signal_external_workflow_execution
502400
)
503-
try:
504-
await self._visit_coresdk_workflow_commands_SignalExternalWorkflowExecution(
505-
fs, o.signal_external_workflow_execution
506-
)
507-
finally:
508-
current_command_info.reset(token)
509401
elif o.HasField("schedule_local_activity"):
510-
token = current_command_info.set(
511-
CommandInfo(
512-
command_type=temporalio.api.enums.v1.command_type_pb2.CommandType.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK,
513-
command_seq=o.schedule_local_activity.seq,
514-
)
402+
await self._visit_coresdk_workflow_commands_ScheduleLocalActivity(
403+
fs, o.schedule_local_activity
515404
)
516-
try:
517-
await self._visit_coresdk_workflow_commands_ScheduleLocalActivity(
518-
fs, o.schedule_local_activity
519-
)
520-
finally:
521-
current_command_info.reset(token)
522405
elif o.HasField("upsert_workflow_search_attributes"):
523406
await self._visit_coresdk_workflow_commands_UpsertWorkflowSearchAttributes(
524407
fs, o.upsert_workflow_search_attributes
@@ -532,18 +415,9 @@ async def _visit_coresdk_workflow_commands_WorkflowCommand(self, fs, o):
532415
fs, o.update_response
533416
)
534417
elif o.HasField("schedule_nexus_operation"):
535-
token = current_command_info.set(
536-
CommandInfo(
537-
command_type=temporalio.api.enums.v1.command_type_pb2.CommandType.COMMAND_TYPE_SCHEDULE_NEXUS_OPERATION,
538-
command_seq=o.schedule_nexus_operation.seq,
539-
)
418+
await self._visit_coresdk_workflow_commands_ScheduleNexusOperation(
419+
fs, o.schedule_nexus_operation
540420
)
541-
try:
542-
await self._visit_coresdk_workflow_commands_ScheduleNexusOperation(
543-
fs, o.schedule_nexus_operation
544-
)
545-
finally:
546-
current_command_info.reset(token)
547421

548422
async def _visit_coresdk_workflow_completion_Success(self, fs, o):
549423
for v in o.commands:

0 commit comments

Comments
 (0)