Skip to content

Commit 58cf53e

Browse files
smart retry changes
1 parent 0f37684 commit 58cf53e

File tree

5 files changed

+265
-77
lines changed

5 files changed

+265
-77
lines changed

sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java

Lines changed: 61 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@
114114
import java.util.Map;
115115
import java.util.Set;
116116
import java.util.concurrent.TimeoutException;
117+
import java.util.concurrent.atomic.AtomicReference;
117118
import java.util.function.BiFunction;
118119
import java.util.function.Consumer;
119120

@@ -1318,6 +1319,18 @@ Mono<BlobDownloadAsyncResponse> downloadStreamWithResponse(BlobRange range, Down
13181319
finalCount = finalRange.getCount();
13191320
}
13201321

1322+
AtomicReference<StorageContentValidationDecoderPolicy.DecoderState> decoderStateRef
1323+
= new AtomicReference<>();
1324+
if (contentValidationOptions != null
1325+
&& contentValidationOptions.isStructuredMessageValidationEnabled()) {
1326+
Object decoderStateObj
1327+
= firstRangeContext.getData(Constants.STRUCTURED_MESSAGE_DECODER_STATE_CONTEXT_KEY)
1328+
.orElse(null);
1329+
if (decoderStateObj instanceof StorageContentValidationDecoderPolicy.DecoderState) {
1330+
decoderStateRef.set((StorageContentValidationDecoderPolicy.DecoderState) decoderStateObj);
1331+
}
1332+
}
1333+
13211334
// The resume function takes throwable and offset at the destination.
13221335
// I.e. offset is relative to the starting point.
13231336
BiFunction<Throwable, Long, Mono<StreamResponse>> onDownloadErrorResume = (throwable, offset) -> {
@@ -1326,18 +1339,28 @@ Mono<BlobDownloadAsyncResponse> downloadStreamWithResponse(BlobRange range, Down
13261339
}
13271340

13281341
long newCount = finalCount - offset;
1342+
StorageContentValidationDecoderPolicy.DecoderState decoderState = null;
1343+
long expectedEncodedLength = finalCount;
1344+
long encodedProgress = offset;
1345+
1346+
if (contentValidationOptions != null
1347+
&& contentValidationOptions.isStructuredMessageValidationEnabled()) {
1348+
decoderState = decoderStateRef.get();
1349+
1350+
if (decoderState == null) {
1351+
Object decoderStateObj
1352+
= firstRangeContext.getData(Constants.STRUCTURED_MESSAGE_DECODER_STATE_CONTEXT_KEY)
1353+
.orElse(null);
13291354

1330-
/*
1331-
* It's possible that the network stream will throw an error after emitting all data but before
1332-
* completing. Issuing a retry at this stage would leave the download in a bad state with
1333-
* incorrect count and offset values. Because we have read the intended amount of data, we can
1334-
* ignore the error at the end of the stream.
1335-
*/
1336-
if (newCount == 0) {
1337-
LOGGER.warning("Exception encountered in ReliableDownload after all data read from the network "
1338-
+ "but before stream signaled completion. Returning success as all data was downloaded. "
1339-
+ "Exception message: " + throwable.getMessage());
1340-
return Mono.empty();
1355+
if (decoderStateObj instanceof StorageContentValidationDecoderPolicy.DecoderState) {
1356+
decoderState = (StorageContentValidationDecoderPolicy.DecoderState) decoderStateObj;
1357+
}
1358+
}
1359+
1360+
if (decoderState != null) {
1361+
expectedEncodedLength = decoderState.getExpectedContentLength();
1362+
encodedProgress = decoderState.getTotalEncodedBytesProcessed();
1363+
}
13411364
}
13421365

13431366
try {
@@ -1349,53 +1372,38 @@ Mono<BlobDownloadAsyncResponse> downloadStreamWithResponse(BlobRange range, Down
13491372
// based on the encoded bytes processed, not the decoded bytes
13501373
if (contentValidationOptions != null
13511374
&& contentValidationOptions.isStructuredMessageValidationEnabled()) {
1352-
// Get the decoder state to determine how many encoded bytes were processed
1353-
Object decoderStateObj
1354-
= firstRangeContext.getData(Constants.STRUCTURED_MESSAGE_DECODER_STATE_CONTEXT_KEY)
1355-
.orElse(null);
1356-
1357-
if (decoderStateObj instanceof StorageContentValidationDecoderPolicy.DecoderState) {
1358-
StorageContentValidationDecoderPolicy.DecoderState decoderState
1359-
= (StorageContentValidationDecoderPolicy.DecoderState) decoderStateObj;
1375+
long retryStartOffset = -1;
13601376

1361-
// Use getRetryOffset() to get the correct offset for retry
1362-
// This accounts for pending bytes that have been received but not yet consumed
1363-
long encodedOffset = decoderState.getRetryOffset();
1364-
long remainingCount = finalCount - encodedOffset;
1365-
retryRange = new BlobRange(initialOffset + encodedOffset, remainingCount);
1377+
// First try to use decoder state (authoritative)
1378+
if (decoderState != null) {
1379+
// Always rewind decoder to last validated boundary before retrying.
1380+
retryStartOffset = decoderState.resetForRetry();
13661381

1367-
LOGGER.info(
1368-
"Structured message smart retry: resuming from offset {} (initial={}, encoded={})",
1369-
initialOffset + encodedOffset, initialOffset, encodedOffset);
1370-
1371-
// Preserve the decoder state for the retry
13721382
retryContext = retryContext
13731383
.addData(Constants.STRUCTURED_MESSAGE_DECODER_STATE_CONTEXT_KEY, decoderState);
1374-
} else {
1375-
// No decoder state available, try to parse retry offset from exception message
1376-
// The exception message contains RETRY-START-OFFSET=<number> token
1377-
long retryStartOffset = StorageContentValidationDecoderPolicy
1384+
decoderStateRef.set(decoderState);
1385+
}
1386+
1387+
// If no decoder state or no retry offset from state, fall back to parsed token or offset.
1388+
if (retryStartOffset < 0) {
1389+
retryStartOffset = StorageContentValidationDecoderPolicy
13781390
.parseRetryStartOffset(throwable.getMessage());
1379-
if (retryStartOffset >= 0) {
1380-
long remainingCount = finalCount - retryStartOffset;
1381-
// Validate remainingCount to avoid negative values
1382-
if (remainingCount <= 0) {
1383-
LOGGER.warning("Retry offset {} exceeds finalCount {}, using fallback",
1384-
retryStartOffset, finalCount);
1385-
retryRange = new BlobRange(initialOffset + offset, newCount);
1386-
} else {
1387-
retryRange = new BlobRange(initialOffset + retryStartOffset, remainingCount);
1388-
1389-
LOGGER.info(
1390-
"Structured message smart retry from exception: resuming from offset {} "
1391-
+ "(initial={}, parsed={})",
1392-
initialOffset + retryStartOffset, initialOffset, retryStartOffset);
1393-
}
1394-
} else {
1395-
// Fallback to normal retry logic if no offset found
1396-
retryRange = new BlobRange(initialOffset + offset, newCount);
1397-
}
13981391
}
1392+
if (retryStartOffset < 0) {
1393+
retryStartOffset = offset;
1394+
}
1395+
1396+
long remainingCount = expectedEncodedLength - retryStartOffset;
1397+
if (remainingCount < 0) {
1398+
remainingCount = expectedEncodedLength - offset;
1399+
retryStartOffset = offset;
1400+
}
1401+
1402+
retryRange = new BlobRange(initialOffset + retryStartOffset, remainingCount);
1403+
1404+
LOGGER.info(
1405+
"Structured message smart retry: resuming from offset {} (initial={}, encoded={}, remaining={})",
1406+
initialOffset + retryStartOffset, initialOffset, retryStartOffset, remainingCount);
13991407
} else {
14001408
// For non-structured downloads, use smart retry from the interrupted offset
14011409
retryRange = new BlobRange(initialOffset + offset, newCount);
@@ -1410,6 +1418,7 @@ Mono<BlobDownloadAsyncResponse> downloadStreamWithResponse(BlobRange range, Down
14101418
// Structured message decoding is now handled by StructuredMessageDecoderPolicy
14111419
return BlobDownloadAsyncResponseConstructorProxy.create(response, onDownloadErrorResume, finalOptions);
14121420
});
1421+
14131422
}
14141423

14151424
Mono<BlobDownloadAsyncResponse> downloadStreamWithResponse(BlobRange range, DownloadRetryOptions options,

sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/BlobMessageDecoderDownloadTests.java

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
import com.azure.core.test.utils.TestUtils;
77
import com.azure.core.util.FluxUtil;
8+
import com.azure.core.util.Context;
89
import com.azure.storage.blob.models.BlobRange;
910
import com.azure.storage.blob.models.BlobRequestConditions;
1011
import com.azure.storage.blob.models.DownloadRetryOptions;
@@ -20,6 +21,7 @@
2021
import reactor.test.StepVerifier;
2122

2223
import java.io.IOException;
24+
import java.io.ByteArrayOutputStream;
2325
import java.nio.ByteBuffer;
2426
import java.util.List;
2527

@@ -285,6 +287,83 @@ public void downloadStreamWithResponseContentValidationSmartRetry() throws IOExc
285287
}
286288
}
287289

290+
@Test
291+
public void downloadStreamWithResponseContentValidationSmartRetryVariousSizes() throws IOException {
292+
int[] dataSizes = new int[] { Constants.KB, 1500, 3 * Constants.KB + 128 };
293+
int[] segmentSizes = new int[] { 512, 700, 1024 };
294+
295+
for (int i = 0; i < dataSizes.length; i++) {
296+
byte[] randomData = getRandomByteArray(dataSizes[i]);
297+
int segmentSize = segmentSizes[i % segmentSizes.length];
298+
299+
StructuredMessageEncoder encoder
300+
= new StructuredMessageEncoder(randomData.length, segmentSize, StructuredMessageFlags.STORAGE_CRC64);
301+
ByteBuffer encodedData = encoder.encode(ByteBuffer.wrap(randomData));
302+
303+
Flux<ByteBuffer> input = Flux.just(encodedData);
304+
305+
// Create a policy that will simulate 2 network interruptions for each run
306+
MockPartialResponsePolicy mockPolicy = new MockPartialResponsePolicy(2);
307+
308+
// Upload the encoded data
309+
bc.upload(input, null, true).block();
310+
311+
// Create a download client with both the mock policy AND the decoder policy
312+
StorageContentValidationDecoderPolicy decoderPolicy = new StorageContentValidationDecoderPolicy();
313+
BlobAsyncClient downloadClient = getBlobAsyncClient(ENVIRONMENT.getPrimaryAccount().getCredential(),
314+
bc.getBlobUrl(), mockPolicy, decoderPolicy);
315+
316+
DownloadContentValidationOptions validationOptions
317+
= new DownloadContentValidationOptions().setStructuredMessageValidationEnabled(true);
318+
DownloadRetryOptions retryOptions = new DownloadRetryOptions().setMaxRetryRequests(5);
319+
320+
StepVerifier.create(downloadClient
321+
.downloadStreamWithResponse((BlobRange) null, retryOptions, (BlobRequestConditions) null, false,
322+
validationOptions)
323+
.flatMap(r -> FluxUtil.collectBytesInByteBufferStream(r.getValue()))).assertNext(r -> {
324+
TestUtils.assertArraysEqual(r, randomData);
325+
}).verifyComplete();
326+
327+
assertEquals(0, mockPolicy.getTriesRemaining());
328+
List<String> rangeHeaders = mockPolicy.getRangeHeaders();
329+
assertTrue(rangeHeaders.size() > 0, "Expected range headers for retries");
330+
assertTrue(rangeHeaders.get(0).startsWith("bytes=0-"), "First request should start from offset 0");
331+
}
332+
}
333+
334+
@Test
335+
public void downloadStreamWithResponseContentValidationSmartRetrySync() throws IOException {
336+
byte[] randomData = getRandomByteArray(Constants.KB);
337+
StructuredMessageEncoder encoder
338+
= new StructuredMessageEncoder(randomData.length, 512, StructuredMessageFlags.STORAGE_CRC64);
339+
ByteBuffer encodedData = encoder.encode(ByteBuffer.wrap(randomData));
340+
341+
Flux<ByteBuffer> input = Flux.just(encodedData);
342+
bc.upload(input, null, true).block();
343+
344+
MockPartialResponsePolicy mockPolicy = new MockPartialResponsePolicy(3);
345+
StorageContentValidationDecoderPolicy decoderPolicy = new StorageContentValidationDecoderPolicy();
346+
BlobClient downloadClient = getBlobClient(ENVIRONMENT.getPrimaryAccount().getCredential(), bc.getBlobUrl(),
347+
mockPolicy, decoderPolicy);
348+
349+
DownloadContentValidationOptions validationOptions
350+
= new DownloadContentValidationOptions().setStructuredMessageValidationEnabled(true);
351+
DownloadRetryOptions retryOptions = new DownloadRetryOptions().setMaxRetryRequests(5);
352+
353+
ByteArrayOutputStream out = new ByteArrayOutputStream();
354+
Context ctx = new Context(Constants.STRUCTURED_MESSAGE_DECODING_CONTEXT_KEY, true)
355+
.addData(Constants.STRUCTURED_MESSAGE_VALIDATION_OPTIONS_CONTEXT_KEY, validationOptions);
356+
357+
downloadClient.downloadStreamWithResponse(out, null, retryOptions, null, false, null, ctx);
358+
359+
TestUtils.assertArraysEqual(out.toByteArray(), randomData);
360+
361+
assertEquals(0, mockPolicy.getTriesRemaining());
362+
List<String> rangeHeaders = mockPolicy.getRangeHeaders();
363+
assertTrue(rangeHeaders.size() > 0, "Expected range headers for retries");
364+
assertTrue(rangeHeaders.get(0).startsWith("bytes=0-"), "First request should start from offset 0");
365+
}
366+
288367
@Test
289368
public void downloadStreamWithResponseContentValidationSmartRetryMultipleSegments() throws IOException {
290369
// Test smart retry with multiple segments to ensure checksum validation

sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/implementation/structuredmessage/StructuredMessageDecoder.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,20 @@ public boolean isHeaderRead() {
332332
return messageLength != -1;
333333
}
334334

335+
/**
336+
* Gets the number of encoded bytes that have been seen but not yet fully
337+
* processed by the decoder (pending bytes).
338+
*
339+
* <p>This is used by smart-retry logic to determine the absolute encoded
340+
* offset that a retry request should start from while still preserving
341+
* the decoder's buffered state.</p>
342+
*
343+
* @return The number of pending encoded bytes buffered by the decoder.
344+
*/
345+
public int getPendingEncodedByteCount() {
346+
return pendingBytes.size();
347+
}
348+
335349
/**
336350
* Reads the message header if we have enough bytes.
337351
*
@@ -611,6 +625,13 @@ public DecodeResult decodeChunk(ByteBuffer buffer) {
611625

612626
// Step 2: Process segments
613627
while (messageOffset < messageLength) {
628+
// If all segments are done, proceed to message footer before attempting any new segment header.
629+
if (currentSegmentNumber == numSegments && currentSegmentContentOffset == currentSegmentContentLength) {
630+
if (!tryReadMessageFooter(buffer)) {
631+
break;
632+
}
633+
}
634+
614635
// Read segment header if needed
615636
if (currentSegmentContentOffset == currentSegmentContentLength) {
616637
if (!tryReadSegmentHeader(buffer)) {

0 commit comments

Comments
 (0)