Adding links to Nexus signals#1593
Conversation
|
|
| links are added to the request's ``links`` field so the callee's history event links | ||
| back to the caller workflow that scheduled this Nexus operation. | ||
| """ | ||
| return self._get_links() |
There was a problem hiding this comment.
Should we consider renaming _get_links() rather than just wrapping it like this?
| return workflow_handle | ||
|
|
||
| def _get_outgoing_request_links(self) -> list[temporalio.api.common.v1.Link]: | ||
| """Inbound Nexus task links to attach to RPCs the operation handler issues. |
There was a problem hiding this comment.
The word "Inbound" here is a bit confusing so close to the work "outgoing" in the method name. I think I understand it to mean that the Nexus context has links attached to it that the server adds which makes them "inbound" to the handler. In general though I think we should align that these are 'outgoing' in the sense that they are meant to be sent along with requests made from the handler.
| When the operation handler signals, signal-with-starts, or starts a workflow, these | ||
| links are added to the request's ``links`` field so the callee's history event links | ||
| back to the caller workflow that scheduled this Nexus operation. |
There was a problem hiding this comment.
Nit: This isn't quite accurate now that SANO exists. In the workflow caller scenario you are correct, but with SANO the link will be to the nexus operation not a calling workflow.
| # If this signal-with-start is issued from inside a Nexus operation handler (but not as the | ||
| # nexus-backing workflow, whose links are handled separately by | ||
| # WorkflowRunOperationContext.start_workflow), capture the signal backlink the server | ||
| # returned so the caller workflow's Nexus history event links to the signaled event. A | ||
| # plain start does not capture a backlink: it only forwards the inbound links onto the | ||
| # start request. | ||
| nexus_ctx = self._try_nexus_start_operation_context() | ||
| if ( | ||
| nexus_ctx is not None | ||
| and not temporalio.nexus._operation_context._in_nexus_backing_workflow_start_context() | ||
| and isinstance( | ||
| resp, | ||
| temporalio.api.workflowservice.v1.SignalWithStartWorkflowExecutionResponse, | ||
| ) | ||
| ): | ||
| # Server >= 1.31 with EnableCHASMSignalBacklinks returns signal_link pointing at | ||
| # the WorkflowExecutionSignaled event; older servers leave it unset. | ||
| if resp.HasField("signal_link"): | ||
| nexus_ctx._add_backlink(resp.signal_link) |
There was a problem hiding this comment.
I'm wondering if nexus_ctx._add_outbound_links() should be renamed to reflect that it's from workflows (e.g. nexus_ctx._add_workflow_backlinks() or something similar) and updated to handle this and be called here? It would also require removing the call in _start_nexus_backing_workflow
Seems like it might be more correct to just attach any back links to the nexus context if it exists, regardless of if the target workflow will complete the nexus operation.
| if temporalio.nexus._operation_context._in_nexus_backing_workflow_start_context(): | ||
| req.on_conflict_options.attach_request_id = True | ||
| req.on_conflict_options.attach_completion_callbacks = True | ||
| req.on_conflict_options.attach_links = True | ||
| else: | ||
| # If this is a plain start_workflow issued from inside a Nexus operation handler | ||
| # (not the nexus-backing workflow, which already carries inbound links via | ||
| # input.links), forward the inbound Nexus task links so the started callee's | ||
| # WorkflowExecutionStarted event links back to the caller. | ||
| nexus_ctx = self._try_nexus_start_operation_context() | ||
| if nexus_ctx is not None: | ||
| req.links.extend(nexus_ctx._get_outgoing_request_links()) |
There was a problem hiding this comment.
I like this change overall, but wonder if the flags are not set correctly. In the backing_workflow_start_context case, we definitely want the callbacks attacked.
In any Nexus case, I think we want the request ID and the links attached.
| @@ -0,0 +1,480 @@ | |||
| """Unit tests for Nexus signal-backlink propagation. | |||
|
|
|||
| These mirror the Java SDK's RootWorkflowClientInvokerLinkPropagationTest. They exercise the | |||
There was a problem hiding this comment.
Please make sure comments and docstrings don't reference other SDKS.
| @@ -0,0 +1,341 @@ | |||
| """End-to-end (server-based) tests for Nexus signal-backlink propagation. | |||
|
|
|||
| These mirror the Java SDK's ``SignalOperationLinkingTest`` and the Go SDK's | |||
There was a problem hiding this comment.
Please make sure comments and docstrings don't reference other SDKS.
There was a problem hiding this comment.
Please add tests that exercise this via standalone nexus operations.
| async def fetch_info(self, ctx, token: str): # type: ignore[no-untyped-def] | ||
| raise NotImplementedError | ||
|
|
||
| async def fetch_result(self, ctx, token: str): # type: ignore[no-untyped-def] | ||
| raise NotImplementedError |
There was a problem hiding this comment.
| async def fetch_info(self, ctx, token: str): # type: ignore[no-untyped-def] | |
| raise NotImplementedError | |
| async def fetch_result(self, ctx, token: str): # type: ignore[no-untyped-def] | |
| raise NotImplementedError |
These are not required.
| @service_handler(service=SignalingService) | ||
| class SignalingServiceHandler: | ||
| @sync_operation | ||
| async def op(self, _ctx: StartOperationContext, input: str) -> str: |
There was a problem hiding this comment.
Please use a dataclass rather than a string split by ':' for input
This adds links to Nexus signals - the receiving workflow has a link to the workflow sending the signal. and the sending workflow has a link to the receiving workflow.