Skip to content

Commit fd765a9

Browse files
authored
Merge branch 'main' into mandatory_query_field
2 parents 6a52408 + 6140b02 commit fd765a9

File tree

22 files changed

+880
-66
lines changed

22 files changed

+880
-66
lines changed

docs/changelog/137558.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 137558
2+
summary: Improve security migration resilience by handling version conflicts
3+
area: Security
4+
type: enhancement
5+
issues: []

docs/changelog/138084.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 138084
2+
summary: Handle Query Timeouts During Collector Initialization in `QueryPhase`
3+
area: Search
4+
type: bug
5+
issues: []

docs/changelog/138094.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 138094
2+
summary: "[IRONSCALES] Add `manage`, `create_index`, `read`, `index`, `write`, `delete`, permission for third party agent indices `kibana_system`"
3+
area: Authorization
4+
type: enhancement
5+
issues:
6+
- 138093

muted-tests.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -444,6 +444,9 @@ tests:
444444
- class: org.elasticsearch.smoketest.SmokeTestMultiNodeClientYamlTestSuiteIT
445445
method: test {yaml=index/100_field_name_length_limit/Test field name length limit synthetic source}
446446
issue: https://github.com/elastic/elasticsearch/issues/138471
447+
- class: org.elasticsearch.xpack.inference.integration.AuthorizationTaskExecutorIT
448+
method: testCreatesEisChatCompletion_DoesNotRemoveEndpointWhenNoLongerAuthorized
449+
issue: https://github.com/elastic/elasticsearch/issues/138480
447450

448451
# Examples:
449452
#
Lines changed: 247 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,247 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.search.aggregations;
11+
12+
import org.elasticsearch.TransportVersion;
13+
import org.elasticsearch.action.index.IndexRequest;
14+
import org.elasticsearch.action.search.SearchPhaseExecutionException;
15+
import org.elasticsearch.action.search.SearchResponse;
16+
import org.elasticsearch.common.io.stream.StreamInput;
17+
import org.elasticsearch.common.io.stream.StreamOutput;
18+
import org.elasticsearch.index.query.QueryBuilders;
19+
import org.elasticsearch.plugins.Plugin;
20+
import org.elasticsearch.plugins.SearchPlugin;
21+
import org.elasticsearch.search.aggregations.support.AggregationContext;
22+
import org.elasticsearch.search.internal.ContextIndexSearcher;
23+
import org.elasticsearch.search.query.SearchTimeoutException;
24+
import org.elasticsearch.test.ESIntegTestCase;
25+
import org.elasticsearch.xcontent.XContentBuilder;
26+
import org.elasticsearch.xcontent.XContentParser;
27+
import org.junit.After;
28+
import org.junit.Before;
29+
30+
import java.io.IOException;
31+
import java.util.Collection;
32+
import java.util.List;
33+
import java.util.Map;
34+
35+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
36+
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
37+
import static org.hamcrest.Matchers.equalTo;
38+
import static org.hamcrest.Matchers.instanceOf;
39+
import static org.hamcrest.Matchers.is;
40+
import static org.hamcrest.Matchers.notNullValue;
41+
42+
/**
43+
* Integration test verifying that a TimeExceededException thrown during collector
44+
* preparation (before the actual search executes) is caught in QueryPhase and
45+
* correctly transformed into a partial response with `timed_out=true`, empty hits,
46+
* and empty aggregations rather than an exception.
47+
*/
48+
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST)
49+
public class QueryPhaseForcedTimeoutIT extends ESIntegTestCase {
50+
51+
private static final String INDEX = "index";
52+
53+
@Override
54+
protected Collection<Class<? extends Plugin>> nodePlugins() {
55+
return List.of(ForceTimeoutAggPlugin.class);
56+
}
57+
58+
@Before
59+
public void setupIndex() throws Exception {
60+
assertAcked(prepareCreate(INDEX).setMapping("""
61+
{
62+
"properties": {
63+
"kwd": { "type": "keyword" },
64+
"txt": { "type": "text" }
65+
}
66+
}
67+
"""));
68+
69+
for (int i = 0; i < 10; i++) {
70+
IndexRequest ir = new IndexRequest(INDEX).source(
71+
jsonBuilder().startObject().field("kwd", "value" + i).field("txt", "text " + i).endObject()
72+
);
73+
client().index(ir).actionGet();
74+
}
75+
indicesAdmin().prepareRefresh(INDEX).get();
76+
ensureGreen(INDEX);
77+
}
78+
79+
@After
80+
public void cleanup() {
81+
indicesAdmin().prepareDelete(INDEX).get();
82+
}
83+
84+
/**
85+
* Executes a search using the ForceTimeoutAggPlugin aggregation which throws
86+
* TimeExceededException during collector preparation, and asserts that:
87+
* - the response is returned without failure,
88+
* - the `timed_out` flag is true,
89+
* - hits are empty, and
90+
* - aggregations are non-null but empty.
91+
*/
92+
public void testTimeoutDuringCollectorPreparationReturnsTimedOutEmptyResult() {
93+
SearchResponse resp = null;
94+
try {
95+
resp = client().prepareSearch(INDEX)
96+
.setQuery(QueryBuilders.matchAllQuery())
97+
.setSize(10)
98+
.setAllowPartialSearchResults(true)
99+
.addAggregation(new ForceTimeoutAggregationBuilder("force_timeout"))
100+
.get();
101+
102+
assertThat(resp, notNullValue());
103+
assertThat("search should be marked timed_out", resp.isTimedOut(), is(true));
104+
assertThat("no hits returned", resp.getHits().getHits().length, equalTo(0));
105+
assertThat(resp.getAggregations(), notNullValue());
106+
assertThat("no aggr returned", resp.getAggregations().asList().isEmpty(), is(true));
107+
assertThat("no shard failures expected", resp.getShardFailures() == null || resp.getShardFailures().length == 0, is(true));
108+
} finally {
109+
if (resp != null) {
110+
resp.decRef();
111+
}
112+
}
113+
}
114+
115+
/**
116+
* In this test we explicitly set allow_partial_search_results=false. Under this
117+
* setting, any shard-level failure in the query phase (including a timeout) is treated as
118+
* a hard failure for the whole search. The coordinating node does not return a response
119+
* with timed_out=true, instead it fails the phase and throws a
120+
* {@link SearchPhaseExecutionException} whose cause is the underlying
121+
* {@link SearchTimeoutException}. This test asserts that behavior.
122+
*/
123+
public void testTimeoutDuringCollectorPreparationDisallowPartialsThrowsException() {
124+
SearchPhaseExecutionException ex = expectThrows(
125+
SearchPhaseExecutionException.class,
126+
() -> client().prepareSearch(INDEX)
127+
.setQuery(QueryBuilders.matchAllQuery())
128+
.setSize(10)
129+
.setAllowPartialSearchResults(false)
130+
.addAggregation(new ForceTimeoutAggregationBuilder("force_timeout"))
131+
.get()
132+
);
133+
134+
assertNotNull("expected a cause on SearchPhaseExecutionException", ex.getCause());
135+
assertThat("expected inner cause to be SearchTimeoutException", ex.getCause(), instanceOf(SearchTimeoutException.class));
136+
}
137+
138+
/**
139+
* A minimal plugin registering a custom aggregation (ForceTimeoutAggregationBuilder)
140+
* whose factory simulates a timeout during collector setup to test QueryPhase handling.
141+
*/
142+
public static class ForceTimeoutAggPlugin extends Plugin implements SearchPlugin {
143+
public static final String NAME = "force_timeout_plugin";
144+
145+
@Override
146+
public List<AggregationSpec> getAggregations() {
147+
return List.of(new AggregationSpec(NAME, ForceTimeoutAggregationBuilder::new, ForceTimeoutAggregationBuilder::parse));
148+
}
149+
}
150+
151+
/**
152+
* Aggregation builder for the ForceTimeoutAggPlugin aggregation.
153+
* It has no parameters and its factory immediately triggers a timeout exception
154+
* when the search collectors are being prepared.
155+
*/
156+
static class ForceTimeoutAggregationBuilder extends AbstractAggregationBuilder<ForceTimeoutAggregationBuilder> {
157+
158+
public static final String TYPE = ForceTimeoutAggPlugin.NAME;
159+
160+
private Map<String, Object> metadata;
161+
162+
ForceTimeoutAggregationBuilder(String name) {
163+
super(name);
164+
}
165+
166+
ForceTimeoutAggregationBuilder(StreamInput in) throws IOException {
167+
super(in);
168+
}
169+
170+
static ForceTimeoutAggregationBuilder parse(XContentParser parser, String name) {
171+
return new ForceTimeoutAggregationBuilder(name);
172+
}
173+
174+
@Override
175+
protected AggregatorFactory doBuild(
176+
AggregationContext context,
177+
AggregatorFactory parent,
178+
AggregatorFactories.Builder subfactoriesBuilder
179+
) throws IOException {
180+
return new ForceTimeoutAggregatorFactory(getName(), context, parent, factoriesBuilder, getMetadata());
181+
}
182+
183+
@Override
184+
protected AggregationBuilder shallowCopy(AggregatorFactories.Builder factoriesBuilder, Map<String, Object> metadata) {
185+
ForceTimeoutAggregationBuilder copy = new ForceTimeoutAggregationBuilder(getName());
186+
copy.factoriesBuilder = factoriesBuilder;
187+
copy.setMetadata(metadata);
188+
return copy;
189+
}
190+
191+
@Override
192+
public Map<String, Object> getMetadata() {
193+
return metadata;
194+
}
195+
196+
@Override
197+
public BucketCardinality bucketCardinality() {
198+
return BucketCardinality.ONE;
199+
}
200+
201+
@Override
202+
public TransportVersion getMinimalSupportedVersion() {
203+
return TransportVersion.zero();
204+
}
205+
206+
@Override
207+
protected void doWriteTo(StreamOutput out) {
208+
// Empty
209+
}
210+
211+
@Override
212+
protected XContentBuilder internalXContent(XContentBuilder builder, Params params) {
213+
return builder;
214+
}
215+
216+
@Override
217+
public String getType() {
218+
return TYPE;
219+
}
220+
221+
/**
222+
* Factory implementation for ForceTimeoutAggregationBuilder.
223+
* Its createInternal() method throws a TimeExceededException
224+
* before any actual collection occurs, simulating a timeout during setup.
225+
*/
226+
static class ForceTimeoutAggregatorFactory extends AggregatorFactory {
227+
228+
ForceTimeoutAggregatorFactory(
229+
String name,
230+
AggregationContext context,
231+
AggregatorFactory parent,
232+
AggregatorFactories.Builder subFactoriesBuilder,
233+
Map<String, Object> metadata
234+
) throws IOException {
235+
super(name, context, parent, subFactoriesBuilder, metadata);
236+
}
237+
238+
@Override
239+
protected Aggregator createInternal(Aggregator parent, CardinalityUpperBound cardinality, Map<String, Object> metadata) {
240+
if (context.searcher() instanceof ContextIndexSearcher cis) {
241+
cis.throwTimeExceededException();
242+
}
243+
throw new AssertionError("unreachable");
244+
}
245+
}
246+
}
247+
}

server/src/main/java/org/elasticsearch/index/IndexVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,7 @@ private static Version parseUnchecked(String version) {
196196
public static final IndexVersion TIME_SERIES_DIMENSIONS_USE_SKIPPERS = def(9_045_0_00, Version.LUCENE_10_3_1);
197197
public static final IndexVersion TIME_SERIES_ALL_FIELDS_USE_SKIPPERS = def(9_046_0_00, Version.LUCENE_10_3_1);
198198
public static final IndexVersion UPGRADE_TO_LUCENE_10_3_2 = def(9_047_0_00, Version.LUCENE_10_3_2);
199+
public static final IndexVersion SECURITY_MIGRATIONS_METADATA_FLATTENED_UPDATE = def(9_048_0_00, Version.LUCENE_10_3_2);
199200

200201
/*
201202
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/search/query/QueryPhase.java

Lines changed: 55 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.elasticsearch.search.SearchContextSourcePrinter;
3434
import org.elasticsearch.search.SearchService;
3535
import org.elasticsearch.search.aggregations.AggregationPhase;
36+
import org.elasticsearch.search.aggregations.InternalAggregations;
3637
import org.elasticsearch.search.internal.ContextIndexSearcher;
3738
import org.elasticsearch.search.internal.ScrollContext;
3839
import org.elasticsearch.search.internal.SearchContext;
@@ -198,47 +199,71 @@ static void addCollectorsAndSearch(SearchContext searchContext, Long timeRangeFi
198199
);
199200
}
200201

201-
CollectorManager<Collector, QueryPhaseResult> collectorManager = QueryPhaseCollectorManager.createQueryPhaseCollectorManager(
202-
postFilterWeight,
203-
searchContext.aggregations() == null ? null : searchContext.aggregations().getAggsCollectorManager(),
204-
searchContext,
205-
hasFilterCollector
206-
);
207-
208202
final Runnable timeoutRunnable = getTimeoutCheck(searchContext);
209203
if (timeoutRunnable != null) {
210204
searcher.addQueryCancellation(timeoutRunnable);
211205
}
212206

213-
QueryPhaseResult queryPhaseResult = searcher.search(query, collectorManager);
214-
if (searchContext.getProfilers() != null) {
215-
searchContext.getProfilers().getCurrentQueryProfiler().setCollectorResult(queryPhaseResult.collectorResult());
216-
}
217-
queryResult.topDocs(queryPhaseResult.topDocsAndMaxScore(), queryPhaseResult.sortValueFormats());
218-
if (searcher.timeExceeded()) {
219-
assert timeoutRunnable != null : "TimeExceededException thrown even though timeout wasn't set";
220-
SearchTimeoutException.handleTimeout(
221-
searchContext.request().allowPartialSearchResults(),
222-
searchContext.shardTarget(),
223-
searchContext.queryResult()
224-
);
225-
}
226-
if (searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER) {
227-
queryResult.terminatedEarly(queryPhaseResult.terminatedAfter());
228-
}
229-
ExecutorService executor = searchContext.indexShard().getThreadPool().executor(ThreadPool.Names.SEARCH);
230-
assert executor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor
231-
|| (executor instanceof EsThreadPoolExecutor == false /* in case thread pool is mocked out in tests */)
232-
: "SEARCH threadpool should have an executor that exposes EWMA metrics, but is of type " + executor.getClass();
233-
if (executor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor rExecutor) {
234-
queryResult.nodeQueueSize(rExecutor.getCurrentQueueSize());
235-
queryResult.serviceTimeEWMA((long) rExecutor.getTaskExecutionEWMA());
207+
try {
208+
CollectorManager<Collector, QueryPhaseResult> collectorManager = QueryPhaseCollectorManager
209+
.createQueryPhaseCollectorManager(
210+
postFilterWeight,
211+
searchContext.aggregations() == null ? null : searchContext.aggregations().getAggsCollectorManager(),
212+
searchContext,
213+
hasFilterCollector
214+
);
215+
216+
QueryPhaseResult queryPhaseResult = searcher.search(query, collectorManager);
217+
218+
if (searchContext.getProfilers() != null) {
219+
searchContext.getProfilers().getCurrentQueryProfiler().setCollectorResult(queryPhaseResult.collectorResult());
220+
}
221+
queryResult.topDocs(queryPhaseResult.topDocsAndMaxScore(), queryPhaseResult.sortValueFormats());
222+
223+
if (searcher.timeExceeded()) {
224+
assert timeoutRunnable != null : "TimeExceededException thrown even though timeout wasn't set";
225+
SearchTimeoutException.handleTimeout(
226+
searchContext.request().allowPartialSearchResults(),
227+
searchContext.shardTarget(),
228+
searchContext.queryResult()
229+
);
230+
}
231+
if (searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER) {
232+
queryResult.terminatedEarly(queryPhaseResult.terminatedAfter());
233+
}
234+
ExecutorService executor = searchContext.indexShard().getThreadPool().executor(ThreadPool.Names.SEARCH);
235+
assert executor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor
236+
|| (executor instanceof EsThreadPoolExecutor == false /* in case thread pool is mocked out in tests */)
237+
: "SEARCH threadpool should have an executor that exposes EWMA metrics, but is of type " + executor.getClass();
238+
if (executor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor rExecutor) {
239+
queryResult.nodeQueueSize(rExecutor.getCurrentQueueSize());
240+
queryResult.serviceTimeEWMA((long) rExecutor.getTaskExecutionEWMA());
241+
}
242+
} catch (ContextIndexSearcher.TimeExceededException tee) {
243+
finalizeAsTimedOutResult(searchContext);
236244
}
237245
} catch (Exception e) {
238246
throw new QueryPhaseExecutionException(searchContext.shardTarget(), "Failed to execute main query", e);
239247
}
240248
}
241249

250+
/**
251+
* Marks the current search as timed out and finalizes the {@link QuerySearchResult}
252+
* with a well-formed empty response. This ensures that even when a timeout occurs
253+
* (e.g., during collector setup or search execution), the shard still returns a
254+
* valid result object with empty top docs and aggregations instead of throwing.
255+
*/
256+
private static void finalizeAsTimedOutResult(SearchContext searchContext) {
257+
QuerySearchResult queryResult = searchContext.queryResult();
258+
SearchTimeoutException.handleTimeout(searchContext.request().allowPartialSearchResults(), searchContext.shardTarget(), queryResult);
259+
260+
queryResult.topDocs(new TopDocsAndMaxScore(Lucene.EMPTY_TOP_DOCS, Float.NaN), new DocValueFormat[0]);
261+
262+
if (searchContext.aggregations() != null) {
263+
queryResult.aggregations(InternalAggregations.EMPTY);
264+
}
265+
}
266+
242267
/**
243268
* Returns whether collection within the provided <code>reader</code> can be early-terminated if it sorts
244269
* with <code>sortAndFormats</code>.

0 commit comments

Comments
 (0)