Skip to content

Commit 4c1a3fa

Browse files
Track segment boundaries for smart retry resumption
Implemented segment boundary tracking in StructuredMessageDecoder: - Added lastCompleteSegmentStart to track where complete segments end - Changed messageOffset and segment fields to long for proper offset tracking - Fixed segment size reading to handle unsigned long values correctly - Updated DecoderState.getRetryOffset() to use segment boundaries - Ensures retries resume from complete segment boundaries, not mid-segment This matches the decryptor pattern where retries align with block boundaries. Co-authored-by: gunjansingh-msft <[email protected]>
1 parent 3af7a7b commit 4c1a3fa

File tree

2 files changed

+46
-15
lines changed

2 files changed

+46
-15
lines changed

sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/implementation/structuredmessage/StructuredMessageDecoder.java

Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,19 @@ public class StructuredMessageDecoder {
2727
private int numSegments;
2828
private final long expectedContentLength;
2929

30-
private int messageOffset = 0;
30+
private long messageOffset = 0;
3131
private int currentSegmentNumber = 0;
32-
private int currentSegmentContentLength = 0;
33-
private int currentSegmentContentOffset = 0;
32+
private long currentSegmentContentLength = 0;
33+
private long currentSegmentContentOffset = 0;
3434

3535
private long messageCrc64 = 0;
3636
private long segmentCrc64 = 0;
3737
private final Map<Integer, Long> segmentCrcs = new HashMap<>();
3838

39+
// Track the last complete segment boundary for smart retry
40+
private long lastCompleteSegmentStart = 0;
41+
private long currentSegmentStart = 0;
42+
3943
/**
4044
* Constructs a new StructuredMessageDecoder.
4145
*
@@ -45,6 +49,25 @@ public StructuredMessageDecoder(long expectedContentLength) {
4549
this.expectedContentLength = expectedContentLength;
4650
}
4751

52+
/**
53+
* Gets the byte offset where the last complete segment ended.
54+
* This is used for smart retry to resume from a segment boundary.
55+
*
56+
* @return The byte offset of the last complete segment boundary.
57+
*/
58+
public long getLastCompleteSegmentStart() {
59+
return lastCompleteSegmentStart;
60+
}
61+
62+
/**
63+
* Gets the current message offset (total bytes consumed from the structured message).
64+
*
65+
* @return The current message offset.
66+
*/
67+
public long getMessageOffset() {
68+
return messageOffset;
69+
}
70+
4871
/**
4972
* Reads the message header from the given buffer.
5073
*
@@ -90,10 +113,16 @@ private void readSegmentHeader(ByteBuffer buffer) {
90113
throw LOGGER.logExceptionAsError(new IllegalArgumentException("Segment header is incomplete."));
91114
}
92115

116+
// Mark the start of this segment (before reading the header)
117+
currentSegmentStart = messageOffset;
118+
93119
int segmentNum = Short.toUnsignedInt(buffer.getShort());
94-
int segmentSize = (int) buffer.getLong();
95120

96-
if (segmentSize < 0 || segmentSize > buffer.remaining()) {
121+
// Read segment size as long (8 bytes)
122+
long segmentSize = buffer.getLong();
123+
124+
// Validate segment size
125+
if (segmentSize < 0L || segmentSize > buffer.remaining()) {
97126
throw LOGGER
98127
.logExceptionAsError(new IllegalArgumentException("Invalid segment size detected: " + segmentSize));
99128
}
@@ -126,8 +155,8 @@ private void readSegmentHeader(ByteBuffer buffer) {
126155
* @throws IllegalArgumentException if there is a segment size mismatch.
127156
*/
128157
private void readSegmentContent(ByteBuffer buffer, ByteArrayOutputStream output, int size) {
129-
int toRead = Math.min(buffer.remaining(), currentSegmentContentLength - currentSegmentContentOffset);
130-
toRead = Math.min(toRead, size);
158+
long remaining = currentSegmentContentLength - currentSegmentContentOffset;
159+
int toRead = (int) Math.min(buffer.remaining(), Math.min(remaining, size));
131160

132161
if (toRead == 0) {
133162
return;
@@ -182,10 +211,12 @@ private void readSegmentFooter(ByteBuffer buffer) {
182211
messageOffset += CRC64_LENGTH;
183212
}
184213

214+
// Mark that this segment is complete - update the last complete segment boundary
215+
// This is the position where we can safely resume if a retry occurs
216+
lastCompleteSegmentStart = messageOffset;
217+
185218
if (currentSegmentNumber == numSegments) {
186219
readMessageFooter(buffer);
187-
} else {
188-
readSegmentHeader(buffer);
189220
}
190221
}
191222

sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/policy/StorageContentValidationDecoderPolicy.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -297,15 +297,15 @@ public long getTotalEncodedBytesProcessed() {
297297

298298
/**
299299
* Gets the offset to use for retry requests.
300-
* This is the total encoded bytes processed minus any bytes in the pending buffer,
301-
* since pending bytes have already been counted but haven't been successfully processed yet.
300+
* This uses the decoder's last complete segment boundary to ensure retries
301+
* resume from a valid segment boundary, not mid-segment.
302302
*
303-
* @return The offset for retry requests.
303+
* @return The offset for retry requests (last complete segment boundary).
304304
*/
305305
public long getRetryOffset() {
306-
long processed = totalEncodedBytesProcessed.get();
307-
int pending = (pendingBuffer != null) ? pendingBuffer.remaining() : 0;
308-
return processed - pending;
306+
// Use the decoder's last complete segment start as the retry offset
307+
// This ensures we resume from a segment boundary, not mid-segment
308+
return decoder.getLastCompleteSegmentStart();
309309
}
310310

311311
/**

0 commit comments

Comments
 (0)