Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,18 @@ public interface SearchClient
}
""";

// Cascade variant: full-object replace (handles add/update) plus removal on
// null params, so child docs stay in sync when a parent's cert is added,
// changed, or removed.
String CASCADE_CERTIFICATION_SCRIPT =
"""
if (params.certification == null) {
ctx._source.remove('certification');
} else {
ctx._source.certification = params.certification;
}
""";

String UPDATE_GLOSSARY_TERM_TAG_FQN_BY_PREFIX_SCRIPT =
"""
if (ctx._source.containsKey('tags')) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static org.openmetadata.service.Entity.WEB_ANALYTIC_ENTITY_VIEW_REPORT_DATA;
import static org.openmetadata.service.Entity.WEB_ANALYTIC_USER_ACTIVITY_REPORT_DATA;
import static org.openmetadata.service.search.SearchClient.ADD_FOLLOWERS_SCRIPT;
import static org.openmetadata.service.search.SearchClient.CASCADE_CERTIFICATION_SCRIPT;
import static org.openmetadata.service.search.SearchClient.DATA_ASSET_SEARCH_ALIAS;
import static org.openmetadata.service.search.SearchClient.DEFAULT_UPDATE_SCRIPT;
import static org.openmetadata.service.search.SearchClient.GLOBAL_SEARCH_ALIAS;
Expand Down Expand Up @@ -1816,6 +1817,47 @@ private void handleEntityCertificationUpdate(EntityInterface entity, ChangeDescr

AssetCertification certification = getCertificationFromEntity(entity);
updateEntityCertificationInSearch(entity, certification);
cascadeCertificationToChildren(entity, certification);
}

// Pushes the cert change onto every child search doc denormalized from this
// entity. Without this the cert filter on the DQ dashboard (which queries
// children like test_case/test_case_result/test_case_resolution_status by
// `certification.tagLabel.tagFQN`) would silently use the stale cert until a
// reindex. RAW_REPLACE in PropagationDescriptor can't be used because it
// restores the old value on delete; we drive a dedicated script instead.
private void cascadeCertificationToChildren(
EntityInterface entity, AssetCertification certification) {
String type = entity.getEntityReference().getType();
Comment thread
gitar-bot[bot] marked this conversation as resolved.
if (!Entity.TABLE.equalsIgnoreCase(type)) {
// Scope: Table only. Dashboard/ApiCollection children also have cert in
// their mappings; extend here when those denormalization paths are added.
return;
}
IndexMapping indexMapping = entityIndexMap.get(Entity.TABLE);
if (indexMapping == null) {
return;
}
List<String> childAliases = indexMapping.getChildAliases(clusterAlias);
if (nullOrEmpty(childAliases)) {
return;
}

Map<String, Object> params = new HashMap<>();
params.put("certification", certification); // null when cert was removed

Pair<String, String> parentMatch = new ImmutablePair<>("table.id", entity.getId().toString());

try {
searchClient.updateChildren(
childAliases, parentMatch, new ImmutablePair<>(CASCADE_CERTIFICATION_SCRIPT, params));
} catch (Exception e) {
LOG.error(
"Failed to cascade certification for table [{}]: {}",
entity.getFullyQualifiedName(),
e.getMessage(),
e);
}
}

private boolean isCertificationUpdated(ChangeDescription change) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ public Map<String, Object> buildSearchIndexDocInternal(Map<String, Object> doc)
}
}

if (parentTable.getCertification() != null) {
doc.put("certification", parentTable.getCertification());
}

if (column.getExtension() != null) {
doc.put("extension", column.getExtension());
doc.put(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ public Map<String, Object> buildSearchIndexDocInternal(Map<String, Object> doc)
}

private void setParentRelationships(Map<String, Object> doc, TestCase testCase) {
// Denormalize parent relationships and inherit domains from the linked table.
// addTestSuiteParentEntityRelations already fetches the Table with "domains",
// Denormalize parent relationships and inherit domains/certification from the linked table.
// addTestSuiteParentEntityRelations already fetches the Table with these fields,
// so we reuse it to avoid an extra DB query per test case.
EntityInterface linkedTable = denormalizeTestSuiteParents(doc, testCase);

Expand All @@ -83,6 +83,12 @@ private void setParentRelationships(Map<String, Object> doc, TestCase testCase)
&& !nullOrEmpty(linkedTable.getDomains())) {
doc.put("domains", getEntitiesWithDisplayName(linkedTable.getDomains()));
}

if (testCase.getCertification() == null
&& linkedTable != null
&& linkedTable.getCertification() != null) {
doc.put("certification", linkedTable.getCertification());
}
}

private EntityInterface denormalizeTestSuiteParents(Map<String, Object> doc, TestCase testCase) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import java.util.HashMap;
import java.util.Map;
import org.openmetadata.schema.entity.data.Table;
import org.openmetadata.schema.tests.TestCase;
import org.openmetadata.schema.tests.TestSuite;
import org.openmetadata.schema.tests.type.TestCaseResolutionStatus;
Expand Down Expand Up @@ -62,7 +63,12 @@ private void setParentRelationships(Map<String, Object> doc) {
if (testSuite == null) return;
doc.put("testSuite", testSuite.getEntityReference());
if (testSuite.getBasicEntityReference() != null) {
TestSuiteIndex.addTestSuiteParentEntityRelations(testSuite.getBasicEntityReference(), doc);
Table linkedTable =
TestSuiteIndex.addTestSuiteParentEntityRelations(
testSuite.getBasicEntityReference(), doc);
if (linkedTable != null && linkedTable.getCertification() != null) {
doc.put("certification", linkedTable.getCertification());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ private void setTableEntityParentRelations(
Entity.getEntityByName(
Entity.TABLE,
entityLink.getEntityFQN(),
"database,databaseSchema,service",
"database,databaseSchema,service,certification",
Include.ALL);
esDoc.put("database", table.getDatabase());
esDoc.put("databaseSchema", table.getDatabaseSchema());
Expand All @@ -127,6 +127,9 @@ private void setTableEntityParentRelations(
esDoc.put("serviceType", table.getServiceType());
}
esDoc.put("table", table.getEntityReference());
if (table.getCertification() != null) {
esDoc.put("certification", table.getCertification());
}
} catch (EntityNotFoundException ex) {
LOG.warn(
"Table [{}] not found during search indexing: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,17 @@ public Map<String, Object> buildSearchIndexDocInternal(Map<String, Object> doc)
private void setParentRelationships(Map<String, Object> doc, TestSuite testSuite) {
EntityReference entityReference = testSuite.getBasicEntityReference();
if (entityReference == null) return;
addTestSuiteParentEntityRelations(entityReference, doc);
Table linkedTable = addTestSuiteParentEntityRelations(entityReference, doc);
if (linkedTable != null && linkedTable.getCertification() != null) {
doc.put("certification", linkedTable.getCertification());
}
}

static Table addTestSuiteParentEntityRelations(
EntityReference testSuiteRef, Map<String, Object> doc) {
if (testSuiteRef.getType().equals(Entity.TABLE)) {
try {
Table table = Entity.getEntity(testSuiteRef, "domains", Include.ALL);
Table table = Entity.getEntity(testSuiteRef, "domains,certification", Include.ALL);
doc.put("table", table.getEntityReference());
doc.put("database", table.getDatabase());
doc.put("databaseSchema", table.getDatabaseSchema());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -849,6 +849,101 @@ void propagateCertificationTagsUsesOldFqnWhenTagRenamed() {
assertEquals("Certification.Gold", keyCaptor.getValue().getRight());
}

@Test
void propagateCertificationTagsCascadesToTableChildrenOnAdd() throws IOException {
Table table = mock(Table.class);
UUID entityId = UUID.randomUUID();
when(table.getId()).thenReturn(entityId);
when(table.getEntityReference())
.thenReturn(new EntityReference().withId(entityId).withType(Entity.TABLE));
AssetCertification cert =
new AssetCertification()
.withTagLabel(
new TagLabel()
.withName("Gold")
.withDescription("Certified")
.withTagFQN("Certification.Gold"));
when(table.getCertification()).thenReturn(cert);

ChangeDescription changeDescription =
changeDescription(
List.of(),
List.of(
new FieldChange().withName("certification").withOldValue("{}").withNewValue("{}")),
List.of());

repository.propagateCertificationTags(Entity.TABLE, table, changeDescription);

@SuppressWarnings("unchecked")
ArgumentCaptor<Pair<String, Map<String, Object>>> updatesCaptor =
ArgumentCaptor.forClass(Pair.class);
@SuppressWarnings("unchecked")
ArgumentCaptor<Pair<String, String>> matchCaptor = ArgumentCaptor.forClass(Pair.class);
verify(searchClient)
.updateChildren(
eq(List.of("cluster_column_search_index")),
matchCaptor.capture(),
updatesCaptor.capture());
assertEquals("table.id", matchCaptor.getValue().getLeft());
assertEquals(entityId.toString(), matchCaptor.getValue().getRight());
assertEquals(SearchClient.CASCADE_CERTIFICATION_SCRIPT, updatesCaptor.getValue().getLeft());
assertSame(cert, updatesCaptor.getValue().getRight().get("certification"));
}

@Test
void propagateCertificationTagsCascadesNullToTableChildrenOnRemove() throws IOException {
Table table = mock(Table.class);
UUID entityId = UUID.randomUUID();
when(table.getId()).thenReturn(entityId);
when(table.getEntityReference())
.thenReturn(new EntityReference().withId(entityId).withType(Entity.TABLE));
when(table.getCertification()).thenReturn(null);

ChangeDescription changeDescription =
changeDescription(
List.of(),
List.of(),
List.of(new FieldChange().withName("certification").withOldValue("{}")));

repository.propagateCertificationTags(Entity.TABLE, table, changeDescription);

@SuppressWarnings("unchecked")
ArgumentCaptor<Pair<String, Map<String, Object>>> updatesCaptor =
ArgumentCaptor.forClass(Pair.class);
verify(searchClient)
.updateChildren(
eq(List.of("cluster_column_search_index")), any(Pair.class), updatesCaptor.capture());
assertEquals(SearchClient.CASCADE_CERTIFICATION_SCRIPT, updatesCaptor.getValue().getLeft());
assertNull(updatesCaptor.getValue().getRight().get("certification"));
}

@Test
void propagateCertificationTagsDoesNotCascadeForNonTableEntities() throws IOException {
// Pipelines carry a native certification but DQ dashboard cascade is
// scoped to Table — children of Pipeline aren't part of the test_case
// family. Verify we don't blast an updateByQuery against unrelated
// child indices.
Pipeline pipeline = mock(Pipeline.class);
UUID entityId = UUID.randomUUID();
when(pipeline.getId()).thenReturn(entityId);
when(pipeline.getEntityReference())
.thenReturn(new EntityReference().withId(entityId).withType(Entity.PIPELINE));
when(pipeline.getCertification())
.thenReturn(
new AssetCertification().withTagLabel(new TagLabel().withTagFQN("Certification.Gold")));

ChangeDescription changeDescription =
changeDescription(
List.of(),
List.of(
new FieldChange().withName("certification").withOldValue("{}").withNewValue("{}")),
List.of());

repository.propagateCertificationTags(Entity.PIPELINE, pipeline, changeDescription);

verify(searchClient, never()).updateChildren(any(List.class), any(Pair.class), any(Pair.class));
}

@Test
void propagateCertificationTagsUsesQuotedOldNameWhenTagHasNoParentFqn() {
Tag tag = mock(Tag.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,46 @@
}
}
},
"certification": {
"type": "object",
"properties": {
"tagLabel": {
"type": "object",
"properties": {
"tagFQN": {
"type": "keyword",
"normalizer": "lowercase_normalizer",
"fields": {
"text": {
"type": "text",
"analyzer": "om_analyzer"
}
}
},
"labelType": {
"type": "keyword"
},
"description": {
"type": "text"
},
"source": {
"type": "keyword"
},
"state": {
"type": "keyword"
}
}
},
"appliedDate": {
"type": "date",
"format": "strict_date_optional_time||epoch_millis"
},
"expiryDate": {
"type": "date",
"format": "strict_date_optional_time||epoch_millis"
}
}
},
"classificationTags": {
"type": "keyword",
"normalizer": "lowercase_normalizer"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,46 @@
}
}
},
"certification": {
"type": "object",
"properties": {
"tagLabel": {
"type": "object",
"properties": {
"tagFQN": {
"type": "keyword",
"normalizer": "lowercase_normalizer",
"fields": {
"text": {
"type": "text",
"analyzer": "om_analyzer"
}
}
},
"labelType": {
"type": "keyword"
},
"description": {
"type": "text"
},
"source": {
"type": "keyword"
},
"state": {
"type": "keyword"
}
}
},
"appliedDate": {
"type": "date",
"format": "strict_date_optional_time||epoch_millis"
},
"expiryDate": {
"type": "date",
"format": "strict_date_optional_time||epoch_millis"
}
}
},
"classificationTags": {
"type": "keyword",
"normalizer": "lowercase_normalizer"
Expand Down
Loading
Loading