Skip to content

Commit e8669ff

Browse files
Add RETRY-START-OFFSET token to exception messages for smart retry
- Add getRetryStartOffset() to StructuredMessageDecoder - Add parseRetryStartOffset() helper to policy for parsing offset from exception - Wrap decode errors with IOException containing RETRY-START-OFFSET=N token - Update BlobAsyncClientBase to parse retry offset from exception message Co-authored-by: gunjansingh-msft <[email protected]>
1 parent a96939c commit e8669ff

File tree

3 files changed

+113
-4
lines changed

3 files changed

+113
-4
lines changed

sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1421,8 +1421,22 @@ Mono<BlobDownloadAsyncResponse> downloadStreamWithResponse(BlobRange range, Down
14211421
retryContext = retryContext
14221422
.addData(Constants.STRUCTURED_MESSAGE_DECODER_STATE_CONTEXT_KEY, decoderState);
14231423
} else {
1424-
// No decoder state yet, use the normal retry logic
1425-
retryRange = new BlobRange(initialOffset + offset, newCount);
1424+
// No decoder state available, try to parse retry offset from exception message
1425+
// The exception message contains RETRY-START-OFFSET=<number> token
1426+
long retryStartOffset = StorageContentValidationDecoderPolicy
1427+
.parseRetryStartOffset(throwable.getMessage());
1428+
if (retryStartOffset >= 0) {
1429+
long remainingCount = finalCount - retryStartOffset;
1430+
retryRange = new BlobRange(initialOffset + retryStartOffset, remainingCount);
1431+
1432+
LOGGER.info(
1433+
"Structured message smart retry from exception: resuming from offset {} "
1434+
+ "(initial={}, parsed={})",
1435+
initialOffset + retryStartOffset, initialOffset, retryStartOffset);
1436+
} else {
1437+
// Fallback to normal retry logic if no offset found
1438+
retryRange = new BlobRange(initialOffset + offset, newCount);
1439+
}
14261440
}
14271441
} else {
14281442
// For non-structured downloads, use smart retry from the interrupted offset

sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/implementation/structuredmessage/StructuredMessageDecoder.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,16 @@ public long getLastCompleteSegmentStart() {
127127
return lastCompleteSegmentStart;
128128
}
129129

130+
/**
131+
* Returns the canonical absolute byte index (0-based) that should be used to resume a failed/incomplete download.
132+
* This MUST be used directly as the Range header start value: "Range: bytes={retryStartOffset}-"
133+
*
134+
* @return The absolute byte index for the retry start offset.
135+
*/
136+
public long getRetryStartOffset() {
137+
return lastCompleteSegmentStart;
138+
}
139+
130140
/**
131141
* Gets the current message offset (total bytes consumed from the structured message).
132142
*

sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/policy/StorageContentValidationDecoderPolicy.java

Lines changed: 87 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,12 @@
1818
import reactor.core.publisher.Flux;
1919
import reactor.core.publisher.Mono;
2020

21+
import java.io.IOException;
2122
import java.nio.ByteBuffer;
2223
import java.nio.charset.Charset;
2324
import java.util.concurrent.atomic.AtomicLong;
25+
import java.util.regex.Matcher;
26+
import java.util.regex.Pattern;
2427

2528
/**
2629
* This is a decoding policy in an {@link com.azure.core.http.HttpPipeline} to decode structured messages in
@@ -37,12 +40,40 @@
3740
public class StorageContentValidationDecoderPolicy implements HttpPipelinePolicy {
3841
private static final ClientLogger LOGGER = new ClientLogger(StorageContentValidationDecoderPolicy.class);
3942

43+
/**
44+
* Machine-readable token pattern for extracting retry start offset from exception messages.
45+
* Format: RETRY-START-OFFSET={number}
46+
*/
47+
private static final String RETRY_OFFSET_TOKEN = "RETRY-START-OFFSET=";
48+
private static final Pattern RETRY_OFFSET_PATTERN = Pattern.compile("RETRY-START-OFFSET=(\\d+)");
49+
4050
/**
4151
* Creates a new instance of {@link StorageContentValidationDecoderPolicy}.
4252
*/
4353
public StorageContentValidationDecoderPolicy() {
4454
}
4555

56+
/**
57+
* Parses the retry start offset from an exception message containing the RETRY-START-OFFSET token.
58+
*
59+
* @param message The exception message to parse.
60+
* @return The retry start offset, or -1 if not found.
61+
*/
62+
public static long parseRetryStartOffset(String message) {
63+
if (message == null) {
64+
return -1;
65+
}
66+
Matcher matcher = RETRY_OFFSET_PATTERN.matcher(message);
67+
if (matcher.find()) {
68+
try {
69+
return Long.parseLong(matcher.group(1));
70+
} catch (NumberFormatException e) {
71+
return -1;
72+
}
73+
}
74+
return -1;
75+
}
76+
4677
@Override
4778
public Mono<HttpResponse> process(HttpPipelineCallContext context, HttpPipelineNextPolicy next) {
4879
// Check if structured message decoding is enabled for this request
@@ -79,6 +110,10 @@ public Mono<HttpResponse> process(HttpPipelineCallContext context, HttpPipelineN
79110
/**
80111
* Decodes a stream of byte buffers using the decoder state.
81112
* The decoder properly handles partial headers and segments split across chunks.
113+
*
114+
* <p>When an error occurs or the stream ends prematurely, an IOException is thrown with a
115+
* machine-readable token RETRY-START-OFFSET=&lt;number&gt; that can be parsed to determine
116+
* the correct offset for retry requests.</p>
82117
*
83118
* @param encodedFlux The flux of encoded byte buffers.
84119
* @param state The decoder state.
@@ -122,7 +157,7 @@ private Flux<ByteBuffer> decodeStream(Flux<ByteBuffer> encodedFlux, DecoderState
122157

123158
case INVALID:
124159
LOGGER.error("Invalid data during decode: {}", result.getMessage());
125-
return Flux.error(new IllegalArgumentException(
160+
return Flux.error(createRetryableException(state,
126161
"Failed to decode structured message: " + result.getMessage()));
127162

128163
default:
@@ -131,8 +166,18 @@ private Flux<ByteBuffer> decodeStream(Flux<ByteBuffer> encodedFlux, DecoderState
131166

132167
} catch (Exception e) {
133168
LOGGER.error("Failed to decode structured message chunk: " + e.getMessage(), e);
134-
return Flux.error(e);
169+
return Flux.error(createRetryableException(state, e.getMessage(), e));
170+
}
171+
}).onErrorResume(throwable -> {
172+
// Wrap any error with retry offset information
173+
if (throwable instanceof IOException) {
174+
// Check if already has retry offset token
175+
if (throwable.getMessage() != null && throwable.getMessage().contains(RETRY_OFFSET_TOKEN)) {
176+
return Flux.error(throwable);
177+
}
135178
}
179+
// Wrap the error with retry offset
180+
return Flux.error(createRetryableException(state, throwable.getMessage(), throwable));
136181
}).doOnComplete(() -> {
137182
// Finalize when stream completes
138183
if (!state.decoder.isComplete()) {
@@ -151,6 +196,46 @@ private Flux<ByteBuffer> decodeStream(Flux<ByteBuffer> encodedFlux, DecoderState
151196
});
152197
}
153198

199+
/**
200+
* Creates an IOException with the retry start offset encoded in the message.
201+
*
202+
* @param state The decoder state.
203+
* @param message The error message.
204+
* @return An IOException with retry offset information.
205+
*/
206+
private IOException createRetryableException(DecoderState state, String message) {
207+
return createRetryableException(state, message, null);
208+
}
209+
210+
/**
211+
* Creates an IOException with the retry start offset encoded in the message.
212+
*
213+
* @param state The decoder state.
214+
* @param message The error message.
215+
* @param cause The original cause, may be null.
216+
* @return An IOException with retry offset information.
217+
*/
218+
private IOException createRetryableException(DecoderState state, String message, Throwable cause) {
219+
long retryOffset = state.decoder.getRetryStartOffset();
220+
long decodedSoFar = state.decoder.getTotalDecodedPayloadBytes();
221+
long expectedLength = state.decoder.getMessageLength();
222+
223+
String fullMessage = String.format("Incomplete structured message: decoded %d of %d bytes. %s%d. %s",
224+
decodedSoFar, expectedLength > 0 ? expectedLength : 0, RETRY_OFFSET_TOKEN, retryOffset,
225+
message != null ? message : "");
226+
227+
LOGGER.atInfo()
228+
.addKeyValue("retryOffset", retryOffset)
229+
.addKeyValue("decodedSoFar", decodedSoFar)
230+
.addKeyValue("expectedLength", expectedLength)
231+
.log("Creating retryable exception with offset");
232+
233+
if (cause != null) {
234+
return new IOException(fullMessage, cause);
235+
}
236+
return new IOException(fullMessage);
237+
}
238+
154239
/**
155240
* Checks if structured message decoding should be applied based on context.
156241
*

0 commit comments

Comments
 (0)