Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,6 @@ tests:
- class: org.elasticsearch.backwards.MixedClusterClientYamlTestSuiteIT
method: test {p0=nodes.stats/11_indices_metrics/indices mappings exact count test for indices level}
issue: https://github.com/elastic/elasticsearch/issues/120950
- class: org.elasticsearch.test.rest.yaml.CcsCommonYamlTestSuiteIT
issue: https://github.com/elastic/elasticsearch/issues/121407
- class: org.elasticsearch.analysis.common.CommonAnalysisClientYamlTestSuiteIT
method: test {yaml=analysis-common/40_token_filters/stemmer_override file access}
issue: https://github.com/elastic/elasticsearch/issues/121625
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiPredicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -78,6 +80,12 @@ public class CcsCommonYamlTestSuiteIT extends ESClientYamlSuiteTestCase {
// the remote cluster is the one we write index operations etc... to
private static final String REMOTE_CLUSTER_NAME = "remote_cluster";

private static final AtomicBoolean isRemoteConfigured = new AtomicBoolean(false);
private static final AtomicBoolean isCombinedComputed = new AtomicBoolean(false);
private static final AtomicReference<TestFeatureService> combinedTestFeatureServiceRef = new AtomicReference<>();
private static final AtomicReference<Set<String>> combinedOsSetRef = new AtomicReference<>();
private static final AtomicReference<Set<String>> combinedNodeVersionsRef = new AtomicReference<>();

private static LocalClusterConfigProvider commonClusterConfig = cluster -> cluster.module("x-pack-async-search")
.module("aggregations")
.module("analysis-common")
Expand Down Expand Up @@ -163,25 +171,26 @@ public void initSearchClient() throws IOException {
}
clusterHosts = unmodifiableList(hosts);
logger.info("initializing REST search clients against {}", clusterHosts);
searchClient = buildClient(restClientSettings(), clusterHosts.toArray(new HttpHost[clusterHosts.size()]));
adminSearchClient = buildClient(restAdminSettings(), clusterHosts.toArray(new HttpHost[clusterHosts.size()]));
searchClient = buildClient(restClientSettings(), clusterHosts.toArray(new HttpHost[0]));
adminSearchClient = buildClient(restAdminSettings(), clusterHosts.toArray(new HttpHost[0]));

searchYamlTestClient = new TestCandidateAwareClient(getRestSpec(), searchClient, hosts, this::getClientBuilderWithSniffedHosts);

// check that we have an established CCS connection
Request request = new Request("GET", "_remote/info");
Response response = adminSearchClient.performRequest(request);
assertOK(response);
ObjectPath responseObject = ObjectPath.createFromResponse(response);
assertNotNull(responseObject.evaluate(REMOTE_CLUSTER_NAME));
assertNull(responseObject.evaluate(REMOTE_CLUSTER_NAME + ".cluster_credentials"));
logger.info("Established connection to remote cluster [" + REMOTE_CLUSTER_NAME + "]");
assert searchClient != null;
assert adminSearchClient != null;
assert clusterHosts != null;

if (isRemoteConfigured.compareAndSet(false, true)) {
// check that we have an established CCS connection
Request request = new Request("GET", "_remote/info");
Response response = adminSearchClient.performRequest(request);
assertOK(response);
ObjectPath responseObject = ObjectPath.createFromResponse(response);
assertNotNull(responseObject.evaluate(REMOTE_CLUSTER_NAME));
assertNull(responseObject.evaluate(REMOTE_CLUSTER_NAME + ".cluster_credentials"));
logger.info("Established connection to remote cluster [" + REMOTE_CLUSTER_NAME + "]");
}
}

assert searchClient != null;
assert adminSearchClient != null;
assert clusterHosts != null;

searchYamlTestClient.setTestCandidate(getTestCandidate());
}

Expand Down Expand Up @@ -299,44 +308,46 @@ protected ClientYamlTestExecutionContext createRestTestExecutionContext(
final Set<String> osSet
) {
try {
// Ensure the test specific initialization is run by calling it explicitly (@Before annotations on base-derived class may
// be called in a different order)
initSearchClient();
// Reconcile and provide unified features, os, version(s), based on both clientYamlTestClient and searchYamlTestClient
var searchOs = readOsFromNodesInfo(adminSearchClient);
var searchNodeVersions = readVersionsFromNodesInfo(adminSearchClient);
var semanticNodeVersions = searchNodeVersions.stream()
.map(ESRestTestCase::parseLegacyVersion)
.flatMap(Optional::stream)
.collect(Collectors.toSet());
final TestFeatureService searchTestFeatureService = createTestFeatureService(
getClusterStateFeatures(adminSearchClient),
semanticNodeVersions
);
final TestFeatureService combinedTestFeatureService = (featureId, any) -> {
boolean adminFeature = testFeatureService.clusterHasFeature(featureId, any);
boolean searchFeature = searchTestFeatureService.clusterHasFeature(featureId, any);
return any ? adminFeature || searchFeature : adminFeature && searchFeature;
};
final Set<String> combinedOsSet = Stream.concat(osSet.stream(), Stream.of(searchOs)).collect(Collectors.toSet());
final Set<String> combinedNodeVersions = Stream.concat(nodesVersions.stream(), searchNodeVersions.stream())
.collect(Collectors.toSet());
if (isCombinedComputed.compareAndSet(false, true)) {
// Ensure the test specific initialization is run by calling it explicitly (@Before annotations on base-derived class may
// be called in a different order)
initSearchClient();
// Reconcile and provide unified features, os, version(s), based on both clientYamlTestClient and searchYamlTestClient
var searchOs = readOsFromNodesInfo(adminSearchClient);
var searchNodeVersions = readVersionsFromNodesInfo(adminSearchClient);
var semanticNodeVersions = searchNodeVersions.stream()
.map(ESRestTestCase::parseLegacyVersion)
.flatMap(Optional::stream)
.collect(Collectors.toSet());
final TestFeatureService searchTestFeatureService = createTestFeatureService(
getClusterStateFeatures(adminSearchClient),
semanticNodeVersions
);
final TestFeatureService combinedTestFeatureService = (featureId, any) -> {
boolean adminFeature = testFeatureService.clusterHasFeature(featureId, any);
boolean searchFeature = searchTestFeatureService.clusterHasFeature(featureId, any);
return any ? adminFeature || searchFeature : adminFeature && searchFeature;
};
final Set<String> combinedOsSet = Stream.concat(osSet.stream(), Stream.of(searchOs)).collect(Collectors.toSet());
final Set<String> combinedNodeVersions = Stream.concat(nodesVersions.stream(), searchNodeVersions.stream())
.collect(Collectors.toSet());

combinedTestFeatureServiceRef.set(combinedTestFeatureService);
combinedOsSetRef.set(combinedOsSet);
combinedNodeVersionsRef.set(combinedNodeVersions);
}

return new ClientYamlTestExecutionContext(
clientYamlTestCandidate,
clientYamlTestClient,
randomizeContentType(),
combinedNodeVersions,
combinedTestFeatureService,
combinedOsSet
combinedNodeVersionsRef.get(),
combinedTestFeatureServiceRef.get(),
combinedOsSetRef.get()
) {
// depending on the API called, we either return the client running against the "write" or the "search" cluster here
protected ClientYamlTestClient clientYamlTestClient(String apiName) {
if (CCS_APIS.contains(apiName)) {
return searchYamlTestClient;
} else {
return super.clientYamlTestClient(apiName);
}
return CCS_APIS.contains(apiName) ? searchYamlTestClient : super.clientYamlTestClient(apiName);
}
};
} catch (IOException e) {
Expand Down