Skip to content

Commit f1a642c

Browse files
authored
Merge branch 'main' into openai/mcp
2 parents 4c54c42 + a9c71aa commit f1a642c

16 files changed

+1302
-103
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ jobs:
7575
- run: poe build-develop
7676
- run: mkdir junit-xml
7777
- run: poe test ${{matrix.pytestExtraArgs}} -s --junit-xml=junit-xml/${{ matrix.python }}--${{ matrix.os }}.xml
78-
timeout-minutes: 10
78+
timeout-minutes: 15
7979
# Time skipping doesn't yet support ARM
8080
- if: ${{ !endsWith(matrix.os, '-arm') }}
8181
run: poe test ${{matrix.pytestExtraArgs}} -s --workflow-environment time-skipping --junit-xml=junit-xml/${{ matrix.python }}--${{ matrix.os }}--time-skipping.xml

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "temporalio"
3-
version = "1.15.0"
3+
version = "1.16.0"
44
description = "Temporal.io Python SDK"
55
authors = [{ name = "Temporal Technologies Inc", email = "[email protected]" }]
66
requires-python = ">=3.9"

temporalio/client.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import temporalio.converter
5858
import temporalio.exceptions
5959
import temporalio.nexus
60+
import temporalio.nexus._operation_context
6061
import temporalio.runtime
6162
import temporalio.service
6263
import temporalio.workflow
@@ -5877,6 +5878,12 @@ async def _build_start_workflow_execution_request(
58775878
)
58785879
# Links are duplicated on request for compatibility with older server versions.
58795880
req.links.extend(links)
5881+
5882+
if temporalio.nexus._operation_context._in_nexus_backing_workflow_start_context():
5883+
req.on_conflict_options.attach_request_id = True
5884+
req.on_conflict_options.attach_completion_callbacks = True
5885+
req.on_conflict_options.attach_links = True
5886+
58805887
return req
58815888

58825889
async def _build_signal_with_start_workflow_execution_request(
@@ -5932,6 +5939,7 @@ async def _populate_start_workflow_execution_request(
59325939
"temporalio.api.enums.v1.WorkflowIdConflictPolicy.ValueType",
59335940
int(input.id_conflict_policy),
59345941
)
5942+
59355943
if input.retry_policy is not None:
59365944
input.retry_policy.apply_to_proto(req.retry_policy)
59375945
req.cron_schedule = input.cron_schedule

temporalio/common.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -982,7 +982,7 @@ class Priority:
982982
2. Then consider "fairness_key" and "fairness_weight" for fairness balancing.
983983
"""
984984

985-
priority_key: Optional[int]
985+
priority_key: Optional[int] = None
986986
"""Priority key is a positive integer from 1 to n, where smaller integers correspond to higher
987987
priorities (tasks run sooner). In general, tasks in a queue should be processed in close to
988988
priority order, although small deviations are possible.
@@ -997,22 +997,22 @@ class Priority:
997997
fairness_key: Optional[str] = None
998998
"""A short string (max 64 bytes) that is used as a key for a fairness balancing mechanism.
999999
This can correspond to a tenant id or even fixed strings like "high", "low", etc.
1000-
1000+
10011001
The fairness mechanism attempts to dispatch tasks for a given key in proportion to its weight.
1002-
For example, using a thousand distinct tenant ids, each with a weight of 1.0 will result in
1002+
For example, using a thousand distinct tenant ids, each with a weight of 1.0 will result in
10031003
each tenant getting a roughly equal share of task dispatch throughput.
1004-
1004+
10051005
Default is an empty string.
10061006
"""
10071007

10081008
fairness_weight: Optional[float] = None
10091009
"""A float that represents the weight for task dispatch for the associated fairness key.
10101010
Tasks for a fairness key are dispatched in proportion to their weight.
1011-
1011+
10121012
Server will clamp the values between 0.001 and 1000. Default weight is 1.0.
1013-
1013+
10141014
The effective weight of a task is determined by the following precedence (highest to lowest):
1015-
1. Weights overridden in task queue configuration
1015+
1. Weights overridden in task queue configuration
10161016
2. Weights attached to workflow/activity
10171017
3. Default weight of 1.0
10181018
"""

temporalio/nexus/_operation_context.py

Lines changed: 60 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,15 @@
33
import dataclasses
44
import logging
55
from collections.abc import Awaitable, Mapping, MutableMapping, Sequence
6+
from contextlib import contextmanager
67
from contextvars import ContextVar
78
from dataclasses import dataclass
89
from datetime import timedelta
910
from typing import (
1011
TYPE_CHECKING,
1112
Any,
1213
Callable,
14+
Generator,
1315
Optional,
1416
Union,
1517
overload,
@@ -47,6 +49,14 @@
4749
ContextVar("temporal-cancel-operation-context")
4850
)
4951

52+
# A Nexus start handler might start zero or more workflows as usual using a Temporal client. In
53+
# addition, it may start one "nexus-backing" workflow, using
54+
# WorkflowRunOperationContext.start_workflow. This context is active while the latter is being done.
55+
# It is thus a narrower context than _temporal_start_operation_context.
56+
_temporal_nexus_backing_workflow_start_context: ContextVar[bool] = ContextVar(
57+
"temporal-nexus-backing-workflow-start-context"
58+
)
59+
5060

5161
@dataclass(frozen=True)
5262
class Info:
@@ -96,6 +106,19 @@ def _try_temporal_context() -> (
96106
return start_ctx or cancel_ctx
97107

98108

109+
@contextmanager
110+
def _nexus_backing_workflow_start_context() -> Generator[None, None, None]:
111+
token = _temporal_nexus_backing_workflow_start_context.set(True)
112+
try:
113+
yield
114+
finally:
115+
_temporal_nexus_backing_workflow_start_context.reset(token)
116+
117+
118+
def _in_nexus_backing_workflow_start_context() -> bool:
119+
return _temporal_nexus_backing_workflow_start_context.get(False)
120+
121+
99122
@dataclass
100123
class _TemporalStartOperationContext:
101124
"""Context for a Nexus start operation being handled by a Temporal Nexus Worker."""
@@ -396,56 +419,46 @@ async def start_workflow(
396419
Nexus caller is itself a workflow, this means that the workflow in the caller
397420
namespace web UI will contain links to the started workflow, and vice versa.
398421
"""
399-
# TODO(nexus-preview): When sdk-python supports on_conflict_options, Typescript does this:
400-
# if (workflowOptions.workflowIdConflictPolicy === 'USE_EXISTING') {
401-
# internalOptions.onConflictOptions = {
402-
# attachLinks: true,
403-
# attachCompletionCallbacks: true,
404-
# attachRequestId: true,
405-
# };
406-
# }
407-
if (
408-
id_conflict_policy
409-
== temporalio.common.WorkflowIDConflictPolicy.USE_EXISTING
410-
):
411-
raise RuntimeError(
412-
"WorkflowIDConflictPolicy.USE_EXISTING is not yet supported when starting a workflow "
413-
"that backs a Nexus operation (Python SDK Nexus support is at Pre-release stage)."
414-
)
415-
416422
# We must pass nexus_completion_callbacks, workflow_event_links, and request_id,
417423
# but these are deliberately not exposed in overloads, hence the type-check
418424
# violation.
419-
wf_handle = await self._temporal_context.client.start_workflow( # type: ignore
420-
workflow=workflow,
421-
arg=arg,
422-
args=args,
423-
id=id,
424-
task_queue=task_queue or self._temporal_context.info().task_queue,
425-
result_type=result_type,
426-
execution_timeout=execution_timeout,
427-
run_timeout=run_timeout,
428-
task_timeout=task_timeout,
429-
id_reuse_policy=id_reuse_policy,
430-
id_conflict_policy=id_conflict_policy,
431-
retry_policy=retry_policy,
432-
cron_schedule=cron_schedule,
433-
memo=memo,
434-
search_attributes=search_attributes,
435-
static_summary=static_summary,
436-
static_details=static_details,
437-
start_delay=start_delay,
438-
start_signal=start_signal,
439-
start_signal_args=start_signal_args,
440-
rpc_metadata=rpc_metadata,
441-
rpc_timeout=rpc_timeout,
442-
request_eager_start=request_eager_start,
443-
priority=priority,
444-
versioning_override=versioning_override,
445-
callbacks=self._temporal_context._get_callbacks(),
446-
workflow_event_links=self._temporal_context._get_workflow_event_links(),
447-
request_id=self._temporal_context.nexus_context.request_id,
448-
)
425+
426+
# Here we are starting a "nexus-backing" workflow. That means that the StartWorkflow request
427+
# contains nexus-specific data such as a completion callback (used by the handler server
428+
# namespace to deliver the result to the caller namespace when the workflow reaches a
429+
# terminal state) and inbound links to the caller workflow (attached to history events of
430+
# the workflow started in the handler namespace, and displayed in the UI).
431+
with _nexus_backing_workflow_start_context():
432+
wf_handle = await self._temporal_context.client.start_workflow( # type: ignore
433+
workflow=workflow,
434+
arg=arg,
435+
args=args,
436+
id=id,
437+
task_queue=task_queue or self._temporal_context.info().task_queue,
438+
result_type=result_type,
439+
execution_timeout=execution_timeout,
440+
run_timeout=run_timeout,
441+
task_timeout=task_timeout,
442+
id_reuse_policy=id_reuse_policy,
443+
id_conflict_policy=id_conflict_policy,
444+
retry_policy=retry_policy,
445+
cron_schedule=cron_schedule,
446+
memo=memo,
447+
search_attributes=search_attributes,
448+
static_summary=static_summary,
449+
static_details=static_details,
450+
start_delay=start_delay,
451+
start_signal=start_signal,
452+
start_signal_args=start_signal_args,
453+
rpc_metadata=rpc_metadata,
454+
rpc_timeout=rpc_timeout,
455+
request_eager_start=request_eager_start,
456+
priority=priority,
457+
versioning_override=versioning_override,
458+
callbacks=self._temporal_context._get_callbacks(),
459+
workflow_event_links=self._temporal_context._get_workflow_event_links(),
460+
request_id=self._temporal_context.nexus_context.request_id,
461+
)
449462

450463
self._temporal_context._add_outbound_links(wf_handle)
451464

temporalio/service.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import temporalio.exceptions
2727
import temporalio.runtime
2828

29-
__version__ = "1.15.0"
29+
__version__ = "1.16.0"
3030

3131
ServiceRequest = TypeVar("ServiceRequest", bound=google.protobuf.message.Message)
3232
ServiceResponse = TypeVar("ServiceResponse", bound=google.protobuf.message.Message)

temporalio/worker/_interceptor.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,7 @@ class StartNexusOperationInput(Generic[InputT, OutputT]):
298298
operation: Union[nexusrpc.Operation[InputT, OutputT], str, Callable[..., Any]]
299299
input: InputT
300300
schedule_to_close_timeout: Optional[timedelta]
301+
cancellation_type: temporalio.workflow.NexusOperationCancellationType
301302
headers: Optional[Mapping[str, str]]
302303
output_type: Optional[Type[OutputT]] = None
303304

0 commit comments

Comments
 (0)