Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,20 +1,29 @@
package org.openmetadata.it.tests;

import static org.junit.jupiter.api.Assertions.assertAll;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import es.org.elasticsearch.client.Request;
import es.org.elasticsearch.client.Response;
import es.org.elasticsearch.client.RestClient;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
import org.openmetadata.it.bootstrap.TestSuiteBootstrap;
import org.openmetadata.it.util.SdkClients;
import org.openmetadata.it.util.TestNamespace;
import org.openmetadata.schema.api.data.CreateTable;
Expand All @@ -26,11 +35,14 @@
import org.openmetadata.schema.entity.services.DatabaseService;
import org.openmetadata.schema.entity.services.ingestionPipelines.AirflowConfig;
import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatus;
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatusType;
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineType;
import org.openmetadata.schema.metadataIngestion.SourceConfig;
import org.openmetadata.schema.metadataIngestion.TestSuitePipeline;
import org.openmetadata.schema.tests.TestCase;
import org.openmetadata.schema.tests.TestSuite;
import org.openmetadata.schema.tests.type.TestSummary;
import org.openmetadata.schema.type.Column;
import org.openmetadata.schema.type.ColumnDataType;
import org.openmetadata.schema.type.EntityHistory;
Expand All @@ -39,6 +51,7 @@
import org.openmetadata.sdk.fluent.builders.TestCaseBuilder;
import org.openmetadata.sdk.models.ListParams;
import org.openmetadata.sdk.models.ListResponse;
import org.openmetadata.sdk.network.HttpMethod;
import org.openmetadata.service.resources.dqtests.TestSuiteResource;

/**
Expand Down Expand Up @@ -756,6 +769,50 @@ void test_deleteLogicalTestSuiteWithPipeline(TestNamespace ns) {
Exception.class, () -> client.ingestionPipelines().get(pipeline.getId().toString()));
}

// ===================================================================
// PIPELINE COMPLETION AND SEARCH INDEX TESTS
// ===================================================================

@Test
void test_pipelineCompletionUpdatesSearchIndex(TestNamespace ns) throws Exception {
OpenMetadataClient client = SdkClients.adminClient();

Table table = createTableForBasicTestSuite(ns, "pipeline_es");
TestSuite testSuite = createBasicTestSuiteForTable(table);
List<TestCase> testCases = createTestCases(client, ns, table, 4);
recordTestCaseResults(client, testCases, 2, 2);

Double versionBefore = getEntity(testSuite.getId().toString()).getVersion();

IngestionPipeline pipeline = createTestSuitePipeline(client, ns, testSuite);
putPipelineStatus(client, pipeline, PipelineStatusType.SUCCESS);

try (RestClient searchClient = TestSuiteBootstrap.createSearchClient()) {
Awaitility.await("pipeline completion updates test suite and search index")
.atMost(Duration.ofSeconds(30))
.pollInterval(Duration.ofSeconds(2))
.untilAsserted(
() -> {
TestSuite updated = getEntityWithFields(testSuite.getId().toString(), "summary");

assertTrue(updated.getVersion() > versionBefore);

TestSummary summary = updated.getSummary();
assertNotNull(summary);
assertAll(
() -> assertEquals(4, summary.getTotal()),
() -> assertEquals(2, summary.getSuccess()),
() -> assertEquals(2, summary.getFailed()));

assertEquals(4, updated.getTestCaseResultSummary().size());

String esBody = queryTestSuiteSearchIndex(searchClient, testSuite.getId());
assertTrue(esBody.contains(testSuite.getId().toString()));
assertTrue(esBody.contains("\"total\":4"));
});
}
}

// ===================================================================
// TEST SUITE FILTERING AND LISTING TESTS
// ===================================================================
Expand Down Expand Up @@ -1005,4 +1062,103 @@ private ListResponse<TestSuite> listTestSuites(Map<String, String> params) {
throw new RuntimeException("Failed to list test suites", e);
}
}

private TestSuite createBasicTestSuiteForTable(Table table) {
CreateTestSuite request = new CreateTestSuite();
request.setName(table.getFullyQualifiedName());
request.setBasicEntityReference(table.getFullyQualifiedName());
return createBasicTestSuite(request);
}

private List<TestCase> createTestCases(
OpenMetadataClient client, TestNamespace ns, Table table, int count) {
return IntStream.range(0, count)
.mapToObj(
i ->
TestCaseBuilder.create(client)
.name(ns.prefix("tc_pipeline_es_" + i))
.forTable(table)
.testDefinition("tableRowCountToEqual")
.parameter("value", "100")
.create())
.toList();
}

private void recordTestCaseResults(
OpenMetadataClient client, List<TestCase> testCases, int passCount, int failCount)
throws Exception {
for (int i = 0; i < passCount; i++) {
client
.testCaseResults()
.forTestCase(testCases.get(i).getFullyQualifiedName())
.passed()
.create();
}
for (int i = passCount; i < passCount + failCount; i++) {
client
.testCaseResults()
.forTestCase(testCases.get(i).getFullyQualifiedName())
.failed()
.create();
}
}

private IngestionPipeline createTestSuitePipeline(
OpenMetadataClient client, TestNamespace ns, TestSuite testSuite) {
CreateIngestionPipeline request = new CreateIngestionPipeline();
request.setName(ns.prefix("pipeline_es_run"));
request.setService(testSuite.getEntityReference());
request.setPipelineType(PipelineType.TEST_SUITE);
request.setSourceConfig(new SourceConfig().withConfig(new TestSuitePipeline()));
request.setAirflowConfig(new AirflowConfig().withStartDate(new java.util.Date()));
return client.ingestionPipelines().create(request);
}

private void putPipelineStatus(
OpenMetadataClient client, IngestionPipeline pipeline, PipelineStatusType statusType) {
PipelineStatus status =
new PipelineStatus()
.withPipelineState(statusType)
.withRunId(UUID.randomUUID().toString())
.withTimestamp(System.currentTimeMillis());
String path =
"/v1/services/ingestionPipelines/" + pipeline.getFullyQualifiedName() + "/pipelineStatus";
client.getHttpClient().execute(HttpMethod.PUT, path, status, PipelineStatus.class);
}

private String getTestSuiteSearchIndexName() {
return "openmetadata_test_suite_search_index";
}

private void refreshTestSuiteSearchIndex(RestClient searchClient) throws Exception {
Request request = new Request("POST", "/" + getTestSuiteSearchIndexName() + "/_refresh");
searchClient.performRequest(request);
}

private String queryTestSuiteSearchIndex(RestClient searchClient, UUID testSuiteId)
throws Exception {
refreshTestSuiteSearchIndex(searchClient);

String query =
"""
{
"size": 1,
"query": {
"bool": {
"must": [
{ "term": { "_id": "%s" } }
]
}
}
}
"""
.formatted(testSuiteId);

Request request = new Request("POST", "/" + getTestSuiteSearchIndexName() + "/_search");
request.setJsonEntity(query);
Response response = searchClient.performRequest(request);

assertEquals(200, response.getStatusLine().getStatusCode());
return new String(response.getEntity().getContent().readAllBytes(), StandardCharsets.UTF_8);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import static org.openmetadata.service.Entity.TEST_CASE;
import static org.openmetadata.service.Entity.TEST_CASE_RESULT;
import static org.openmetadata.service.Entity.TEST_DEFINITION;
import static org.openmetadata.service.Entity.TEST_SUITE;

import jakarta.json.JsonPatch;
import jakarta.ws.rs.core.Response;
Expand All @@ -18,9 +17,7 @@
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.common.utils.CommonUtil;
import org.openmetadata.schema.tests.ResultSummary;
import org.openmetadata.schema.tests.TestCase;
import org.openmetadata.schema.tests.TestSuite;
import org.openmetadata.schema.tests.type.TestCaseDimensionResult;
import org.openmetadata.schema.tests.type.TestCaseResult;
import org.openmetadata.schema.tests.type.TestCaseStatus;
Expand Down Expand Up @@ -247,90 +244,6 @@ private void updateTestCaseStatus(TestCaseResult testCaseResult, OperationType o
EntityRepository.EntityUpdater entityUpdater =
testCaseRepository.getUpdater(original, updated, EntityRepository.Operation.PATCH, null);
entityUpdater.update();
updateTestSuiteSummary(updated);
}

private void updateTestSuiteSummary(TestCase testCase) {
List<String> fqns =
testCase.getTestSuites() != null
? testCase.getTestSuites().stream().map(TestSuite::getFullyQualifiedName).toList()
: null;
TestSuiteRepository testSuiteRepository = new TestSuiteRepository();
if (fqns != null) {
for (String fqn : fqns) {
TestSuite testSuite = Entity.getEntityByName(TEST_SUITE, fqn, "*", Include.ALL);
if (testSuite != null) {
// LOG 1: LOAD
int resultCountBefore =
testSuite.getTestCaseResultSummary() != null
? testSuite.getTestCaseResultSummary().size()
: 0;
LOG.info(
"[RACE-CONDITION-MONITOR] updateTestSuiteSummary LOAD | suiteId={} | "
+ "version={} | resultCount={} | threadId={}",
testSuite.getId(),
testSuite.getVersion(),
resultCountBefore,
Thread.currentThread().getId());

TestSuite original = JsonUtils.deepCopy(testSuite, TestSuite.class);
List<ResultSummary> resultSummaries = testSuite.getTestCaseResultSummary();

if (resultSummaries != null) {
resultSummaries.stream()
.filter(s -> s.getTestCaseName().equals(testCase.getFullyQualifiedName()))
.findFirst()
.ifPresentOrElse(
s -> {
s.setStatus(testCase.getTestCaseStatus());
s.setTimestamp(testCase.getTestCaseResult().getTimestamp());
},
() ->
resultSummaries.add(
new ResultSummary()
.withTestCaseName(testCase.getFullyQualifiedName())
.withStatus(testCase.getTestCaseStatus())
.withTimestamp(testCase.getTestCaseResult().getTimestamp())));
} else {
testSuite.setTestCaseResultSummary(
List.of(
new ResultSummary()
.withTestCaseName(testCase.getFullyQualifiedName())
.withStatus(testCase.getTestCaseStatus())
.withTimestamp(testCase.getTestCaseResult().getTimestamp())));
}

int resultCountAfter =
testSuite.getTestCaseResultSummary() != null
? testSuite.getTestCaseResultSummary().size()
: 0;

EntityRepository.EntityUpdater entityUpdater =
testSuiteRepository.getUpdater(
original, testSuite, EntityRepository.Operation.PATCH, null);

// LOG 2: SAVE START
LOG.info(
"[RACE-CONDITION-MONITOR] updateTestSuiteSummary SAVE START | suiteId={} | "
+ "version={} | resultCountBefore={} | resultCountAfter={} | threadId={}",
testSuite.getId(),
testSuite.getVersion(),
resultCountBefore,
resultCountAfter,
Thread.currentThread().getId());

entityUpdater.update();

// LOG 3: SAVE COMPLETE
LOG.info(
"[RACE-CONDITION-MONITOR] updateTestSuiteSummary SAVE COMPLETE | suiteId={} | "
+ "resultCount={} | threadId={}",
testSuite.getId(),
resultCountAfter,
Thread.currentThread().getId());
}
}
}
}

@Override
Expand Down
Loading
Loading