Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.Version;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseListener;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.IOUtils;
Expand All @@ -40,6 +41,8 @@
import java.util.Locale;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -246,10 +249,7 @@ static RestClient twoClients(RestClient localClient, RestClient remoteClient) th
return bulkClient.performRequest(request);
} else {
Request[] clones = cloneRequests(request, 2);
Response resp1 = remoteClient.performRequest(clones[0]);
Response resp2 = localClient.performRequest(clones[1]);
assertEquals(resp1.getStatusLine().getStatusCode(), resp2.getStatusLine().getStatusCode());
return resp2;
return runInParallel(localClient, remoteClient, clones);
}
});
doAnswer(invocation -> {
Expand Down Expand Up @@ -284,6 +284,44 @@ static Request[] cloneRequests(Request orig, int numClones) throws IOException {
return clones;
}

/**
* Run {@link #cloneRequests cloned} requests in parallel.
*/
static Response runInParallel(RestClient localClient, RestClient remoteClient, Request[] clones) throws Throwable {
CompletableFuture<Response> remoteResponse = new CompletableFuture<>();
CompletableFuture<Response> localResponse = new CompletableFuture<>();
remoteClient.performRequestAsync(clones[0], new ResponseListener() {
@Override
public void onSuccess(Response response) {
remoteResponse.complete(response);
}

@Override
public void onFailure(Exception exception) {
remoteResponse.completeExceptionally(exception);
}
});
localClient.performRequestAsync(clones[1], new ResponseListener() {
@Override
public void onSuccess(Response response) {
localResponse.complete(response);
}

@Override
public void onFailure(Exception exception) {
localResponse.completeExceptionally(exception);
}
});
try {
Response remote = remoteResponse.get();
Response local = localResponse.get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: you could use PlainActionFuture instead of anonymous listeners implementations here.

assertEquals(remote.getStatusLine().getStatusCode(), local.getStatusLine().getStatusCode());
return local;
} catch (ExecutionException e) {
throw e.getCause();
}
}

/**
* Convert FROM employees ... => FROM *:employees,employees
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
Expand Down Expand Up @@ -122,23 +123,59 @@ protected EsqlSpecTestCase(
this.mode = randomFrom(Mode.values());
}

private static boolean dataLoaded = false;
private static class Protected {
private volatile boolean completed = false;
private volatile boolean started = false;
private volatile Throwable failure = null;

private void protectedBlock(Callable<Void> callable) {
if (completed) {
return;
}
// In case tests get run in parallel, we ensure only one setup is run, and other tests wait for this
synchronized (this) {
if (completed) {
return;
}
if (started) {
// Should only happen if a previous test setup failed, possibly with partial setup, let's fail fast the current test
if (failure != null) {
fail(failure, "Previous test setup failed: " + failure.getMessage());
}
fail("Previous test setup failed with unknown error");
}
started = true;
try {
callable.call();
completed = true;
} catch (Throwable t) {
failure = t;
fail(failure, "Current test setup failed: " + failure.getMessage());
}
}
}

private synchronized void reset() {
completed = false;
started = false;
failure = null;
}
}

private static final Protected INGEST = new Protected();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the Views PR we use a separate VIEWS protector, semi-decoupling views from index creation. We could break the index parts into finer grained stuff too, but I see risks with that.

protected static boolean testClustersOk = true;

@Before
public void setup() throws IOException {
public void setup() {
assumeTrue("test clusters were broken", testClustersOk);
boolean supportsLookup = supportsIndexModeLookup();
boolean supportsSourceMapping = supportsSourceFieldMapping();
boolean supportsInferenceTestService = supportsInferenceTestService();
if (dataLoaded == false) {
if (supportsInferenceTestService) {
INGEST.protectedBlock(() -> {
// Inference endpoints must be created before ingesting any datasets that rely on them (mapping of inference_id)
if (supportsInferenceTestService()) {
createInferenceEndpoints(adminClient());
}

loadDataSetIntoEs(client(), supportsLookup, supportsSourceMapping, supportsInferenceTestService);
dataLoaded = true;
}
loadDataSetIntoEs(client(), supportsIndexModeLookup(), supportsSourceFieldMapping(), supportsInferenceTestService());
return null;
});
}

@AfterClass
Expand All @@ -147,15 +184,14 @@ public static void wipeTestData() throws IOException {
return;
}
try {
dataLoaded = false;
adminClient().performRequest(new Request("DELETE", "/*"));
} catch (ResponseException e) {
// 404 here just means we had no indexes
if (e.getResponse().getStatusLine().getStatusCode() != 404) {
throw e;
}
}

INGEST.reset();
deleteInferenceEndpoints(adminClient());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.logging.log4j.core.config.plugins.util.PluginManager;
import org.apache.lucene.util.IOConsumer;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
Expand All @@ -30,6 +31,7 @@
import org.elasticsearch.inference.TaskType;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.xcontent.XContentType;

Expand All @@ -43,6 +45,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand All @@ -55,6 +58,7 @@
import static org.elasticsearch.xpack.esql.EsqlTestUtils.reader;

public class CsvTestsDataLoader {
private static final int PARALLEL_THREADS = 10;
private static final int BULK_DATA_SIZE = 100_000;
private static final TestDataset EMPLOYEES = new TestDataset("employees", "mapping-default.json", "employees.csv").noSubfields();
private static final TestDataset EMPLOYEES_INCOMPATIBLE = new TestDataset(
Expand Down Expand Up @@ -421,16 +425,39 @@ private static void loadDataSetIntoEs(
IndexCreator indexCreator
) throws IOException {
Logger logger = LogManager.getLogger(CsvTestsDataLoader.class);
List<TestDataset> datasets = availableDatasetsForEs(
supportsIndexModeLookup,
supportsSourceFieldMapping,
inferenceEnabled,
timeSeriesOnly
).stream().toList();

executeInParallel(datasets, dataset -> createIndex(client, dataset, indexCreator), "Failed to create indices in parallel");

executeInParallel(datasets, dataset -> loadData(client, dataset, logger), "Failed to load data in parallel");

forceMerge(client, datasets.stream().map(d -> d.indexName).collect(Collectors.toSet()), logger);

executeInParallel(
ENRICH_POLICIES,
policy -> loadEnrichPolicy(client, policy.policyName, policy.policyFileName, logger),
"Failed to load enrich policies in parallel"
);

Set<String> loadedDatasets = new HashSet<>();
for (var dataset : availableDatasetsForEs(supportsIndexModeLookup, supportsSourceFieldMapping, inferenceEnabled, timeSeriesOnly)) {
load(client, dataset, logger, indexCreator);
loadedDatasets.add(dataset.indexName);
}
forceMerge(client, loadedDatasets, logger);
for (var policy : ENRICH_POLICIES) {
loadEnrichPolicy(client, policy.policyName, policy.policyFileName, logger);
}
}

private static <T> void executeInParallel(List<T> items, IOConsumer<T> consumer, String errorMessage) {
Semaphore semaphore = new Semaphore(PARALLEL_THREADS);
ESTestCase.runInParallel(items.size(), i -> {
try {
semaphore.acquire();
consumer.accept(items.get(i));
} catch (IOException | InterruptedException e) {
throw new RuntimeException(errorMessage, e);
} finally {
semaphore.release();
}
});
}

public static void createInferenceEndpoints(RestClient client) throws IOException {
Expand Down Expand Up @@ -587,11 +614,13 @@ private static URL getResource(String name) {
return result;
}

private static void load(RestClient client, TestDataset dataset, Logger logger, IndexCreator indexCreator) throws IOException {
private static void createIndex(RestClient client, TestDataset dataset, IndexCreator indexCreator) throws IOException {
URL mapping = getResource("/" + dataset.mappingFileName);
Settings indexSettings = dataset.readSettingsFile();
indexCreator.createIndex(client, dataset.indexName, readMappingFile(mapping, dataset.typeMapping), indexSettings);
}

private static void loadData(RestClient client, TestDataset dataset, Logger logger) throws IOException {
// Some examples only test that the query and mappings are valid, and don't need example data. Use .noData() for those
if (dataset.dataFileName != null) {
URL data = getResource("/data/" + dataset.dataFileName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,18 @@

import java.net.ConnectException;

import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.startsWith;

public class CsvTestsDataLoaderTests extends ESTestCase {

public void testCsvTestsDataLoaderExecution() {
ConnectException ce = expectThrows(ConnectException.class, () -> CsvTestsDataLoader.main(new String[] {}));
assertThat(ce.getMessage(), startsWith("Connection refused"));
Throwable cause = expectThrows(AssertionError.class, () -> CsvTestsDataLoader.main(new String[] {}));
// find the root cause
while (cause.getCause() != null) {
cause = cause.getCause();
}
assertThat(cause, instanceOf(ConnectException.class));
assertThat(cause.getMessage(), startsWith("Connection refused"));
}
}