Skip to content

Commit 8b800f2

Browse files
committed
feat(jobqueue): add job queue SDK with CAS-based claim mechanism and smoke tests
Introduces the job queue SDK for scalable execution request processing: - ConditionalWriter: CAS (Compare-And-Swap) layer using If-Version-Match headers - Claim: atomic claim/release of entities with is_claimed predicate to prevent sequential overwrites - Discovery and JobQueue orchestration layers - Fix OpenAPI request_helper to forward per-MCP headers (If-Version-Match) - Add executorInstanceId field to ExecutionRequestResult PDL - Smoke tests validating claim lifecycle, concurrent claim races, and sequential conflict detection against live GMS
1 parent 6f8733a commit 8b800f2

File tree

25 files changed

+5833
-3
lines changed

25 files changed

+5833
-3
lines changed

docs-website/sidebars.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1146,6 +1146,7 @@ module.exports = {
11461146
label: "Patch",
11471147
},
11481148
"docs/api/tutorials/sdk/search_client",
1149+
"docs/api/tutorials/sdk/job-queue",
11491150
],
11501151
},
11511152
// Admin.

docs/api/tutorials/sdk/job-queue.md

Lines changed: 585 additions & 0 deletions
Large diffs are not rendered by default.

docs/design_notes/job-queue-design.md

Lines changed: 835 additions & 0 deletions
Large diffs are not rendered by default.

metadata-ingestion/src/datahub/emitter/request_helper.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ def from_mcp(
8383
url = f"{gms_server}/openapi/v3/entity/{mcp.entityType}/{mcp.entityUrn}"
8484
else:
8585
if mcp.aspect:
86-
mcp_headers = {}
86+
mcp_headers = dict(mcp.headers) if mcp.headers else {}
8787

8888
if not async_flag and search_sync_flag:
8989
mcp_headers["X-DataHub-Sync-Index-Update"] = "true"
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
"""Reusable platform coordination patterns built on the DataHub SDK.
2+
3+
Typical usage::
4+
5+
from datahub.sdk.patterns.job_queue import JobQueue, Claim, SearchDiscovery
6+
7+
queue = JobQueue(
8+
discovery=SearchDiscovery(graph=graph, entity_type="dataHubAction", filters={"state": ["ACTIVE"]}),
9+
claim=Claim(graph=graph, aspect_name="dataHubActionStatus", ...),
10+
owner_id=instance_id,
11+
)
12+
13+
for job in queue.poll():
14+
try:
15+
process(job.urn)
16+
finally:
17+
job.release()
18+
"""
19+
20+
from datahub.sdk.patterns._shared.conditional_writer import (
21+
CASResult,
22+
ConditionalWriter,
23+
VersionedAspect,
24+
)
25+
from datahub.sdk.patterns.job_queue.claim import Claim
26+
from datahub.sdk.patterns.job_queue.discovery import (
27+
Discovery,
28+
MCLDiscovery,
29+
SearchDiscovery,
30+
WorkItem,
31+
)
32+
from datahub.sdk.patterns.job_queue.job_queue import Job, JobQueue
33+
from datahub.sdk.patterns.job_queue.sweeper import Sweeper, SweepResult
34+
35+
__all__ = [
36+
"CASResult",
37+
"Claim",
38+
"ConditionalWriter",
39+
"Discovery",
40+
"Job",
41+
"JobQueue",
42+
"MCLDiscovery",
43+
"SearchDiscovery",
44+
"SweepResult",
45+
"Sweeper",
46+
"VersionedAspect",
47+
"WorkItem",
48+
]
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
from datahub.sdk.patterns._shared.conditional_writer import (
2+
CASResult,
3+
ConditionalWriter,
4+
VersionedAspect,
5+
)
6+
7+
__all__ = [
8+
"CASResult",
9+
"ConditionalWriter",
10+
"VersionedAspect",
11+
]
Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
"""Layer 1: CAS (Compare-And-Swap) mechanics over DataHub aspects.
2+
3+
Provides conditional read-then-write operations using the If-Version-Match
4+
header, backed by GMS's ConditionalWriteValidator. Zero domain knowledge --
5+
this module only knows about aspects, versions, and the DataHubGraph client.
6+
"""
7+
8+
from __future__ import annotations
9+
10+
import logging
11+
from dataclasses import dataclass
12+
from typing import Optional, Type
13+
14+
from datahub._codegen.aspect import _Aspect
15+
from datahub.configuration.common import OperationalError
16+
from datahub.emitter.mcp import MetadataChangeProposalWrapper
17+
from datahub.ingestion.graph.client import DataHubGraph
18+
from datahub.metadata.schema_classes import SystemMetadataClass
19+
20+
logger = logging.getLogger(__name__)
21+
22+
# Matches ConditionalWriteValidator.UNVERSIONED_ASPECT_VERSION on the server.
23+
_UNVERSIONED = "-1"
24+
25+
26+
@dataclass
27+
class CASResult:
28+
"""Outcome of a conditional write attempt."""
29+
30+
success: bool
31+
new_version: Optional[str] = None # version after write (if success)
32+
reason: Optional[str] = None # "conflict", "error", etc. (if failure)
33+
34+
35+
@dataclass
36+
class VersionedAspect:
37+
"""An aspect value paired with its version string.
38+
39+
Returned by :meth:`ConditionalWriter.read_versioned_aspect` to combine
40+
both the version and the typed aspect in a single round trip.
41+
"""
42+
43+
version: str
44+
aspect: Optional[_Aspect] = None
45+
46+
47+
class ConditionalWriter:
48+
"""Thin wrapper around DataHubGraph for conditional aspect writes.
49+
50+
All version strings use the same scheme as GMS: numeric strings for
51+
existing aspects, ``"-1"`` for aspects that do not yet exist.
52+
"""
53+
54+
def __init__(self, graph: DataHubGraph) -> None:
55+
self._graph = graph
56+
57+
def read_version(self, entity_urn: str, aspect_name: str) -> str:
58+
"""Read the current version of an aspect.
59+
60+
Returns:
61+
Version string. ``"-1"`` if the aspect does not exist.
62+
"""
63+
mcps = self._graph.get_entity_as_mcps(entity_urn, aspects=[aspect_name])
64+
for mcp in mcps:
65+
if (
66+
mcp.aspect is not None
67+
and mcp.aspectName == aspect_name
68+
and mcp.systemMetadata is not None
69+
):
70+
return _extract_version(mcp.systemMetadata)
71+
return _UNVERSIONED
72+
73+
def read_versioned_aspect(
74+
self,
75+
entity_urn: str,
76+
aspect_name: str,
77+
aspect_class: Type[_Aspect],
78+
) -> VersionedAspect:
79+
"""Read the version and typed aspect in a single GMS call.
80+
81+
Returns:
82+
A :class:`VersionedAspect` with the version string and typed
83+
aspect (``None`` if the aspect is absent or doesn't match
84+
*aspect_class*).
85+
"""
86+
mcps = self._graph.get_entity_as_mcps(entity_urn, aspects=[aspect_name])
87+
for mcp in mcps:
88+
if (
89+
mcp.aspect is not None
90+
and mcp.aspectName == aspect_name
91+
and mcp.systemMetadata is not None
92+
):
93+
version = _extract_version(mcp.systemMetadata)
94+
aspect = mcp.aspect if isinstance(mcp.aspect, aspect_class) else None
95+
return VersionedAspect(version=version, aspect=aspect)
96+
return VersionedAspect(version=_UNVERSIONED)
97+
98+
def read_aspect(
99+
self,
100+
entity_urn: str,
101+
aspect_name: str,
102+
aspect_class: Type[_Aspect],
103+
) -> Optional[_Aspect]:
104+
"""Read the current value of an aspect.
105+
106+
Returns:
107+
The aspect instance, or ``None`` if absent.
108+
"""
109+
return self._graph.get_aspect(
110+
entity_urn=entity_urn,
111+
aspect_type=aspect_class,
112+
)
113+
114+
def write_if_version(
115+
self,
116+
entity_urn: str,
117+
aspect_name: str,
118+
aspect_value: _Aspect,
119+
expected_version: str,
120+
track_new_version: bool = True,
121+
) -> CASResult:
122+
"""Write an aspect only if the current version matches *expected_version*.
123+
124+
Uses the ``If-Version-Match`` header so that GMS validates server-side:
125+
126+
* Version matches -- write succeeds, returns the new version.
127+
* Version mismatch -- write rejected, returns a conflict result.
128+
129+
When *track_new_version* is ``False`` the post-write read is skipped
130+
and ``new_version`` will be ``None``. Use this when the caller does
131+
not need the updated version (e.g. release writes).
132+
133+
All other exceptions propagate to the caller.
134+
"""
135+
mcpw = MetadataChangeProposalWrapper(
136+
entityUrn=entity_urn,
137+
aspect=aspect_value,
138+
headers={"If-Version-Match": expected_version},
139+
)
140+
141+
try:
142+
self._graph.emit(mcpw)
143+
except OperationalError as exc:
144+
if _is_precondition_failure(exc):
145+
logger.debug(
146+
"CAS conflict for %s/%s (expected version %s): %s",
147+
entity_urn,
148+
aspect_name,
149+
expected_version,
150+
exc,
151+
)
152+
return CASResult(success=False, reason="conflict")
153+
raise
154+
155+
if not track_new_version:
156+
return CASResult(success=True)
157+
158+
# After a successful write, read the new version.
159+
new_version = self.read_version(entity_urn, aspect_name)
160+
return CASResult(success=True, new_version=new_version)
161+
162+
163+
def _extract_version(system_metadata: SystemMetadataClass) -> str:
164+
"""Extract the version string from system metadata.
165+
166+
Mirrors the Java ``AspectWithMetadata.extractVersion`` logic:
167+
uses ``systemMetadata.version`` if present, falls back to ``"-1"``.
168+
"""
169+
version = system_metadata.get("version")
170+
if version is not None:
171+
return str(version)
172+
return _UNVERSIONED
173+
174+
175+
def _is_precondition_failure(exc: OperationalError) -> bool:
176+
"""Detect a 412 Precondition Failed response from GMS.
177+
178+
The rest emitter wraps HTTP errors in OperationalError. We check both
179+
the embedded info dict and the message string for precondition signals.
180+
"""
181+
# Check for HTTP 412 status code in the info dict.
182+
status = exc.info.get("status")
183+
if status == 412:
184+
return True
185+
186+
# Fallback: check message text for the precondition keyword.
187+
msg = (exc.message or "").lower()
188+
if "precondition" in msg or "expected version" in msg:
189+
return True
190+
191+
return False
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
from datahub.sdk.patterns.job_queue.claim import Claim
2+
from datahub.sdk.patterns.job_queue.discovery import (
3+
Discovery,
4+
MCLDiscovery,
5+
SearchDiscovery,
6+
)
7+
from datahub.sdk.patterns.job_queue.job_queue import Job, JobQueue
8+
from datahub.sdk.patterns.job_queue.sweeper import Sweeper, SweepResult
9+
10+
__all__ = [
11+
"Claim",
12+
"Discovery",
13+
"Job",
14+
"JobQueue",
15+
"MCLDiscovery",
16+
"SearchDiscovery",
17+
"SweepResult",
18+
"Sweeper",
19+
]

0 commit comments

Comments
 (0)