Skip to content

Commit 335fd72

Browse files
committed
Merge branch 'main' into esql_better_push
2 parents ae1ccfd + ef7a5e6 commit 335fd72

File tree

78 files changed

+2602
-1183
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

78 files changed

+2602
-1183
lines changed

docs/changelog/138023.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 138023
2+
summary: Push down COUNT(*) BY DATE_TRUNC
3+
area: ES|QL
4+
type: feature
5+
issues: []

docs/changelog/138539.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 138539
2+
summary: Handle serialization of null blocks in `AggregateMetricDoubleBlock`
3+
area: ES|QL
4+
type: bug
5+
issues: []

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java

Lines changed: 162 additions & 290 deletions
Large diffs are not rendered by default.

muted-tests.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -408,9 +408,6 @@ tests:
408408
- class: org.elasticsearch.xpack.ml.integration.RevertModelSnapshotIT
409409
method: testRevertModelSnapshot_DeleteInterveningResults
410410
issue: https://github.com/elastic/elasticsearch/issues/132349
411-
- class: org.elasticsearch.xpack.security.authc.jwt.JwtRealmAuthenticateTests
412-
method: testJwkUpdatesByReloadWithFile
413-
issue: https://github.com/elastic/elasticsearch/issues/138397
414411
- class: org.elasticsearch.smoketest.MlWithSecurityIT
415412
method: test {yaml=ml/start_data_frame_analytics/Test start classification analysis when the dependent variable cardinality is too low}
416413
issue: https://github.com/elastic/elasticsearch/issues/138409
Lines changed: 319 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,319 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.common.blobstore;
11+
12+
import org.apache.logging.log4j.Level;
13+
import org.apache.logging.log4j.LogManager;
14+
import org.apache.logging.log4j.Logger;
15+
import org.elasticsearch.core.IOUtils;
16+
import org.elasticsearch.repositories.blobstore.RequestedRangeNotSatisfiedException;
17+
18+
import java.io.IOException;
19+
import java.io.InputStream;
20+
import java.nio.file.NoSuchFileException;
21+
import java.util.ArrayList;
22+
import java.util.List;
23+
24+
import static org.elasticsearch.core.Strings.format;
25+
26+
public abstract class RetryingInputStream extends InputStream {
27+
28+
private static final Logger logger = LogManager.getLogger(RetryingInputStream.class);
29+
30+
public static final int MAX_SUPPRESSED_EXCEPTIONS = 10;
31+
32+
private final BlobStoreServices blobStoreServices;
33+
private final OperationPurpose purpose;
34+
private final long start;
35+
private final long end;
36+
private final List<Exception> failures;
37+
38+
protected SingleAttemptInputStream currentStream;
39+
private long offset = 0;
40+
private int attempt = 1;
41+
private int failuresAfterMeaningfulProgress = 0;
42+
private boolean closed = false;
43+
44+
protected RetryingInputStream(BlobStoreServices blobStoreServices, OperationPurpose purpose) throws IOException {
45+
this(blobStoreServices, purpose, 0L, Long.MAX_VALUE - 1L);
46+
}
47+
48+
@SuppressWarnings("this-escape") // TODO: We can do better than this but I don't want to touch the tests for the first implementation
49+
protected RetryingInputStream(BlobStoreServices blobStoreServices, OperationPurpose purpose, long start, long end) throws IOException {
50+
if (start < 0L) {
51+
throw new IllegalArgumentException("start must be non-negative");
52+
}
53+
if (end < start || end == Long.MAX_VALUE) {
54+
throw new IllegalArgumentException("end must be >= start and not Long.MAX_VALUE");
55+
}
56+
this.blobStoreServices = blobStoreServices;
57+
this.purpose = purpose;
58+
this.failures = new ArrayList<>(MAX_SUPPRESSED_EXCEPTIONS);
59+
this.start = start;
60+
this.end = end;
61+
final int initialAttempt = attempt;
62+
openStreamWithRetry();
63+
maybeLogAndRecordMetricsForSuccess(initialAttempt, "open");
64+
}
65+
66+
private void openStreamWithRetry() throws IOException {
67+
while (true) {
68+
if (offset > 0 || start > 0 || end < Long.MAX_VALUE - 1) {
69+
assert start + offset <= end : "requesting beyond end, start = " + start + " offset=" + offset + " end=" + end;
70+
}
71+
try {
72+
currentStream = blobStoreServices.getInputStream(Math.addExact(start, offset), end);
73+
return;
74+
} catch (NoSuchFileException | RequestedRangeNotSatisfiedException e) {
75+
throw e;
76+
} catch (RuntimeException e) {
77+
if (attempt == 1) {
78+
blobStoreServices.onRetryStarted("open");
79+
}
80+
final long delayInMillis = maybeLogAndComputeRetryDelay("opening", e);
81+
delayBeforeRetry(delayInMillis);
82+
}
83+
}
84+
}
85+
86+
@Override
87+
public int read() throws IOException {
88+
ensureOpen();
89+
final int initialAttempt = attempt;
90+
while (true) {
91+
try {
92+
final int result = currentStream.read();
93+
if (result != -1) {
94+
offset += 1;
95+
}
96+
maybeLogAndRecordMetricsForSuccess(initialAttempt, "read");
97+
return result;
98+
} catch (IOException e) {
99+
if (attempt == initialAttempt) {
100+
blobStoreServices.onRetryStarted("read");
101+
}
102+
reopenStreamOrFail(e);
103+
}
104+
}
105+
}
106+
107+
@Override
108+
public int read(byte[] b, int off, int len) throws IOException {
109+
ensureOpen();
110+
final int initialAttempt = attempt;
111+
while (true) {
112+
try {
113+
final int bytesRead = currentStream.read(b, off, len);
114+
if (bytesRead != -1) {
115+
offset += bytesRead;
116+
}
117+
maybeLogAndRecordMetricsForSuccess(initialAttempt, "read");
118+
return bytesRead;
119+
} catch (IOException e) {
120+
if (attempt == initialAttempt) {
121+
blobStoreServices.onRetryStarted("read");
122+
}
123+
reopenStreamOrFail(e);
124+
}
125+
}
126+
}
127+
128+
private void ensureOpen() {
129+
if (closed) {
130+
assert false : "using RetryingInputStream after close";
131+
throw new IllegalStateException("Stream is closed");
132+
}
133+
}
134+
135+
private void reopenStreamOrFail(IOException e) throws IOException {
136+
final long meaningfulProgressSize = blobStoreServices.getMeaningfulProgressSize();
137+
if (currentStreamProgress() >= meaningfulProgressSize) {
138+
failuresAfterMeaningfulProgress += 1;
139+
}
140+
final long delayInMillis = maybeLogAndComputeRetryDelay("reading", e);
141+
IOUtils.closeWhileHandlingException(currentStream);
142+
143+
delayBeforeRetry(delayInMillis);
144+
openStreamWithRetry();
145+
}
146+
147+
// The method throws if the operation should *not* be retried. Otherwise, it keeps a record for the attempt and associated failure
148+
// and compute the delay before retry.
149+
private <T extends Exception> long maybeLogAndComputeRetryDelay(String action, T e) throws T {
150+
if (shouldRetry(attempt) == false) {
151+
final var finalException = addSuppressedExceptions(e);
152+
logForFailure(action, finalException);
153+
throw finalException;
154+
}
155+
156+
// Log at info level for the 1st retry and then exponentially less
157+
logForRetry(Integer.bitCount(attempt) == 1 ? Level.INFO : Level.DEBUG, action, e);
158+
if (failures.size() < MAX_SUPPRESSED_EXCEPTIONS) {
159+
failures.add(e);
160+
}
161+
final long delayInMillis = getRetryDelayInMillis();
162+
attempt += 1; // increment after computing delay because attempt affects the result
163+
return delayInMillis;
164+
}
165+
166+
private void logForFailure(String action, Exception e) {
167+
logger.warn(
168+
() -> format(
169+
"failed %s [%s] at offset [%s] with purpose [%s]",
170+
action,
171+
blobStoreServices.getBlobDescription(),
172+
start + offset,
173+
purpose.getKey()
174+
),
175+
e
176+
);
177+
}
178+
179+
private void logForRetry(Level level, String action, Exception e) {
180+
logger.log(
181+
level,
182+
() -> format(
183+
"""
184+
failed %s [%s] at offset [%s] with purpose [%s]; \
185+
this was attempt [%s] to read this blob which yielded [%s] bytes; in total \
186+
[%s] of the attempts to read this blob have made meaningful progress and do not count towards the maximum number of \
187+
retries; the maximum number of read attempts which do not make meaningful progress is [%s]""",
188+
action,
189+
blobStoreServices.getBlobDescription(),
190+
start + offset,
191+
purpose.getKey(),
192+
attempt,
193+
currentStreamProgress(),
194+
failuresAfterMeaningfulProgress,
195+
maxRetriesForNoMeaningfulProgress()
196+
),
197+
e
198+
);
199+
}
200+
201+
private void maybeLogAndRecordMetricsForSuccess(int initialAttempt, String action) {
202+
if (attempt > initialAttempt) {
203+
final int numberOfRetries = attempt - initialAttempt;
204+
logger.info(
205+
"successfully {} input stream for [{}] with purpose [{}] after [{}] retries",
206+
action,
207+
blobStoreServices.getBlobDescription(),
208+
purpose.getKey(),
209+
numberOfRetries
210+
);
211+
blobStoreServices.onRetrySucceeded(action, numberOfRetries);
212+
}
213+
}
214+
215+
private long currentStreamProgress() {
216+
if (currentStream == null) {
217+
return 0L;
218+
}
219+
return Math.subtractExact(Math.addExact(start, offset), currentStream.getFirstOffset());
220+
}
221+
222+
private boolean shouldRetry(int attempt) {
223+
if (purpose == OperationPurpose.REPOSITORY_ANALYSIS) {
224+
return false;
225+
}
226+
if (purpose == OperationPurpose.INDICES) {
227+
return true;
228+
}
229+
final int maxAttempts = blobStoreServices.getMaxRetries() + 1;
230+
return attempt < maxAttempts + failuresAfterMeaningfulProgress;
231+
}
232+
233+
private int maxRetriesForNoMeaningfulProgress() {
234+
return purpose == OperationPurpose.INDICES ? Integer.MAX_VALUE : (blobStoreServices.getMaxRetries() + 1);
235+
}
236+
237+
private void delayBeforeRetry(long delayInMillis) {
238+
try {
239+
assert shouldRetry(attempt - 1) : "should not have retried";
240+
Thread.sleep(delayInMillis);
241+
} catch (InterruptedException e) {
242+
logger.info("retrying input stream delay interrupted", e);
243+
Thread.currentThread().interrupt();
244+
}
245+
}
246+
247+
// protected access for testing
248+
protected long getRetryDelayInMillis() {
249+
// Initial delay is 10 ms and cap max delay at 10 * 1024 millis, i.e. it retries every ~10 seconds at a minimum
250+
return 10L << (Math.min(attempt - 1, 10));
251+
}
252+
253+
@Override
254+
public void close() throws IOException {
255+
try {
256+
currentStream.close();
257+
} finally {
258+
closed = true;
259+
}
260+
}
261+
262+
@Override
263+
public long skip(long n) throws IOException {
264+
ensureOpen();
265+
return currentStream.skip(n);
266+
}
267+
268+
@Override
269+
public void reset() {
270+
throw new UnsupportedOperationException("RetryingInputStream does not support seeking");
271+
}
272+
273+
private <T extends Exception> T addSuppressedExceptions(T e) {
274+
for (Exception failure : failures) {
275+
e.addSuppressed(failure);
276+
}
277+
return e;
278+
}
279+
280+
/**
281+
* This implements all the behavior that is blob-store-specific
282+
*/
283+
protected interface BlobStoreServices {
284+
285+
/**
286+
* Get an input stream for the blob at the given position
287+
*
288+
* @param start The start of the range to read, inclusive
289+
* @param end The end of the range to read, exclusive, or {@code Long.MAX_VALUE - 1} if the end of the blob should be used
290+
* @return An input stream for the given version
291+
* @throws IOException if a retryable error occurs while opening the stream
292+
* @throws NoSuchFileException if the blob does not exist, this is not retry-able
293+
* @throws RequestedRangeNotSatisfiedException if the requested range is not valid, this is not retry-able
294+
*/
295+
SingleAttemptInputStream getInputStream(long start, long end) throws IOException;
296+
297+
void onRetryStarted(String action);
298+
299+
void onRetrySucceeded(String action, long numberOfRetries);
300+
301+
long getMeaningfulProgressSize();
302+
303+
int getMaxRetries();
304+
305+
String getBlobDescription();
306+
}
307+
308+
/**
309+
* Represents an {@link InputStream} for a single attempt to read a blob. Each retry
310+
* will attempt to create a new one of these. If reading from it fails, it should not retry.
311+
*/
312+
protected abstract static class SingleAttemptInputStream extends InputStream {
313+
314+
/**
315+
* @return the offset of the first byte returned by this input stream
316+
*/
317+
protected abstract long getFirstOffset();
318+
}
319+
}

server/src/main/java/org/elasticsearch/common/lucene/Lucene.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,9 +87,12 @@
8787
import java.util.Arrays;
8888
import java.util.Collection;
8989
import java.util.Collections;
90+
import java.util.HashSet;
9091
import java.util.List;
9192
import java.util.Map;
9293
import java.util.Objects;
94+
import java.util.Set;
95+
import java.util.stream.Collectors;
9396

9497
public class Lucene {
9598

@@ -128,6 +131,14 @@ public static Iterable<String> files(SegmentInfos infos) throws IOException {
128131
return Iterables.flatten(list);
129132
}
130133

134+
/**
135+
* Returns the additional files that the {@param current} index commit introduces compared to the {@param previous} one.
136+
*/
137+
public static Set<String> additionalFileNames(IndexCommit previous, IndexCommit current) throws IOException {
138+
final Set<String> previousFiles = previous != null ? new HashSet<>(previous.getFileNames()) : Set.of();
139+
return current.getFileNames().stream().filter(f -> previousFiles.contains(f) == false).collect(Collectors.toUnmodifiableSet());
140+
}
141+
131142
/**
132143
* Returns the number of documents in the index referenced by this {@link SegmentInfos}
133144
*/

0 commit comments

Comments
 (0)