Skip to content

Commit a48cece

Browse files
Support RequestIdReference in Nexus Links (#1228)
* WIP add Nexus request id reference to links * Small cleanups. Move to request ID reference for workflow_execution_started_event_link_from_workflow_handle * Update tests/nexus/test_link_conversion.py Co-authored-by: James Watkins-Harvey <[email protected]> --------- Co-authored-by: James Watkins-Harvey <[email protected]>
1 parent cc19379 commit a48cece

File tree

3 files changed

+224
-28
lines changed

3 files changed

+224
-28
lines changed

temporalio/nexus/_link_conversion.py

Lines changed: 107 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -24,26 +24,31 @@
2424
)
2525
LINK_EVENT_ID_PARAM_NAME = "eventID"
2626
LINK_EVENT_TYPE_PARAM_NAME = "eventType"
27+
LINK_REQUEST_ID_PARAM_NAME = "requestID"
28+
LINK_REFERENCE_TYPE_PARAM_NAME = "referenceType"
29+
30+
EVENT_REFERENCE_TYPE = "EventReference"
31+
REQUEST_ID_REFERENCE_TYPE = "RequestIdReference"
2732

2833

2934
def workflow_execution_started_event_link_from_workflow_handle(
30-
handle: temporalio.client.WorkflowHandle[Any, Any],
35+
handle: temporalio.client.WorkflowHandle[Any, Any], request_id: str
3136
) -> temporalio.api.common.v1.Link.WorkflowEvent:
3237
"""Create a WorkflowEvent link corresponding to a started workflow"""
3338
if handle.first_execution_run_id is None:
3439
raise ValueError(
3540
f"Workflow handle {handle} has no first execution run ID. "
3641
f"Cannot create WorkflowExecutionStarted event link."
3742
)
43+
3844
return temporalio.api.common.v1.Link.WorkflowEvent(
3945
namespace=handle._client.namespace,
4046
workflow_id=handle.id,
4147
run_id=handle.first_execution_run_id,
42-
event_ref=temporalio.api.common.v1.Link.WorkflowEvent.EventReference(
43-
event_id=1,
48+
request_id_ref=temporalio.api.common.v1.Link.WorkflowEvent.RequestIdReference(
49+
request_id=request_id,
4450
event_type=temporalio.api.enums.v1.EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED,
4551
),
46-
# TODO(nexus-preview): RequestIdReference
4752
)
4853

4954

@@ -60,9 +65,21 @@ def workflow_event_to_nexus_link(
6065
workflow_id = urllib.parse.quote(workflow_event.workflow_id)
6166
run_id = urllib.parse.quote(workflow_event.run_id)
6267
path = f"/namespaces/{namespace}/workflows/{workflow_id}/{run_id}/history"
63-
query_params = _event_reference_to_query_params(workflow_event.event_ref)
68+
69+
query_params = None
70+
match workflow_event.WhichOneof("reference"):
71+
case "event_ref":
72+
query_params = _event_reference_to_query_params(workflow_event.event_ref)
73+
case "request_id_ref":
74+
query_params = _request_id_reference_to_query_params(
75+
workflow_event.request_id_ref
76+
)
77+
78+
# urllib will omit '//' from the url if netloc is empty so we add the scheme manually
79+
url = f"{scheme}://{urllib.parse.urlunparse(('', '', path, '', query_params, ''))}"
80+
6481
return nexusrpc.Link(
65-
url=urllib.parse.urlunparse((scheme, "", path, "", query_params, "")),
82+
url=url,
6683
type=workflow_event.DESCRIPTOR.full_name,
6784
)
6885

@@ -83,7 +100,20 @@ def nexus_link_to_workflow_event(
83100
)
84101
return None
85102
try:
86-
event_ref = _query_params_to_event_reference(url.query)
103+
query_params = urllib.parse.parse_qs(url.query)
104+
105+
request_id_ref = None
106+
event_ref = None
107+
match query_params.get(LINK_REFERENCE_TYPE_PARAM_NAME):
108+
case ["EventReference"]:
109+
event_ref = _query_params_to_event_reference(query_params)
110+
case ["RequestIdReference"]:
111+
request_id_ref = _query_params_to_request_id_reference(query_params)
112+
case _:
113+
raise ValueError(
114+
f"Invalid Nexus link: {link}. Expected {LINK_REFERENCE_TYPE_PARAM_NAME} to be '{EVENT_REFERENCE_TYPE}' or '{REQUEST_ID_REFERENCE_TYPE}'"
115+
)
116+
87117
except ValueError as err:
88118
logger.warning(
89119
f"Failed to parse event reference from Nexus link URL query parameters: {link} ({err})"
@@ -96,6 +126,7 @@ def nexus_link_to_workflow_event(
96126
workflow_id=urllib.parse.unquote(groups["workflow_id"]),
97127
run_id=urllib.parse.unquote(groups["run_id"]),
98128
event_ref=event_ref,
129+
request_id_ref=request_id_ref,
99130
)
100131

101132

@@ -109,36 +140,58 @@ def _event_reference_to_query_params(
109140
)
110141
return urllib.parse.urlencode(
111142
{
112-
"eventID": event_ref.event_id,
113-
"eventType": event_type_name,
114-
"referenceType": "EventReference",
143+
LINK_EVENT_ID_PARAM_NAME: event_ref.event_id,
144+
LINK_EVENT_TYPE_PARAM_NAME: event_type_name,
145+
LINK_REFERENCE_TYPE_PARAM_NAME: EVENT_REFERENCE_TYPE,
115146
}
116147
)
117148

118149

150+
def _request_id_reference_to_query_params(
151+
request_id_ref: temporalio.api.common.v1.Link.WorkflowEvent.RequestIdReference,
152+
) -> str:
153+
params = {
154+
LINK_REFERENCE_TYPE_PARAM_NAME: REQUEST_ID_REFERENCE_TYPE,
155+
}
156+
157+
if request_id_ref.request_id:
158+
params[LINK_REQUEST_ID_PARAM_NAME] = request_id_ref.request_id
159+
160+
event_type_name = temporalio.api.enums.v1.EventType.Name(request_id_ref.event_type)
161+
if event_type_name.startswith("EVENT_TYPE_"):
162+
event_type_name = _event_type_constant_case_to_pascal_case(
163+
event_type_name.removeprefix("EVENT_TYPE_")
164+
)
165+
params[LINK_EVENT_TYPE_PARAM_NAME] = event_type_name
166+
167+
return urllib.parse.urlencode(params)
168+
169+
119170
def _query_params_to_event_reference(
120-
raw_query_params: str,
171+
query_params: dict[str, list[str]],
121172
) -> temporalio.api.common.v1.Link.WorkflowEvent.EventReference:
122173
"""Return an EventReference from the query params or raise ValueError."""
123-
query_params = urllib.parse.parse_qs(raw_query_params)
124-
125-
[reference_type] = query_params.get("referenceType") or [""]
126-
if reference_type != "EventReference":
174+
[reference_type] = query_params.get(LINK_REFERENCE_TYPE_PARAM_NAME) or [""]
175+
if reference_type != EVENT_REFERENCE_TYPE:
127176
raise ValueError(
128177
f"Expected Nexus link URL query parameter referenceType to be EventReference but got: {reference_type}"
129178
)
179+
130180
# event type
131-
[raw_event_type_name] = query_params.get(LINK_EVENT_TYPE_PARAM_NAME) or [""]
132-
if not raw_event_type_name:
133-
raise ValueError(f"query params do not contain event type: {query_params}")
134-
if raw_event_type_name.startswith("EVENT_TYPE_"):
135-
event_type_name = raw_event_type_name
136-
elif re.match("[A-Z][a-z]", raw_event_type_name):
137-
event_type_name = "EVENT_TYPE_" + _event_type_pascal_case_to_constant_case(
138-
raw_event_type_name
139-
)
140-
else:
141-
raise ValueError(f"Invalid event type name: {raw_event_type_name}")
181+
match query_params.get(LINK_EVENT_TYPE_PARAM_NAME):
182+
case None:
183+
raise ValueError(f"query params do not contain event type: {query_params}")
184+
185+
case [raw_event_type_name] if raw_event_type_name.startswith("EVENT_TYPE_"):
186+
event_type_name = raw_event_type_name
187+
188+
case [raw_event_type_name] if re.match("[A-Z][a-z]", raw_event_type_name):
189+
event_type_name = "EVENT_TYPE_" + _event_type_pascal_case_to_constant_case(
190+
raw_event_type_name
191+
)
192+
193+
case raw_event_type_name:
194+
raise ValueError(f"Invalid event type name: {raw_event_type_name}")
142195

143196
# event id
144197
event_id = 0
@@ -155,6 +208,34 @@ def _query_params_to_event_reference(
155208
)
156209

157210

211+
def _query_params_to_request_id_reference(
212+
query_params: dict[str, list[str]],
213+
) -> temporalio.api.common.v1.Link.WorkflowEvent.RequestIdReference:
214+
"""Return an EventReference from the query params or raise ValueError."""
215+
# event type
216+
match query_params.get(LINK_EVENT_TYPE_PARAM_NAME):
217+
case None:
218+
raise ValueError(f"query params do not contain event type: {query_params}")
219+
220+
case [raw_event_type_name] if raw_event_type_name.startswith("EVENT_TYPE_"):
221+
event_type_name = raw_event_type_name
222+
223+
case [raw_event_type_name] if re.match("[A-Z][a-z]", raw_event_type_name):
224+
event_type_name = "EVENT_TYPE_" + _event_type_pascal_case_to_constant_case(
225+
raw_event_type_name
226+
)
227+
228+
case raw_event_type_name:
229+
raise ValueError(f"Invalid event type name: {raw_event_type_name}")
230+
231+
[request_id] = query_params.get(LINK_REQUEST_ID_PARAM_NAME, [""])
232+
233+
return temporalio.api.common.v1.Link.WorkflowEvent.RequestIdReference(
234+
request_id=request_id,
235+
event_type=temporalio.api.enums.v1.EventType.Value(event_type_name),
236+
)
237+
238+
158239
def _event_type_constant_case_to_pascal_case(s: str) -> str:
159240
"""Convert a CONSTANT_CASE string to PascalCase.
160241

temporalio/nexus/_operation_context.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,8 @@ def _add_outbound_links(
183183
if not wf_event_links:
184184
wf_event_links = [
185185
_link_conversion.workflow_execution_started_event_link_from_workflow_handle(
186-
workflow_handle
186+
workflow_handle,
187+
self.nexus_context.request_id,
187188
)
188189
]
189190
self.nexus_context.outbound_links.extend(

tests/nexus/test_link_conversion.py

Lines changed: 115 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import urllib.parse
22
from typing import Any
33

4+
import nexusrpc
45
import pytest
56

67
import temporalio.api.common.v1
@@ -39,8 +40,9 @@
3940
def test_query_params_to_event_reference(
4041
query_param_str: str, expected_event_ref: dict[str, Any]
4142
):
43+
query_params = urllib.parse.parse_qs(query_param_str)
4244
event_ref = temporalio.nexus._link_conversion._query_params_to_event_reference(
43-
query_param_str
45+
query_params
4446
)
4547
for k, v in expected_event_ref.items():
4648
assert getattr(event_ref, k) == v
@@ -72,6 +74,118 @@ def test_event_reference_to_query_params(
7274
assert query_params == expected_query_params
7375

7476

77+
@pytest.mark.parametrize(
78+
["query_param_str", "expected_event_ref"],
79+
[
80+
(
81+
"eventType=NexusOperationScheduled&referenceType=RequestIdReference&requestID=req-123",
82+
{
83+
"event_type": temporalio.api.enums.v1.EventType.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED,
84+
"request_id": "req-123",
85+
},
86+
),
87+
# event ID is optional in query params; we leave it unset in the ref if missing
88+
(
89+
"eventType=NexusOperationScheduled&referenceType=RequestIdReference",
90+
{
91+
"event_type": temporalio.api.enums.v1.EventType.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED,
92+
"request_id": "",
93+
},
94+
),
95+
# Older server sends EVENT_TYPE_CONSTANT_CASE event type name
96+
(
97+
"eventType=EVENT_TYPE_NEXUS_OPERATION_SCHEDULED&referenceType=RequestIdReference&requestID=req-123",
98+
{
99+
"event_type": temporalio.api.enums.v1.EventType.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED,
100+
"request_id": "req-123",
101+
},
102+
),
103+
],
104+
)
105+
def test_query_params_to_request_id_reference(
106+
query_param_str: str, expected_event_ref: dict[str, Any]
107+
):
108+
query_params = urllib.parse.parse_qs(query_param_str)
109+
event_ref = temporalio.nexus._link_conversion._query_params_to_request_id_reference(
110+
query_params
111+
)
112+
for k, v in expected_event_ref.items():
113+
assert getattr(event_ref, k) == v
114+
115+
116+
@pytest.mark.parametrize(
117+
["event_ref", "expected_query_param_str"],
118+
[
119+
# We always send PascalCase event type names (no EventType prefix)
120+
(
121+
{
122+
"event_type": temporalio.api.enums.v1.EventType.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED,
123+
"request_id": "req-123",
124+
},
125+
"eventType=NexusOperationScheduled&referenceType=RequestIdReference&requestID=req-123",
126+
),
127+
],
128+
)
129+
def test_request_id_reference_to_query_params(
130+
event_ref: dict[str, Any], expected_query_param_str: str
131+
):
132+
query_params_str = (
133+
temporalio.nexus._link_conversion._request_id_reference_to_query_params(
134+
temporalio.api.common.v1.Link.WorkflowEvent.RequestIdReference(**event_ref)
135+
)
136+
)
137+
query_params = urllib.parse.parse_qs(query_params_str)
138+
expected_query_params = urllib.parse.parse_qs(expected_query_param_str)
139+
assert query_params == expected_query_params
140+
141+
142+
@pytest.mark.parametrize(
143+
["event", "expected_link"],
144+
[
145+
(
146+
temporalio.api.common.v1.Link.WorkflowEvent(
147+
namespace="ns",
148+
workflow_id="wid",
149+
run_id="rid",
150+
request_id_ref=temporalio.api.common.v1.Link.WorkflowEvent.RequestIdReference(
151+
event_type=temporalio.api.enums.v1.event_type_pb2.EVENT_TYPE_WORKFLOW_TASK_COMPLETED,
152+
request_id="req-123",
153+
),
154+
),
155+
nexusrpc.Link(
156+
type=temporalio.api.common.v1.Link.WorkflowEvent.DESCRIPTOR.full_name,
157+
url="temporal:///namespaces/ns/workflows/wid/rid/history?referenceType=RequestIdReference&requestID=req-123&eventType=WorkflowTaskCompleted",
158+
),
159+
),
160+
(
161+
temporalio.api.common.v1.Link.WorkflowEvent(
162+
namespace="ns2",
163+
workflow_id="wid2",
164+
run_id="rid2",
165+
event_ref=temporalio.api.common.v1.Link.WorkflowEvent.EventReference(
166+
event_id=42,
167+
event_type=temporalio.api.enums.v1.event_type_pb2.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED,
168+
),
169+
),
170+
nexusrpc.Link(
171+
type=temporalio.api.common.v1.Link.WorkflowEvent.DESCRIPTOR.full_name,
172+
url="temporal:///namespaces/ns2/workflows/wid2/rid2/history?eventID=42&eventType=WorkflowExecutionCompleted&referenceType=EventReference",
173+
),
174+
),
175+
],
176+
)
177+
def test_link_conversion_workflow_event_to_link_and_back(
178+
event: temporalio.api.common.v1.Link.WorkflowEvent, expected_link: nexusrpc.Link
179+
):
180+
actual_link = temporalio.nexus._link_conversion.workflow_event_to_nexus_link(event)
181+
assert expected_link == actual_link
182+
183+
actual_event = temporalio.nexus._link_conversion.nexus_link_to_workflow_event(
184+
actual_link
185+
)
186+
assert event == actual_event
187+
188+
75189
def test_link_conversion_utilities():
76190
p2c = temporalio.nexus._link_conversion._event_type_pascal_case_to_constant_case
77191
c2p = temporalio.nexus._link_conversion._event_type_constant_case_to_pascal_case

0 commit comments

Comments
 (0)