|
85 | 85 | import com.azure.storage.common.implementation.Constants; |
86 | 86 | import com.azure.storage.common.implementation.SasImplUtils; |
87 | 87 | import com.azure.storage.common.implementation.StorageImplUtils; |
| 88 | +import com.azure.storage.common.policy.StorageContentValidationDecoderPolicy; |
88 | 89 | import reactor.core.publisher.Flux; |
89 | 90 | import reactor.core.publisher.Mono; |
90 | 91 | import reactor.core.publisher.SignalType; |
@@ -1342,24 +1343,65 @@ Mono<BlobDownloadAsyncResponse> downloadStreamWithResponse(BlobRange range, Down |
1342 | 1343 | try { |
1343 | 1344 | // For retry context, preserve decoder state if structured message validation is enabled |
1344 | 1345 | Context retryContext = firstRangeContext; |
| 1346 | + BlobRange retryRange; |
1345 | 1347 |
|
1346 | | - // If structured message decoding is enabled, we need to include the decoder state |
1347 | | - // so the retry can continue from where we left off |
| 1348 | + // If structured message decoding is enabled, we need to calculate the retry offset |
| 1349 | + // based on the encoded bytes processed, not the decoded bytes |
1348 | 1350 | if (contentValidationOptions != null |
1349 | 1351 | && contentValidationOptions.isStructuredMessageValidationEnabled()) { |
1350 | | - // The decoder state will be set by the policy during processing |
1351 | | - // We preserve it in the context for the retry request |
1352 | | - Object decoderState |
| 1352 | + // Get the decoder state to determine how many encoded bytes were processed |
| 1353 | + Object decoderStateObj |
1353 | 1354 | = firstRangeContext.getData(Constants.STRUCTURED_MESSAGE_DECODER_STATE_CONTEXT_KEY) |
1354 | 1355 | .orElse(null); |
1355 | | - if (decoderState != null) { |
| 1356 | + |
| 1357 | + if (decoderStateObj instanceof StorageContentValidationDecoderPolicy.DecoderState) { |
| 1358 | + StorageContentValidationDecoderPolicy.DecoderState decoderState |
| 1359 | + = (StorageContentValidationDecoderPolicy.DecoderState) decoderStateObj; |
| 1360 | + |
| 1361 | + // Use getRetryOffset() to get the correct offset for retry |
| 1362 | + // This accounts for pending bytes that have been received but not yet consumed |
| 1363 | + long encodedOffset = decoderState.getRetryOffset(); |
| 1364 | + long remainingCount = finalCount - encodedOffset; |
| 1365 | + retryRange = new BlobRange(initialOffset + encodedOffset, remainingCount); |
| 1366 | + |
| 1367 | + LOGGER.info( |
| 1368 | + "Structured message smart retry: resuming from offset {} (initial={}, encoded={})", |
| 1369 | + initialOffset + encodedOffset, initialOffset, encodedOffset); |
| 1370 | + |
| 1371 | + // Preserve the decoder state for the retry |
1356 | 1372 | retryContext = retryContext |
1357 | 1373 | .addData(Constants.STRUCTURED_MESSAGE_DECODER_STATE_CONTEXT_KEY, decoderState); |
| 1374 | + } else { |
| 1375 | + // No decoder state available, try to parse retry offset from exception message |
| 1376 | + // The exception message contains RETRY-START-OFFSET=<number> token |
| 1377 | + long retryStartOffset = StorageContentValidationDecoderPolicy |
| 1378 | + .parseRetryStartOffset(throwable.getMessage()); |
| 1379 | + if (retryStartOffset >= 0) { |
| 1380 | + long remainingCount = finalCount - retryStartOffset; |
| 1381 | + // Validate remainingCount to avoid negative values |
| 1382 | + if (remainingCount <= 0) { |
| 1383 | + LOGGER.warning("Retry offset {} exceeds finalCount {}, using fallback", |
| 1384 | + retryStartOffset, finalCount); |
| 1385 | + retryRange = new BlobRange(initialOffset + offset, newCount); |
| 1386 | + } else { |
| 1387 | + retryRange = new BlobRange(initialOffset + retryStartOffset, remainingCount); |
| 1388 | + |
| 1389 | + LOGGER.info( |
| 1390 | + "Structured message smart retry from exception: resuming from offset {} " |
| 1391 | + + "(initial={}, parsed={})", |
| 1392 | + initialOffset + retryStartOffset, initialOffset, retryStartOffset); |
| 1393 | + } |
| 1394 | + } else { |
| 1395 | + // Fallback to normal retry logic if no offset found |
| 1396 | + retryRange = new BlobRange(initialOffset + offset, newCount); |
| 1397 | + } |
1358 | 1398 | } |
| 1399 | + } else { |
| 1400 | + // For non-structured downloads, use smart retry from the interrupted offset |
| 1401 | + retryRange = new BlobRange(initialOffset + offset, newCount); |
1359 | 1402 | } |
1360 | 1403 |
|
1361 | | - return downloadRange(new BlobRange(initialOffset + offset, newCount), finalRequestConditions, |
1362 | | - eTag, finalGetMD5, retryContext); |
| 1404 | + return downloadRange(retryRange, finalRequestConditions, eTag, finalGetMD5, retryContext); |
1363 | 1405 | } catch (Exception e) { |
1364 | 1406 | return Mono.error(e); |
1365 | 1407 | } |
|
0 commit comments