Skip to content

Commit 617d8b2

Browse files
committed
Merge remote-tracking branch 'origin/main' into semantic-search
2 parents d286908 + d8265b7 commit 617d8b2

File tree

98 files changed

+2085
-400
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

98 files changed

+2085
-400
lines changed

.github/workflows/docker-k8s-operator.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,6 @@ on:
2020
description: "Do you want to update docker image latest tag as well ?"
2121
type: boolean
2222

23-
permissions:
24-
contents: read
25-
2623
concurrency:
2724
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
2825
cancel-in-progress: true

CLAUDE.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,8 @@ yarn parse-schema # Parse JSON schemas for frontend (connection and
198198
- Follow existing project patterns and conventions
199199
- Generate production-ready code, not tutorial code
200200
- Create integration tests in openmetadata-integration-tests
201+
- Do not use Fully Qualified Names in the code such as org.openmetadata.schema.type.Status instead import the class name
202+
- Do not import wild-card packages instead import exactly required packages
201203

202204
### TypeScript/Frontend Code Requirements
203205
- **NEVER use `any` type** in TypeScript code - always use proper types
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
-- Change entity_extension_time_series.timestamp from VIRTUAL to STORED for performance.
2+
-- STORED columns are materialized on disk, making unique constraint checks and range
3+
-- queries on timestamp significantly faster (especially for bulk pipeline status upserts).
4+
-- MySQL does not allow ALTER from VIRTUAL to STORED directly, so we drop and re-add.
5+
-- NOTE: This will lock the table for a full rebuild. On large deployments with millions
6+
-- of rows in entity_extension_time_series, plan for downtime accordingly.
7+
ALTER TABLE entity_extension_time_series
8+
DROP INDEX entity_extension_time_series_constraint,
9+
DROP COLUMN `timestamp`,
10+
ADD COLUMN `timestamp` bigint unsigned GENERATED ALWAYS AS (json_unquote(json_extract(`json`, _utf8mb4'$.timestamp'))) STORED NOT NULL,
11+
ADD UNIQUE KEY `entity_extension_time_series_constraint` (`entityFQNHash`, `extension`, `timestamp`);
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
-- No changes needed for entity_extension_time_series.timestamp on PostgreSQL.
2+
-- PostgreSQL already uses STORED for the generated timestamp column (since table creation).
3+
-- MySQL migration changes it from VIRTUAL to STORED for consistency and performance.

ingestion/src/metadata/ingestion/models/pipeline_status.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
Model required to ingest pipeline status data
1313
from the sample data
1414
"""
15+
from typing import List
16+
1517
from pydantic import BaseModel
1618

1719
from metadata.generated.schema.entity.data.pipeline import PipelineStatus
@@ -20,3 +22,8 @@
2022
class OMetaPipelineStatus(BaseModel):
2123
pipeline_fqn: str
2224
pipeline_status: PipelineStatus
25+
26+
27+
class OMetaBulkPipelineStatus(BaseModel):
28+
pipeline_fqn: str
29+
pipeline_statuses: List[PipelineStatus]

ingestion/src/metadata/ingestion/ometa/mixins/pipeline_mixin.py

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
1414
To be used by OpenMetadata class
1515
"""
16-
from typing import List
16+
from typing import List, Optional
1717

1818
from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest
1919
from metadata.generated.schema.entity.data.pipeline import (
@@ -39,6 +39,26 @@ class OMetaPipelineMixin:
3939

4040
client: REST
4141

42+
def add_bulk_pipeline_status(
43+
self, fqn: str, statuses: List[PipelineStatus]
44+
) -> Pipeline:
45+
"""
46+
Send multiple PipelineStatus records to the Pipeline Entity
47+
in a single bulk request
48+
"""
49+
try:
50+
parts = fqn_utils.split(fqn)
51+
normalized_fqn = fqn_utils._build(*parts, quote=True)
52+
except Exception:
53+
normalized_fqn = fqn
54+
55+
resp = self.client.put(
56+
f"{self.get_suffix(Pipeline)}/{quote(normalized_fqn)}/status/bulk",
57+
data="[" + ",".join(status.model_dump_json() for status in statuses) + "]",
58+
)
59+
60+
return Pipeline(**resp)
61+
4262
def add_pipeline_status(self, fqn: str, status: PipelineStatus) -> Pipeline:
4363
"""
4464
Given a pipeline and a PipelineStatus, send it
@@ -60,6 +80,34 @@ def add_pipeline_status(self, fqn: str, status: PipelineStatus) -> Pipeline:
6080

6181
return Pipeline(**resp)
6282

83+
def list_pipeline_statuses(
84+
self,
85+
fqn: str,
86+
start_ts: int,
87+
end_ts: int,
88+
limit: Optional[int] = None,
89+
) -> List[PipelineStatus]:
90+
"""
91+
List PipelineStatus records for a Pipeline within a time range.
92+
"""
93+
try:
94+
parts = fqn_utils.split(fqn)
95+
normalized_fqn = fqn_utils._build(*parts, quote=True)
96+
except Exception:
97+
normalized_fqn = fqn
98+
99+
params = f"startTs={start_ts}&endTs={end_ts}"
100+
if limit is not None:
101+
params += f"&limit={limit}"
102+
103+
resp = self.client.get(
104+
f"{self.get_suffix(Pipeline)}/{quote(normalized_fqn)}/status?{params}",
105+
)
106+
107+
if resp and "data" in resp:
108+
return [PipelineStatus(**status) for status in resp["data"]]
109+
return []
110+
63111
def add_task_to_pipeline(self, pipeline: Pipeline, *tasks: Task) -> Pipeline:
64112
"""
65113
The background logic for this method is that during

ingestion/src/metadata/ingestion/sink/metadata_rest.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,10 @@
9090
PatchedEntity,
9191
PatchRequest,
9292
)
93-
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
93+
from metadata.ingestion.models.pipeline_status import (
94+
OMetaBulkPipelineStatus,
95+
OMetaPipelineStatus,
96+
)
9497
from metadata.ingestion.models.profile_data import OMetaTableProfileSampleData
9598
from metadata.ingestion.models.search_index_data import OMetaIndexSampleData
9699
from metadata.ingestion.models.tests_data import (
@@ -649,6 +652,15 @@ def write_pipeline_status(
649652
)
650653
return Either(right=pipeline)
651654

655+
@_run_dispatch.register
656+
def write_bulk_pipeline_status(
657+
self, record: OMetaBulkPipelineStatus
658+
) -> Either[Pipeline]:
659+
pipeline = self.metadata.add_bulk_pipeline_status(
660+
fqn=record.pipeline_fqn, statuses=record.pipeline_statuses
661+
)
662+
return Either(right=pipeline)
663+
652664
@_run_dispatch.register
653665
def write_profile_sample_data(
654666
self, record: OMetaTableProfileSampleData

ingestion/src/metadata/ingestion/source/pipeline/databrickspipeline/metadata.py

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
"""
1515

1616
import traceback
17+
from datetime import datetime, timedelta, timezone
1718
from typing import Iterable, List, Optional, Tuple
1819

1920
from pydantic import ValidationError
@@ -55,7 +56,7 @@
5556
from metadata.ingestion.api.models import Either
5657
from metadata.ingestion.api.steps import InvalidSourceException
5758
from metadata.ingestion.lineage.sql_lineage import get_column_fqn
58-
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
59+
from metadata.ingestion.models.pipeline_status import OMetaBulkPipelineStatus
5960
from metadata.ingestion.ometa.ometa_api import OpenMetadata
6061
from metadata.ingestion.source.pipeline.databrickspipeline.kafka_parser import (
6162
extract_dlt_table_dependencies,
@@ -250,13 +251,22 @@ def get_tasks(self, pipeline_details: DataBrickPipelineDetails) -> List[Task]:
250251

251252
def yield_pipeline_status(
252253
self, pipeline_details: DataBrickPipelineDetails
253-
) -> Iterable[OMetaPipelineStatus]:
254+
) -> Iterable[Either[OMetaBulkPipelineStatus]]:
254255
try:
255256
if not pipeline_details.job_id:
256257
return
257258

259+
lookback_days = self.source_config.statusLookbackDays or 1
260+
cutoff_ts = int(
261+
(datetime.now(timezone.utc) - timedelta(days=lookback_days)).timestamp()
262+
* 1000
263+
)
264+
statuses: List[PipelineStatus] = []
265+
258266
for run in self.client.get_job_runs(job_id=pipeline_details.job_id) or []:
259267
run = DBRun(**run)
268+
if run.start_time and run.start_time < cutoff_ts:
269+
break
260270
task_status = [
261271
TaskStatus(
262272
name=str(task.name),
@@ -269,24 +279,28 @@ def yield_pipeline_status(
269279
)
270280
for task in run.tasks or []
271281
]
272-
pipeline_status = PipelineStatus(
273-
taskStatus=task_status,
274-
timestamp=Timestamp(run.start_time),
275-
executionStatus=STATUS_MAP.get(
276-
run.state.result_state,
277-
StatusType.Failed,
278-
),
282+
statuses.append(
283+
PipelineStatus(
284+
taskStatus=task_status,
285+
timestamp=Timestamp(run.start_time),
286+
executionStatus=STATUS_MAP.get(
287+
run.state.result_state,
288+
StatusType.Failed,
289+
),
290+
)
279291
)
292+
293+
if statuses:
280294
pipeline_fqn = fqn.build(
281295
metadata=self.metadata,
282296
entity_type=Pipeline,
283297
service_name=self.context.get().pipeline_service,
284298
pipeline_name=self.context.get().pipeline,
285299
)
286300
yield Either(
287-
right=OMetaPipelineStatus(
301+
right=OMetaBulkPipelineStatus(
288302
pipeline_fqn=pipeline_fqn,
289-
pipeline_status=pipeline_status,
303+
pipeline_statuses=statuses,
290304
)
291305
)
292306
except Exception as exc:

ingestion/src/metadata/readers/dataframe/dsv.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,8 @@ def chunk_generator():
140140
storage_options=storage_options,
141141
compression=compression,
142142
encoding_errors="ignore",
143+
escapechar="\\",
144+
engine="python",
143145
) as reader:
144146
for chunks in reader:
145147
chunks = self._fix_malformed_quoted_chunk(

ingestion/src/metadata/sdk/entities/pipelines.py

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
"""
22
Pipelines entity SDK with fluent API
33
"""
4-
from typing import Type
4+
from typing import Any, List, Optional, Type, cast
55

66
from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest
7-
from metadata.generated.schema.entity.data.pipeline import Pipeline
7+
from metadata.generated.schema.entity.data.pipeline import Pipeline, PipelineStatus
88
from metadata.sdk.entities.base import BaseEntity
99

1010

@@ -15,3 +15,33 @@ class Pipelines(BaseEntity[Pipeline, CreatePipelineRequest]):
1515
def entity_type(cls) -> Type[Pipeline]:
1616
"""Return the Pipeline entity type"""
1717
return Pipeline
18+
19+
@classmethod
20+
def add_pipeline_status(cls, fqn: str, status: PipelineStatus) -> Pipeline:
21+
"""Add a single pipeline execution status."""
22+
client = cls._get_client()
23+
result = cast(Any, client).add_pipeline_status(fqn=fqn, status=status)
24+
return cls._coerce_entity(result)
25+
26+
@classmethod
27+
def add_bulk_pipeline_status(
28+
cls, fqn: str, statuses: List[PipelineStatus]
29+
) -> Pipeline:
30+
"""Add multiple pipeline execution statuses in a single bulk request."""
31+
client = cls._get_client()
32+
result = cast(Any, client).add_bulk_pipeline_status(fqn=fqn, statuses=statuses)
33+
return cls._coerce_entity(result)
34+
35+
@classmethod
36+
def list_pipeline_statuses(
37+
cls,
38+
fqn: str,
39+
start_ts: int,
40+
end_ts: int,
41+
limit: Optional[int] = None,
42+
) -> List[PipelineStatus]:
43+
"""List pipeline execution statuses within a time range."""
44+
client = cls._get_client()
45+
return cast(Any, client).list_pipeline_statuses(
46+
fqn=fqn, start_ts=start_ts, end_ts=end_ts, limit=limit
47+
)

0 commit comments

Comments
 (0)