Skip to content

Commit 963dc60

Browse files
Merge branch 'main' into es-134088-fix
2 parents 1a46028 + 93d0896 commit 963dc60

File tree

3 files changed

+85
-12
lines changed

3 files changed

+85
-12
lines changed

server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,13 +49,16 @@
4949
import static org.mockito.ArgumentMatchers.any;
5050
import static org.mockito.Mockito.spy;
5151
import static org.mockito.Mockito.verify;
52+
import static org.mockito.Mockito.verifyNoInteractions;
5253

5354
public class InternalClusterInfoServiceSchedulingTests extends ESTestCase {
5455

5556
public void testScheduling() {
5657
final DiscoveryNode discoveryNode = DiscoveryNodeUtils.create("test");
5758
final DiscoveryNodes noMaster = DiscoveryNodes.builder().add(discoveryNode).localNodeId(discoveryNode.getId()).build();
5859
final DiscoveryNodes localMaster = noMaster.withMasterNodeId(discoveryNode.getId());
60+
final DiscoveryNode joiner = DiscoveryNodeUtils.create("joiner");
61+
final DiscoveryNodes withJoiner = DiscoveryNodes.builder(localMaster).add(joiner).build();
5962

6063
final Settings.Builder settingsBuilder = Settings.builder()
6164
.put(Node.NODE_NAME_SETTING.getKey(), discoveryNode.getName())
@@ -129,6 +132,45 @@ public void reroute(String reason, Priority priority, ActionListener<Void> liste
129132
);
130133
runUntilFlag(deterministicTaskQueue, becameMaster1);
131134

135+
// A node joins the cluster
136+
{
137+
Mockito.clearInvocations(mockEstimatedHeapUsageCollector, nodeUsageStatsForThreadPoolsCollector);
138+
final int initialRequestCount = client.requestCount;
139+
final AtomicBoolean nodeJoined = new AtomicBoolean();
140+
clusterApplierService.onNewClusterState(
141+
"node joins",
142+
() -> ClusterState.builder(new ClusterName("cluster")).nodes(withJoiner).build(),
143+
setFlagOnSuccess(nodeJoined)
144+
);
145+
// Don't use runUntilFlag because we don't want the scheduled task to run
146+
deterministicTaskQueue.runAllRunnableTasks();
147+
assertTrue(nodeJoined.get());
148+
// Addition of node should have triggered refresh
149+
// should have run two client requests: nodes stats request and indices stats request
150+
assertThat(client.requestCount, equalTo(initialRequestCount + 2));
151+
verify(mockEstimatedHeapUsageCollector).collectClusterHeapUsage(any()); // Should have polled for heap usage
152+
verify(nodeUsageStatsForThreadPoolsCollector).collectUsageStats(any(), any(), any());
153+
}
154+
155+
// ... then leaves
156+
{
157+
Mockito.clearInvocations(mockEstimatedHeapUsageCollector, nodeUsageStatsForThreadPoolsCollector);
158+
final int initialRequestCount = client.requestCount;
159+
final AtomicBoolean nodeLeft = new AtomicBoolean();
160+
clusterApplierService.onNewClusterState(
161+
"node leaves",
162+
() -> ClusterState.builder(new ClusterName("cluster")).nodes(localMaster).build(),
163+
setFlagOnSuccess(nodeLeft)
164+
);
165+
// Don't use runUntilFlag because we don't want the scheduled task to run
166+
deterministicTaskQueue.runAllRunnableTasks();
167+
assertTrue(nodeLeft.get());
168+
// departing nodes don't trigger refreshes
169+
assertThat(client.requestCount, equalTo(initialRequestCount));
170+
verifyNoInteractions(mockEstimatedHeapUsageCollector);
171+
verifyNoInteractions(nodeUsageStatsForThreadPoolsCollector);
172+
}
173+
132174
final AtomicBoolean failMaster1 = new AtomicBoolean();
133175
clusterApplierService.onNewClusterState(
134176
"fail master 1",

x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java

Lines changed: 35 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.apache.http.client.CredentialsProvider;
1818
import org.apache.http.impl.client.BasicCredentialsProvider;
1919
import org.apache.logging.log4j.core.config.plugins.util.PluginManager;
20+
import org.apache.lucene.util.IOConsumer;
2021
import org.elasticsearch.client.Request;
2122
import org.elasticsearch.client.Response;
2223
import org.elasticsearch.client.ResponseException;
@@ -30,6 +31,7 @@
3031
import org.elasticsearch.inference.TaskType;
3132
import org.elasticsearch.logging.LogManager;
3233
import org.elasticsearch.logging.Logger;
34+
import org.elasticsearch.test.ESTestCase;
3335
import org.elasticsearch.test.rest.ESRestTestCase;
3436
import org.elasticsearch.xcontent.XContentType;
3537

@@ -43,6 +45,7 @@
4345
import java.util.List;
4446
import java.util.Map;
4547
import java.util.Set;
48+
import java.util.concurrent.Semaphore;
4649
import java.util.stream.Collectors;
4750
import java.util.stream.Stream;
4851

@@ -53,6 +56,7 @@
5356
import static org.elasticsearch.xpack.esql.EsqlTestUtils.reader;
5457

5558
public class CsvTestsDataLoader {
59+
private static final int PARALLEL_THREADS = 10;
5660
private static final int BULK_DATA_SIZE = 100_000;
5761
private static final TestDataset EMPLOYEES = new TestDataset("employees", "mapping-default.json", "employees.csv").noSubfields();
5862
private static final TestDataset EMPLOYEES_INCOMPATIBLE = new TestDataset(
@@ -398,16 +402,35 @@ private static void loadDataSetIntoEs(
398402
IndexCreator indexCreator
399403
) throws IOException {
400404
Logger logger = LogManager.getLogger(CsvTestsDataLoader.class);
405+
List<TestDataset> datasets = availableDatasetsForEs(supportsIndexModeLookup, supportsSourceFieldMapping, inferenceEnabled).stream()
406+
.toList();
407+
408+
executeInParallel(datasets, dataset -> createIndex(client, dataset, indexCreator), "Failed to create indices in parallel");
409+
410+
executeInParallel(datasets, dataset -> loadData(client, dataset, logger), "Failed to load data in parallel");
411+
412+
forceMerge(client, datasets.stream().map(d -> d.indexName).collect(Collectors.toSet()), logger);
413+
414+
executeInParallel(
415+
ENRICH_POLICIES,
416+
policy -> loadEnrichPolicy(client, policy.policyName, policy.policyFileName, logger),
417+
"Failed to load enrich policies in parallel"
418+
);
401419

402-
Set<String> loadedDatasets = new HashSet<>();
403-
for (var dataset : availableDatasetsForEs(supportsIndexModeLookup, supportsSourceFieldMapping, inferenceEnabled)) {
404-
load(client, dataset, logger, indexCreator);
405-
loadedDatasets.add(dataset.indexName);
406-
}
407-
forceMerge(client, loadedDatasets, logger);
408-
for (var policy : ENRICH_POLICIES) {
409-
loadEnrichPolicy(client, policy.policyName, policy.policyFileName, logger);
410-
}
420+
}
421+
422+
private static <T> void executeInParallel(List<T> items, IOConsumer<T> consumer, String errorMessage) {
423+
Semaphore semaphore = new Semaphore(PARALLEL_THREADS);
424+
ESTestCase.runInParallel(items.size(), i -> {
425+
try {
426+
semaphore.acquire();
427+
consumer.accept(items.get(i));
428+
} catch (IOException | InterruptedException e) {
429+
throw new RuntimeException(errorMessage, e);
430+
} finally {
431+
semaphore.release();
432+
}
433+
});
411434
}
412435

413436
public static void createInferenceEndpoints(RestClient client) throws IOException {
@@ -564,11 +587,13 @@ private static URL getResource(String name) {
564587
return result;
565588
}
566589

567-
private static void load(RestClient client, TestDataset dataset, Logger logger, IndexCreator indexCreator) throws IOException {
590+
private static void createIndex(RestClient client, TestDataset dataset, IndexCreator indexCreator) throws IOException {
568591
URL mapping = getResource("/" + dataset.mappingFileName);
569592
Settings indexSettings = dataset.readSettingsFile();
570593
indexCreator.createIndex(client, dataset.indexName, readMappingFile(mapping, dataset.typeMapping), indexSettings);
594+
}
571595

596+
private static void loadData(RestClient client, TestDataset dataset, Logger logger) throws IOException {
572597
// Some examples only test that the query and mappings are valid, and don't need example data. Use .noData() for those
573598
if (dataset.dataFileName != null) {
574599
URL data = getResource("/data/" + dataset.dataFileName);

x-pack/plugin/esql/qa/testFixtures/src/test/java/org/elasticsearch/xpack/esql/CsvTestsDataLoaderTests.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,18 @@
1111

1212
import java.net.ConnectException;
1313

14+
import static org.hamcrest.Matchers.instanceOf;
1415
import static org.hamcrest.Matchers.startsWith;
1516

1617
public class CsvTestsDataLoaderTests extends ESTestCase {
1718

1819
public void testCsvTestsDataLoaderExecution() {
19-
ConnectException ce = expectThrows(ConnectException.class, () -> CsvTestsDataLoader.main(new String[] {}));
20-
assertThat(ce.getMessage(), startsWith("Connection refused"));
20+
Throwable cause = expectThrows(AssertionError.class, () -> CsvTestsDataLoader.main(new String[] {}));
21+
// find the root cause
22+
while (cause.getCause() != null) {
23+
cause = cause.getCause();
24+
}
25+
assertThat(cause, instanceOf(ConnectException.class));
26+
assertThat(cause.getMessage(), startsWith("Connection refused"));
2127
}
2228
}

0 commit comments

Comments
 (0)