diff --git a/temporalio/worker/_interceptor.py b/temporalio/worker/_interceptor.py index 1b412cb7f..0e1d253db 100644 --- a/temporalio/worker/_interceptor.py +++ b/temporalio/worker/_interceptor.py @@ -351,6 +351,8 @@ class StartLocalActivityInput: local_retry_threshold: Optional[timedelta] cancellation_type: temporalio.workflow.ActivityCancellationType headers: Mapping[str, temporalio.api.common.v1.Payload] + summary: Optional[str] + # The types may be absent arg_types: Optional[List[Type]] ret_type: Optional[Type] diff --git a/temporalio/worker/_workflow_instance.py b/temporalio/worker/_workflow_instance.py index 81f21588f..2133f10f8 100644 --- a/temporalio/worker/_workflow_instance.py +++ b/temporalio/worker/_workflow_instance.py @@ -1458,6 +1458,7 @@ def workflow_start_local_activity( local_retry_threshold: Optional[timedelta], cancellation_type: temporalio.workflow.ActivityCancellationType, activity_id: Optional[str], + summary: Optional[str], ) -> temporalio.workflow.ActivityHandle[Any]: # Get activity definition if it's callable name: str @@ -1490,6 +1491,7 @@ def workflow_start_local_activity( retry_policy=retry_policy, local_retry_threshold=local_retry_threshold, cancellation_type=cancellation_type, + summary=summary, headers={}, arg_types=arg_types, ret_type=ret_type, @@ -2756,6 +2758,11 @@ def _apply_schedule_command( v.start_to_close_timeout.FromTimedelta(self._input.start_to_close_timeout) if self._input.retry_policy: self._input.retry_policy.apply_to_proto(v.retry_policy) + if self._input.summary: + print("copying summary") + command.user_metadata.summary.CopyFrom( + self._instance._payload_converter.to_payload(self._input.summary) + ) v.cancellation_type = cast( "temporalio.bridge.proto.workflow_commands.ActivityCancellationType.ValueType", int(self._input.cancellation_type), @@ -2777,10 +2784,6 @@ def _apply_schedule_command( command.schedule_activity.versioning_intent = ( self._input.versioning_intent._to_proto() ) - if self._input.summary: - command.user_metadata.summary.CopyFrom( - self._instance._payload_converter.to_payload(self._input.summary) - ) if self._input.priority: command.schedule_activity.priority.CopyFrom( self._input.priority._to_proto() diff --git a/temporalio/workflow.py b/temporalio/workflow.py index f3a514e26..c776cb1ab 100644 --- a/temporalio/workflow.py +++ b/temporalio/workflow.py @@ -849,6 +849,7 @@ def workflow_start_local_activity( local_retry_threshold: Optional[timedelta], cancellation_type: ActivityCancellationType, activity_id: Optional[str], + summary: Optional[str], ) -> ActivityHandle[Any]: ... @abstractmethod @@ -3061,6 +3062,7 @@ class LocalActivityConfig(TypedDict, total=False): local_retry_threshold: Optional[timedelta] cancellation_type: ActivityCancellationType activity_id: Optional[str] + summary: Optional[str] # Overload for async no-param activity @@ -3075,6 +3077,7 @@ def start_local_activity( local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, + summary: Optional[str] = None, ) -> ActivityHandle[ReturnType]: ... @@ -3090,6 +3093,7 @@ def start_local_activity( retry_policy: Optional[temporalio.common.RetryPolicy] = None, local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, + summary: Optional[str] = None, ) -> ActivityHandle[ReturnType]: ... @@ -3106,6 +3110,7 @@ def start_local_activity( local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, + summary: Optional[str] = None, ) -> ActivityHandle[ReturnType]: ... @@ -3122,6 +3127,7 @@ def start_local_activity( local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, + summary: Optional[str] = None, ) -> ActivityHandle[ReturnType]: ... @@ -3138,6 +3144,7 @@ def start_local_activity( local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, + summary: Optional[str] = None, ) -> ActivityHandle[ReturnType]: ... @@ -3154,6 +3161,7 @@ def start_local_activity( local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, + summary: Optional[str] = None, ) -> ActivityHandle[ReturnType]: ... @@ -3172,6 +3180,7 @@ def start_local_activity( local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, + summary: Optional[str] = None, ) -> ActivityHandle[Any]: ... @@ -3188,6 +3197,7 @@ def start_local_activity( local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, + summary: Optional[str] = None, ) -> ActivityHandle[Any]: """Start a local activity and return its handle. @@ -3215,6 +3225,7 @@ def start_local_activity( activity_id: Optional unique identifier for the activity. This is an advanced setting that should not be set unless users are sure they need to. Contact Temporal before setting this value. + summary: Optional summary for the activity. Returns: An activity handle to the activity which is an async task. @@ -3230,6 +3241,7 @@ def start_local_activity( local_retry_threshold=local_retry_threshold, cancellation_type=cancellation_type, activity_id=activity_id, + summary=summary, ) @@ -3245,6 +3257,7 @@ async def execute_local_activity( local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, + summary: Optional[str] = None, ) -> ReturnType: ... @@ -3260,6 +3273,7 @@ async def execute_local_activity( local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, + summary: Optional[str] = None, ) -> ReturnType: ... @@ -3276,6 +3290,7 @@ async def execute_local_activity( local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, + summary: Optional[str] = None, ) -> ReturnType: ... @@ -3292,6 +3307,7 @@ async def execute_local_activity( local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, + summary: Optional[str] = None, ) -> ReturnType: ... @@ -3308,6 +3324,7 @@ async def execute_local_activity( local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, + summary: Optional[str] = None, ) -> ReturnType: ... @@ -3324,6 +3341,7 @@ async def execute_local_activity( local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, + summary: Optional[str] = None, ) -> ReturnType: ... @@ -3342,6 +3360,7 @@ async def execute_local_activity( local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, + summary: Optional[str] = None, ) -> Any: ... @@ -3358,6 +3377,7 @@ async def execute_local_activity( local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, + summary: Optional[str] = None, ) -> Any: """Start a local activity and wait for completion. @@ -3376,6 +3396,7 @@ async def execute_local_activity( local_retry_threshold=local_retry_threshold, cancellation_type=cancellation_type, activity_id=activity_id, + summary=summary, ) @@ -3485,6 +3506,7 @@ def start_local_activity_class( local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, + summary: Optional[str] = None, ) -> ActivityHandle[Any]: """Start a local activity from a callable class. @@ -3501,6 +3523,7 @@ def start_local_activity_class( local_retry_threshold=local_retry_threshold, cancellation_type=cancellation_type, activity_id=activity_id, + summary=summary, ) @@ -3516,6 +3539,7 @@ async def execute_local_activity_class( local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, + summary: Optional[str] = None, ) -> ReturnType: ... @@ -3531,6 +3555,7 @@ async def execute_local_activity_class( local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, + summary: Optional[str] = None, ) -> ReturnType: ... @@ -3547,6 +3572,7 @@ async def execute_local_activity_class( local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, + summary: Optional[str] = None, ) -> ReturnType: ... @@ -3563,6 +3589,7 @@ async def execute_local_activity_class( local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, + summary: Optional[str] = None, ) -> ReturnType: ... @@ -3579,6 +3606,7 @@ async def execute_local_activity_class( local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, + summary: Optional[str] = None, ) -> ReturnType: ... @@ -3595,6 +3623,7 @@ async def execute_local_activity_class( local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, + summary: Optional[str] = None, ) -> ReturnType: ... @@ -3610,6 +3639,7 @@ async def execute_local_activity_class( local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, + summary: Optional[str] = None, ) -> Any: """Start a local activity from a callable class and wait for completion. @@ -3628,6 +3658,7 @@ async def execute_local_activity_class( local_retry_threshold=local_retry_threshold, cancellation_type=cancellation_type, activity_id=activity_id, + summary=summary, ) @@ -3643,6 +3674,7 @@ def start_local_activity_method( local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, + summary: Optional[str] = None, ) -> ActivityHandle[ReturnType]: ... @@ -3658,6 +3690,7 @@ def start_local_activity_method( local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, + summary: Optional[str] = None, ) -> ActivityHandle[ReturnType]: ... @@ -3674,6 +3707,7 @@ def start_local_activity_method( local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, + summary: Optional[str] = None, ) -> ActivityHandle[ReturnType]: ... @@ -3690,6 +3724,7 @@ def start_local_activity_method( local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, + summary: Optional[str] = None, ) -> ActivityHandle[ReturnType]: ... @@ -3706,6 +3741,7 @@ def start_local_activity_method( local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, + summary: Optional[str] = None, ) -> ActivityHandle[ReturnType]: ... @@ -3722,6 +3758,7 @@ def start_local_activity_method( local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, + summary: Optional[str] = None, ) -> ActivityHandle[ReturnType]: ... @@ -3737,6 +3774,7 @@ def start_local_activity_method( local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, + summary: Optional[str] = None, ) -> ActivityHandle[Any]: """Start a local activity from a method. @@ -3753,6 +3791,7 @@ def start_local_activity_method( local_retry_threshold=local_retry_threshold, cancellation_type=cancellation_type, activity_id=activity_id, + summary=summary, ) @@ -3768,6 +3807,7 @@ async def execute_local_activity_method( local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, + summary: Optional[str] = None, ) -> ReturnType: ... @@ -3783,6 +3823,7 @@ async def execute_local_activity_method( local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, + summary: Optional[str] = None, ) -> ReturnType: ... @@ -3799,6 +3840,7 @@ async def execute_local_activity_method( local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, + summary: Optional[str] = None, ) -> ReturnType: ... @@ -3815,6 +3857,7 @@ async def execute_local_activity_method( local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, + summary: Optional[str] = None, ) -> ReturnType: ... @@ -3831,6 +3874,7 @@ async def execute_local_activity_method( local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, + summary: Optional[str] = None, ) -> ReturnType: ... @@ -3847,6 +3891,7 @@ async def execute_local_activity_method( local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, + summary: Optional[str] = None, ) -> ReturnType: ... @@ -3862,6 +3907,7 @@ async def execute_local_activity_method( local_retry_threshold: Optional[timedelta] = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, activity_id: Optional[str] = None, + summary: Optional[str] = None, ) -> Any: """Start a local activity from a method and wait for completion. @@ -3880,6 +3926,7 @@ async def execute_local_activity_method( local_retry_threshold=local_retry_threshold, cancellation_type=cancellation_type, activity_id=activity_id, + summary=summary, ) diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index 52dd97d8e..643058f5d 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -2564,6 +2564,7 @@ async def run(self) -> None: local_activity_config = workflow.LocalActivityConfig( retry_policy=retry_policy, schedule_to_close_timeout=timedelta(seconds=5), + summary="Summary", ) result = await workflow.execute_local_activity( fail_until_attempt_activity, 2, **local_activity_config