Skip to content

Commit 88f797a

Browse files
committed
Use (command_type, command_seq) dataclass
1 parent c80452b commit 88f797a

File tree

7 files changed

+186
-53
lines changed

7 files changed

+186
-53
lines changed

scripts/gen_payload_visitor.py

Lines changed: 50 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,13 +72,48 @@ def emit_singular_with_seq(
7272
) -> str:
7373
# Helper to emit a singular field visit that sets the seq contextvar, with presence check but
7474
# without headers guard since this is used for commands only.
75+
76+
# Map field names to command types
77+
field_to_command_type = {
78+
# Commands
79+
"schedule_activity": "COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK",
80+
"schedule_local_activity": "COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK",
81+
"start_child_workflow_execution": "COMMAND_TYPE_START_CHILD_WORKFLOW_EXECUTION",
82+
"signal_external_workflow_execution": "COMMAND_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION",
83+
"schedule_nexus_operation": "COMMAND_TYPE_SCHEDULE_NEXUS_OPERATION",
84+
"request_cancel_activity": "COMMAND_TYPE_REQUEST_CANCEL_ACTIVITY_TASK",
85+
"request_cancel_local_activity": "COMMAND_TYPE_REQUEST_CANCEL_ACTIVITY_TASK",
86+
"request_cancel_external_workflow_execution": "COMMAND_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION",
87+
"request_cancel_nexus_operation": "COMMAND_TYPE_REQUEST_CANCEL_NEXUS_OPERATION",
88+
"cancel_timer": "COMMAND_TYPE_CANCEL_TIMER",
89+
"cancel_signal_workflow": "COMMAND_TYPE_CANCEL_SIGNAL_WORKFLOW",
90+
"start_timer": "COMMAND_TYPE_START_TIMER",
91+
# Resolutions (use the corresponding command type)
92+
"resolve_activity": "COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK",
93+
"resolve_child_workflow_execution_start": "COMMAND_TYPE_START_CHILD_WORKFLOW_EXECUTION",
94+
"resolve_child_workflow_execution": "COMMAND_TYPE_START_CHILD_WORKFLOW_EXECUTION",
95+
"resolve_signal_external_workflow": "COMMAND_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION",
96+
"resolve_request_cancel_external_workflow": "COMMAND_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION",
97+
"resolve_nexus_operation_start": "COMMAND_TYPE_SCHEDULE_NEXUS_OPERATION",
98+
"resolve_nexus_operation": "COMMAND_TYPE_SCHEDULE_NEXUS_OPERATION",
99+
}
100+
101+
command_type = field_to_command_type.get(field_name)
102+
if not command_type:
103+
raise ValueError(f"Unknown field with seq: {field_name}")
104+
75105
return f"""\
76106
{presence_word} o.HasField("{field_name}"):
77-
token = current_command_seq.set({access_expr}.seq)
107+
token = current_command_info.set(
108+
CommandInfo(
109+
command_type=temporalio.api.enums.v1.command_type_pb2.CommandType.{command_type},
110+
command_seq={access_expr}.seq,
111+
)
112+
)
78113
try:
79114
await self._visit_{child_method}(fs, {access_expr})
80115
finally:
81-
current_command_seq.reset(token)"""
116+
current_command_info.reset(token)"""
82117

83118

84119
class VisitorGenerator:
@@ -100,13 +135,23 @@ def generate(self, roots: list[Descriptor]) -> str:
100135
# This file is generated by gen_payload_visitor.py. Changes should be made there.
101136
import abc
102137
import contextvars
138+
from dataclasses import dataclass
103139
from typing import Any, MutableSequence, Optional
104140
141+
import temporalio.api.enums.v1.command_type_pb2
105142
from temporalio.api.common.v1.message_pb2 import Payload
106143
107-
# Current workflow command sequence number
108-
current_command_seq: contextvars.ContextVar[Optional[int]] = contextvars.ContextVar(
109-
"current_command_seq", default=None
144+
145+
@dataclass(frozen=True)
146+
class CommandInfo:
147+
\"\"\"Information identifying a specific command instance.\"\"\"
148+
149+
command_type: temporalio.api.enums.v1.command_type_pb2.CommandType
150+
command_seq: int
151+
152+
153+
current_command_info: contextvars.ContextVar[Optional[CommandInfo]] = (
154+
contextvars.ContextVar("current_command_info", default=None)
110155
)
111156
112157
class VisitorFunctions(abc.ABC):

temporalio/bridge/_visitor.py

Lines changed: 95 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,23 @@
11
# This file is generated by gen_payload_visitor.py. Changes should be made there.
22
import abc
33
import contextvars
4+
from dataclasses import dataclass
45
from typing import Any, MutableSequence, Optional
56

7+
import temporalio.api.enums.v1.command_type_pb2
68
from temporalio.api.common.v1.message_pb2 import Payload
79

8-
# Current workflow command sequence number
9-
current_command_seq: contextvars.ContextVar[Optional[int]] = contextvars.ContextVar(
10-
"current_command_seq", default=None
10+
11+
@dataclass(frozen=True)
12+
class CommandInfo:
13+
"""Information identifying a specific command instance."""
14+
15+
command_type: temporalio.api.enums.v1.command_type_pb2.CommandType.ValueType
16+
command_seq: int
17+
18+
19+
current_command_info: contextvars.ContextVar[Optional[CommandInfo]] = (
20+
contextvars.ContextVar("current_command_info", default=None)
1121
)
1222

1323

@@ -253,69 +263,100 @@ async def _visit_coresdk_workflow_activation_WorkflowActivationJob(self, fs, o):
253263
fs, o.signal_workflow
254264
)
255265
elif o.HasField("resolve_activity"):
256-
token = current_command_seq.set(o.resolve_activity.seq)
266+
token = current_command_info.set(
267+
CommandInfo(
268+
command_type=temporalio.api.enums.v1.command_type_pb2.CommandType.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK,
269+
command_seq=o.resolve_activity.seq,
270+
)
271+
)
257272
try:
258273
await self._visit_coresdk_workflow_activation_ResolveActivity(
259274
fs, o.resolve_activity
260275
)
261276
finally:
262-
current_command_seq.reset(token)
277+
current_command_info.reset(token)
263278
elif o.HasField("resolve_child_workflow_execution_start"):
264-
token = current_command_seq.set(
265-
o.resolve_child_workflow_execution_start.seq
279+
token = current_command_info.set(
280+
CommandInfo(
281+
command_type=temporalio.api.enums.v1.command_type_pb2.CommandType.COMMAND_TYPE_START_CHILD_WORKFLOW_EXECUTION,
282+
command_seq=o.resolve_child_workflow_execution_start.seq,
283+
)
266284
)
267285
try:
268286
await self._visit_coresdk_workflow_activation_ResolveChildWorkflowExecutionStart(
269287
fs, o.resolve_child_workflow_execution_start
270288
)
271289
finally:
272-
current_command_seq.reset(token)
290+
current_command_info.reset(token)
273291
elif o.HasField("resolve_child_workflow_execution"):
274-
token = current_command_seq.set(o.resolve_child_workflow_execution.seq)
292+
token = current_command_info.set(
293+
CommandInfo(
294+
command_type=temporalio.api.enums.v1.command_type_pb2.CommandType.COMMAND_TYPE_START_CHILD_WORKFLOW_EXECUTION,
295+
command_seq=o.resolve_child_workflow_execution.seq,
296+
)
297+
)
275298
try:
276299
await self._visit_coresdk_workflow_activation_ResolveChildWorkflowExecution(
277300
fs, o.resolve_child_workflow_execution
278301
)
279302
finally:
280-
current_command_seq.reset(token)
303+
current_command_info.reset(token)
281304
elif o.HasField("resolve_signal_external_workflow"):
282-
token = current_command_seq.set(o.resolve_signal_external_workflow.seq)
305+
token = current_command_info.set(
306+
CommandInfo(
307+
command_type=temporalio.api.enums.v1.command_type_pb2.CommandType.COMMAND_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION,
308+
command_seq=o.resolve_signal_external_workflow.seq,
309+
)
310+
)
283311
try:
284312
await self._visit_coresdk_workflow_activation_ResolveSignalExternalWorkflow(
285313
fs, o.resolve_signal_external_workflow
286314
)
287315
finally:
288-
current_command_seq.reset(token)
316+
current_command_info.reset(token)
289317
elif o.HasField("resolve_request_cancel_external_workflow"):
290-
token = current_command_seq.set(
291-
o.resolve_request_cancel_external_workflow.seq
318+
token = current_command_info.set(
319+
CommandInfo(
320+
command_type=temporalio.api.enums.v1.command_type_pb2.CommandType.COMMAND_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION,
321+
command_seq=o.resolve_request_cancel_external_workflow.seq,
322+
)
292323
)
293324
try:
294325
await self._visit_coresdk_workflow_activation_ResolveRequestCancelExternalWorkflow(
295326
fs, o.resolve_request_cancel_external_workflow
296327
)
297328
finally:
298-
current_command_seq.reset(token)
329+
current_command_info.reset(token)
299330
elif o.HasField("do_update"):
300331
await self._visit_coresdk_workflow_activation_DoUpdate(fs, o.do_update)
301332
elif o.HasField("resolve_nexus_operation_start"):
302-
token = current_command_seq.set(o.resolve_nexus_operation_start.seq)
333+
token = current_command_info.set(
334+
CommandInfo(
335+
command_type=temporalio.api.enums.v1.command_type_pb2.CommandType.COMMAND_TYPE_SCHEDULE_NEXUS_OPERATION,
336+
command_seq=o.resolve_nexus_operation_start.seq,
337+
)
338+
)
303339
try:
304340
await (
305341
self._visit_coresdk_workflow_activation_ResolveNexusOperationStart(
306342
fs, o.resolve_nexus_operation_start
307343
)
308344
)
309345
finally:
310-
current_command_seq.reset(token)
346+
current_command_info.reset(token)
311347
elif o.HasField("resolve_nexus_operation"):
312-
token = current_command_seq.set(o.resolve_nexus_operation.seq)
348+
token = current_command_info.set(
349+
CommandInfo(
350+
command_type=temporalio.api.enums.v1.command_type_pb2.CommandType.COMMAND_TYPE_SCHEDULE_NEXUS_OPERATION,
351+
command_seq=o.resolve_nexus_operation.seq,
352+
)
353+
)
313354
try:
314355
await self._visit_coresdk_workflow_activation_ResolveNexusOperation(
315356
fs, o.resolve_nexus_operation
316357
)
317358
finally:
318-
current_command_seq.reset(token)
359+
current_command_info.reset(token)
319360

320361
async def _visit_coresdk_workflow_activation_WorkflowActivation(self, fs, o):
321362
for v in o.jobs:
@@ -414,13 +455,18 @@ async def _visit_coresdk_workflow_commands_WorkflowCommand(self, fs, o):
414455
if o.HasField("user_metadata"):
415456
await self._visit_temporal_api_sdk_v1_UserMetadata(fs, o.user_metadata)
416457
if o.HasField("schedule_activity"):
417-
token = current_command_seq.set(o.schedule_activity.seq)
458+
token = current_command_info.set(
459+
CommandInfo(
460+
command_type=temporalio.api.enums.v1.command_type_pb2.CommandType.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK,
461+
command_seq=o.schedule_activity.seq,
462+
)
463+
)
418464
try:
419465
await self._visit_coresdk_workflow_commands_ScheduleActivity(
420466
fs, o.schedule_activity
421467
)
422468
finally:
423-
current_command_seq.reset(token)
469+
current_command_info.reset(token)
424470
elif o.HasField("respond_to_query"):
425471
await self._visit_coresdk_workflow_commands_QueryResult(
426472
fs, o.respond_to_query
@@ -438,29 +484,44 @@ async def _visit_coresdk_workflow_commands_WorkflowCommand(self, fs, o):
438484
fs, o.continue_as_new_workflow_execution
439485
)
440486
elif o.HasField("start_child_workflow_execution"):
441-
token = current_command_seq.set(o.start_child_workflow_execution.seq)
487+
token = current_command_info.set(
488+
CommandInfo(
489+
command_type=temporalio.api.enums.v1.command_type_pb2.CommandType.COMMAND_TYPE_START_CHILD_WORKFLOW_EXECUTION,
490+
command_seq=o.start_child_workflow_execution.seq,
491+
)
492+
)
442493
try:
443494
await self._visit_coresdk_workflow_commands_StartChildWorkflowExecution(
444495
fs, o.start_child_workflow_execution
445496
)
446497
finally:
447-
current_command_seq.reset(token)
498+
current_command_info.reset(token)
448499
elif o.HasField("signal_external_workflow_execution"):
449-
token = current_command_seq.set(o.signal_external_workflow_execution.seq)
500+
token = current_command_info.set(
501+
CommandInfo(
502+
command_type=temporalio.api.enums.v1.command_type_pb2.CommandType.COMMAND_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION,
503+
command_seq=o.signal_external_workflow_execution.seq,
504+
)
505+
)
450506
try:
451507
await self._visit_coresdk_workflow_commands_SignalExternalWorkflowExecution(
452508
fs, o.signal_external_workflow_execution
453509
)
454510
finally:
455-
current_command_seq.reset(token)
511+
current_command_info.reset(token)
456512
elif o.HasField("schedule_local_activity"):
457-
token = current_command_seq.set(o.schedule_local_activity.seq)
513+
token = current_command_info.set(
514+
CommandInfo(
515+
command_type=temporalio.api.enums.v1.command_type_pb2.CommandType.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK,
516+
command_seq=o.schedule_local_activity.seq,
517+
)
518+
)
458519
try:
459520
await self._visit_coresdk_workflow_commands_ScheduleLocalActivity(
460521
fs, o.schedule_local_activity
461522
)
462523
finally:
463-
current_command_seq.reset(token)
524+
current_command_info.reset(token)
464525
elif o.HasField("upsert_workflow_search_attributes"):
465526
await self._visit_coresdk_workflow_commands_UpsertWorkflowSearchAttributes(
466527
fs, o.upsert_workflow_search_attributes
@@ -474,13 +535,18 @@ async def _visit_coresdk_workflow_commands_WorkflowCommand(self, fs, o):
474535
fs, o.update_response
475536
)
476537
elif o.HasField("schedule_nexus_operation"):
477-
token = current_command_seq.set(o.schedule_nexus_operation.seq)
538+
token = current_command_info.set(
539+
CommandInfo(
540+
command_type=temporalio.api.enums.v1.command_type_pb2.CommandType.COMMAND_TYPE_SCHEDULE_NEXUS_OPERATION,
541+
command_seq=o.schedule_nexus_operation.seq,
542+
)
543+
)
478544
try:
479545
await self._visit_coresdk_workflow_commands_ScheduleNexusOperation(
480546
fs, o.schedule_nexus_operation
481547
)
482548
finally:
483-
current_command_seq.reset(token)
549+
current_command_info.reset(token)
484550

485551
async def _visit_coresdk_workflow_completion_Success(self, fs, o):
486552
for v in o.commands:

temporalio/worker/_workflow.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -752,7 +752,7 @@ def _get_current_command_codec(self) -> temporalio.converter.PayloadCodec:
752752
return self.instance.get_payload_codec_with_context(
753753
self.context_free_payload_codec,
754754
self.workflow_context_payload_codec,
755-
temporalio.bridge._visitor.current_command_seq.get(),
755+
temporalio.bridge._visitor.current_command_info.get(),
756756
)
757757

758758

0 commit comments

Comments
 (0)