Skip to content

Commit 5c76eba

Browse files
authored
Merge branch 'main' into otel-refactor
2 parents 0dfeaf8 + 28f43f1 commit 5c76eba

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+8341
-570
lines changed

.github/workflows/build-binaries.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ jobs:
6565
if [ "$RUNNER_OS" = "Windows" ]; then
6666
bindir=Scripts
6767
fi
68-
./.venv/$bindir/pip install 'protobuf>=3.20,<6' 'types-protobuf>=3.20,<6' 'typing-extensions>=4.2.0,<5' pytest pytest_asyncio grpcio pydantic opentelemetry-api opentelemetry-sdk python-dateutil openai-agents
68+
./.venv/$bindir/pip install 'protobuf>=3.20,<6' 'types-protobuf>=3.20,<6' 'typing-extensions>=4.2.0,<5' pytest pytest_asyncio grpcio 'nexus-rpc>=1.1.0' pydantic opentelemetry-api opentelemetry-sdk python-dateutil openai-agents
6969
./.venv/$bindir/pip install --no-index --find-links=../dist temporalio
7070
./.venv/$bindir/python -m pytest -s -k test_workflow_hello
7171

.github/workflows/ci.yml

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ jobs:
2828
python: "3.13"
2929
docsTarget: true
3030
cloudTestTarget: true
31+
openaiTestTarget: true
3132
clippyLinter: true
3233
- os: ubuntu-latest
3334
python: "3.9"
@@ -75,8 +76,6 @@ jobs:
7576
- run: mkdir junit-xml
7677
- run: poe test ${{matrix.pytestExtraArgs}} -s --junit-xml=junit-xml/${{ matrix.python }}--${{ matrix.os }}.xml
7778
timeout-minutes: 10
78-
env:
79-
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
8079
# Time skipping doesn't yet support ARM
8180
- if: ${{ !endsWith(matrix.os, '-arm') }}
8281
run: poe test ${{matrix.pytestExtraArgs}} -s --workflow-environment time-skipping --junit-xml=junit-xml/${{ matrix.python }}--${{ matrix.os }}--time-skipping.xml
@@ -89,6 +88,11 @@ jobs:
8988
TEMPORAL_CLIENT_CLOUD_API_KEY: ${{ secrets.TEMPORAL_CLIENT_CLOUD_API_KEY }}
9089
TEMPORAL_CLIENT_CLOUD_API_VERSION: 2024-05-13-00
9190
TEMPORAL_CLIENT_CLOUD_NAMESPACE: sdk-ci.a2dd6
91+
- if: ${{ matrix.openaiTestTarget && (github.event.pull_request.head.repo.full_name == '' || github.event.pull_request.head.repo.full_name == 'temporalio/sdk-python') }}
92+
run: poe test tests/contrib/openai_agents/test_openai.py ${{matrix.pytestExtraArgs}} -s --junit-xml=junit-xml/${{ matrix.python }}--${{ matrix.os }}--openai.xml
93+
timeout-minutes: 10
94+
env:
95+
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
9296
- name: "Upload junit-xml artifacts"
9397
uses: actions/upload-artifact@v4
9498
if: always()

README.md

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ informal introduction to the features and their implementation.
9494
- [Heartbeating and Cancellation](#heartbeating-and-cancellation)
9595
- [Worker Shutdown](#worker-shutdown)
9696
- [Testing](#testing-1)
97+
- [Nexus](#nexus)
9798
- [Workflow Replay](#workflow-replay)
9899
- [Observability](#observability)
99100
- [Metrics](#metrics)
@@ -1308,6 +1309,113 @@ affect calls activity code might make to functions on the `temporalio.activity`
13081309
* `cancel()` can be invoked to simulate a cancellation of the activity
13091310
* `worker_shutdown()` can be invoked to simulate a worker shutdown during execution of the activity
13101311

1312+
1313+
### Nexus
1314+
1315+
⚠️ **Nexus support is currently at an experimental release stage. Backwards-incompatible changes are anticipated until a stable release is announced.** ⚠️
1316+
1317+
[Nexus](https://github.com/nexus-rpc/) is a synchronous RPC protocol. Arbitrary duration operations that can respond
1318+
asynchronously are modeled on top of a set of pre-defined synchronous RPCs.
1319+
1320+
Temporal supports calling Nexus operations **from a workflow**. See https://docs.temporal.io/nexus. There is no support
1321+
currently for calling a Nexus operation from non-workflow code.
1322+
1323+
To get started quickly using Nexus with Temporal, see the Python Nexus sample:
1324+
https://github.com/temporalio/samples-python/tree/nexus/hello_nexus.
1325+
1326+
1327+
Two types of Nexus operation are supported, each using a decorator:
1328+
1329+
- `@temporalio.nexus.workflow_run_operation`: a Nexus operation that is backed by a Temporal workflow. The operation
1330+
handler you write will start the handler workflow and then respond with a token indicating that the handler workflow
1331+
is in progress. When the handler workflow completes, Temporal server will automatically deliver the result (success or
1332+
failure) to the caller workflow.
1333+
- `@nexusrpc.handler.sync_operation`: an operation that responds synchronously. It may be `def` or `async def` and it
1334+
may do network I/O, but it must respond within 10 seconds.
1335+
1336+
The following steps are an overview of the [Python Nexus sample](
1337+
https://github.com/temporalio/samples-python/tree/nexus/hello_nexus).
1338+
1339+
1. Create the caller and handler namespaces, and the Nexus endpoint. For example,
1340+
```
1341+
temporal operator namespace create --namespace my-handler-namespace
1342+
temporal operator namespace create --namespace my-caller-namespace
1343+
1344+
temporal operator nexus endpoint create \
1345+
--name my-nexus-endpoint \
1346+
--target-namespace my-handler-namespace \
1347+
--target-task-queue my-handler-task-queue
1348+
```
1349+
1350+
2. Define your service contract. This specifies the names and input/output types of your operations. You will use this
1351+
to refer to the operations when calling them from a workflow.
1352+
```python
1353+
@nexusrpc.service
1354+
class MyNexusService:
1355+
my_sync_operation: nexusrpc.Operation[MyInput, MyOutput]
1356+
my_workflow_run_operation: nexusrpc.Operation[MyInput, MyOutput]
1357+
```
1358+
1359+
3. Implement your operation handlers in a service handler:
1360+
```python
1361+
@service_handler(service=MyNexusService)
1362+
class MyNexusServiceHandler:
1363+
@sync_operation
1364+
async def my_sync_operation(
1365+
self, ctx: StartOperationContext, input: MyInput
1366+
) -> MyOutput:
1367+
return MyOutput(message=f"Hello {input.name} from sync operation!")
1368+
1369+
@workflow_run_operation
1370+
async def my_workflow_run_operation(
1371+
self, ctx: WorkflowRunOperationContext, input: MyInput
1372+
) -> nexus.WorkflowHandle[MyOutput]:
1373+
return await ctx.start_workflow(
1374+
WorkflowStartedByNexusOperation.run,
1375+
input,
1376+
id=str(uuid.uuid4()),
1377+
)
1378+
```
1379+
1380+
4. Register your service handler with a Temporal worker.
1381+
```python
1382+
client = await Client.connect("localhost:7233", namespace="my-handler-namespace")
1383+
worker = Worker(
1384+
client,
1385+
task_queue="my-handler-task-queue",
1386+
workflows=[WorkflowStartedByNexusOperation],
1387+
nexus_service_handlers=[MyNexusServiceHandler()],
1388+
)
1389+
await worker.run()
1390+
```
1391+
1392+
5. Call your Nexus operations from your caller workflow.
1393+
```python
1394+
@workflow.defn
1395+
class CallerWorkflow:
1396+
def __init__(self):
1397+
self.nexus_client = workflow.create_nexus_client(
1398+
service=MyNexusService, endpoint="my-nexus-endpoint"
1399+
)
1400+
1401+
@workflow.run
1402+
async def run(self, name: str) -> tuple[MyOutput, MyOutput]:
1403+
# Start the Nexus operation and wait for the result in one go, using execute_operation.
1404+
wf_result = await self.nexus_client.execute_operation(
1405+
MyNexusService.my_workflow_run_operation,
1406+
MyInput(name),
1407+
)
1408+
# Or alternatively, obtain the operation handle using start_operation,
1409+
# and then use it to get the result:
1410+
sync_operation_handle = await self.nexus_client.start_operation(
1411+
MyNexusService.my_sync_operation,
1412+
MyInput(name),
1413+
)
1414+
sync_result = await sync_operation_handle
1415+
return sync_result, wf_result
1416+
```
1417+
1418+
13111419
### Workflow Replay
13121420
13131421
Given a workflow's history, it can be replayed locally to check for things like non-determinism errors. For example,

pyproject.toml

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,17 @@
11
[project]
22
name = "temporalio"
3-
version = "1.13.0"
3+
version = "1.14.1"
44
description = "Temporal.io Python SDK"
55
authors = [{ name = "Temporal Technologies Inc", email = "[email protected]" }]
6-
requires-python = "~=3.9"
6+
requires-python = ">=3.9"
77
readme = "README.md"
88
license = { file = "LICENSE" }
99
keywords = [
1010
"temporal",
1111
"workflow",
1212
]
1313
dependencies = [
14+
"nexus-rpc>=1.1.0",
1415
"protobuf>=3.20,<6",
1516
"python-dateutil>=2.8.2,<3 ; python_version < '3.11'",
1617
"types-protobuf>=3.20",
@@ -25,7 +26,7 @@ opentelemetry = [
2526
]
2627
pydantic = ["pydantic>=2.0.0,<3"]
2728
openai-agents = [
28-
"openai-agents >= 0.0.19,<0.1",
29+
"openai-agents >= 0.1,<0.2",
2930
"eval-type-backport>=0.2.2; python_version < '3.10'"
3031
]
3132

@@ -44,7 +45,7 @@ dev = [
4445
"psutil>=5.9.3,<6",
4546
"pydocstyle>=6.3.0,<7",
4647
"pydoctor>=24.11.1,<25",
47-
"pyright==1.1.377",
48+
"pyright==1.1.402",
4849
"pytest~=7.4",
4950
"pytest-asyncio>=0.21,<0.22",
5051
"pytest-timeout~=2.2",
@@ -53,6 +54,8 @@ dev = [
5354
"twine>=4.0.1,<5",
5455
"ruff>=0.5.0,<0.6",
5556
"maturin>=1.8.2",
57+
"pytest-cov>=6.1.1",
58+
"httpx>=0.28.1",
5659
"pytest-pretty>=1.3.0",
5760
]
5861

@@ -155,13 +158,24 @@ project-name = "Temporal Python"
155158
sidebar-expand-depth = 2
156159

157160
[tool.pyright]
161+
reportUnknownVariableType = "none"
162+
reportUnknownParameterType = "none"
163+
reportUnusedCallResult = "none"
164+
reportImplicitStringConcatenation = "none"
165+
reportPrivateUsage = "none"
166+
reportExplicitAny = "none"
167+
reportMissingTypeArgument = "none"
168+
reportAny = "none"
169+
enableTypeIgnoreComments = true
170+
158171
include = ["temporalio", "tests"]
159172
exclude = [
160173
"temporalio/api",
161174
"temporalio/bridge/proto",
162175
"tests/worker/workflow_sandbox/testmodules/proto",
163176
"temporalio/bridge/worker.py",
164177
"temporalio/contrib/opentelemetry.py",
178+
"temporalio/contrib/pydantic.py",
165179
"temporalio/converter.py",
166180
"temporalio/testing/_workflow.py",
167181
"temporalio/worker/_activity.py",
@@ -173,6 +187,10 @@ exclude = [
173187
"tests/api/test_grpc_stub.py",
174188
"tests/conftest.py",
175189
"tests/contrib/test_opentelemetry.py",
190+
"tests/contrib/pydantic/models.py",
191+
"tests/contrib/pydantic/models_2.py",
192+
"tests/contrib/pydantic/test_pydantic.py",
193+
"tests/contrib/pydantic/workflows.py",
176194
"tests/test_converter.py",
177195
"tests/test_service.py",
178196
"tests/test_workflow.py",

temporalio/bridge/src/worker.rs

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use temporal_sdk_core_api::worker::{
1919
};
2020
use temporal_sdk_core_api::Worker;
2121
use temporal_sdk_core_protos::coresdk::workflow_completion::WorkflowActivationCompletion;
22-
use temporal_sdk_core_protos::coresdk::{ActivityHeartbeat, ActivityTaskCompletion};
22+
use temporal_sdk_core_protos::coresdk::{ActivityHeartbeat, ActivityTaskCompletion, nexus::NexusTaskCompletion};
2323
use temporal_sdk_core_protos::temporal::api::history::v1::History;
2424
use tokio::sync::mpsc::{channel, Sender};
2525
use tokio_stream::wrappers::ReceiverStream;
@@ -60,6 +60,7 @@ pub struct WorkerConfig {
6060
graceful_shutdown_period_millis: u64,
6161
nondeterminism_as_workflow_fail: bool,
6262
nondeterminism_as_workflow_fail_for_types: HashSet<String>,
63+
nexus_task_poller_behavior: PollerBehavior,
6364
}
6465

6566
#[derive(FromPyObject)]
@@ -557,6 +558,18 @@ impl WorkerRef {
557558
})
558559
}
559560

561+
fn poll_nexus_task<'p>(&self, py: Python<'p>) -> PyResult<Bound<'p, PyAny>> {
562+
let worker = self.worker.as_ref().unwrap().clone();
563+
self.runtime.future_into_py(py, async move {
564+
let bytes = match worker.poll_nexus_task().await {
565+
Ok(task) => task.encode_to_vec(),
566+
Err(PollError::ShutDown) => return Err(PollShutdownError::new_err(())),
567+
Err(err) => return Err(PyRuntimeError::new_err(format!("Poll failure: {err}"))),
568+
};
569+
Ok(bytes)
570+
})
571+
}
572+
560573
fn complete_workflow_activation<'p>(
561574
&self,
562575
py: Python<'p>,
@@ -591,6 +604,22 @@ impl WorkerRef {
591604
})
592605
}
593606

607+
fn complete_nexus_task<'p>(&self,
608+
py: Python<'p>,
609+
proto: &Bound<'_, PyBytes>,
610+
) -> PyResult<Bound<'p, PyAny>> {
611+
let worker = self.worker.as_ref().unwrap().clone();
612+
let completion = NexusTaskCompletion::decode(proto.as_bytes())
613+
.map_err(|err| PyValueError::new_err(format!("Invalid proto: {err}")))?;
614+
self.runtime.future_into_py(py, async move {
615+
worker
616+
.complete_nexus_task(completion)
617+
.await
618+
.context("Completion failure")
619+
.map_err(Into::into)
620+
})
621+
}
622+
594623
fn record_activity_heartbeat(&self, proto: &Bound<'_, PyBytes>) -> PyResult<()> {
595624
enter_sync!(self.runtime);
596625
let heartbeat = ActivityHeartbeat::decode(proto.as_bytes())
@@ -688,6 +717,7 @@ fn convert_worker_config(
688717
})
689718
.collect::<HashMap<String, HashSet<WorkflowErrorType>>>(),
690719
)
720+
.nexus_task_poller_behavior(conf.nexus_task_poller_behavior)
691721
.build()
692722
.map_err(|err| PyValueError::new_err(format!("Invalid worker config: {err}")))
693723
}

0 commit comments

Comments
 (0)