Skip to content

Commit 116c063

Browse files
authored
Merge branch 'main' into 2025/09/09/cluster-applier-thread-watchdog
2 parents 6e69aef + 87fec4f commit 116c063

File tree

8 files changed

+95
-32
lines changed

8 files changed

+95
-32
lines changed

build-tools-internal/src/main/java/org/elasticsearch/gradle/internal/InternalTestClustersPlugin.java

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,7 @@
1111

1212
import org.elasticsearch.gradle.VersionProperties;
1313
import org.elasticsearch.gradle.internal.info.GlobalBuildInfoPlugin;
14-
import org.elasticsearch.gradle.testclusters.ElasticsearchCluster;
1514
import org.elasticsearch.gradle.testclusters.TestClustersPlugin;
16-
import org.gradle.api.NamedDomainObjectContainer;
1715
import org.gradle.api.Plugin;
1816
import org.gradle.api.Project;
1917

@@ -33,17 +31,5 @@ public void apply(Project project) {
3331
version -> (version.equals(VersionProperties.getElasticsearchVersion()) && buildParams.getSnapshotBuild() == false)
3432
|| buildParams.getBwcVersions().unreleasedInfo(version) == null
3533
);
36-
37-
NamedDomainObjectContainer<ElasticsearchCluster> testClusters = (NamedDomainObjectContainer<ElasticsearchCluster>) project
38-
.getExtensions()
39-
.getByName(TestClustersPlugin.EXTENSION_NAME);
40-
// Limit the number of allocated processors for all nodes to 2 in the cluster by default.
41-
// This is to ensure that the tests run consistently across different environments.
42-
String processorCount = shouldConfigureTestClustersWithOneProcessor() ? "1" : "2";
43-
testClusters.configureEach(elasticsearchCluster -> elasticsearchCluster.setting("node.processors", processorCount));
44-
}
45-
46-
private boolean shouldConfigureTestClustersWithOneProcessor() {
47-
return Boolean.parseBoolean(System.getProperty("tests.configure_test_clusters_with_one_processor", "false"));
4834
}
4935
}

build-tools/src/main/java/org/elasticsearch/gradle/testclusters/ElasticsearchNode.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1404,6 +1404,15 @@ private void createConfiguration() {
14041404
baseConfig.put("cluster.service.slow_master_task_logging_threshold", "5s");
14051405
}
14061406

1407+
// Limit the number of allocated processors for all nodes in the cluster by default.
1408+
// This is to ensure that the tests run consistently across different environments.
1409+
String processorCount = shouldConfigureTestClustersWithOneProcessor() ? "1" : "2";
1410+
if (getVersion().onOrAfter("7.6.0")) {
1411+
baseConfig.put("node.processors", processorCount);
1412+
} else {
1413+
baseConfig.put("processors", processorCount);
1414+
}
1415+
14071416
baseConfig.put("action.destructive_requires_name", "false");
14081417

14091418
HashSet<String> overriden = new HashSet<>(baseConfig.keySet());
@@ -1789,4 +1798,8 @@ private static class LinkCreationException extends UncheckedIOException {
17891798
super(message, cause);
17901799
}
17911800
}
1801+
1802+
private boolean shouldConfigureTestClustersWithOneProcessor() {
1803+
return Boolean.parseBoolean(System.getProperty("tests.configure_test_clusters_with_one_processor", "false"));
1804+
}
17921805
}

docs/changelog/135734.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 135734
2+
summary: Fill in topn values if competitive
3+
area: ES|QL
4+
type: enhancement
5+
issues: []

muted-tests.yml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -633,6 +633,33 @@ tests:
633633
- class: org.elasticsearch.datastreams.DataStreamIndexSettingsProviderTests
634634
method: testGetAdditionalIndexSettingsMappingsMerging
635635
issue: https://github.com/elastic/elasticsearch/issues/135884
636+
- class: org.elasticsearch.upgrades.FullClusterRestartDownsampleIT
637+
method: testRollupIndex {cluster=UPGRADED}
638+
issue: https://github.com/elastic/elasticsearch/issues/135906
639+
- class: org.elasticsearch.upgrades.FullClusterRestartIT
640+
method: testNewReplicasTimeSeriesMode {cluster=UPGRADED}
641+
issue: https://github.com/elastic/elasticsearch/issues/135907
642+
- class: org.elasticsearch.upgrades.FullClusterRestartIT
643+
method: testSingleDoc {cluster=UPGRADED}
644+
issue: https://github.com/elastic/elasticsearch/issues/135908
645+
- class: org.elasticsearch.upgrades.FullClusterRestartIT
646+
method: testResize {cluster=UPGRADED}
647+
issue: https://github.com/elastic/elasticsearch/issues/135909
648+
- class: org.elasticsearch.upgrades.FullClusterRestartIT
649+
method: testSystemIndexMetadataIsUpgraded {cluster=UPGRADED}
650+
issue: https://github.com/elastic/elasticsearch/issues/135923
651+
- class: org.elasticsearch.upgrades.FullClusterRestartIT
652+
method: testSearch {cluster=UPGRADED}
653+
issue: https://github.com/elastic/elasticsearch/issues/135927
654+
- class: org.elasticsearch.xpack.inference.chunking.ChunkingSettingsBuilderTests
655+
method: testBuildChunkingSettingsForElasticReranker_QueryTokenCountLessThanHalfOfTokenLimit
656+
issue: https://github.com/elastic/elasticsearch/issues/135928
657+
- class: org.elasticsearch.upgrades.FullClusterRestartIT
658+
method: testPeerRecoveryRetentionLeases {cluster=UPGRADED}
659+
issue: https://github.com/elastic/elasticsearch/issues/135929
660+
- class: org.elasticsearch.upgrades.FullClusterRestartIT
661+
method: testPersianAnalyzerBWC {cluster=UPGRADED}
662+
issue: https://github.com/elastic/elasticsearch/issues/135930
636663

637664
# Examples:
638665
#
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
{
2+
"project.tags": {
3+
"documentation": {
4+
"url": null,
5+
"description": "Return tags defined for the project"
6+
},
7+
"stability": "experimental",
8+
"visibility": "public",
9+
"headers": {
10+
"accept": ["application/json"]
11+
},
12+
"url": {
13+
"paths": [
14+
{
15+
"path": "/_project/tags",
16+
"methods": ["GET"]
17+
}
18+
]
19+
}
20+
}
21+
}

test/test-clusters/src/main/java/org/elasticsearch/test/cluster/local/DefaultSettingsProvider.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,11 @@ public Map<String, String> get(LocalNodeSpec nodeSpec) {
4444

4545
// Limit the number of allocated processors for all nodes in the cluster by default.
4646
// This is to ensure that the tests run consistently across different environments.
47-
settings.put("node.processors", "2");
47+
if (nodeSpec.getVersion().onOrAfter("7.6.0")) {
48+
settings.put("node.processors", "2");
49+
} else {
50+
settings.put("processors", "2");
51+
}
4852

4953
// Default the watermarks to absurdly low to prevent the tests from failing on nodes without enough disk space
5054
settings.put("cluster.routing.allocation.disk.watermark.low", "1b");

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -199,15 +199,7 @@ static final class RowFiller {
199199
}
200200
}
201201

202-
/**
203-
* Fill a {@link Row} for {@code position}.
204-
*/
205-
void row(int position, Row destination) {
206-
writeKey(position, destination);
207-
writeValues(position, destination);
208-
}
209-
210-
private void writeKey(int position, Row row) {
202+
void writeKey(int position, Row row) {
211203
int orderByCompositeKeyCurrentPosition = 0;
212204
for (int i = 0; i < keyFactories.length; i++) {
213205
int valueAsBytesSize = keyFactories[i].extractor.writeKey(row.keys, position);
@@ -217,7 +209,7 @@ private void writeKey(int position, Row row) {
217209
}
218210
}
219211

220-
private void writeValues(int position, Row destination) {
212+
void writeValues(int position, Row destination) {
221213
for (ValueExtractor e : valueExtractors) {
222214
var refCounted = e.getRefCountedForShard(position);
223215
if (refCounted != null) {
@@ -416,15 +408,28 @@ public void addInput(Page page) {
416408
spare.values.clear();
417409
spare.clearRefCounters();
418410
}
419-
rowFiller.row(i, spare);
411+
rowFiller.writeKey(i, spare);
420412

421413
// When rows are very long, appending the values one by one can lead to lots of allocations.
422414
// To avoid this, pre-allocate at least as much size as in the last seen row.
423415
// Let the pre-allocation size decay in case we only have 1 huge row and smaller rows otherwise.
424416
spareKeysPreAllocSize = Math.max(spare.keys.length(), spareKeysPreAllocSize / 2);
425-
spareValuesPreAllocSize = Math.max(spare.values.length(), spareValuesPreAllocSize / 2);
426417

427-
spare = inputQueue.insertWithOverflow(spare);
418+
// This is `inputQueue.insertWithOverflow` with followed by filling in the value only if we inserted.
419+
if (inputQueue.size() < inputQueue.topCount) {
420+
// Heap not yet full, just add elements
421+
rowFiller.writeValues(i, spare);
422+
spareValuesPreAllocSize = Math.max(spare.values.length(), spareValuesPreAllocSize / 2);
423+
inputQueue.add(spare);
424+
spare = null;
425+
} else if (inputQueue.lessThan(inputQueue.top(), spare)) {
426+
// Heap full AND this node fit in it.
427+
Row nextSpare = inputQueue.top();
428+
rowFiller.writeValues(i, spare);
429+
spareValuesPreAllocSize = Math.max(spare.values.length(), spareValuesPreAllocSize / 2);
430+
inputQueue.updateTop(spare);
431+
spare = nextSpare;
432+
}
428433
}
429434
} finally {
430435
page.releaseBlocks();

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,6 @@
9292
import static org.hamcrest.Matchers.equalTo;
9393
import static org.hamcrest.Matchers.greaterThan;
9494
import static org.hamcrest.Matchers.hasSize;
95-
import static org.hamcrest.Matchers.is;
9695
import static org.hamcrest.Matchers.lessThan;
9796
import static org.hamcrest.Matchers.lessThanOrEqualTo;
9897

@@ -470,7 +469,8 @@ private TopNOperator.Row row(
470469
page
471470
);
472471
TopNOperator.Row row = new TopNOperator.Row(nonBreakingBigArrays().breakerService().getBreaker("request"), sortOrders, 0, 0);
473-
rf.row(position, row);
472+
rf.writeKey(position, row);
473+
rf.writeValues(position, row);
474474
return row;
475475
}
476476

@@ -1468,14 +1468,15 @@ public void testRowResizes() {
14681468
);
14691469
List<ElementType> types = Collections.nCopies(columns, INT);
14701470
List<TopNEncoder> encoders = Collections.nCopies(columns, DEFAULT_UNSORTABLE);
1471+
boolean asc = randomBoolean();
14711472
try (
14721473
TopNOperator op = new TopNOperator(
14731474
driverContext().blockFactory(),
14741475
breaker,
14751476
10,
14761477
types,
14771478
encoders,
1478-
List.of(new TopNOperator.SortOrder(0, randomBoolean(), randomBoolean())),
1479+
List.of(new TopNOperator.SortOrder(0, asc, randomBoolean())),
14791480
randomPageSize()
14801481
)
14811482
) {
@@ -1491,7 +1492,8 @@ public void testRowResizes() {
14911492

14921493
// 105 are from the objects
14931494
// 1 is for the min-heap itself
1494-
assertThat(breaker.getMemoryRequestCount(), is(106L));
1495+
// -1 IF we're sorting ascending. We encode one less value.
1496+
assertThat(breaker.getMemoryRequestCount(), equalTo(asc ? 105L : 106L));
14951497
}
14961498
}
14971499

0 commit comments

Comments
 (0)