Skip to content

Commit b39cb13

Browse files
committed
Merge remote-tracking branch 'origin/main' into otel/benign_exceptions
2 parents 3247f71 + 9372d47 commit b39cb13

File tree

11 files changed

+460
-194
lines changed

11 files changed

+460
-194
lines changed

temporalio/bridge/Cargo.lock

Lines changed: 164 additions & 185 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

temporalio/contrib/openai_agents/_temporal_openai_agents.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,7 @@ def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig:
313313
config["data_converter"] = DataConverter(
314314
payload_converter_class=_OpenAIPayloadConverter
315315
)
316-
return config
316+
return self.next_worker_plugin.configure_replayer(config)
317317

318318
@asynccontextmanager
319319
async def run_replayer(

temporalio/worker/_interceptor.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,8 @@ class StartLocalActivityInput:
352352
local_retry_threshold: Optional[timedelta]
353353
cancellation_type: temporalio.workflow.ActivityCancellationType
354354
headers: Mapping[str, temporalio.api.common.v1.Payload]
355+
summary: Optional[str]
356+
355357
# The types may be absent
356358
arg_types: Optional[List[Type]]
357359
ret_type: Optional[Type]

temporalio/worker/_workflow.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -552,6 +552,10 @@ def _create_workflow_instance(
552552
priority=temporalio.common.Priority._from_proto(init.priority),
553553
)
554554

555+
last_failure = (
556+
init.continued_failure if init.HasField("continued_failure") else None
557+
)
558+
555559
# Create instance from details
556560
det = WorkflowInstanceDetails(
557561
payload_converter_class=self._data_converter.payload_converter_class,
@@ -563,6 +567,8 @@ def _create_workflow_instance(
563567
extern_functions=self._extern_functions,
564568
disable_eager_activity_execution=self._disable_eager_activity_execution,
565569
worker_level_failure_exception_types=self._workflow_failure_exception_types,
570+
last_completion_result=init.last_completion_result,
571+
last_failure=last_failure,
566572
)
567573
if defn.sandboxed:
568574
return self._workflow_runner.create_instance(det)

temporalio/worker/_workflow_instance.py

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
import temporalio.workflow
6565
from temporalio.service import __version__
6666

67+
from ..api.failure.v1.message_pb2 import Failure
6768
from ._interceptor import (
6869
ContinueAsNewInput,
6970
ExecuteWorkflowInput,
@@ -143,6 +144,8 @@ class WorkflowInstanceDetails:
143144
extern_functions: Mapping[str, Callable]
144145
disable_eager_activity_execution: bool
145146
worker_level_failure_exception_types: Sequence[Type[BaseException]]
147+
last_completion_result: temporalio.api.common.v1.Payloads
148+
last_failure: Optional[Failure]
146149

147150

148151
class WorkflowInstance(ABC):
@@ -320,6 +323,9 @@ def __init__(self, det: WorkflowInstanceDetails) -> None:
320323
# metadata query
321324
self._current_details = ""
322325

326+
self._last_completion_result = det.last_completion_result
327+
self._last_failure = det.last_failure
328+
323329
# The versioning behavior of this workflow, as established by annotation or by the dynamic
324330
# config function. Is only set once upon initialization.
325331
self._versioning_behavior: Optional[temporalio.common.VersioningBehavior] = None
@@ -1464,6 +1470,7 @@ def workflow_start_local_activity(
14641470
local_retry_threshold: Optional[timedelta],
14651471
cancellation_type: temporalio.workflow.ActivityCancellationType,
14661472
activity_id: Optional[str],
1473+
summary: Optional[str],
14671474
) -> temporalio.workflow.ActivityHandle[Any]:
14681475
# Get activity definition if it's callable
14691476
name: str
@@ -1496,6 +1503,7 @@ def workflow_start_local_activity(
14961503
retry_policy=retry_policy,
14971504
local_retry_threshold=local_retry_threshold,
14981505
cancellation_type=cancellation_type,
1506+
summary=summary,
14991507
headers={},
15001508
arg_types=arg_types,
15011509
ret_type=ret_type,
@@ -1703,6 +1711,37 @@ def workflow_is_failure_exception(self, err: BaseException) -> bool:
17031711
)
17041712
)
17051713

1714+
def workflow_has_last_completion_result(self) -> bool:
1715+
return len(self._last_completion_result.payloads) > 0
1716+
1717+
def workflow_last_completion_result(
1718+
self, type_hint: Optional[Type]
1719+
) -> Optional[Any]:
1720+
if len(self._last_completion_result.payloads) == 0:
1721+
return None
1722+
elif len(self._last_completion_result.payloads) > 1:
1723+
warnings.warn(
1724+
f"Expected single last completion result, got {len(self._last_completion_result.payloads)}"
1725+
)
1726+
return None
1727+
1728+
if type_hint is None:
1729+
return self._payload_converter.from_payload(
1730+
self._last_completion_result.payloads[0]
1731+
)
1732+
else:
1733+
return self._payload_converter.from_payload(
1734+
self._last_completion_result.payloads[0], type_hint
1735+
)
1736+
1737+
def workflow_last_failure(self) -> Optional[BaseException]:
1738+
if self._last_failure:
1739+
return self._failure_converter.from_failure(
1740+
self._last_failure, self._payload_converter
1741+
)
1742+
1743+
return None
1744+
17061745
#### Calls from outbound impl ####
17071746
# These are in alphabetical order and all start with "_outbound_".
17081747

@@ -2766,6 +2805,10 @@ def _apply_schedule_command(
27662805
v.start_to_close_timeout.FromTimedelta(self._input.start_to_close_timeout)
27672806
if self._input.retry_policy:
27682807
self._input.retry_policy.apply_to_proto(v.retry_policy)
2808+
if self._input.summary:
2809+
command.user_metadata.summary.CopyFrom(
2810+
self._instance._payload_converter.to_payload(self._input.summary)
2811+
)
27692812
v.cancellation_type = cast(
27702813
temporalio.bridge.proto.workflow_commands.ActivityCancellationType.ValueType,
27712814
int(self._input.cancellation_type),
@@ -2787,11 +2830,6 @@ def _apply_schedule_command(
27872830
command.schedule_activity.versioning_intent = (
27882831
self._input.versioning_intent._to_proto()
27892832
)
2790-
if self._input.summary:
2791-
command.user_metadata.summary.CopyFrom(
2792-
self._instance._payload_converter.to_payload(self._input.summary)
2793-
)
2794-
print("Activity summary: ", command.user_metadata.summary)
27952833
if self._input.priority:
27962834
command.schedule_activity.priority.CopyFrom(
27972835
self._input.priority._to_proto()

temporalio/worker/workflow_sandbox/_runner.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
import temporalio.worker._workflow_instance
1919
import temporalio.workflow
2020

21+
from ...api.common.v1.message_pb2 import Payloads
22+
from ...api.failure.v1.message_pb2 import Failure
23+
2124
# Workflow instance has to be relative import
2225
from .._workflow_instance import (
2326
UnsandboxedWorkflowRunner,
@@ -84,6 +87,8 @@ def prepare_workflow(self, defn: temporalio.workflow._Definition) -> None:
8487
extern_functions={},
8588
disable_eager_activity_execution=False,
8689
worker_level_failure_exception_types=self._worker_level_failure_exception_types,
90+
last_completion_result=Payloads(),
91+
last_failure=Failure(),
8792
),
8893
)
8994

0 commit comments

Comments
 (0)