Skip to content

Commit fca72af

Browse files
javabrettclaudetreff7es
authored
fix(ingestion): Prevent duplicate [CLI] source creation during executor-managed ingestion (#16663)
Co-authored-by: Claude <noreply@anthropic.com> Co-authored-by: treff7es <treff7es@gmail.com>
1 parent 00fa226 commit fca72af

File tree

2 files changed

+121
-21
lines changed

2 files changed

+121
-21
lines changed

metadata-ingestion/src/datahub/ingestion/reporting/datahub_ingestion_run_summary_provider.py

Lines changed: 55 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@
2828
StructuredExecutionReportClass,
2929
_Aspect,
3030
)
31+
from datahub.metadata.urns import DataHubExecutionRequestUrn
3132
from datahub.utilities.logging_manager import get_log_buffer
33+
from datahub.utilities.urns.error import InvalidUrnError
3234
from datahub.utilities.urns.urn import Urn
3335

3436
logger = logging.getLogger(__name__)
@@ -118,35 +120,63 @@ def __init__(self, sink: Sink, report_recipe: bool, ctx: PipelineContext) -> Non
118120
ingestion_source_key = self.generate_unique_key(ctx.pipeline_config)
119121
self.entity_name: str = self.generate_entity_name(ingestion_source_key)
120122

123+
# If run_id is an execution request URN, the executor owns the source/request lifecycle.
124+
try:
125+
parsed = Urn.from_string(ctx.run_id)
126+
self._is_running_under_executor = (
127+
parsed.entity_type == DataHubExecutionRequestUrn.ENTITY_TYPE
128+
)
129+
except InvalidUrnError:
130+
self._is_running_under_executor = False
131+
except Exception:
132+
logger.warning(
133+
f"Unexpected error parsing run_id={ctx.run_id!r} as URN; "
134+
"assuming standalone CLI context.",
135+
exc_info=True,
136+
)
137+
self._is_running_under_executor = False
138+
139+
if self._is_running_under_executor:
140+
logger.debug(f"Executor-managed run detected (run_id={ctx.run_id}).")
141+
121142
self.ingestion_source_urn: Urn = Urn(
122143
entity_type="dataHubIngestionSource",
123144
entity_id=["cli-" + datahub_guid(ingestion_source_key)],
124145
)
125146
logger.debug(f"Ingestion source urn = {self.ingestion_source_urn}")
126-
self.execution_request_input_urn: Urn = Urn(
127-
entity_type="dataHubExecutionRequest", entity_id=[ctx.run_id]
128-
)
147+
# Use typed URN only in the executor path (run_id already validated as such).
148+
# For standalone CLI runs, run_id is a plain string; passing a foreign URN type
149+
# to DataHubExecutionRequestUrn would raise InvalidUrnError.
150+
if self._is_running_under_executor:
151+
self.execution_request_input_urn: Urn = DataHubExecutionRequestUrn(
152+
ctx.run_id
153+
)
154+
else:
155+
self.execution_request_input_urn = Urn(
156+
entity_type="dataHubExecutionRequest", entity_id=[ctx.run_id]
157+
)
129158
self.start_time_ms: int = self.get_cur_time_in_ms()
130159

131-
# Construct the dataHubIngestionSourceInfo aspect
132-
source_info_aspect = DataHubIngestionSourceInfoClass(
133-
name=self.entity_name,
134-
type=ctx.pipeline_config.source.type,
135-
platform=make_data_platform_urn(
136-
getattr(ctx.pipeline_config.source, "platform", "unknown")
137-
),
138-
config=DataHubIngestionSourceConfigClass(
139-
recipe=self._get_recipe_to_report(ctx),
140-
version=nice_version_name(),
141-
executorId=self._EXECUTOR_ID,
142-
),
143-
)
160+
if not self._is_running_under_executor:
161+
# Construct the dataHubIngestionSourceInfo aspect
162+
source_info_aspect = DataHubIngestionSourceInfoClass(
163+
name=self.entity_name,
164+
type=ctx.pipeline_config.source.type,
165+
platform=make_data_platform_urn(
166+
getattr(ctx.pipeline_config.source, "platform", "unknown")
167+
),
168+
config=DataHubIngestionSourceConfigClass(
169+
recipe=self._get_recipe_to_report(ctx),
170+
version=nice_version_name(),
171+
executorId=self._EXECUTOR_ID,
172+
),
173+
)
144174

145-
# Emit the dataHubIngestionSourceInfo aspect
146-
self._emit_aspect(
147-
entity_urn=self.ingestion_source_urn,
148-
aspect_value=source_info_aspect,
149-
)
175+
# Emit the dataHubIngestionSourceInfo aspect
176+
self._emit_aspect(
177+
entity_urn=self.ingestion_source_urn,
178+
aspect_value=source_info_aspect,
179+
)
150180

151181
@staticmethod
152182
def _convert_sets_to_lists(obj: Any) -> Any:
@@ -214,6 +244,10 @@ def _emit_aspect(
214244

215245
def on_start(self, ctx: PipelineContext) -> None:
216246
assert ctx.pipeline_config is not None
247+
248+
if self._is_running_under_executor:
249+
return
250+
217251
# Construct the dataHubExecutionRequestInput aspect
218252
execution_input_aspect = ExecutionRequestInputClass(
219253
task=self._INGESTION_TASK_NAME,

metadata-ingestion/tests/unit/reporting/test_datahub_ingestion_reporter.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,19 @@
11
from typing import Any, Dict, List, Set, Tuple, Union
2+
from unittest.mock import MagicMock
23

34
import pytest
45

6+
from datahub.ingestion.api.common import PipelineContext
7+
from datahub.ingestion.api.sink import Sink
58
from datahub.ingestion.reporting.datahub_ingestion_run_summary_provider import (
69
DatahubIngestionRunSummaryProvider,
710
DatahubIngestionRunSummaryProviderConfig,
811
)
912
from datahub.ingestion.run.pipeline_config import PipelineConfig
13+
from datahub.metadata.schema_classes import (
14+
DataHubIngestionSourceInfoClass,
15+
ExecutionRequestInputClass,
16+
)
1017

1118

1219
@pytest.mark.parametrize(
@@ -185,3 +192,62 @@ def test_json_serializable() -> None:
185192
except TypeError:
186193
serializable = False
187194
assert serializable
195+
196+
197+
def _make_provider(run_id: str, report_recipe: bool = True) -> tuple:
198+
pipeline_config = PipelineConfig.from_dict(
199+
{"source": {"type": "bigquery"}, "sink": {"type": "console"}}
200+
)
201+
ctx = PipelineContext(run_id=run_id, pipeline_config=pipeline_config)
202+
mock_sink = MagicMock(spec=Sink)
203+
provider = DatahubIngestionRunSummaryProvider(
204+
sink=mock_sink, report_recipe=report_recipe, ctx=ctx
205+
)
206+
return provider, ctx, mock_sink
207+
208+
209+
def test_executor_detection_with_execution_request_urn() -> None:
210+
"""Test that CLI source creation is skipped when run_id is an execution request URN"""
211+
_provider, _ctx, mock_sink = _make_provider("urn:li:dataHubExecutionRequest:12345")
212+
213+
mock_sink.write_record_async.assert_not_called()
214+
215+
216+
def test_cli_source_creation_with_normal_run_id() -> None:
217+
"""Test that CLI source is created when run_id is NOT an execution request URN"""
218+
provider, _ctx, mock_sink = _make_provider("normal-cli-run-id-12345")
219+
220+
assert mock_sink.write_record_async.call_count == 1
221+
record_envelope = mock_sink.write_record_async.call_args[0][0]
222+
assert record_envelope.record.entityUrn == str(provider.ingestion_source_urn)
223+
assert isinstance(record_envelope.record.aspect, DataHubIngestionSourceInfoClass)
224+
225+
226+
def test_on_start_skips_execution_request_input_under_executor() -> None:
227+
"""Test that on_start skips ExecutionRequestInput creation when under executor"""
228+
provider, ctx, mock_sink = _make_provider("urn:li:dataHubExecutionRequest:12345")
229+
mock_sink.reset_mock() # Reset after __init__ (which emits nothing in executor path)
230+
231+
provider.on_start(ctx)
232+
233+
mock_sink.write_record_async.assert_not_called()
234+
235+
236+
def test_on_start_creates_execution_request_input_for_cli() -> None:
237+
"""Test that on_start creates ExecutionRequestInput for standalone CLI runs"""
238+
provider, ctx, mock_sink = _make_provider("normal-cli-run-id")
239+
mock_sink.reset_mock() # Reset after __init__
240+
241+
provider.on_start(ctx)
242+
243+
assert mock_sink.write_record_async.call_count == 1
244+
record_envelope = mock_sink.write_record_async.call_args[0][0]
245+
assert isinstance(record_envelope.record.aspect, ExecutionRequestInputClass)
246+
247+
248+
def test_non_execution_request_urn_as_run_id_does_not_crash() -> None:
249+
"""A valid URN of a different entity type must not crash __init__."""
250+
provider, _ctx, _mock_sink = _make_provider(
251+
"urn:li:ingestionSource:some-source-id", report_recipe=False
252+
)
253+
assert not provider._is_running_under_executor

0 commit comments

Comments
 (0)