Skip to content

Commit c0147c7

Browse files
authored
Merge branch 'main' into promql-with-series-dead-code
2 parents 8949308 + d2b4355 commit c0147c7

File tree

36 files changed

+431
-667
lines changed

36 files changed

+431
-667
lines changed

docs/changelog/138002.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 138002
2+
summary: Fix `SearchContext` CB memory accounting
3+
area: Aggregations
4+
type: bug
5+
issues: []

docs/changelog/138438.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 138438
2+
summary: Change `DatabaseNodeService` error logs to warnings
3+
area: Ingest Node
4+
type: bug
5+
issues: []

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseNodeService.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -371,7 +371,7 @@ void checkDatabases(ProjectState projectState) {
371371
try {
372372
retrieveAndUpdateDatabase(projectId, name, metadata);
373373
} catch (Exception ex) {
374-
logger.error(() -> "failed to retrieve database [" + name + "]", ex);
374+
logger.warn(() -> "failed to retrieve database [" + name + "]", ex);
375375
}
376376
});
377377

@@ -468,13 +468,13 @@ void retrieveAndUpdateDatabase(ProjectId projectId, String databaseName, GeoIpTa
468468
Files.delete(retrievedFile);
469469
},
470470
failure -> {
471-
logger.error(() -> "failed to retrieve database [" + databaseName + "]", failure);
471+
logger.warn(() -> "failed to retrieve database [" + databaseName + "]", failure);
472472
try {
473473
Files.deleteIfExists(databaseTmpFile);
474474
Files.deleteIfExists(retrievedFile);
475475
} catch (IOException ioe) {
476476
ioe.addSuppressed(failure);
477-
logger.error("unable to delete tmp database file after failure", ioe);
477+
logger.warn("unable to delete tmp database file after failure", ioe);
478478
}
479479
}
480480
);
@@ -519,7 +519,7 @@ void updateDatabase(ProjectId projectId, String databaseFileName, String recorde
519519
}
520520
logger.info("successfully loaded database file [{}]", file.getFileName());
521521
} catch (Exception e) {
522-
logger.error(() -> "failed to update database [" + databaseFileName + "]", e);
522+
logger.warn(() -> "failed to update database [" + databaseFileName + "]", e);
523523
}
524524
}
525525

@@ -533,7 +533,7 @@ void removeStaleEntries(ProjectId projectId, Collection<String> staleEntries) {
533533
assert existing != null;
534534
existing.shutdown(true);
535535
} catch (Exception e) {
536-
logger.error(() -> "failed to clean database [" + staleEntry + "] for project [" + projectId + "]", e);
536+
logger.warn(() -> "failed to clean database [" + staleEntry + "] for project [" + projectId + "]", e);
537537
}
538538
}
539539
}

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -810,12 +810,12 @@ private class MultipartUploadCompareAndExchangeOperation {
810810
this.threadPool = threadPool;
811811
}
812812

813-
void run(BytesReference expected, BytesReference updated, ActionListener<OptionalBytesReference> listener) throws Exception {
814-
innerRun(expected, updated, listener.delegateResponse((delegate, e) -> {
813+
void run(BytesReference expected, BytesReference updated, ActionListener<OptionalBytesReference> listener) {
814+
ActionListener.run(listener.delegateResponse((delegate, e) -> {
815815
logger.trace(() -> Strings.format("[%s]: compareAndExchangeRegister failed", rawKey), e);
816-
if (e instanceof AwsServiceException awsServiceException
817-
&& (awsServiceException.statusCode() == 404
818-
|| awsServiceException.statusCode() == 200
816+
if ((e instanceof AwsServiceException awsServiceException)
817+
&& (awsServiceException.statusCode() == RestStatus.NOT_FOUND.getStatus()
818+
|| awsServiceException.statusCode() == RestStatus.OK.getStatus()
819819
&& "NoSuchUpload".equals(awsServiceException.awsErrorDetails().errorCode()))) {
820820
// An uncaught 404 means that our multipart upload was aborted by a concurrent operation before we could complete it.
821821
// Also (rarely) S3 can start processing the request during a concurrent abort and this can result in a 200 OK with an
@@ -824,7 +824,7 @@ void run(BytesReference expected, BytesReference updated, ActionListener<Optiona
824824
} else {
825825
delegate.onFailure(e);
826826
}
827-
}));
827+
}), l -> innerRun(expected, updated, l));
828828
}
829829

830830
void innerRun(BytesReference expected, BytesReference updated, ActionListener<OptionalBytesReference> listener) throws Exception {
@@ -1120,16 +1120,13 @@ public void compareAndExchangeRegister(
11201120
ActionListener<OptionalBytesReference> listener
11211121
) {
11221122
final var clientReference = blobStore.clientReference();
1123-
ActionListener.run(
1124-
ActionListener.releaseBefore(clientReference, listener),
1125-
l -> new MultipartUploadCompareAndExchangeOperation(
1126-
purpose,
1127-
clientReference.client(),
1128-
blobStore.bucket(),
1129-
key,
1130-
blobStore.getThreadPool()
1131-
).run(expected, updated, l)
1132-
);
1123+
new MultipartUploadCompareAndExchangeOperation(
1124+
purpose,
1125+
clientReference.client(),
1126+
blobStore.bucket(),
1127+
key,
1128+
blobStore.getThreadPool()
1129+
).run(expected, updated, ActionListener.releaseBefore(clientReference, listener));
11331130
}
11341131

11351132
@Override

modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1346,6 +1346,42 @@ public void testRetryOn403InStateless() {
13461346
assertEquals(denyAccessAfterAttempt <= maxRetries ? 1 : 0, accessDeniedResponseCount.get());
13471347
}
13481348

1349+
public void testUploadNotFoundInCompareAndExchange() {
1350+
final var blobContainerPath = BlobPath.EMPTY.add(getTestName());
1351+
final var statefulBlobContainer = createBlobContainer(1, null, null, null, null, null, blobContainerPath);
1352+
1353+
@SuppressForbidden(reason = "use a http server")
1354+
class RejectsUploadPartRequests extends S3HttpHandler {
1355+
RejectsUploadPartRequests() {
1356+
super("bucket");
1357+
}
1358+
1359+
@Override
1360+
public void handle(HttpExchange exchange) throws IOException {
1361+
if (parseRequest(exchange).isUploadPartRequest()) {
1362+
exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1);
1363+
} else {
1364+
super.handle(exchange);
1365+
}
1366+
}
1367+
}
1368+
1369+
httpServer.createContext("/", new RejectsUploadPartRequests());
1370+
1371+
safeAwait(
1372+
l -> statefulBlobContainer.compareAndExchangeRegister(
1373+
randomPurpose(),
1374+
"not_found_register",
1375+
BytesArray.EMPTY,
1376+
new BytesArray(new byte[1]),
1377+
l.map(result -> {
1378+
assertFalse(result.isPresent());
1379+
return null;
1380+
})
1381+
)
1382+
);
1383+
}
1384+
13491385
private static String getBase16MD5Digest(BytesReference bytesReference) {
13501386
return MessageDigests.toHexString(MessageDigests.digest(bytesReference, MessageDigests.md5()));
13511387
}

muted-tests.yml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -438,6 +438,18 @@ tests:
438438
- class: org.elasticsearch.xpack.inference.integration.AuthorizationTaskExecutorIT
439439
method: testCreatesEisChatCompletion_DoesNotRemoveEndpointWhenNoLongerAuthorized
440440
issue: https://github.com/elastic/elasticsearch/issues/138480
441+
- class: org.elasticsearch.ingest.geoip.IngestGeoIpClientYamlTestSuiteIT
442+
method: test {yaml=ingest_geoip/60_ip_location_databases/Test adding, getting, and removing ip location databases}
443+
issue: https://github.com/elastic/elasticsearch/issues/138502
444+
- class: org.elasticsearch.xpack.exponentialhistogram.ExponentialHistogramFieldMapperTests
445+
method: testFormattedDocValues
446+
issue: https://github.com/elastic/elasticsearch/issues/138504
447+
- class: org.elasticsearch.xpack.esql.heap_attack.HeapAttackLookupJoinIT
448+
method: testLookupExplosionBigString
449+
issue: https://github.com/elastic/elasticsearch/issues/138510
450+
- class: org.elasticsearch.xpack.inference.integration.SemanticTextIndexOptionsIT
451+
method: testValidateIndexOptionsWithBasicLicense
452+
issue: https://github.com/elastic/elasticsearch/issues/138513
441453

442454
# Examples:
443455
#
Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
package org.elasticsearch.search.aggregations.metrics;
10+
11+
import org.apache.logging.log4j.util.Strings;
12+
import org.elasticsearch.action.index.IndexRequestBuilder;
13+
import org.elasticsearch.action.search.SearchRequestBuilder;
14+
import org.elasticsearch.common.settings.Settings;
15+
import org.elasticsearch.rest.RestStatus;
16+
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
17+
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregatorFactory.ExecutionMode;
18+
import org.elasticsearch.search.sort.SortBuilders;
19+
import org.elasticsearch.search.sort.SortOrder;
20+
import org.elasticsearch.test.ESIntegTestCase;
21+
22+
import java.io.IOException;
23+
import java.util.ArrayList;
24+
import java.util.List;
25+
26+
import static org.elasticsearch.search.aggregations.AggregationBuilders.terms;
27+
import static org.elasticsearch.search.aggregations.AggregationBuilders.topHits;
28+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
29+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFailures;
30+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailuresAndResponse;
31+
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
32+
import static org.hamcrest.Matchers.containsString;
33+
import static org.hamcrest.Matchers.notNullValue;
34+
35+
@ESIntegTestCase.SuiteScopeTestCase()
36+
public class LargeTopHitsIT extends ESIntegTestCase {
37+
38+
private static final String TERMS_AGGS_FIELD_1 = "terms1";
39+
private static final String TERMS_AGGS_FIELD_2 = "terms2";
40+
private static final String TERMS_AGGS_FIELD_3 = "terms3";
41+
private static final String SORT_FIELD = "sort";
42+
43+
@Override
44+
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
45+
return Settings.builder().put(super.nodeSettings(nodeOrdinal, otherSettings)).put("indices.breaker.request.type", "memory").build();
46+
}
47+
48+
public static String randomExecutionHint() {
49+
return randomBoolean() ? null : randomFrom(ExecutionMode.values()).toString();
50+
}
51+
52+
@Override
53+
public void setupSuiteScopeCluster() throws Exception {
54+
initSmallIdx();
55+
ensureSearchable();
56+
}
57+
58+
private void initSmallIdx() throws IOException {
59+
createIndex("small_idx");
60+
ensureGreen("small_idx");
61+
populateIndex("small_idx", 5, 40_000);
62+
}
63+
64+
private void initLargeIdx() throws IOException {
65+
createIndex("large_idx");
66+
ensureGreen("large_idx");
67+
populateIndex("large_idx", 70, 50_000);
68+
}
69+
70+
public void testSimple() {
71+
assertNoFailuresAndResponse(query("small_idx"), response -> {
72+
Terms terms = response.getAggregations().get("terms");
73+
assertThat(terms, notNullValue());
74+
});
75+
}
76+
77+
public void test500Queries() {
78+
for (int i = 0; i < 500; i++) {
79+
// make sure we are not leaking memory over multiple queries
80+
assertNoFailuresAndResponse(query("small_idx"), response -> {
81+
Terms terms = response.getAggregations().get("terms");
82+
assertThat(terms, notNullValue());
83+
});
84+
}
85+
}
86+
87+
// This works most of the time, but it's not consistent: it still triggers OOM sometimes.
88+
// The test env is too small and non-deterministic to hold all these data and results.
89+
@AwaitsFix(bugUrl = "see comment above")
90+
public void testBreakAndRecover() throws IOException {
91+
initLargeIdx();
92+
assertNoFailuresAndResponse(query("small_idx"), response -> {
93+
Terms terms = response.getAggregations().get("terms");
94+
assertThat(terms, notNullValue());
95+
});
96+
97+
assertFailures(query("large_idx"), RestStatus.TOO_MANY_REQUESTS, containsString("Data too large"));
98+
99+
assertNoFailuresAndResponse(query("small_idx"), response -> {
100+
Terms terms = response.getAggregations().get("terms");
101+
assertThat(terms, notNullValue());
102+
});
103+
}
104+
105+
private void createIndex(String idxName) {
106+
assertAcked(
107+
prepareCreate(idxName).setMapping(
108+
TERMS_AGGS_FIELD_1,
109+
"type=keyword",
110+
TERMS_AGGS_FIELD_2,
111+
"type=keyword",
112+
TERMS_AGGS_FIELD_3,
113+
"type=keyword",
114+
"text",
115+
"type=text,store=true",
116+
"large_text_1",
117+
"type=text,store=false",
118+
"large_text_2",
119+
"type=text,store=false",
120+
"large_text_3",
121+
"type=text,store=false",
122+
"large_text_4",
123+
"type=text,store=false",
124+
"large_text_5",
125+
"type=text,store=false"
126+
)
127+
);
128+
}
129+
130+
private void populateIndex(String idxName, int nDocs, int size) throws IOException {
131+
for (int i = 0; i < nDocs; i++) {
132+
List<IndexRequestBuilder> builders = new ArrayList<>();
133+
builders.add(
134+
prepareIndex(idxName).setId(Integer.toString(i))
135+
.setSource(
136+
jsonBuilder().startObject()
137+
.field(TERMS_AGGS_FIELD_1, "val" + i % 53)
138+
.field(TERMS_AGGS_FIELD_2, "val" + i % 23)
139+
.field(TERMS_AGGS_FIELD_3, "val" + i % 10)
140+
.field(SORT_FIELD, i)
141+
.field("text", "some text to entertain")
142+
.field("large_text_1", Strings.repeat("this is a text field 1 ", size))
143+
.field("large_text_2", Strings.repeat("this is a text field 2 ", size))
144+
.field("large_text_3", Strings.repeat("this is a text field 3 ", size))
145+
.field("large_text_4", Strings.repeat("this is a text field 4 ", size))
146+
.field("large_text_5", Strings.repeat("this is a text field 5 ", size))
147+
.field("field1", 5)
148+
.field("field2", 2.71)
149+
.endObject()
150+
)
151+
);
152+
153+
indexRandom(true, builders);
154+
}
155+
}
156+
157+
private static SearchRequestBuilder query(String indexName) {
158+
return prepareSearch(indexName).addAggregation(
159+
terms("terms").executionHint(randomExecutionHint())
160+
.field(TERMS_AGGS_FIELD_1)
161+
.subAggregation(
162+
terms("terms").executionHint(randomExecutionHint())
163+
.field(TERMS_AGGS_FIELD_2)
164+
.subAggregation(
165+
terms("terms").executionHint(randomExecutionHint())
166+
.field(TERMS_AGGS_FIELD_2)
167+
.subAggregation(topHits("hits").sort(SortBuilders.fieldSort(SORT_FIELD).order(SortOrder.DESC)))
168+
)
169+
)
170+
);
171+
}
172+
}

server/src/main/java/org/elasticsearch/search/aggregations/metrics/TopHitsAggregator.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import java.util.List;
5555
import java.util.Map;
5656
import java.util.function.BiConsumer;
57+
import java.util.function.IntConsumer;
5758

5859
class TopHitsAggregator extends MetricsAggregator {
5960

@@ -198,7 +199,7 @@ public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOE
198199
for (int i = 0; i < topDocs.scoreDocs.length; i++) {
199200
docIdsToLoad[i] = topDocs.scoreDocs[i].doc;
200201
}
201-
FetchSearchResult fetchResult = runFetchPhase(subSearchContext, docIdsToLoad);
202+
FetchSearchResult fetchResult = runFetchPhase(subSearchContext, docIdsToLoad, this::addRequestCircuitBreakerBytes);
202203
if (fetchProfiles != null) {
203204
fetchProfiles.add(fetchResult.profileResult());
204205
}
@@ -222,7 +223,7 @@ public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOE
222223
);
223224
}
224225

225-
private static FetchSearchResult runFetchPhase(SubSearchContext subSearchContext, int[] docIdsToLoad) {
226+
private static FetchSearchResult runFetchPhase(SubSearchContext subSearchContext, int[] docIdsToLoad, IntConsumer memoryChecker) {
226227
// Fork the search execution context for each slice, because the fetch phase does not support concurrent execution yet.
227228
SearchExecutionContext searchExecutionContext = new SearchExecutionContext(subSearchContext.getSearchExecutionContext());
228229
// InnerHitSubContext is not thread-safe, so we fork it as well to support concurrent execution
@@ -242,7 +243,7 @@ public InnerHitsContext innerHits() {
242243
}
243244
};
244245

245-
fetchSubSearchContext.fetchPhase().execute(fetchSubSearchContext, docIdsToLoad, null);
246+
fetchSubSearchContext.fetchPhase().execute(fetchSubSearchContext, docIdsToLoad, null, memoryChecker);
246247
return fetchSubSearchContext.fetchResult();
247248
}
248249

0 commit comments

Comments
 (0)