Skip to content

Commit 1f93f04

Browse files
committed
Use (command_type, command_seq) dataclass
1 parent 3c5b2ce commit 1f93f04

File tree

7 files changed

+186
-53
lines changed

7 files changed

+186
-53
lines changed

scripts/gen_payload_visitor.py

Lines changed: 50 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -67,13 +67,48 @@ def emit_singular_with_seq(
6767
) -> str:
6868
# Helper to emit a singular field visit that sets the seq contextvar, with presence check but
6969
# without headers guard since this is used for commands only.
70+
71+
# Map field names to command types
72+
field_to_command_type = {
73+
# Commands
74+
"schedule_activity": "COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK",
75+
"schedule_local_activity": "COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK",
76+
"start_child_workflow_execution": "COMMAND_TYPE_START_CHILD_WORKFLOW_EXECUTION",
77+
"signal_external_workflow_execution": "COMMAND_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION",
78+
"schedule_nexus_operation": "COMMAND_TYPE_SCHEDULE_NEXUS_OPERATION",
79+
"request_cancel_activity": "COMMAND_TYPE_REQUEST_CANCEL_ACTIVITY_TASK",
80+
"request_cancel_local_activity": "COMMAND_TYPE_REQUEST_CANCEL_ACTIVITY_TASK",
81+
"request_cancel_external_workflow_execution": "COMMAND_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION",
82+
"request_cancel_nexus_operation": "COMMAND_TYPE_REQUEST_CANCEL_NEXUS_OPERATION",
83+
"cancel_timer": "COMMAND_TYPE_CANCEL_TIMER",
84+
"cancel_signal_workflow": "COMMAND_TYPE_CANCEL_SIGNAL_WORKFLOW",
85+
"start_timer": "COMMAND_TYPE_START_TIMER",
86+
# Resolutions (use the corresponding command type)
87+
"resolve_activity": "COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK",
88+
"resolve_child_workflow_execution_start": "COMMAND_TYPE_START_CHILD_WORKFLOW_EXECUTION",
89+
"resolve_child_workflow_execution": "COMMAND_TYPE_START_CHILD_WORKFLOW_EXECUTION",
90+
"resolve_signal_external_workflow": "COMMAND_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION",
91+
"resolve_request_cancel_external_workflow": "COMMAND_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION",
92+
"resolve_nexus_operation_start": "COMMAND_TYPE_SCHEDULE_NEXUS_OPERATION",
93+
"resolve_nexus_operation": "COMMAND_TYPE_SCHEDULE_NEXUS_OPERATION",
94+
}
95+
96+
command_type = field_to_command_type.get(field_name)
97+
if not command_type:
98+
raise ValueError(f"Unknown field with seq: {field_name}")
99+
70100
return f"""\
71101
{presence_word} o.HasField("{field_name}"):
72-
token = current_command_seq.set({access_expr}.seq)
102+
token = current_command_info.set(
103+
CommandInfo(
104+
command_type=temporalio.api.enums.v1.command_type_pb2.CommandType.{command_type},
105+
command_seq={access_expr}.seq,
106+
)
107+
)
73108
try:
74109
await self._visit_{child_method}(fs, {access_expr})
75110
finally:
76-
current_command_seq.reset(token)"""
111+
current_command_info.reset(token)"""
77112

78113

79114
class VisitorGenerator:
@@ -95,13 +130,23 @@ def generate(self, roots: list[Descriptor]) -> str:
95130
# This file is generated by gen_payload_visitor.py. Changes should be made there.
96131
import abc
97132
import contextvars
133+
from dataclasses import dataclass
98134
from typing import Any, MutableSequence, Optional
99135
136+
import temporalio.api.enums.v1.command_type_pb2
100137
from temporalio.api.common.v1.message_pb2 import Payload
101138
102-
# Current workflow command sequence number
103-
current_command_seq: contextvars.ContextVar[Optional[int]] = contextvars.ContextVar(
104-
"current_command_seq", default=None
139+
140+
@dataclass(frozen=True)
141+
class CommandInfo:
142+
\"\"\"Information identifying a specific command instance.\"\"\"
143+
144+
command_type: temporalio.api.enums.v1.command_type_pb2.CommandType
145+
command_seq: int
146+
147+
148+
current_command_info: contextvars.ContextVar[Optional[CommandInfo]] = (
149+
contextvars.ContextVar("current_command_info", default=None)
105150
)
106151
107152
class VisitorFunctions(abc.ABC):

temporalio/bridge/_visitor.py

Lines changed: 95 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,23 @@
11
# This file is generated by gen_payload_visitor.py. Changes should be made there.
22
import abc
33
import contextvars
4+
from dataclasses import dataclass
45
from typing import Any, MutableSequence, Optional
56

7+
import temporalio.api.enums.v1.command_type_pb2
68
from temporalio.api.common.v1.message_pb2 import Payload
79

8-
# Current workflow command sequence number
9-
current_command_seq: contextvars.ContextVar[Optional[int]] = contextvars.ContextVar(
10-
"current_command_seq", default=None
10+
11+
@dataclass(frozen=True)
12+
class CommandInfo:
13+
"""Information identifying a specific command instance."""
14+
15+
command_type: temporalio.api.enums.v1.command_type_pb2.CommandType.ValueType
16+
command_seq: int
17+
18+
19+
current_command_info: contextvars.ContextVar[Optional[CommandInfo]] = (
20+
contextvars.ContextVar("current_command_info", default=None)
1121
)
1222

1323

@@ -253,69 +263,100 @@ async def _visit_coresdk_workflow_activation_WorkflowActivationJob(self, fs, o):
253263
fs, o.signal_workflow
254264
)
255265
elif o.HasField("resolve_activity"):
256-
token = current_command_seq.set(o.resolve_activity.seq)
266+
token = current_command_info.set(
267+
CommandInfo(
268+
command_type=temporalio.api.enums.v1.command_type_pb2.CommandType.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK,
269+
command_seq=o.resolve_activity.seq,
270+
)
271+
)
257272
try:
258273
await self._visit_coresdk_workflow_activation_ResolveActivity(
259274
fs, o.resolve_activity
260275
)
261276
finally:
262-
current_command_seq.reset(token)
277+
current_command_info.reset(token)
263278
elif o.HasField("resolve_child_workflow_execution_start"):
264-
token = current_command_seq.set(
265-
o.resolve_child_workflow_execution_start.seq
279+
token = current_command_info.set(
280+
CommandInfo(
281+
command_type=temporalio.api.enums.v1.command_type_pb2.CommandType.COMMAND_TYPE_START_CHILD_WORKFLOW_EXECUTION,
282+
command_seq=o.resolve_child_workflow_execution_start.seq,
283+
)
266284
)
267285
try:
268286
await self._visit_coresdk_workflow_activation_ResolveChildWorkflowExecutionStart(
269287
fs, o.resolve_child_workflow_execution_start
270288
)
271289
finally:
272-
current_command_seq.reset(token)
290+
current_command_info.reset(token)
273291
elif o.HasField("resolve_child_workflow_execution"):
274-
token = current_command_seq.set(o.resolve_child_workflow_execution.seq)
292+
token = current_command_info.set(
293+
CommandInfo(
294+
command_type=temporalio.api.enums.v1.command_type_pb2.CommandType.COMMAND_TYPE_START_CHILD_WORKFLOW_EXECUTION,
295+
command_seq=o.resolve_child_workflow_execution.seq,
296+
)
297+
)
275298
try:
276299
await self._visit_coresdk_workflow_activation_ResolveChildWorkflowExecution(
277300
fs, o.resolve_child_workflow_execution
278301
)
279302
finally:
280-
current_command_seq.reset(token)
303+
current_command_info.reset(token)
281304
elif o.HasField("resolve_signal_external_workflow"):
282-
token = current_command_seq.set(o.resolve_signal_external_workflow.seq)
305+
token = current_command_info.set(
306+
CommandInfo(
307+
command_type=temporalio.api.enums.v1.command_type_pb2.CommandType.COMMAND_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION,
308+
command_seq=o.resolve_signal_external_workflow.seq,
309+
)
310+
)
283311
try:
284312
await self._visit_coresdk_workflow_activation_ResolveSignalExternalWorkflow(
285313
fs, o.resolve_signal_external_workflow
286314
)
287315
finally:
288-
current_command_seq.reset(token)
316+
current_command_info.reset(token)
289317
elif o.HasField("resolve_request_cancel_external_workflow"):
290-
token = current_command_seq.set(
291-
o.resolve_request_cancel_external_workflow.seq
318+
token = current_command_info.set(
319+
CommandInfo(
320+
command_type=temporalio.api.enums.v1.command_type_pb2.CommandType.COMMAND_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION,
321+
command_seq=o.resolve_request_cancel_external_workflow.seq,
322+
)
292323
)
293324
try:
294325
await self._visit_coresdk_workflow_activation_ResolveRequestCancelExternalWorkflow(
295326
fs, o.resolve_request_cancel_external_workflow
296327
)
297328
finally:
298-
current_command_seq.reset(token)
329+
current_command_info.reset(token)
299330
elif o.HasField("do_update"):
300331
await self._visit_coresdk_workflow_activation_DoUpdate(fs, o.do_update)
301332
elif o.HasField("resolve_nexus_operation_start"):
302-
token = current_command_seq.set(o.resolve_nexus_operation_start.seq)
333+
token = current_command_info.set(
334+
CommandInfo(
335+
command_type=temporalio.api.enums.v1.command_type_pb2.CommandType.COMMAND_TYPE_SCHEDULE_NEXUS_OPERATION,
336+
command_seq=o.resolve_nexus_operation_start.seq,
337+
)
338+
)
303339
try:
304340
await (
305341
self._visit_coresdk_workflow_activation_ResolveNexusOperationStart(
306342
fs, o.resolve_nexus_operation_start
307343
)
308344
)
309345
finally:
310-
current_command_seq.reset(token)
346+
current_command_info.reset(token)
311347
elif o.HasField("resolve_nexus_operation"):
312-
token = current_command_seq.set(o.resolve_nexus_operation.seq)
348+
token = current_command_info.set(
349+
CommandInfo(
350+
command_type=temporalio.api.enums.v1.command_type_pb2.CommandType.COMMAND_TYPE_SCHEDULE_NEXUS_OPERATION,
351+
command_seq=o.resolve_nexus_operation.seq,
352+
)
353+
)
313354
try:
314355
await self._visit_coresdk_workflow_activation_ResolveNexusOperation(
315356
fs, o.resolve_nexus_operation
316357
)
317358
finally:
318-
current_command_seq.reset(token)
359+
current_command_info.reset(token)
319360

320361
async def _visit_coresdk_workflow_activation_WorkflowActivation(self, fs, o):
321362
for v in o.jobs:
@@ -411,13 +452,18 @@ async def _visit_coresdk_workflow_commands_WorkflowCommand(self, fs, o):
411452
if o.HasField("user_metadata"):
412453
await self._visit_temporal_api_sdk_v1_UserMetadata(fs, o.user_metadata)
413454
if o.HasField("schedule_activity"):
414-
token = current_command_seq.set(o.schedule_activity.seq)
455+
token = current_command_info.set(
456+
CommandInfo(
457+
command_type=temporalio.api.enums.v1.command_type_pb2.CommandType.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK,
458+
command_seq=o.schedule_activity.seq,
459+
)
460+
)
415461
try:
416462
await self._visit_coresdk_workflow_commands_ScheduleActivity(
417463
fs, o.schedule_activity
418464
)
419465
finally:
420-
current_command_seq.reset(token)
466+
current_command_info.reset(token)
421467
elif o.HasField("respond_to_query"):
422468
await self._visit_coresdk_workflow_commands_QueryResult(
423469
fs, o.respond_to_query
@@ -435,29 +481,44 @@ async def _visit_coresdk_workflow_commands_WorkflowCommand(self, fs, o):
435481
fs, o.continue_as_new_workflow_execution
436482
)
437483
elif o.HasField("start_child_workflow_execution"):
438-
token = current_command_seq.set(o.start_child_workflow_execution.seq)
484+
token = current_command_info.set(
485+
CommandInfo(
486+
command_type=temporalio.api.enums.v1.command_type_pb2.CommandType.COMMAND_TYPE_START_CHILD_WORKFLOW_EXECUTION,
487+
command_seq=o.start_child_workflow_execution.seq,
488+
)
489+
)
439490
try:
440491
await self._visit_coresdk_workflow_commands_StartChildWorkflowExecution(
441492
fs, o.start_child_workflow_execution
442493
)
443494
finally:
444-
current_command_seq.reset(token)
495+
current_command_info.reset(token)
445496
elif o.HasField("signal_external_workflow_execution"):
446-
token = current_command_seq.set(o.signal_external_workflow_execution.seq)
497+
token = current_command_info.set(
498+
CommandInfo(
499+
command_type=temporalio.api.enums.v1.command_type_pb2.CommandType.COMMAND_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION,
500+
command_seq=o.signal_external_workflow_execution.seq,
501+
)
502+
)
447503
try:
448504
await self._visit_coresdk_workflow_commands_SignalExternalWorkflowExecution(
449505
fs, o.signal_external_workflow_execution
450506
)
451507
finally:
452-
current_command_seq.reset(token)
508+
current_command_info.reset(token)
453509
elif o.HasField("schedule_local_activity"):
454-
token = current_command_seq.set(o.schedule_local_activity.seq)
510+
token = current_command_info.set(
511+
CommandInfo(
512+
command_type=temporalio.api.enums.v1.command_type_pb2.CommandType.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK,
513+
command_seq=o.schedule_local_activity.seq,
514+
)
515+
)
455516
try:
456517
await self._visit_coresdk_workflow_commands_ScheduleLocalActivity(
457518
fs, o.schedule_local_activity
458519
)
459520
finally:
460-
current_command_seq.reset(token)
521+
current_command_info.reset(token)
461522
elif o.HasField("upsert_workflow_search_attributes"):
462523
await self._visit_coresdk_workflow_commands_UpsertWorkflowSearchAttributes(
463524
fs, o.upsert_workflow_search_attributes
@@ -471,13 +532,18 @@ async def _visit_coresdk_workflow_commands_WorkflowCommand(self, fs, o):
471532
fs, o.update_response
472533
)
473534
elif o.HasField("schedule_nexus_operation"):
474-
token = current_command_seq.set(o.schedule_nexus_operation.seq)
535+
token = current_command_info.set(
536+
CommandInfo(
537+
command_type=temporalio.api.enums.v1.command_type_pb2.CommandType.COMMAND_TYPE_SCHEDULE_NEXUS_OPERATION,
538+
command_seq=o.schedule_nexus_operation.seq,
539+
)
540+
)
475541
try:
476542
await self._visit_coresdk_workflow_commands_ScheduleNexusOperation(
477543
fs, o.schedule_nexus_operation
478544
)
479545
finally:
480-
current_command_seq.reset(token)
546+
current_command_info.reset(token)
481547

482548
async def _visit_coresdk_workflow_completion_Success(self, fs, o):
483549
for v in o.commands:

temporalio/worker/_workflow.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -752,7 +752,7 @@ def _get_current_command_codec(self) -> temporalio.converter.PayloadCodec:
752752
return self.instance.get_payload_codec_with_context(
753753
self.context_free_payload_codec,
754754
self.workflow_context_payload_codec,
755-
temporalio.bridge._visitor.current_command_seq.get(),
755+
temporalio.bridge._visitor.current_command_info.get(),
756756
)
757757

758758

0 commit comments

Comments
 (0)