Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions ingestion/src/metadata/ingestion/models/pipeline_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
Model required to ingest pipeline status data
from the sample data
"""
from typing import List

from pydantic import BaseModel

from metadata.generated.schema.entity.data.pipeline import PipelineStatus
Expand All @@ -20,3 +22,8 @@
class OMetaPipelineStatus(BaseModel):
pipeline_fqn: str
pipeline_status: PipelineStatus


class OMetaBulkPipelineStatus(BaseModel):
pipeline_fqn: str
pipeline_statuses: List[PipelineStatus]
20 changes: 20 additions & 0 deletions ingestion/src/metadata/ingestion/ometa/mixins/pipeline_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,26 @@ class OMetaPipelineMixin:

client: REST

def add_bulk_pipeline_status(
self, fqn: str, statuses: List[PipelineStatus]
) -> Pipeline:
"""
Send multiple PipelineStatus records to the Pipeline Entity
in a single bulk request
"""
try:
parts = fqn_utils.split(fqn)
normalized_fqn = fqn_utils._build(*parts, quote=True)
except Exception:
normalized_fqn = fqn

resp = self.client.put(
f"{self.get_suffix(Pipeline)}/{quote(normalized_fqn)}/status/bulk",
data="[" + ",".join(status.model_dump_json() for status in statuses) + "]",
)

return Pipeline(**resp)

def add_pipeline_status(self, fqn: str, status: PipelineStatus) -> Pipeline:
"""
Given a pipeline and a PipelineStatus, send it
Expand Down
14 changes: 13 additions & 1 deletion ingestion/src/metadata/ingestion/sink/metadata_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,10 @@
PatchedEntity,
PatchRequest,
)
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
from metadata.ingestion.models.pipeline_status import (
OMetaBulkPipelineStatus,
OMetaPipelineStatus,
)
from metadata.ingestion.models.profile_data import OMetaTableProfileSampleData
from metadata.ingestion.models.search_index_data import OMetaIndexSampleData
from metadata.ingestion.models.tests_data import (
Expand Down Expand Up @@ -647,6 +650,15 @@ def write_pipeline_status(
)
return Either(right=pipeline)

@_run_dispatch.register
def write_bulk_pipeline_status(
self, record: OMetaBulkPipelineStatus
) -> Either[Pipeline]:
pipeline = self.metadata.add_bulk_pipeline_status(
fqn=record.pipeline_fqn, statuses=record.pipeline_statuses
)
return Either(right=pipeline)

@_run_dispatch.register
def write_profile_sample_data(
self, record: OMetaTableProfileSampleData
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"""

import traceback
from datetime import datetime, timedelta, timezone
from typing import Iterable, List, Optional, Tuple

from pydantic import ValidationError
Expand Down Expand Up @@ -55,7 +56,7 @@
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.lineage.sql_lineage import get_column_fqn
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
from metadata.ingestion.models.pipeline_status import OMetaBulkPipelineStatus
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.pipeline.databrickspipeline.kafka_parser import (
extract_dlt_table_dependencies,
Expand Down Expand Up @@ -250,13 +251,22 @@ def get_tasks(self, pipeline_details: DataBrickPipelineDetails) -> List[Task]:

def yield_pipeline_status(
self, pipeline_details: DataBrickPipelineDetails
) -> Iterable[OMetaPipelineStatus]:
) -> Iterable[Either[OMetaBulkPipelineStatus]]:
try:
if not pipeline_details.job_id:
return

lookback_days = self.source_config.numberOfStatus or 1
cutoff_ts = int(
(datetime.now(timezone.utc) - timedelta(days=lookback_days)).timestamp()
* 1000
)
statuses: List[PipelineStatus] = []

for run in self.client.get_job_runs(job_id=pipeline_details.job_id) or []:
run = DBRun(**run)
if run.start_time and run.start_time < cutoff_ts:
break
Comment on lines +268 to +269
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Edge Case: Databricks connector break assumes descending run order

The yield_pipeline_status method uses break when it encounters a run with start_time < cutoff_ts. This assumes that self.client.get_job_runs() returns runs in strictly descending chronological order (newest first).

If the Databricks API returns runs in a different order (e.g., ascending, or unordered), this break will cause the method to skip recent runs that appear after an older one in the list. This would result in incomplete pipeline status data being ingested.

Suggestion: Either:

  1. Verify and document that get_job_runs() guarantees descending order (Databricks Jobs API does return runs in descending order by default when using list_runs)
  2. Use continue instead of break to safely handle all runs regardless of ordering, though this loses the early-termination optimization
  3. Add a sort before iterating: sorted(runs, key=lambda r: r.get('start_time', 0), reverse=True)
if run.start_time and run.start_time < cutoff_ts:
    continue  # safer: skip old runs without assuming order

Was this helpful? React with 👍 / 👎

task_status = [
TaskStatus(
name=str(task.name),
Expand All @@ -269,24 +279,28 @@ def yield_pipeline_status(
)
for task in run.tasks or []
]
pipeline_status = PipelineStatus(
taskStatus=task_status,
timestamp=Timestamp(run.start_time),
executionStatus=STATUS_MAP.get(
run.state.result_state,
StatusType.Failed,
),
statuses.append(
PipelineStatus(
taskStatus=task_status,
timestamp=Timestamp(run.start_time),
executionStatus=STATUS_MAP.get(
run.state.result_state,
StatusType.Failed,
),
)
)

if statuses:
pipeline_fqn = fqn.build(
metadata=self.metadata,
entity_type=Pipeline,
service_name=self.context.get().pipeline_service,
pipeline_name=self.context.get().pipeline,
)
yield Either(
right=OMetaPipelineStatus(
right=OMetaBulkPipelineStatus(
pipeline_fqn=pipeline_fqn,
pipeline_status=pipeline_status,
pipeline_statuses=statuses,
)
)
except Exception as exc:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1093,6 +1093,111 @@ void test_pipelineTaskDisplayName(TestNamespace ns) {
assertEquals("extract_task", pipeline.getTasks().get(0).getName());
}

@Test
void put_bulkPipelineStatus_200_OK(TestNamespace ns) throws Exception {
OpenMetadataClient client = SdkClients.adminClient();
PipelineService service = PipelineServiceTestFactory.createAirflow(ns);

CreatePipeline request = new CreatePipeline();
request.setName(ns.prefix("pipeline_bulk_status"));
request.setService(service.getFullyQualifiedName());

List<Task> tasks =
Arrays.asList(
new Task().withName("task1").withDescription("First task"),
new Task().withName("task2").withDescription("Second task"));
request.setTasks(tasks);

Pipeline pipeline = createEntity(request);
assertNotNull(pipeline);

org.openmetadata.schema.type.Status t1Status =
new org.openmetadata.schema.type.Status()
.withName("task1")
.withExecutionStatus(org.openmetadata.schema.type.StatusType.Successful);
org.openmetadata.schema.type.Status t2Status =
new org.openmetadata.schema.type.Status()
.withName("task2")
.withExecutionStatus(org.openmetadata.schema.type.StatusType.Failed);
List<org.openmetadata.schema.type.Status> taskStatuses = Arrays.asList(t1Status, t2Status);

long baseTime = System.currentTimeMillis() - 5 * 3600000;
List<org.openmetadata.schema.entity.data.PipelineStatus> bulkStatuses =
new java.util.ArrayList<>();
for (int i = 0; i < 5; i++) {
org.openmetadata.schema.entity.data.PipelineStatus ps =
new org.openmetadata.schema.entity.data.PipelineStatus()
.withExecutionStatus(org.openmetadata.schema.type.StatusType.Successful)
.withTimestamp(baseTime + i * 3600000)
.withTaskStatus(taskStatuses);
bulkStatuses.add(ps);
}

Pipeline putResponse =
client.pipelines().addBulkPipelineStatus(pipeline.getFullyQualifiedName(), bulkStatuses);
assertNotNull(putResponse);
assertNotNull(putResponse.getPipelineStatus());
assertEquals(
bulkStatuses.get(4).getTimestamp(), putResponse.getPipelineStatus().getTimestamp());

Pipeline fetched = client.pipelines().get(pipeline.getId().toString(), "pipelineStatus");
assertNotNull(fetched.getPipelineStatus());
assertEquals(bulkStatuses.get(4).getTimestamp(), fetched.getPipelineStatus().getTimestamp());

org.openmetadata.schema.entity.data.PipelineStatus overlapStatus =
new org.openmetadata.schema.entity.data.PipelineStatus()
.withExecutionStatus(org.openmetadata.schema.type.StatusType.Failed)
.withTimestamp(bulkStatuses.get(4).getTimestamp())
.withTaskStatus(taskStatuses);
org.openmetadata.schema.entity.data.PipelineStatus newStatus =
new org.openmetadata.schema.entity.data.PipelineStatus()
.withExecutionStatus(org.openmetadata.schema.type.StatusType.Failed)
.withTimestamp(baseTime + 5 * 3600000)
.withTaskStatus(taskStatuses);

List<org.openmetadata.schema.entity.data.PipelineStatus> overlapStatuses =
Arrays.asList(overlapStatus, newStatus);

putResponse =
client.pipelines().addBulkPipelineStatus(pipeline.getFullyQualifiedName(), overlapStatuses);
assertNotNull(putResponse);
assertEquals(newStatus.getTimestamp(), putResponse.getPipelineStatus().getTimestamp());
assertEquals(
org.openmetadata.schema.type.StatusType.Failed,
putResponse.getPipelineStatus().getExecutionStatus());
}

@Test
void put_bulkPipelineStatus_invalidTask_4xx(TestNamespace ns) {
OpenMetadataClient client = SdkClients.adminClient();
PipelineService service = PipelineServiceTestFactory.createAirflow(ns);

CreatePipeline request = new CreatePipeline();
request.setName(ns.prefix("pipeline_bulk_invalid"));
request.setService(service.getFullyQualifiedName());
request.setTasks(Arrays.asList(new Task().withName("task1").withDescription("Task 1")));

Pipeline pipeline = createEntity(request);

org.openmetadata.schema.type.Status invalidTaskStatus =
new org.openmetadata.schema.type.Status()
.withName("nonExistentTask")
.withExecutionStatus(org.openmetadata.schema.type.StatusType.Failed);

org.openmetadata.schema.entity.data.PipelineStatus ps =
new org.openmetadata.schema.entity.data.PipelineStatus()
.withExecutionStatus(org.openmetadata.schema.type.StatusType.Failed)
.withTimestamp(System.currentTimeMillis())
.withTaskStatus(Arrays.asList(invalidTaskStatus));

assertThrows(
Exception.class,
() ->
client
.pipelines()
.addBulkPipelineStatus(pipeline.getFullyQualifiedName(), Arrays.asList(ps)));
}

@Test
void test_pipelineStatusWithTaskTiming_200_OK(TestNamespace ns) throws Exception {
OpenMetadataClient client = SdkClients.adminClient();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.openmetadata.sdk.services.dataassets;

import java.util.List;
import org.openmetadata.schema.api.data.CreatePipeline;
import org.openmetadata.schema.entity.data.Pipeline;
import org.openmetadata.schema.entity.data.PipelineStatus;
Expand Down Expand Up @@ -31,8 +32,19 @@ public Pipeline addPipelineStatus(String fqn, PipelineStatus status)
} catch (java.io.UnsupportedEncodingException e) {
encodedFqn = fqn;
}
// API path is /{fqn}/status (not /name/{fqn}/status)
String path = basePath + "/" + encodedFqn + "/status";
return httpClient.execute(HttpMethod.PUT, path, status, Pipeline.class);
}

public Pipeline addBulkPipelineStatus(String fqn, List<PipelineStatus> statuses)
throws OpenMetadataException {
String encodedFqn;
try {
encodedFqn = java.net.URLEncoder.encode(fqn, "UTF-8").replace("+", "%20");
} catch (java.io.UnsupportedEncodingException e) {
encodedFqn = fqn;
}
String path = basePath + "/" + encodedFqn + "/status/bulk";
return httpClient.execute(HttpMethod.PUT, path, statuses, Pipeline.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,36 @@ default void insert(String entityFQNHash, String extension, String jsonSchema, S
insert(getTimeSeriesTableName(), entityFQNHash, extension, jsonSchema, json);
}

@ConnectionAwareSqlUpdate(
value =
"INSERT INTO <table>(entityFQNHash, extension, jsonSchema, json) "
+ "VALUES (:entityFQNHash, :extension, :jsonSchema, :json) "
+ "ON DUPLICATE KEY UPDATE json = VALUES(json)",
connectionType = MYSQL)
@ConnectionAwareSqlUpdate(
value =
"INSERT INTO <table>(entityFQNHash, extension, jsonSchema, json) "
+ "VALUES (:entityFQNHash, :extension, :jsonSchema, (:json :: jsonb)) "
+ "ON CONFLICT (entityFQNHash, extension, timestamp) DO UPDATE SET json = EXCLUDED.json",
connectionType = POSTGRES)
void upsert(
@Define("table") String table,
@BindFQN("entityFQNHash") String entityFQNHash,
@Bind("extension") String extension,
@Bind("jsonSchema") String jsonSchema,
@Bind("json") String json);

default void upsert(String entityFQN, String extension, String jsonSchema, String json) {
upsert(getTimeSeriesTableName(), entityFQN, extension, jsonSchema, json);
}

default void upsertBatch(
String entityFQN, String extension, String jsonSchema, List<String> jsons) {
for (String json : jsons) {
upsert(entityFQN, extension, jsonSchema, json);
}
}

@ConnectionAwareSqlUpdate(
value =
"INSERT INTO <table>(entityFQNHash, jsonSchema, json) "
Expand Down
Loading
Loading