Skip to content

Commit e129ceb

Browse files
committed
Refactor: don't use default PayloadVisitor to set command context
1 parent 9c9a3fe commit e129ceb

File tree

10 files changed

+192
-276
lines changed

10 files changed

+192
-276
lines changed

scripts/gen_payload_visitor.py

Lines changed: 4 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -67,55 +67,6 @@ def emit_singular(
6767
await self._visit_{child_method}(fs, {access_expr})"""
6868

6969

70-
def emit_singular_with_seq(
71-
field_name: str, access_expr: str, child_method: str, presence_word: str
72-
) -> str:
73-
# Helper to emit a singular field visit that sets the seq contextvar, with presence check but
74-
# without headers guard since this is used for commands only.
75-
76-
# Map field names to command types
77-
field_to_command_type = {
78-
# Commands
79-
"schedule_activity": "COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK",
80-
"schedule_local_activity": "COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK",
81-
"start_child_workflow_execution": "COMMAND_TYPE_START_CHILD_WORKFLOW_EXECUTION",
82-
"signal_external_workflow_execution": "COMMAND_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION",
83-
"schedule_nexus_operation": "COMMAND_TYPE_SCHEDULE_NEXUS_OPERATION",
84-
"request_cancel_activity": "COMMAND_TYPE_REQUEST_CANCEL_ACTIVITY_TASK",
85-
"request_cancel_local_activity": "COMMAND_TYPE_REQUEST_CANCEL_ACTIVITY_TASK",
86-
"request_cancel_external_workflow_execution": "COMMAND_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION",
87-
"request_cancel_nexus_operation": "COMMAND_TYPE_REQUEST_CANCEL_NEXUS_OPERATION",
88-
"cancel_timer": "COMMAND_TYPE_CANCEL_TIMER",
89-
"cancel_signal_workflow": "COMMAND_TYPE_CANCEL_SIGNAL_WORKFLOW",
90-
"start_timer": "COMMAND_TYPE_START_TIMER",
91-
# Resolutions (use the corresponding command type)
92-
"resolve_activity": "COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK",
93-
"resolve_child_workflow_execution_start": "COMMAND_TYPE_START_CHILD_WORKFLOW_EXECUTION",
94-
"resolve_child_workflow_execution": "COMMAND_TYPE_START_CHILD_WORKFLOW_EXECUTION",
95-
"resolve_signal_external_workflow": "COMMAND_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION",
96-
"resolve_request_cancel_external_workflow": "COMMAND_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION",
97-
"resolve_nexus_operation_start": "COMMAND_TYPE_SCHEDULE_NEXUS_OPERATION",
98-
"resolve_nexus_operation": "COMMAND_TYPE_SCHEDULE_NEXUS_OPERATION",
99-
}
100-
101-
command_type = field_to_command_type.get(field_name)
102-
if not command_type:
103-
raise ValueError(f"Unknown field with seq: {field_name}")
104-
105-
return f"""\
106-
{presence_word} o.HasField("{field_name}"):
107-
token = current_command_info.set(
108-
CommandInfo(
109-
command_type=temporalio.api.enums.v1.command_type_pb2.CommandType.{command_type},
110-
command_seq={access_expr}.seq,
111-
)
112-
)
113-
try:
114-
await self._visit_{child_method}(fs, {access_expr})
115-
finally:
116-
current_command_info.reset(token)"""
117-
118-
11970
class VisitorGenerator:
12071
def generate(self, roots: list[Descriptor]) -> str:
12172
"""
@@ -134,26 +85,11 @@ def generate(self, roots: list[Descriptor]) -> str:
13485
header = """
13586
# This file is generated by gen_payload_visitor.py. Changes should be made there.
13687
import abc
137-
import contextvars
138-
from dataclasses import dataclass
139-
from typing import Any, MutableSequence, Optional
88+
from typing import Any, MutableSequence
14089
141-
import temporalio.api.enums.v1.command_type_pb2
14290
from temporalio.api.common.v1.message_pb2 import Payload
14391
14492
145-
@dataclass(frozen=True)
146-
class CommandInfo:
147-
\"\"\"Information identifying a specific command instance.\"\"\"
148-
149-
command_type: temporalio.api.enums.v1.command_type_pb2.CommandType
150-
command_seq: int
151-
152-
153-
current_command_info: contextvars.ContextVar[Optional[CommandInfo]] = (
154-
contextvars.ContextVar("current_command_info", default=None)
155-
)
156-
15793
class VisitorFunctions(abc.ABC):
15894
\"\"\"Set of functions which can be called by the visitor.
15995
Allows handling payloads as a sequence.
@@ -323,29 +259,6 @@ def walk(self, desc: Descriptor) -> bool:
323259
)
324260
)
325261

326-
commands_with_seq = {
327-
"cancel_signal_workflow",
328-
"cancel_timer",
329-
"request_cancel_activity",
330-
"request_cancel_external_workflow_execution",
331-
"request_cancel_local_activity",
332-
"request_cancel_nexus_operation",
333-
"schedule_activity",
334-
"schedule_local_activity",
335-
"schedule_nexus_operation",
336-
"signal_external_workflow_execution",
337-
"start_child_workflow_execution",
338-
"start_timer",
339-
}
340-
activation_jobs_with_seq = {
341-
"resolve_activity",
342-
"resolve_child_workflow_execution_start",
343-
"resolve_child_workflow_execution",
344-
"resolve_nexus_operation_start",
345-
"resolve_nexus_operation",
346-
"resolve_request_cancel_external_workflow",
347-
"resolve_signal_external_workflow",
348-
}
349262
# Process oneof fields as if/elif chains
350263
for oneof_idx, fields in oneof_fields.items():
351264
oneof_lines = []
@@ -357,25 +270,9 @@ def walk(self, desc: Descriptor) -> bool:
357270
if child_has_payload:
358271
if_word = "if" if first else "elif"
359272
first = False
360-
if (
361-
desc.full_name == "coresdk.workflow_commands.WorkflowCommand"
362-
and field.name in commands_with_seq
363-
):
364-
line = emit_singular_with_seq(
365-
field.name, f"o.{field.name}", name_for(child_desc), if_word
366-
)
367-
elif (
368-
desc.full_name
369-
== "coresdk.workflow_activation.WorkflowActivationJob"
370-
and field.name in activation_jobs_with_seq
371-
):
372-
line = emit_singular_with_seq(
373-
field.name, f"o.{field.name}", name_for(child_desc), if_word
374-
)
375-
else:
376-
line = emit_singular(
377-
field.name, f"o.{field.name}", name_for(child_desc), if_word
378-
)
273+
line = emit_singular(
274+
field.name, f"o.{field.name}", name_for(child_desc), if_word
275+
)
379276
oneof_lines.append(line)
380277
if oneof_lines:
381278
lines.extend(oneof_lines)

temporalio/bridge/_visitor.py

Lines changed: 25 additions & 151 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,10 @@
11
# This file is generated by gen_payload_visitor.py. Changes should be made there.
22
import abc
3-
import contextvars
4-
from dataclasses import dataclass
5-
from typing import Any, MutableSequence, Optional
3+
from typing import Any, MutableSequence
64

7-
import temporalio.api.enums.v1.command_type_pb2
85
from temporalio.api.common.v1.message_pb2 import Payload
96

107

11-
@dataclass(frozen=True)
12-
class CommandInfo:
13-
"""Information identifying a specific command instance."""
14-
15-
command_type: temporalio.api.enums.v1.command_type_pb2.CommandType.ValueType
16-
command_seq: int
17-
18-
19-
current_command_info: contextvars.ContextVar[Optional[CommandInfo]] = (
20-
contextvars.ContextVar("current_command_info", default=None)
21-
)
22-
23-
248
class VisitorFunctions(abc.ABC):
259
"""Set of functions which can be called by the visitor.
2610
Allows handling payloads as a sequence.
@@ -263,100 +247,35 @@ async def _visit_coresdk_workflow_activation_WorkflowActivationJob(self, fs, o):
263247
fs, o.signal_workflow
264248
)
265249
elif o.HasField("resolve_activity"):
266-
token = current_command_info.set(
267-
CommandInfo(
268-
command_type=temporalio.api.enums.v1.command_type_pb2.CommandType.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK,
269-
command_seq=o.resolve_activity.seq,
270-
)
250+
await self._visit_coresdk_workflow_activation_ResolveActivity(
251+
fs, o.resolve_activity
271252
)
272-
try:
273-
await self._visit_coresdk_workflow_activation_ResolveActivity(
274-
fs, o.resolve_activity
275-
)
276-
finally:
277-
current_command_info.reset(token)
278253
elif o.HasField("resolve_child_workflow_execution_start"):
279-
token = current_command_info.set(
280-
CommandInfo(
281-
command_type=temporalio.api.enums.v1.command_type_pb2.CommandType.COMMAND_TYPE_START_CHILD_WORKFLOW_EXECUTION,
282-
command_seq=o.resolve_child_workflow_execution_start.seq,
283-
)
254+
await self._visit_coresdk_workflow_activation_ResolveChildWorkflowExecutionStart(
255+
fs, o.resolve_child_workflow_execution_start
284256
)
285-
try:
286-
await self._visit_coresdk_workflow_activation_ResolveChildWorkflowExecutionStart(
287-
fs, o.resolve_child_workflow_execution_start
288-
)
289-
finally:
290-
current_command_info.reset(token)
291257
elif o.HasField("resolve_child_workflow_execution"):
292-
token = current_command_info.set(
293-
CommandInfo(
294-
command_type=temporalio.api.enums.v1.command_type_pb2.CommandType.COMMAND_TYPE_START_CHILD_WORKFLOW_EXECUTION,
295-
command_seq=o.resolve_child_workflow_execution.seq,
296-
)
258+
await self._visit_coresdk_workflow_activation_ResolveChildWorkflowExecution(
259+
fs, o.resolve_child_workflow_execution
297260
)
298-
try:
299-
await self._visit_coresdk_workflow_activation_ResolveChildWorkflowExecution(
300-
fs, o.resolve_child_workflow_execution
301-
)
302-
finally:
303-
current_command_info.reset(token)
304261
elif o.HasField("resolve_signal_external_workflow"):
305-
token = current_command_info.set(
306-
CommandInfo(
307-
command_type=temporalio.api.enums.v1.command_type_pb2.CommandType.COMMAND_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION,
308-
command_seq=o.resolve_signal_external_workflow.seq,
309-
)
262+
await self._visit_coresdk_workflow_activation_ResolveSignalExternalWorkflow(
263+
fs, o.resolve_signal_external_workflow
310264
)
311-
try:
312-
await self._visit_coresdk_workflow_activation_ResolveSignalExternalWorkflow(
313-
fs, o.resolve_signal_external_workflow
314-
)
315-
finally:
316-
current_command_info.reset(token)
317265
elif o.HasField("resolve_request_cancel_external_workflow"):
318-
token = current_command_info.set(
319-
CommandInfo(
320-
command_type=temporalio.api.enums.v1.command_type_pb2.CommandType.COMMAND_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION,
321-
command_seq=o.resolve_request_cancel_external_workflow.seq,
322-
)
266+
await self._visit_coresdk_workflow_activation_ResolveRequestCancelExternalWorkflow(
267+
fs, o.resolve_request_cancel_external_workflow
323268
)
324-
try:
325-
await self._visit_coresdk_workflow_activation_ResolveRequestCancelExternalWorkflow(
326-
fs, o.resolve_request_cancel_external_workflow
327-
)
328-
finally:
329-
current_command_info.reset(token)
330269
elif o.HasField("do_update"):
331270
await self._visit_coresdk_workflow_activation_DoUpdate(fs, o.do_update)
332271
elif o.HasField("resolve_nexus_operation_start"):
333-
token = current_command_info.set(
334-
CommandInfo(
335-
command_type=temporalio.api.enums.v1.command_type_pb2.CommandType.COMMAND_TYPE_SCHEDULE_NEXUS_OPERATION,
336-
command_seq=o.resolve_nexus_operation_start.seq,
337-
)
272+
await self._visit_coresdk_workflow_activation_ResolveNexusOperationStart(
273+
fs, o.resolve_nexus_operation_start
338274
)
339-
try:
340-
await (
341-
self._visit_coresdk_workflow_activation_ResolveNexusOperationStart(
342-
fs, o.resolve_nexus_operation_start
343-
)
344-
)
345-
finally:
346-
current_command_info.reset(token)
347275
elif o.HasField("resolve_nexus_operation"):
348-
token = current_command_info.set(
349-
CommandInfo(
350-
command_type=temporalio.api.enums.v1.command_type_pb2.CommandType.COMMAND_TYPE_SCHEDULE_NEXUS_OPERATION,
351-
command_seq=o.resolve_nexus_operation.seq,
352-
)
276+
await self._visit_coresdk_workflow_activation_ResolveNexusOperation(
277+
fs, o.resolve_nexus_operation
353278
)
354-
try:
355-
await self._visit_coresdk_workflow_activation_ResolveNexusOperation(
356-
fs, o.resolve_nexus_operation
357-
)
358-
finally:
359-
current_command_info.reset(token)
360279

361280
async def _visit_coresdk_workflow_activation_WorkflowActivation(self, fs, o):
362281
for v in o.jobs:
@@ -455,18 +374,9 @@ async def _visit_coresdk_workflow_commands_WorkflowCommand(self, fs, o):
455374
if o.HasField("user_metadata"):
456375
await self._visit_temporal_api_sdk_v1_UserMetadata(fs, o.user_metadata)
457376
if o.HasField("schedule_activity"):
458-
token = current_command_info.set(
459-
CommandInfo(
460-
command_type=temporalio.api.enums.v1.command_type_pb2.CommandType.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK,
461-
command_seq=o.schedule_activity.seq,
462-
)
377+
await self._visit_coresdk_workflow_commands_ScheduleActivity(
378+
fs, o.schedule_activity
463379
)
464-
try:
465-
await self._visit_coresdk_workflow_commands_ScheduleActivity(
466-
fs, o.schedule_activity
467-
)
468-
finally:
469-
current_command_info.reset(token)
470380
elif o.HasField("respond_to_query"):
471381
await self._visit_coresdk_workflow_commands_QueryResult(
472382
fs, o.respond_to_query
@@ -484,44 +394,17 @@ async def _visit_coresdk_workflow_commands_WorkflowCommand(self, fs, o):
484394
fs, o.continue_as_new_workflow_execution
485395
)
486396
elif o.HasField("start_child_workflow_execution"):
487-
token = current_command_info.set(
488-
CommandInfo(
489-
command_type=temporalio.api.enums.v1.command_type_pb2.CommandType.COMMAND_TYPE_START_CHILD_WORKFLOW_EXECUTION,
490-
command_seq=o.start_child_workflow_execution.seq,
491-
)
397+
await self._visit_coresdk_workflow_commands_StartChildWorkflowExecution(
398+
fs, o.start_child_workflow_execution
492399
)
493-
try:
494-
await self._visit_coresdk_workflow_commands_StartChildWorkflowExecution(
495-
fs, o.start_child_workflow_execution
496-
)
497-
finally:
498-
current_command_info.reset(token)
499400
elif o.HasField("signal_external_workflow_execution"):
500-
token = current_command_info.set(
501-
CommandInfo(
502-
command_type=temporalio.api.enums.v1.command_type_pb2.CommandType.COMMAND_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION,
503-
command_seq=o.signal_external_workflow_execution.seq,
504-
)
401+
await self._visit_coresdk_workflow_commands_SignalExternalWorkflowExecution(
402+
fs, o.signal_external_workflow_execution
505403
)
506-
try:
507-
await self._visit_coresdk_workflow_commands_SignalExternalWorkflowExecution(
508-
fs, o.signal_external_workflow_execution
509-
)
510-
finally:
511-
current_command_info.reset(token)
512404
elif o.HasField("schedule_local_activity"):
513-
token = current_command_info.set(
514-
CommandInfo(
515-
command_type=temporalio.api.enums.v1.command_type_pb2.CommandType.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK,
516-
command_seq=o.schedule_local_activity.seq,
517-
)
405+
await self._visit_coresdk_workflow_commands_ScheduleLocalActivity(
406+
fs, o.schedule_local_activity
518407
)
519-
try:
520-
await self._visit_coresdk_workflow_commands_ScheduleLocalActivity(
521-
fs, o.schedule_local_activity
522-
)
523-
finally:
524-
current_command_info.reset(token)
525408
elif o.HasField("upsert_workflow_search_attributes"):
526409
await self._visit_coresdk_workflow_commands_UpsertWorkflowSearchAttributes(
527410
fs, o.upsert_workflow_search_attributes
@@ -535,18 +418,9 @@ async def _visit_coresdk_workflow_commands_WorkflowCommand(self, fs, o):
535418
fs, o.update_response
536419
)
537420
elif o.HasField("schedule_nexus_operation"):
538-
token = current_command_info.set(
539-
CommandInfo(
540-
command_type=temporalio.api.enums.v1.command_type_pb2.CommandType.COMMAND_TYPE_SCHEDULE_NEXUS_OPERATION,
541-
command_seq=o.schedule_nexus_operation.seq,
542-
)
421+
await self._visit_coresdk_workflow_commands_ScheduleNexusOperation(
422+
fs, o.schedule_nexus_operation
543423
)
544-
try:
545-
await self._visit_coresdk_workflow_commands_ScheduleNexusOperation(
546-
fs, o.schedule_nexus_operation
547-
)
548-
finally:
549-
current_command_info.reset(token)
550424

551425
async def _visit_coresdk_workflow_completion_Success(self, fs, o):
552426
for v in o.commands:

0 commit comments

Comments
 (0)