Skip to content

Commit 5ce3a1c

Browse files
smart retry changes
1 parent 58cf53e commit 5ce3a1c

File tree

7 files changed

+427
-387
lines changed

7 files changed

+427
-387
lines changed

sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/implementation/util/BuilderHelper.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,12 @@ public static HttpPipeline buildPipeline(StorageSharedKeyCredential storageShare
141141

142142
HttpPolicyProviders.addAfterRetryPolicies(policies);
143143

144-
policies.add(new StorageContentValidationDecoderPolicy());
144+
// Only add the structured message decoder once; allow callers to inject their own for ordering with test
145+
// policies (e.g., fault injection before decoding).
146+
boolean hasDecoder = policies.stream().anyMatch(p -> p instanceof StorageContentValidationDecoderPolicy);
147+
if (!hasDecoder) {
148+
policies.add(new StorageContentValidationDecoderPolicy());
149+
}
145150

146151
policies.add(getResponseValidationPolicy());
147152

sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java

Lines changed: 43 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1290,11 +1290,14 @@ Mono<BlobDownloadAsyncResponse> downloadStreamWithResponse(BlobRange range, Down
12901290
? new Context("azure-eagerly-convert-headers", true)
12911291
: context.addData("azure-eagerly-convert-headers", true);
12921292

1293+
AtomicReference<StorageContentValidationDecoderPolicy.DecoderState> decoderStateRef = new AtomicReference<>();
1294+
12931295
// Add structured message decoding context if enabled
12941296
final Context firstRangeContext;
12951297
if (contentValidationOptions != null && contentValidationOptions.isStructuredMessageValidationEnabled()) {
12961298
firstRangeContext = initialContext.addData(Constants.STRUCTURED_MESSAGE_DECODING_CONTEXT_KEY, true)
1297-
.addData(Constants.STRUCTURED_MESSAGE_VALIDATION_OPTIONS_CONTEXT_KEY, contentValidationOptions);
1299+
.addData(Constants.STRUCTURED_MESSAGE_VALIDATION_OPTIONS_CONTEXT_KEY, contentValidationOptions)
1300+
.addData(Constants.STRUCTURED_MESSAGE_DECODER_STATE_REF_CONTEXT_KEY, decoderStateRef);
12981301
} else {
12991302
firstRangeContext = initialContext;
13001303
}
@@ -1319,18 +1322,6 @@ Mono<BlobDownloadAsyncResponse> downloadStreamWithResponse(BlobRange range, Down
13191322
finalCount = finalRange.getCount();
13201323
}
13211324

1322-
AtomicReference<StorageContentValidationDecoderPolicy.DecoderState> decoderStateRef
1323-
= new AtomicReference<>();
1324-
if (contentValidationOptions != null
1325-
&& contentValidationOptions.isStructuredMessageValidationEnabled()) {
1326-
Object decoderStateObj
1327-
= firstRangeContext.getData(Constants.STRUCTURED_MESSAGE_DECODER_STATE_CONTEXT_KEY)
1328-
.orElse(null);
1329-
if (decoderStateObj instanceof StorageContentValidationDecoderPolicy.DecoderState) {
1330-
decoderStateRef.set((StorageContentValidationDecoderPolicy.DecoderState) decoderStateObj);
1331-
}
1332-
}
1333-
13341325
// The resume function takes throwable and offset at the destination.
13351326
// I.e. offset is relative to the starting point.
13361327
BiFunction<Throwable, Long, Mono<StreamResponse>> onDownloadErrorResume = (throwable, offset) -> {
@@ -1342,6 +1333,8 @@ Mono<BlobDownloadAsyncResponse> downloadStreamWithResponse(BlobRange range, Down
13421333
StorageContentValidationDecoderPolicy.DecoderState decoderState = null;
13431334
long expectedEncodedLength = finalCount;
13441335
long encodedProgress = offset;
1336+
long retryStartOffset = -1;
1337+
boolean noBytesEmitted = offset == 0;
13451338

13461339
if (contentValidationOptions != null
13471340
&& contentValidationOptions.isStructuredMessageValidationEnabled()) {
@@ -1357,12 +1350,37 @@ Mono<BlobDownloadAsyncResponse> downloadStreamWithResponse(BlobRange range, Down
13571350
}
13581351
}
13591352

1353+
// If the decoder has already finalized, discard it and restart from the beginning.
1354+
if (decoderState != null && decoderState.isFinalized()) {
1355+
decoderState = null;
1356+
}
1357+
13601358
if (decoderState != null) {
13611359
expectedEncodedLength = decoderState.getExpectedContentLength();
13621360
encodedProgress = decoderState.getTotalEncodedBytesProcessed();
1361+
1362+
long boundaryDecoded = decoderState.getDecodedBytesAtLastCompleteSegment();
1363+
if (offset < boundaryDecoded || noBytesEmitted) {
1364+
// We haven't emitted the bytes represented by this decoder boundary; restart clean.
1365+
decoderState = null;
1366+
} else {
1367+
long bytesToSkip = Math.max(0, offset - boundaryDecoded);
1368+
decoderState.setDecodedBytesToSkip(bytesToSkip);
1369+
1370+
// Always rewind decoder to last validated boundary before retrying.
1371+
retryStartOffset = decoderState.resetForRetry();
1372+
}
13631373
}
13641374
}
13651375

1376+
if (decoderState == null) {
1377+
// No decoder state available (likely failed before policy captured it) or no bytes emitted;
1378+
// restart from beginning to avoid skipping data.
1379+
retryStartOffset = noBytesEmitted ? 0 : -1;
1380+
encodedProgress = noBytesEmitted ? 0 : encodedProgress;
1381+
decoderStateRef.set(null);
1382+
}
1383+
13661384
try {
13671385
// For retry context, preserve decoder state if structured message validation is enabled
13681386
Context retryContext = firstRangeContext;
@@ -1372,25 +1390,23 @@ Mono<BlobDownloadAsyncResponse> downloadStreamWithResponse(BlobRange range, Down
13721390
// based on the encoded bytes processed, not the decoded bytes
13731391
if (contentValidationOptions != null
13741392
&& contentValidationOptions.isStructuredMessageValidationEnabled()) {
1375-
long retryStartOffset = -1;
1376-
1377-
// First try to use decoder state (authoritative)
1378-
if (decoderState != null) {
1379-
// Always rewind decoder to last validated boundary before retrying.
1380-
retryStartOffset = decoderState.resetForRetry();
1381-
1382-
retryContext = retryContext
1383-
.addData(Constants.STRUCTURED_MESSAGE_DECODER_STATE_CONTEXT_KEY, decoderState);
1384-
decoderStateRef.set(decoderState);
1385-
}
1386-
13871393
// If no decoder state or no retry offset from state, fall back to parsed token or offset.
1388-
if (retryStartOffset < 0) {
1394+
if (retryStartOffset < 0 && !noBytesEmitted) {
13891395
retryStartOffset = StorageContentValidationDecoderPolicy
13901396
.parseRetryStartOffset(throwable.getMessage());
13911397
}
13921398
if (retryStartOffset < 0) {
1393-
retryStartOffset = offset;
1399+
retryStartOffset = noBytesEmitted ? 0 : offset;
1400+
}
1401+
1402+
if (decoderState != null) {
1403+
retryContext = retryContext
1404+
.addData(Constants.STRUCTURED_MESSAGE_DECODER_STATE_CONTEXT_KEY, decoderState);
1405+
decoderStateRef.set(decoderState);
1406+
} else {
1407+
// Ensure we don't carry a stale decoder state into a full restart.
1408+
retryContext = retryContext
1409+
.addData(Constants.STRUCTURED_MESSAGE_DECODER_STATE_CONTEXT_KEY, null);
13941410
}
13951411

13961412
long remainingCount = expectedEncodedLength - retryStartOffset;

0 commit comments

Comments
 (0)