|
14 | 14 | import com.amazonaws.services.s3.model.S3Object; |
15 | 15 | import com.amazonaws.services.s3.model.S3ObjectInputStream; |
16 | 16 |
|
| 17 | +import org.apache.logging.log4j.Level; |
17 | 18 | import org.apache.logging.log4j.LogManager; |
18 | 19 | import org.apache.logging.log4j.Logger; |
19 | | -import org.apache.logging.log4j.util.Supplier; |
20 | 20 | import org.elasticsearch.Version; |
21 | 21 | import org.elasticsearch.common.blobstore.OperationPurpose; |
22 | 22 | import org.elasticsearch.core.IOUtils; |
@@ -48,7 +48,7 @@ class S3RetryingInputStream extends InputStream { |
48 | 48 | private final String blobKey; |
49 | 49 | private final long start; |
50 | 50 | private final long end; |
51 | | - private final List<IOException> failures; |
| 51 | + private final List<Exception> failures; |
52 | 52 |
|
53 | 53 | private S3ObjectInputStream currentStream; |
54 | 54 | private long currentStreamFirstOffset; |
@@ -77,29 +77,34 @@ class S3RetryingInputStream extends InputStream { |
77 | 77 | this.failures = new ArrayList<>(MAX_SUPPRESSED_EXCEPTIONS); |
78 | 78 | this.start = start; |
79 | 79 | this.end = end; |
80 | | - openStream(); |
| 80 | + openStreamWithRetry(); |
81 | 81 | } |
82 | 82 |
|
83 | | - private void openStream() throws IOException { |
84 | | - try (AmazonS3Reference clientReference = blobStore.clientReference()) { |
85 | | - final GetObjectRequest getObjectRequest = new GetObjectRequest(blobStore.bucket(), blobKey); |
86 | | - getObjectRequest.setRequestMetricCollector(blobStore.getMetricCollector(Operation.GET_OBJECT, purpose)); |
87 | | - if (currentOffset > 0 || start > 0 || end < Long.MAX_VALUE - 1) { |
88 | | - assert start + currentOffset <= end |
89 | | - : "requesting beyond end, start = " + start + " offset=" + currentOffset + " end=" + end; |
90 | | - getObjectRequest.setRange(Math.addExact(start, currentOffset), end); |
91 | | - } |
92 | | - final S3Object s3Object = SocketAccess.doPrivileged(() -> clientReference.client().getObject(getObjectRequest)); |
93 | | - this.currentStreamFirstOffset = Math.addExact(start, currentOffset); |
94 | | - this.currentStreamLastOffset = Math.addExact(currentStreamFirstOffset, getStreamLength(s3Object)); |
95 | | - this.currentStream = s3Object.getObjectContent(); |
96 | | - } catch (final AmazonClientException e) { |
97 | | - if (e instanceof AmazonS3Exception amazonS3Exception) { |
98 | | - if (404 == amazonS3Exception.getStatusCode()) { |
99 | | - throw addSuppressedExceptions(new NoSuchFileException("Blob object [" + blobKey + "] not found: " + e.getMessage())); |
| 83 | + private void openStreamWithRetry() throws IOException { |
| 84 | + while (true) { |
| 85 | + try (AmazonS3Reference clientReference = blobStore.clientReference()) { |
| 86 | + final GetObjectRequest getObjectRequest = new GetObjectRequest(blobStore.bucket(), blobKey); |
| 87 | + getObjectRequest.setRequestMetricCollector(blobStore.getMetricCollector(Operation.GET_OBJECT, purpose)); |
| 88 | + if (currentOffset > 0 || start > 0 || end < Long.MAX_VALUE - 1) { |
| 89 | + assert start + currentOffset <= end |
| 90 | + : "requesting beyond end, start = " + start + " offset=" + currentOffset + " end=" + end; |
| 91 | + getObjectRequest.setRange(Math.addExact(start, currentOffset), end); |
| 92 | + } |
| 93 | + final S3Object s3Object = SocketAccess.doPrivileged(() -> clientReference.client().getObject(getObjectRequest)); |
| 94 | + this.currentStreamFirstOffset = Math.addExact(start, currentOffset); |
| 95 | + this.currentStreamLastOffset = Math.addExact(currentStreamFirstOffset, getStreamLength(s3Object)); |
| 96 | + this.currentStream = s3Object.getObjectContent(); |
| 97 | + return; |
| 98 | + } catch (AmazonClientException e) { |
| 99 | + if (e instanceof AmazonS3Exception amazonS3Exception && 404 == amazonS3Exception.getStatusCode()) { |
| 100 | + throw addSuppressedExceptions( |
| 101 | + new NoSuchFileException("Blob object [" + blobKey + "] not found: " + amazonS3Exception.getMessage()) |
| 102 | + ); |
100 | 103 | } |
| 104 | + |
| 105 | + final long delayInMillis = maybeLogAndComputeRetryDelay("opening", e); |
| 106 | + delayBeforeRetry(delayInMillis); |
101 | 107 | } |
102 | | - throw addSuppressedExceptions(e); |
103 | 108 | } |
104 | 109 | } |
105 | 110 |
|
@@ -166,45 +171,104 @@ private void ensureOpen() { |
166 | 171 | } |
167 | 172 |
|
168 | 173 | private void reopenStreamOrFail(IOException e) throws IOException { |
169 | | - if (purpose == OperationPurpose.REPOSITORY_ANALYSIS) { |
170 | | - logger.warn(() -> format(""" |
171 | | - failed reading [%s/%s] at offset [%s]""", blobStore.bucket(), blobKey, start + currentOffset), e); |
172 | | - throw e; |
| 174 | + final long delayInMillis = maybeLogAndComputeRetryDelay("reading", e); |
| 175 | + maybeAbort(currentStream); |
| 176 | + IOUtils.closeWhileHandlingException(currentStream); |
| 177 | + |
| 178 | + delayBeforeRetry(delayInMillis); |
| 179 | + openStreamWithRetry(); |
| 180 | + } |
| 181 | + |
| 182 | + // The method throws if the operation should *not* be retried. Otherwise, it keeps a record for the attempt and associated failure |
| 183 | + // and compute the delay before retry. |
| 184 | + private <T extends Exception> long maybeLogAndComputeRetryDelay(String action, T e) throws T { |
| 185 | + if (shouldRetry(attempt) == false) { |
| 186 | + final var finalException = addSuppressedExceptions(e); |
| 187 | + logForFailure(action, finalException); |
| 188 | + throw finalException; |
173 | 189 | } |
174 | 190 |
|
175 | | - final int maxAttempts = blobStore.getMaxRetries() + 1; |
| 191 | + // Log at info level for the 1st retry and every ~5 minutes afterward |
| 192 | + logForRetry((attempt == 1 || attempt % 30 == 0) ? Level.INFO : Level.DEBUG, action, e); |
| 193 | + if (failures.size() < MAX_SUPPRESSED_EXCEPTIONS) { |
| 194 | + failures.add(e); |
| 195 | + } |
| 196 | + final long delayInMillis = getRetryDelayInMillis(); |
| 197 | + attempt += 1; // increment after computing delay because attempt affects the result |
| 198 | + return delayInMillis; |
| 199 | + } |
| 200 | + |
| 201 | + private void logForFailure(String action, Exception e) { |
| 202 | + logger.warn( |
| 203 | + () -> format( |
| 204 | + "failed %s [%s/%s] at offset [%s] with purpose [%s]", |
| 205 | + action, |
| 206 | + blobStore.bucket(), |
| 207 | + blobKey, |
| 208 | + start + currentOffset, |
| 209 | + purpose.getKey() |
| 210 | + ), |
| 211 | + e |
| 212 | + ); |
| 213 | + } |
176 | 214 |
|
| 215 | + private void logForRetry(Level level, String action, Exception e) { |
177 | 216 | final long meaningfulProgressSize = Math.max(1L, blobStore.bufferSizeInBytes() / 100L); |
178 | 217 | final long currentStreamProgress = Math.subtractExact(Math.addExact(start, currentOffset), currentStreamFirstOffset); |
179 | 218 | if (currentStreamProgress >= meaningfulProgressSize) { |
180 | 219 | failuresAfterMeaningfulProgress += 1; |
181 | 220 | } |
182 | | - final Supplier<String> messageSupplier = () -> format( |
183 | | - """ |
184 | | - failed reading [%s/%s] at offset [%s]; this was attempt [%s] to read this blob which yielded [%s] bytes; in total \ |
185 | | - [%s] of the attempts to read this blob have made meaningful progress and do not count towards the maximum number of \ |
186 | | - retries; the maximum number of read attempts which do not make meaningful progress is [%s]""", |
187 | | - blobStore.bucket(), |
188 | | - blobKey, |
189 | | - start + currentOffset, |
190 | | - attempt, |
191 | | - currentStreamProgress, |
192 | | - failuresAfterMeaningfulProgress, |
193 | | - maxAttempts |
| 221 | + logger.log( |
| 222 | + level, |
| 223 | + () -> format( |
| 224 | + """ |
| 225 | + failed %s [%s/%s] at offset [%s] with purpose [%s]; \ |
| 226 | + this was attempt [%s] to read this blob which yielded [%s] bytes; in total \ |
| 227 | + [%s] of the attempts to read this blob have made meaningful progress and do not count towards the maximum number of \ |
| 228 | + retries; the maximum number of read attempts which do not make meaningful progress is [%s]""", |
| 229 | + action, |
| 230 | + blobStore.bucket(), |
| 231 | + blobKey, |
| 232 | + start + currentOffset, |
| 233 | + purpose.getKey(), |
| 234 | + attempt, |
| 235 | + currentStreamProgress, |
| 236 | + failuresAfterMeaningfulProgress, |
| 237 | + maxRetriesForNoMeaningfulProgress() |
| 238 | + ), |
| 239 | + e |
194 | 240 | ); |
195 | | - if (attempt >= maxAttempts + failuresAfterMeaningfulProgress) { |
196 | | - final var finalException = addSuppressedExceptions(e); |
197 | | - logger.warn(messageSupplier, finalException); |
198 | | - throw finalException; |
| 241 | + } |
| 242 | + |
| 243 | + private boolean shouldRetry(int attempt) { |
| 244 | + if (purpose == OperationPurpose.REPOSITORY_ANALYSIS) { |
| 245 | + return false; |
199 | 246 | } |
200 | | - logger.debug(messageSupplier, e); |
201 | | - attempt += 1; |
202 | | - if (failures.size() < MAX_SUPPRESSED_EXCEPTIONS) { |
203 | | - failures.add(e); |
| 247 | + if (purpose == OperationPurpose.INDICES) { |
| 248 | + return true; |
204 | 249 | } |
205 | | - maybeAbort(currentStream); |
206 | | - IOUtils.closeWhileHandlingException(currentStream); |
207 | | - openStream(); |
| 250 | + final int maxAttempts = blobStore.getMaxRetries() + 1; |
| 251 | + return attempt < maxAttempts + failuresAfterMeaningfulProgress; |
| 252 | + } |
| 253 | + |
| 254 | + private int maxRetriesForNoMeaningfulProgress() { |
| 255 | + return purpose == OperationPurpose.INDICES ? Integer.MAX_VALUE : (blobStore.getMaxRetries() + 1); |
| 256 | + } |
| 257 | + |
| 258 | + private void delayBeforeRetry(long delayInMillis) { |
| 259 | + try { |
| 260 | + assert shouldRetry(attempt - 1) : "should not have retried"; |
| 261 | + Thread.sleep(delayInMillis); |
| 262 | + } catch (InterruptedException e) { |
| 263 | + logger.info("s3 input stream delay interrupted", e); |
| 264 | + Thread.currentThread().interrupt(); |
| 265 | + } |
| 266 | + } |
| 267 | + |
| 268 | + // protected access for testing |
| 269 | + protected long getRetryDelayInMillis() { |
| 270 | + // Initial delay is 10 ms and cap max delay at 10 * 1024 millis, i.e. it retries every ~10 seconds at a minimum |
| 271 | + return 10L << (Math.min(attempt - 1, 10)); |
208 | 272 | } |
209 | 273 |
|
210 | 274 | @Override |
@@ -247,7 +311,7 @@ public void reset() { |
247 | 311 | } |
248 | 312 |
|
249 | 313 | private <T extends Exception> T addSuppressedExceptions(T e) { |
250 | | - for (IOException failure : failures) { |
| 314 | + for (Exception failure : failures) { |
251 | 315 | e.addSuppressed(failure); |
252 | 316 | } |
253 | 317 | return e; |
|
0 commit comments