Skip to content

Commit 5c6a74a

Browse files
committed
Merge branch 'main' into esql_danger_zone
2 parents 6faf095 + 5a4961b commit 5c6a74a

File tree

35 files changed

+1104
-559
lines changed

35 files changed

+1104
-559
lines changed

.github/workflows/docs-preview-comment.yml

Lines changed: 0 additions & 71 deletions
This file was deleted.

docs/changelog/130452.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 130452
2+
summary: "Aggs: Add cancellation checks to `FilterByFilter` aggregator"
3+
area: Aggregations
4+
type: bug
5+
issues: []

muted-tests.yml

Lines changed: 9 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -492,11 +492,6 @@ tests:
492492
method: "builds distribution from branches via archives extractedAssemble [bwcDistVersion: 8.2.1, bwcProject: bugfix, expectedAssembleTaskName:
493493
extractedAssemble, #2]"
494494
issue: https://github.com/elastic/elasticsearch/issues/119871
495-
- class: geoip.GeoIpMultiProjectIT
496-
issue: https://github.com/elastic/elasticsearch/issues/130073
497-
- class: org.elasticsearch.xpack.esql.action.EnrichIT
498-
method: testTopN
499-
issue: https://github.com/elastic/elasticsearch/issues/130122
500495
- class: org.elasticsearch.action.support.ThreadedActionListenerTests
501496
method: testRejectionHandling
502497
issue: https://github.com/elastic/elasticsearch/issues/130129
@@ -518,12 +513,6 @@ tests:
518513
- class: org.elasticsearch.xpack.esql.inference.bulk.BulkInferenceExecutorTests
519514
method: testSuccessfulExecution
520515
issue: https://github.com/elastic/elasticsearch/issues/130306
521-
- class: org.elasticsearch.multiproject.test.CoreWithMultipleProjectsClientYamlTestSuiteIT
522-
method: test {yaml=cluster.stats/10_basic/Dense vector stats}
523-
issue: https://github.com/elastic/elasticsearch/issues/130307
524-
- class: org.elasticsearch.multiproject.test.CoreWithMultipleProjectsClientYamlTestSuiteIT
525-
method: test {yaml=nodes.stats/11_indices_metrics/Lucene segment level fields stats}
526-
issue: https://github.com/elastic/elasticsearch/issues/130360
527516
- class: org.elasticsearch.ingest.PipelineFactoryTests
528517
method: testCreateUnsupportedFieldAccessPattern
529518
issue: https://github.com/elastic/elasticsearch/issues/130422
@@ -536,9 +525,6 @@ tests:
536525
- class: org.elasticsearch.xpack.esql.action.EsqlActionBreakerIT
537526
method: testTopNPushedToLucene
538527
issue: https://github.com/elastic/elasticsearch/issues/130505
539-
- class: org.elasticsearch.multiproject.test.CoreWithMultipleProjectsClientYamlTestSuiteIT
540-
method: test {yaml=indices.resolve_index/10_basic_resolve_index/Resolve index with hidden and closed indices}
541-
issue: https://github.com/elastic/elasticsearch/issues/130568
542528
- class: org.elasticsearch.xpack.monitoring.exporter.http.HttpExporterTests
543529
method: testExporterWithHostOnly
544530
issue: https://github.com/elastic/elasticsearch/issues/130599
@@ -569,21 +555,24 @@ tests:
569555
- class: org.elasticsearch.test.rest.yaml.RcsCcsCommonYamlTestSuiteIT
570556
method: test {p0=search.vectors/41_knn_search_half_byte_quantized/Test create, merge, and search cosine}
571557
issue: https://github.com/elastic/elasticsearch/issues/130663
572-
- class: org.elasticsearch.multiproject.test.CoreWithMultipleProjectsClientYamlTestSuiteIT
573-
method: test {yaml=search.vectors/70_dense_vector_telemetry/Field mapping stats}
574-
issue: https://github.com/elastic/elasticsearch/issues/130672
575558
- class: org.elasticsearch.indices.stats.IndexStatsIT
576559
method: testFilterCacheStats
577560
issue: https://github.com/elastic/elasticsearch/issues/124447
578561
- class: org.elasticsearch.search.sort.FieldSortIT
579562
method: testSortMixedFieldTypes
580563
issue: https://github.com/elastic/elasticsearch/issues/129445
581-
- class: org.elasticsearch.multiproject.test.XpackWithMultipleProjectsClientYamlTestSuiteIT
582-
method: test {yaml=dlm/10_usage/Test data stream lifecycle usage stats}
583-
issue: https://github.com/elastic/elasticsearch/issues/130677
584564
- class: org.elasticsearch.xpack.esql.action.CrossClusterAsyncQueryStopIT
585565
method: testStopQueryLocal
586566
issue: https://github.com/elastic/elasticsearch/issues/121672
567+
- class: org.elasticsearch.test.rest.yaml.RcsCcsCommonYamlTestSuiteIT
568+
method: test {p0=search/160_exists_query/Test exists query on mapped ip field with no doc values}
569+
issue: https://github.com/elastic/elasticsearch/issues/130730
570+
- class: org.elasticsearch.xpack.esql.qa.multi_node.GenerativeIT
571+
method: test
572+
issue: https://github.com/elastic/elasticsearch/issues/130067
573+
- class: org.elasticsearch.xpack.esql.qa.single_node.GenerativeIT
574+
method: test
575+
issue: https://github.com/elastic/elasticsearch/issues/130067
587576

588577
# Examples:
589578
#

rest-api-spec/src/main/resources/rest-api-spec/api/open_point_in_time.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,11 @@
5959
"allow_partial_search_results": {
6060
"type": "boolean",
6161
"description": "Specify whether to tolerate shards missing when creating the point-in-time, or otherwise throw an exception. (default: false)"
62+
},
63+
"max_concurrent_shard_requests": {
64+
"type": "number",
65+
"description": "The number of concurrent shard requests per node executed concurrently when opening this point-in-time. This value should be used to limit the impact of opening the point-in-time on the cluster",
66+
"default": 5
6267
}
6368
},
6469
"body":{

rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/search.vectors/40_knn_search.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -621,9 +621,15 @@ setup:
621621
index:
622622
index: test_index
623623
id: "0"
624+
refresh: true
624625
body:
625626
embedding: [ 0.5, 111.3, -13.0, 14.8, -156.0 ]
626627

628+
# wait and ensure that the mapping update is replicated
629+
- do:
630+
cluster.health:
631+
wait_for_events: languid
632+
627633
- do:
628634
indices.get_mapping:
629635
index: test_index
Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.search.aggregations.bucket;
11+
12+
import org.elasticsearch.action.bulk.BulkRequestBuilder;
13+
import org.elasticsearch.action.search.SearchRequestBuilder;
14+
import org.elasticsearch.action.search.TransportSearchAction;
15+
import org.elasticsearch.action.support.WriteRequest;
16+
import org.elasticsearch.common.settings.Settings;
17+
import org.elasticsearch.common.util.CollectionUtils;
18+
import org.elasticsearch.core.TimeValue;
19+
import org.elasticsearch.index.mapper.OnScriptError;
20+
import org.elasticsearch.plugins.Plugin;
21+
import org.elasticsearch.plugins.ScriptPlugin;
22+
import org.elasticsearch.script.LongFieldScript;
23+
import org.elasticsearch.script.ScriptContext;
24+
import org.elasticsearch.script.ScriptEngine;
25+
import org.elasticsearch.search.aggregations.bucket.filter.FiltersAggregator.KeyedFilter;
26+
import org.elasticsearch.search.lookup.SearchLookup;
27+
import org.elasticsearch.tasks.TaskInfo;
28+
import org.elasticsearch.test.ESIntegTestCase;
29+
import org.elasticsearch.xcontent.XContentBuilder;
30+
import org.elasticsearch.xcontent.json.JsonXContent;
31+
32+
import java.util.Collection;
33+
import java.util.List;
34+
import java.util.Map;
35+
import java.util.Set;
36+
import java.util.concurrent.Semaphore;
37+
38+
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
39+
import static org.elasticsearch.search.aggregations.AggregationBuilders.filters;
40+
import static org.elasticsearch.search.aggregations.AggregationBuilders.terms;
41+
import static org.hamcrest.Matchers.empty;
42+
import static org.hamcrest.Matchers.greaterThan;
43+
import static org.hamcrest.Matchers.not;
44+
45+
@ESIntegTestCase.SuiteScopeTestCase
46+
public class FiltersCancellationIT extends ESIntegTestCase {
47+
48+
private static final String INDEX = "idx";
49+
private static final String PAUSE_FIELD = "pause";
50+
private static final String NUMERIC_FIELD = "value";
51+
52+
private static final int NUM_DOCS = 100_000;
53+
private static final int SEMAPHORE_PERMITS = NUM_DOCS - 1000;
54+
private static final Semaphore SCRIPT_SEMAPHORE = new Semaphore(0);
55+
56+
@Override
57+
protected Collection<Class<? extends Plugin>> nodePlugins() {
58+
return CollectionUtils.appendToCopy(super.nodePlugins(), pausableFieldPluginClass());
59+
}
60+
61+
protected Class<? extends Plugin> pausableFieldPluginClass() {
62+
return PauseScriptPlugin.class;
63+
}
64+
65+
@Override
66+
public void setupSuiteScopeCluster() throws Exception {
67+
try (XContentBuilder mapping = JsonXContent.contentBuilder()) {
68+
mapping.startObject();
69+
mapping.startObject("runtime");
70+
{
71+
mapping.startObject(PAUSE_FIELD);
72+
{
73+
mapping.field("type", "long");
74+
mapping.startObject("script").field("source", "").field("lang", PauseScriptPlugin.PAUSE_SCRIPT_LANG).endObject();
75+
}
76+
mapping.endObject();
77+
mapping.startObject(NUMERIC_FIELD);
78+
{
79+
mapping.field("type", "long");
80+
}
81+
mapping.endObject();
82+
}
83+
mapping.endObject();
84+
mapping.endObject();
85+
86+
client().admin().indices().prepareCreate(INDEX).setMapping(mapping).get();
87+
}
88+
89+
int DOCS_PER_BULK = 100_000;
90+
for (int i = 0; i < NUM_DOCS; i += DOCS_PER_BULK) {
91+
BulkRequestBuilder bulk = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
92+
for (int j = 0; j < DOCS_PER_BULK; j++) {
93+
int docId = i + j;
94+
bulk.add(prepareIndex(INDEX).setId(Integer.toString(docId)).setSource(NUMERIC_FIELD, docId));
95+
}
96+
bulk.get();
97+
}
98+
99+
client().admin().indices().prepareForceMerge(INDEX).setMaxNumSegments(1).get();
100+
}
101+
102+
public void testFiltersCountCancellation() throws Exception {
103+
ensureProperCancellation(
104+
client().prepareSearch(INDEX)
105+
.addAggregation(
106+
filters(
107+
"filters",
108+
new KeyedFilter[] {
109+
new KeyedFilter("filter1", termQuery(PAUSE_FIELD, 1)),
110+
new KeyedFilter("filter2", termQuery(PAUSE_FIELD, 2)) }
111+
)
112+
)
113+
);
114+
}
115+
116+
public void testFiltersSubAggsCancellation() throws Exception {
117+
ensureProperCancellation(
118+
client().prepareSearch(INDEX)
119+
.addAggregation(
120+
filters(
121+
"filters",
122+
new KeyedFilter[] {
123+
new KeyedFilter("filter1", termQuery(PAUSE_FIELD, 1)),
124+
new KeyedFilter("filter2", termQuery(PAUSE_FIELD, 2)) }
125+
).subAggregation(terms("sub").field(PAUSE_FIELD))
126+
)
127+
);
128+
}
129+
130+
private void ensureProperCancellation(SearchRequestBuilder searchRequestBuilder) throws Exception {
131+
var searchRequestFuture = searchRequestBuilder.setTimeout(TimeValue.timeValueSeconds(1)).execute();
132+
assertFalse(searchRequestFuture.isCancelled());
133+
assertFalse(searchRequestFuture.isDone());
134+
135+
// Check that there are search tasks running
136+
assertThat(getSearchTasks(), not(empty()));
137+
138+
// Wait for the script field to get blocked
139+
assertBusy(() -> { assertThat(SCRIPT_SEMAPHORE.getQueueLength(), greaterThan(0)); });
140+
141+
// Cancel the tasks
142+
// Warning: Adding a waitForCompletion(true)/execute() here sometimes causes tasks to not get canceled and threads to get stuck
143+
client().admin().cluster().prepareCancelTasks().setActions(TransportSearchAction.NAME + "*").get();
144+
145+
SCRIPT_SEMAPHORE.release(SEMAPHORE_PERMITS);
146+
147+
// Ensure the search request finished and that there are no more search tasks
148+
assertBusy(() -> {
149+
assertTrue(searchRequestFuture.isDone());
150+
assertThat(getSearchTasks(), empty());
151+
});
152+
}
153+
154+
private List<TaskInfo> getSearchTasks() {
155+
return client().admin()
156+
.cluster()
157+
.prepareListTasks()
158+
.setActions(TransportSearchAction.NAME + "*")
159+
.setDetailed(true)
160+
.get()
161+
.getTasks();
162+
}
163+
164+
public static class PauseScriptPlugin extends Plugin implements ScriptPlugin {
165+
public static final String PAUSE_SCRIPT_LANG = "pause";
166+
167+
@Override
168+
public ScriptEngine getScriptEngine(Settings settings, Collection<ScriptContext<?>> contexts) {
169+
return new ScriptEngine() {
170+
@Override
171+
public String getType() {
172+
return PAUSE_SCRIPT_LANG;
173+
}
174+
175+
@Override
176+
@SuppressWarnings("unchecked")
177+
public <FactoryType> FactoryType compile(
178+
String name,
179+
String code,
180+
ScriptContext<FactoryType> context,
181+
Map<String, String> params
182+
) {
183+
if (context == LongFieldScript.CONTEXT) {
184+
return (FactoryType) new LongFieldScript.Factory() {
185+
@Override
186+
public LongFieldScript.LeafFactory newFactory(
187+
String fieldName,
188+
Map<String, Object> params,
189+
SearchLookup searchLookup,
190+
OnScriptError onScriptError
191+
) {
192+
return ctx -> new LongFieldScript(fieldName, params, searchLookup, onScriptError, ctx) {
193+
@Override
194+
public void execute() {
195+
try {
196+
SCRIPT_SEMAPHORE.acquire();
197+
} catch (InterruptedException e) {
198+
throw new AssertionError(e);
199+
}
200+
emit(1);
201+
}
202+
};
203+
}
204+
};
205+
}
206+
throw new IllegalStateException("unsupported type " + context);
207+
}
208+
209+
@Override
210+
public Set<ScriptContext<?>> getSupportedContexts() {
211+
return Set.of(LongFieldScript.CONTEXT);
212+
}
213+
};
214+
}
215+
}
216+
}

0 commit comments

Comments
 (0)