From 041b5447660b862a4eaa8236af238bdd1fd90a29 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Mon, 14 Jul 2025 16:30:05 -0400 Subject: [PATCH 1/5] Install nexus-rpc from GitHub --- .github/workflows/build-binaries.yml | 5 +++++ pyproject.toml | 3 +++ uv.lock | 8 ++------ 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/.github/workflows/build-binaries.yml b/.github/workflows/build-binaries.yml index 3c7834b5b..be818370f 100644 --- a/.github/workflows/build-binaries.yml +++ b/.github/workflows/build-binaries.yml @@ -74,3 +74,8 @@ jobs: with: name: packages-${{ matrix.package-suffix }} path: dist + + - name: Deliberately fail to prevent releasing nexus-rpc w/ GitHub link in pyproject.toml + run: | + echo "This is a deliberate failure to prevent releasing nexus-rpc with a GitHub link in pyproject.toml" + exit 1 \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 5f0eb916a..2dc122f14 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -236,3 +236,6 @@ exclude = [ [tool.uv] # Prevent uv commands from building the package by default package = false + +[tool.uv.sources] +nexus-rpc = { git = "https://github.com/nexus-rpc/sdk-python.git", rev = "35f574c711193a6e2560d3e6665732a5bb7ae92c" } diff --git a/uv.lock b/uv.lock index 356a2a313..638b7a118 100644 --- a/uv.lock +++ b/uv.lock @@ -1044,14 +1044,10 @@ wheels = [ [[package]] name = "nexus-rpc" version = "1.1.0" -source = { registry = "https://pypi.org/simple" } +source = { git = "https://github.com/nexus-rpc/sdk-python.git?rev=35f574c711193a6e2560d3e6665732a5bb7ae92c#35f574c711193a6e2560d3e6665732a5bb7ae92c" } dependencies = [ { name = "typing-extensions" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/ef/66/540687556bd28cf1ec370cc6881456203dfddb9dab047b8979c6865b5984/nexus_rpc-1.1.0.tar.gz", hash = "sha256:d65ad6a2f54f14e53ebe39ee30555eaeb894102437125733fb13034a04a44553", size = 77383, upload-time = "2025-07-07T19:03:58.368Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/bf/2f/9e9d0dcaa4c6ffa22b7aa31069a8a264c753ff8027b36af602cce038c92f/nexus_rpc-1.1.0-py3-none-any.whl", hash = "sha256:d1b007af2aba186a27e736f8eaae39c03aed05b488084ff6c3d1785c9ba2ad38", size = 27743, upload-time = "2025-07-07T19:03:57.556Z" }, -] [[package]] name = "nh3" @@ -1761,7 +1757,7 @@ dev = [ requires-dist = [ { name = "eval-type-backport", marker = "python_full_version < '3.10' and extra == 'openai-agents'", specifier = ">=0.2.2" }, { name = "grpcio", marker = "extra == 'grpc'", specifier = ">=1.48.2,<2" }, - { name = "nexus-rpc", specifier = ">=1.1.0" }, + { name = "nexus-rpc", git = "https://github.com/nexus-rpc/sdk-python.git?rev=35f574c711193a6e2560d3e6665732a5bb7ae92c" }, { name = "openai-agents", marker = "extra == 'openai-agents'", specifier = ">=0.1,<0.2" }, { name = "opentelemetry-api", marker = "extra == 'opentelemetry'", specifier = ">=1.11.1,<2" }, { name = "opentelemetry-sdk", marker = "extra == 'opentelemetry'", specifier = ">=1.11.1,<2" }, From bc8fd8f105a48a248a14580ad0577692f0c74787 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Sat, 12 Jul 2025 09:50:50 -0400 Subject: [PATCH 2/5] Respond to upstream: {get,set}_operation_definition -> {get,set}_operation --- temporalio/nexus/_decorators.py | 2 +- temporalio/nexus/_util.py | 10 +++++----- .../test_dynamic_creation_of_user_handler_classes.py | 4 ++-- tests/nexus/test_handler.py | 5 +---- 4 files changed, 9 insertions(+), 12 deletions(-) diff --git a/temporalio/nexus/_decorators.py b/temporalio/nexus/_decorators.py index 1266fd29e..d8675afdb 100644 --- a/temporalio/nexus/_decorators.py +++ b/temporalio/nexus/_decorators.py @@ -123,7 +123,7 @@ async def _start( return WorkflowRunOperationHandler(_start, input_type, output_type) method_name = get_callable_name(start) - nexusrpc.set_operation_definition( + nexusrpc.set_operation( operation_handler_factory, nexusrpc.Operation( name=name or method_name, diff --git a/temporalio/nexus/_util.py b/temporalio/nexus/_util.py index ef005d0c4..553dc683d 100644 --- a/temporalio/nexus/_util.py +++ b/temporalio/nexus/_util.py @@ -129,15 +129,15 @@ def get_operation_factory( ``obj`` should be a decorated operation start method. """ - op_defn = nexusrpc.get_operation_definition(obj) - if op_defn: + op = nexusrpc.get_operation(obj) + if op: factory = obj else: if factory := getattr(obj, "__nexus_operation_factory__", None): - op_defn = nexusrpc.get_operation_definition(factory) - if not isinstance(op_defn, nexusrpc.Operation): + op = nexusrpc.get_operation(factory) + if not isinstance(op, nexusrpc.Operation): return None, None - return factory, op_defn + return factory, op # TODO(nexus-preview) Copied from nexusrpc diff --git a/tests/nexus/test_dynamic_creation_of_user_handler_classes.py b/tests/nexus/test_dynamic_creation_of_user_handler_classes.py index 0eef14b84..018cf569d 100644 --- a/tests/nexus/test_dynamic_creation_of_user_handler_classes.py +++ b/tests/nexus/test_dynamic_creation_of_user_handler_classes.py @@ -78,8 +78,8 @@ async def test_run_nexus_service_from_programmatically_created_service_handler( service_handler = nexusrpc.handler._core.ServiceHandler( service=nexusrpc.ServiceDefinition( name="MyService", - operations={ - "increment": nexusrpc.Operation[int, int]( + operation_definitions={ + "increment": nexusrpc.OperationDefinition[int, int]( name="increment", method_name="increment", input_type=int, diff --git a/tests/nexus/test_handler.py b/tests/nexus/test_handler.py index c805a967c..81cb760b4 100644 --- a/tests/nexus/test_handler.py +++ b/tests/nexus/test_handler.py @@ -773,10 +773,7 @@ async def test_start_operation_without_type_annotations( def test_operation_without_type_annotations_without_service_definition_raises_validation_error(): - with pytest.raises( - ValueError, - match=r"has no input type.+has no output type", - ): + with pytest.raises(ValueError, match=r"has no input type"): service_handler(MyServiceHandlerWithOperationsWithoutTypeAnnotations) From 14fe03bddeadf95d5a3c6dbc3a8a3ec6b88e005d Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Sun, 13 Jul 2025 23:34:12 -0400 Subject: [PATCH 3/5] Respond to upstream: do not use get_operation_factory --- temporalio/nexus/_util.py | 23 ------- temporalio/worker/_interceptor.py | 8 +-- ...ynamic_creation_of_user_handler_classes.py | 69 ------------------- .../test_handler_operation_definitions.py | 3 +- 4 files changed, 4 insertions(+), 99 deletions(-) diff --git a/temporalio/nexus/_util.py b/temporalio/nexus/_util.py index 553dc683d..4c9d5997b 100644 --- a/temporalio/nexus/_util.py +++ b/temporalio/nexus/_util.py @@ -13,7 +13,6 @@ TypeVar, ) -import nexusrpc from nexusrpc import ( InputT, OutputT, @@ -118,28 +117,6 @@ def get_callable_name(fn: Callable[..., Any]) -> str: return method_name -# TODO(nexus-preview) Copied from nexusrpc -def get_operation_factory( - obj: Any, -) -> tuple[ - Optional[Callable[[Any], Any]], - Optional[nexusrpc.Operation[Any, Any]], -]: - """Return the :py:class:`Operation` for the object along with the factory function. - - ``obj`` should be a decorated operation start method. - """ - op = nexusrpc.get_operation(obj) - if op: - factory = obj - else: - if factory := getattr(obj, "__nexus_operation_factory__", None): - op = nexusrpc.get_operation(factory) - if not isinstance(op, nexusrpc.Operation): - return None, None - return factory, op - - # TODO(nexus-preview) Copied from nexusrpc def set_operation_factory( obj: Any, diff --git a/temporalio/worker/_interceptor.py b/temporalio/worker/_interceptor.py index 1b412cb7f..32ce66e0b 100644 --- a/temporalio/worker/_interceptor.py +++ b/temporalio/worker/_interceptor.py @@ -299,15 +299,14 @@ class StartNexusOperationInput(Generic[InputT, OutputT]): input: InputT schedule_to_close_timeout: Optional[timedelta] headers: Optional[Mapping[str, str]] - output_type: Optional[Type[OutputT]] = None + output_type: Optional[type[OutputT]] = None def __post_init__(self) -> None: """Initialize operation-specific attributes after dataclass creation.""" if isinstance(self.operation, nexusrpc.Operation): self.output_type = self.operation.output_type elif callable(self.operation): - _, op = temporalio.nexus._util.get_operation_factory(self.operation) - if isinstance(op, nexusrpc.Operation): + if op := nexusrpc.get_operation(self.operation): self.output_type = op.output_type else: raise ValueError( @@ -326,8 +325,7 @@ def operation_name(self) -> str: elif isinstance(self.operation, str): return self.operation elif callable(self.operation): - _, op = temporalio.nexus._util.get_operation_factory(self.operation) - if isinstance(op, nexusrpc.Operation): + if op := nexusrpc.get_operation(self.operation): return op.name else: raise ValueError( diff --git a/tests/nexus/test_dynamic_creation_of_user_handler_classes.py b/tests/nexus/test_dynamic_creation_of_user_handler_classes.py index 018cf569d..3df085d01 100644 --- a/tests/nexus/test_dynamic_creation_of_user_handler_classes.py +++ b/tests/nexus/test_dynamic_creation_of_user_handler_classes.py @@ -3,11 +3,9 @@ import httpx import nexusrpc.handler import pytest -from nexusrpc.handler import sync_operation from temporalio import nexus, workflow from temporalio.client import Client -from temporalio.nexus._util import get_operation_factory from temporalio.testing import WorkflowEnvironment from temporalio.worker import Worker from tests.helpers.nexus import ServiceClient, create_nexus_endpoint @@ -107,70 +105,3 @@ async def test_run_nexus_service_from_programmatically_created_service_handler( json=1, ) assert response.status_code == 201 - - -def make_incrementer_user_service_definition_and_service_handler_classes( - op_names: list[str], -) -> tuple[type, type]: - # - # service contract - # - - ops = {name: nexusrpc.Operation[int, int] for name in op_names} - service_cls: type = nexusrpc.service(type("ServiceContract", (), ops)) - - # - # service handler - # - @sync_operation - async def _increment_op( - self, - ctx: nexusrpc.handler.StartOperationContext, - input: int, - ) -> int: - return input + 1 - - op_handler_factories = {} - for name in op_names: - op_handler_factory, _ = get_operation_factory(_increment_op) - assert op_handler_factory - op_handler_factories[name] = op_handler_factory - - handler_cls: type = nexusrpc.handler.service_handler(service=service_cls)( - type("ServiceImpl", (), op_handler_factories) - ) - - return service_cls, handler_cls - - -@pytest.mark.skip( - reason="Dynamic creation of service contract using type() is not supported" -) -async def test_dynamic_creation_of_user_handler_classes( - client: Client, env: WorkflowEnvironment -): - task_queue = str(uuid.uuid4()) - - service_cls, handler_cls = ( - make_incrementer_user_service_definition_and_service_handler_classes( - ["increment"] - ) - ) - - assert (service_defn := nexusrpc.get_service_definition(service_cls)) - service_name = service_defn.name - - endpoint = (await create_nexus_endpoint(task_queue, client)).endpoint.id - async with Worker( - client, - task_queue=task_queue, - nexus_service_handlers=[handler_cls()], - ): - server_address = ServiceClient.default_server_address(env) - async with httpx.AsyncClient() as http_client: - response = await http_client.post( - f"http://{server_address}/nexus/endpoints/{endpoint}/services/{service_name}/increment", - json=1, - ) - assert response.status_code == 200 - assert response.json() == 2 diff --git a/tests/nexus/test_handler_operation_definitions.py b/tests/nexus/test_handler_operation_definitions.py index 8e41c1efa..82a0682fb 100644 --- a/tests/nexus/test_handler_operation_definitions.py +++ b/tests/nexus/test_handler_operation_definitions.py @@ -11,7 +11,6 @@ from temporalio import nexus from temporalio.nexus import WorkflowRunOperationContext, workflow_run_operation -from temporalio.nexus._util import get_operation_factory @dataclass @@ -96,7 +95,7 @@ async def test_collected_operation_names( assert isinstance(service_defn, nexusrpc.ServiceDefinition) assert service_defn.name == "Service" for method_name, expected_op in test_case.expected_operations.items(): - _, actual_op = get_operation_factory(getattr(test_case.Service, method_name)) + actual_op = nexusrpc.get_operation(getattr(test_case.Service, method_name)) assert isinstance(actual_op, nexusrpc.Operation) assert actual_op.name == expected_op.name assert actual_op.input_type == expected_op.input_type From 46232b092be584b6d92df0f53d37c4170844b1c2 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Mon, 14 Jul 2025 16:33:39 -0400 Subject: [PATCH 4/5] Don't warn on lack of type annotations on op handler --- temporalio/nexus/_util.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/temporalio/nexus/_util.py b/temporalio/nexus/_util.py index 4c9d5997b..70793be64 100644 --- a/temporalio/nexus/_util.py +++ b/temporalio/nexus/_util.py @@ -78,19 +78,10 @@ def _get_start_method_input_and_output_type_annotations( try: type_annotations = typing.get_type_hints(start) except TypeError: - warnings.warn( - f"Expected decorated start method {start} to have type annotations" - ) return None, None output_type = type_annotations.pop("return", None) if len(type_annotations) != 2: - suffix = f": {type_annotations}" if type_annotations else "" - warnings.warn( - f"Expected decorated start method {start} to have exactly 2 " - f"type-annotated parameters (ctx and input), but it has {len(type_annotations)}" - f"{suffix}." - ) input_type = None else: ctx_type, input_type = type_annotations.values() From eafaa767e5a0fa833990a76cd24f4af120202e77 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Sun, 13 Jul 2025 11:30:10 -0400 Subject: [PATCH 5/5] Nexus typing fixes --- temporalio/workflow.py | 36 ++++++++++++++--------------- tests/nexus/test_handler.py | 10 ++++++-- tests/nexus/test_workflow_caller.py | 11 +++++---- 3 files changed, 32 insertions(+), 25 deletions(-) diff --git a/temporalio/workflow.py b/temporalio/workflow.py index df78664ca..a627cec1d 100644 --- a/temporalio/workflow.py +++ b/temporalio/workflow.py @@ -5145,7 +5145,7 @@ async def start_operation( operation: nexusrpc.Operation[InputT, OutputT], input: InputT, *, - output_type: Optional[Type[OutputT]] = None, + output_type: Optional[type[OutputT]] = None, schedule_to_close_timeout: Optional[timedelta] = None, headers: Optional[Mapping[str, str]] = None, ) -> NexusOperationHandle[OutputT]: ... @@ -5158,7 +5158,7 @@ async def start_operation( operation: str, input: Any, *, - output_type: Optional[Type[OutputT]] = None, + output_type: Optional[type[OutputT]] = None, schedule_to_close_timeout: Optional[timedelta] = None, headers: Optional[Mapping[str, str]] = None, ) -> NexusOperationHandle[OutputT]: ... @@ -5174,7 +5174,7 @@ async def start_operation( ], input: InputT, *, - output_type: Optional[Type[OutputT]] = None, + output_type: Optional[type[OutputT]] = None, schedule_to_close_timeout: Optional[timedelta] = None, headers: Optional[Mapping[str, str]] = None, ) -> NexusOperationHandle[OutputT]: ... @@ -5190,7 +5190,7 @@ async def start_operation( ], input: InputT, *, - output_type: Optional[Type[OutputT]] = None, + output_type: Optional[type[OutputT]] = None, schedule_to_close_timeout: Optional[timedelta] = None, headers: Optional[Mapping[str, str]] = None, ) -> NexusOperationHandle[OutputT]: ... @@ -5206,7 +5206,7 @@ async def start_operation( ], input: InputT, *, - output_type: Optional[Type[OutputT]] = None, + output_type: Optional[type[OutputT]] = None, schedule_to_close_timeout: Optional[timedelta] = None, headers: Optional[Mapping[str, str]] = None, ) -> NexusOperationHandle[OutputT]: ... @@ -5217,7 +5217,7 @@ async def start_operation( operation: Any, input: Any, *, - output_type: Optional[Type[OutputT]] = None, + output_type: Optional[type[OutputT]] = None, schedule_to_close_timeout: Optional[timedelta] = None, headers: Optional[Mapping[str, str]] = None, ) -> Any: @@ -5246,7 +5246,7 @@ async def execute_operation( operation: nexusrpc.Operation[InputT, OutputT], input: InputT, *, - output_type: Optional[Type[OutputT]] = None, + output_type: Optional[type[OutputT]] = None, schedule_to_close_timeout: Optional[timedelta] = None, headers: Optional[Mapping[str, str]] = None, ) -> OutputT: ... @@ -5259,7 +5259,7 @@ async def execute_operation( operation: str, input: Any, *, - output_type: Optional[Type[OutputT]] = None, + output_type: Optional[type[OutputT]] = None, schedule_to_close_timeout: Optional[timedelta] = None, headers: Optional[Mapping[str, str]] = None, ) -> OutputT: ... @@ -5275,7 +5275,7 @@ async def execute_operation( ], input: InputT, *, - output_type: Optional[Type[OutputT]] = None, + output_type: Optional[type[OutputT]] = None, schedule_to_close_timeout: Optional[timedelta] = None, headers: Optional[Mapping[str, str]] = None, ) -> OutputT: ... @@ -5291,7 +5291,7 @@ async def execute_operation( ], input: InputT, *, - output_type: Optional[Type[OutputT]] = None, + output_type: Optional[type[OutputT]] = None, schedule_to_close_timeout: Optional[timedelta] = None, headers: Optional[Mapping[str, str]] = None, ) -> OutputT: ... @@ -5307,7 +5307,7 @@ async def execute_operation( ], input: InputT, *, - output_type: Optional[Type[OutputT]] = None, + output_type: Optional[type[OutputT]] = None, schedule_to_close_timeout: Optional[timedelta] = None, headers: Optional[Mapping[str, str]] = None, ) -> OutputT: ... @@ -5318,7 +5318,7 @@ async def execute_operation( operation: Any, input: Any, *, - output_type: Optional[Type[OutputT]] = None, + output_type: Optional[type[OutputT]] = None, schedule_to_close_timeout: Optional[timedelta] = None, headers: Optional[Mapping[str, str]] = None, ) -> Any: @@ -5342,7 +5342,7 @@ def __init__( self, *, endpoint: str, - service: Union[Type[ServiceT], str], + service: Union[type[ServiceT], str], ) -> None: """Create a Nexus client. @@ -5369,7 +5369,7 @@ async def start_operation( operation: Any, input: Any, *, - output_type: Optional[Type] = None, + output_type: Optional[type] = None, schedule_to_close_timeout: Optional[timedelta] = None, headers: Optional[Mapping[str, str]] = None, ) -> Any: @@ -5390,7 +5390,7 @@ async def execute_operation( operation: Any, input: Any, *, - output_type: Optional[Type] = None, + output_type: Optional[type] = None, schedule_to_close_timeout: Optional[timedelta] = None, headers: Optional[Mapping[str, str]] = None, ) -> Any: @@ -5407,7 +5407,7 @@ async def execute_operation( @overload def create_nexus_client( *, - service: Type[ServiceT], + service: type[ServiceT], endpoint: str, ) -> NexusClient[ServiceT]: ... @@ -5422,9 +5422,9 @@ def create_nexus_client( def create_nexus_client( *, - service: Union[Type[ServiceT], str], + service: Union[type[ServiceT], str], endpoint: str, -) -> NexusClient[ServiceT]: +) -> NexusClient[Any]: """Create a Nexus client. .. warning:: diff --git a/tests/nexus/test_handler.py b/tests/nexus/test_handler.py index 81cb760b4..d95db5731 100644 --- a/tests/nexus/test_handler.py +++ b/tests/nexus/test_handler.py @@ -45,6 +45,7 @@ sync_operation, ) from nexusrpc.handler._decorators import operation_handler +from typing_extensions import dataclass_transform from temporalio import nexus, workflow from temporalio.client import Client @@ -328,12 +329,17 @@ class UnsuccessfulResponse: headers: Mapping[str, str] = UNSUCCESSFUL_RESPONSE_HEADERS -class _TestCase: +@dataclass_transform() +class _BaseTestCase: + pass + + +class _TestCase(_BaseTestCase): operation: str + expected: SuccessfulResponse service_defn: str = "MyService" input: Input = Input("") headers: dict[str, str] = {} - expected: SuccessfulResponse expected_without_service_definition: Optional[SuccessfulResponse] = None skip = "" diff --git a/tests/nexus/test_workflow_caller.py b/tests/nexus/test_workflow_caller.py index c9417ef58..085febb78 100644 --- a/tests/nexus/test_workflow_caller.py +++ b/tests/nexus/test_workflow_caller.py @@ -253,11 +253,12 @@ def __init__( request_cancel: bool, task_queue: str, ) -> None: + service: type[Any] = { + CallerReference.IMPL_WITH_INTERFACE: ServiceImpl, + CallerReference.INTERFACE: ServiceInterface, + }[input.op_input.caller_reference] self.nexus_client = workflow.create_nexus_client( - service={ - CallerReference.IMPL_WITH_INTERFACE: ServiceImpl, - CallerReference.INTERFACE: ServiceInterface, - }[input.op_input.caller_reference], + service=service, endpoint=make_nexus_endpoint_name(task_queue), ) self._nexus_operation_started = False @@ -883,7 +884,7 @@ async def run( task_queue: str, ) -> ServiceClassNameOutput: C, N = CallerReference, NameOverride - service_cls: type + service_cls: type[Any] if (caller_reference, name_override) == (C.INTERFACE, N.YES): service_cls = ServiceInterfaceWithNameOverride elif (caller_reference, name_override) == (C.INTERFACE, N.NO):