Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@ yarn parse-schema # Parse JSON schemas for frontend (connection and
- Follow existing project patterns and conventions
- Generate production-ready code, not tutorial code
- Create integration tests in openmetadata-integration-tests
- Do not use Fully Qualified Names in the code such as org.openmetadata.schema.type.Status instead import the class name
- Do not import wild-card packages instead import exactly required packages

### TypeScript/Frontend Code Requirements
- **NEVER use `any` type** in TypeScript code - always use proper types
Expand Down
11 changes: 11 additions & 0 deletions bootstrap/sql/migrations/native/1.11.9/mysql/schemaChanges.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
-- Change entity_extension_time_series.timestamp from VIRTUAL to STORED for performance.
-- STORED columns are materialized on disk, making unique constraint checks and range
-- queries on timestamp significantly faster (especially for bulk pipeline status upserts).
-- MySQL does not allow ALTER from VIRTUAL to STORED directly, so we drop and re-add.
-- NOTE: This will lock the table for a full rebuild. On large deployments with millions
-- of rows in entity_extension_time_series, plan for downtime accordingly.
ALTER TABLE entity_extension_time_series
DROP INDEX entity_extension_time_series_constraint,
DROP COLUMN `timestamp`,
ADD COLUMN `timestamp` bigint unsigned GENERATED ALWAYS AS (json_unquote(json_extract(`json`, _utf8mb4'$.timestamp'))) STORED NOT NULL,
ADD UNIQUE KEY `entity_extension_time_series_constraint` (`entityFQNHash`, `extension`, `timestamp`);
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-- No changes needed for entity_extension_time_series.timestamp on PostgreSQL.
-- PostgreSQL already uses STORED for the generated timestamp column (since table creation).
-- MySQL migration changes it from VIRTUAL to STORED for consistency and performance.
7 changes: 7 additions & 0 deletions ingestion/src/metadata/ingestion/models/pipeline_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
Model required to ingest pipeline status data
from the sample data
"""
from typing import List

from pydantic import BaseModel

from metadata.generated.schema.entity.data.pipeline import PipelineStatus
Expand All @@ -20,3 +22,8 @@
class OMetaPipelineStatus(BaseModel):
pipeline_fqn: str
pipeline_status: PipelineStatus


class OMetaBulkPipelineStatus(BaseModel):
pipeline_fqn: str
pipeline_statuses: List[PipelineStatus]
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

To be used by OpenMetadata class
"""
from typing import List
from typing import List, Optional

from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest
from metadata.generated.schema.entity.data.pipeline import (
Expand All @@ -39,6 +39,26 @@ class OMetaPipelineMixin:

client: REST

def add_bulk_pipeline_status(
self, fqn: str, statuses: List[PipelineStatus]
) -> Pipeline:
"""
Send multiple PipelineStatus records to the Pipeline Entity
in a single bulk request
"""
try:
parts = fqn_utils.split(fqn)
normalized_fqn = fqn_utils._build(*parts, quote=True)
except Exception:
normalized_fqn = fqn

resp = self.client.put(
f"{self.get_suffix(Pipeline)}/{quote(normalized_fqn)}/status/bulk",
data="[" + ",".join(status.model_dump_json() for status in statuses) + "]",
)

return Pipeline(**resp)

def add_pipeline_status(self, fqn: str, status: PipelineStatus) -> Pipeline:
"""
Given a pipeline and a PipelineStatus, send it
Expand All @@ -60,6 +80,34 @@ def add_pipeline_status(self, fqn: str, status: PipelineStatus) -> Pipeline:

return Pipeline(**resp)

def list_pipeline_statuses(
self,
fqn: str,
start_ts: int,
end_ts: int,
limit: Optional[int] = None,
) -> List[PipelineStatus]:
"""
List PipelineStatus records for a Pipeline within a time range.
"""
try:
parts = fqn_utils.split(fqn)
normalized_fqn = fqn_utils._build(*parts, quote=True)
except Exception:
normalized_fqn = fqn

params = f"startTs={start_ts}&endTs={end_ts}"
if limit is not None:
params += f"&limit={limit}"

resp = self.client.get(
f"{self.get_suffix(Pipeline)}/{quote(normalized_fqn)}/status?{params}",
)

if resp and "data" in resp:
return [PipelineStatus(**status) for status in resp["data"]]
return []

def add_task_to_pipeline(self, pipeline: Pipeline, *tasks: Task) -> Pipeline:
"""
The background logic for this method is that during
Expand Down
14 changes: 13 additions & 1 deletion ingestion/src/metadata/ingestion/sink/metadata_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,10 @@
PatchedEntity,
PatchRequest,
)
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
from metadata.ingestion.models.pipeline_status import (
OMetaBulkPipelineStatus,
OMetaPipelineStatus,
)
from metadata.ingestion.models.profile_data import OMetaTableProfileSampleData
from metadata.ingestion.models.search_index_data import OMetaIndexSampleData
from metadata.ingestion.models.tests_data import (
Expand Down Expand Up @@ -647,6 +650,15 @@ def write_pipeline_status(
)
return Either(right=pipeline)

@_run_dispatch.register
def write_bulk_pipeline_status(
self, record: OMetaBulkPipelineStatus
) -> Either[Pipeline]:
pipeline = self.metadata.add_bulk_pipeline_status(
fqn=record.pipeline_fqn, statuses=record.pipeline_statuses
)
return Either(right=pipeline)

@_run_dispatch.register
def write_profile_sample_data(
self, record: OMetaTableProfileSampleData
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"""

import traceback
from datetime import datetime, timedelta, timezone
from typing import Iterable, List, Optional, Tuple

from pydantic import ValidationError
Expand Down Expand Up @@ -55,7 +56,7 @@
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.lineage.sql_lineage import get_column_fqn
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
from metadata.ingestion.models.pipeline_status import OMetaBulkPipelineStatus
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.pipeline.databrickspipeline.kafka_parser import (
extract_dlt_table_dependencies,
Expand Down Expand Up @@ -250,13 +251,22 @@ def get_tasks(self, pipeline_details: DataBrickPipelineDetails) -> List[Task]:

def yield_pipeline_status(
self, pipeline_details: DataBrickPipelineDetails
) -> Iterable[OMetaPipelineStatus]:
) -> Iterable[Either[OMetaBulkPipelineStatus]]:
try:
if not pipeline_details.job_id:
return

lookback_days = self.source_config.statusLookbackDays or 1
cutoff_ts = int(
(datetime.now(timezone.utc) - timedelta(days=lookback_days)).timestamp()
* 1000
)
statuses: List[PipelineStatus] = []

for run in self.client.get_job_runs(job_id=pipeline_details.job_id) or []:
run = DBRun(**run)
if run.start_time and run.start_time < cutoff_ts:
break
Comment on lines +268 to +269
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Edge Case: Databricks connector break assumes descending run order

The yield_pipeline_status method uses break when it encounters a run with start_time < cutoff_ts. This assumes that self.client.get_job_runs() returns runs in strictly descending chronological order (newest first).

If the Databricks API returns runs in a different order (e.g., ascending, or unordered), this break will cause the method to skip recent runs that appear after an older one in the list. This would result in incomplete pipeline status data being ingested.

Suggestion: Either:

  1. Verify and document that get_job_runs() guarantees descending order (Databricks Jobs API does return runs in descending order by default when using list_runs)
  2. Use continue instead of break to safely handle all runs regardless of ordering, though this loses the early-termination optimization
  3. Add a sort before iterating: sorted(runs, key=lambda r: r.get('start_time', 0), reverse=True)
if run.start_time and run.start_time < cutoff_ts:
    continue  # safer: skip old runs without assuming order

Was this helpful? React with 👍 / 👎

task_status = [
TaskStatus(
name=str(task.name),
Expand All @@ -269,24 +279,28 @@ def yield_pipeline_status(
)
for task in run.tasks or []
]
pipeline_status = PipelineStatus(
taskStatus=task_status,
timestamp=Timestamp(run.start_time),
executionStatus=STATUS_MAP.get(
run.state.result_state,
StatusType.Failed,
),
statuses.append(
PipelineStatus(
taskStatus=task_status,
timestamp=Timestamp(run.start_time),
executionStatus=STATUS_MAP.get(
run.state.result_state,
StatusType.Failed,
),
)
)

if statuses:
pipeline_fqn = fqn.build(
metadata=self.metadata,
entity_type=Pipeline,
service_name=self.context.get().pipeline_service,
pipeline_name=self.context.get().pipeline,
)
yield Either(
right=OMetaPipelineStatus(
right=OMetaBulkPipelineStatus(
pipeline_fqn=pipeline_fqn,
pipeline_status=pipeline_status,
pipeline_statuses=statuses,
)
)
except Exception as exc:
Expand Down
34 changes: 32 additions & 2 deletions ingestion/src/metadata/sdk/entities/pipelines.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
"""
Pipelines entity SDK with fluent API
"""
from typing import Type
from typing import Any, List, Optional, Type, cast

from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest
from metadata.generated.schema.entity.data.pipeline import Pipeline
from metadata.generated.schema.entity.data.pipeline import Pipeline, PipelineStatus
from metadata.sdk.entities.base import BaseEntity


Expand All @@ -15,3 +15,33 @@ class Pipelines(BaseEntity[Pipeline, CreatePipelineRequest]):
def entity_type(cls) -> Type[Pipeline]:
"""Return the Pipeline entity type"""
return Pipeline

@classmethod
def add_pipeline_status(cls, fqn: str, status: PipelineStatus) -> Pipeline:
"""Add a single pipeline execution status."""
client = cls._get_client()
result = cast(Any, client).add_pipeline_status(fqn=fqn, status=status)
return cls._coerce_entity(result)

@classmethod
def add_bulk_pipeline_status(
cls, fqn: str, statuses: List[PipelineStatus]
) -> Pipeline:
"""Add multiple pipeline execution statuses in a single bulk request."""
client = cls._get_client()
result = cast(Any, client).add_bulk_pipeline_status(fqn=fqn, statuses=statuses)
return cls._coerce_entity(result)

@classmethod
def list_pipeline_statuses(
cls,
fqn: str,
start_ts: int,
end_ts: int,
limit: Optional[int] = None,
) -> List[PipelineStatus]:
"""List pipeline execution statuses within a time range."""
client = cls._get_client()
return cast(Any, client).list_pipeline_statuses(
fqn=fqn, start_ts=start_ts, end_ts=end_ts, limit=limit
)
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
LineageDetails,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
from metadata.ingestion.models.pipeline_status import OMetaBulkPipelineStatus
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.pipeline.databrickspipeline.metadata import (
DatabrickspipelineSource,
Expand Down Expand Up @@ -82,7 +82,9 @@
},
}
},
"sourceConfig": {"config": {"type": "PipelineMetadata"}},
"sourceConfig": {
"config": {"type": "PipelineMetadata", "statusLookbackDays": 99999}
},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
Expand Down Expand Up @@ -174,35 +176,37 @@
)

EXPECTED_PIPELINE_STATUS = [
OMetaPipelineStatus(
OMetaBulkPipelineStatus(
pipeline_fqn="databricks_pipeline_test.11223344",
pipeline_status=PipelineStatus(
timestamp=1625060460483,
executionStatus="Successful",
taskStatus=[
TaskStatus(
name="Orders_Ingest",
executionStatus="Successful",
startTime=1625060460483,
endTime=1625060863413,
logLink="https://my-workspace.cloud.databricks.com/#job/11223344/run/123",
),
TaskStatus(
name="Match",
executionStatus="Successful",
startTime=1625060460483,
endTime=1625060863413,
logLink="https://my-workspace.cloud.databricks.com/#job/11223344/run/123",
),
TaskStatus(
name="Sessionize",
executionStatus="Successful",
startTime=1625060460483,
endTime=1625060863413,
logLink="https://my-workspace.cloud.databricks.com/#job/11223344/run/123",
),
],
),
pipeline_statuses=[
PipelineStatus(
timestamp=1625060460483,
executionStatus="Successful",
taskStatus=[
TaskStatus(
name="Orders_Ingest",
executionStatus="Successful",
startTime=1625060460483,
endTime=1625060863413,
logLink="https://my-workspace.cloud.databricks.com/#job/11223344/run/123",
),
TaskStatus(
name="Match",
executionStatus="Successful",
startTime=1625060460483,
endTime=1625060863413,
logLink="https://my-workspace.cloud.databricks.com/#job/11223344/run/123",
),
TaskStatus(
name="Sessionize",
executionStatus="Successful",
startTime=1625060460483,
endTime=1625060863413,
logLink="https://my-workspace.cloud.databricks.com/#job/11223344/run/123",
),
],
),
],
),
]

Expand Down
Loading
Loading