Skip to content

Commit bf975a9

Browse files
Add comprehensive diagnostic logging for smart retry debugging
Added detailed logging to track decoder state and buffer positions: - readAndValidateSegmentLength() with hex dump of buffer contents - Enhanced resetToLastCompleteSegment() logging with all state details - Segment completion logging with offset and length details - Buffer reception logging in decodeStream with pending/new byte counts - Retry offset calculation logging with before/after decoder state This will help identify where decoder state and retry offset become misaligned. Co-authored-by: gunjansingh-msft <[email protected]>
1 parent 82ad940 commit bf975a9

File tree

2 files changed

+102
-14
lines changed

2 files changed

+102
-14
lines changed

sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/implementation/structuredmessage/StructuredMessageDecoder.java

Lines changed: 77 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -75,12 +75,21 @@ public long getMessageOffset() {
7575
*/
7676
public void resetToLastCompleteSegment() {
7777
if (messageOffset != lastCompleteSegmentStart) {
78-
LOGGER.verbose("Resetting decoder from offset {} to last complete segment boundary {}", messageOffset,
79-
lastCompleteSegmentStart);
78+
LOGGER.atInfo()
79+
.addKeyValue("fromOffset", messageOffset)
80+
.addKeyValue("toOffset", lastCompleteSegmentStart)
81+
.addKeyValue("currentSegmentNum", currentSegmentNumber)
82+
.addKeyValue("currentSegmentContentOffset", currentSegmentContentOffset)
83+
.addKeyValue("currentSegmentContentLength", currentSegmentContentLength)
84+
.log("Resetting decoder to last complete segment boundary");
8085
messageOffset = lastCompleteSegmentStart;
8186
// Reset current segment state - next decode will read the segment header
8287
currentSegmentContentOffset = 0;
8388
currentSegmentContentLength = 0;
89+
} else {
90+
LOGGER.atVerbose()
91+
.addKeyValue("offset", messageOffset)
92+
.log("Decoder already at last complete segment boundary, no reset needed");
8493
}
8594
}
8695

@@ -118,6 +127,65 @@ private void readMessageHeader(ByteBuffer buffer) {
118127
messageOffset += V1_HEADER_LENGTH;
119128
}
120129

130+
/**
131+
* Converts a ByteBuffer range to hex string for diagnostic purposes.
132+
*/
133+
private static String toHex(ByteBuffer buf, int len) {
134+
int pos = buf.position();
135+
int peek = Math.min(len, buf.remaining());
136+
byte[] out = new byte[peek];
137+
buf.get(out, 0, peek);
138+
buf.position(pos);
139+
StringBuilder sb = new StringBuilder();
140+
for (int i = 0; i < out.length; i++) {
141+
sb.append(String.format("%02X", out[i]));
142+
if (i < out.length - 1)
143+
sb.append(' ');
144+
}
145+
return sb.toString();
146+
}
147+
148+
/**
149+
* Reads and validates segment length with diagnostic logging.
150+
*/
151+
private long readAndValidateSegmentLength(ByteBuffer buffer, long remaining) {
152+
final int SEGMENT_SIZE_BYTES = 8; // segment size is 8 bytes (long)
153+
if (buffer.remaining() < SEGMENT_SIZE_BYTES) {
154+
LOGGER.error("Not enough bytes to read segment size. pos={}, remaining={}", buffer.position(),
155+
buffer.remaining());
156+
throw new IllegalStateException("Not enough bytes to read segment size");
157+
}
158+
159+
// Diagnostic: dump first 16 bytes at this position so we can see what's being read
160+
LOGGER.atInfo()
161+
.addKeyValue("decoderOffset", messageOffset)
162+
.addKeyValue("bufferPos", buffer.position())
163+
.addKeyValue("bufferRemaining", buffer.remaining())
164+
.addKeyValue("peek16", toHex(buffer, 16))
165+
.addKeyValue("lastCompleteSegment", lastCompleteSegmentStart)
166+
.log("Decoder about to read segment length");
167+
168+
long segmentLength = buffer.getLong();
169+
170+
if (segmentLength < 0 || segmentLength > remaining) {
171+
// Peek next bytes for extra detail
172+
String peekNext = toHex(buffer, 16);
173+
LOGGER.error(
174+
"Invalid segment length read: segmentLength={}, remaining={}, decoderOffset={}, "
175+
+ "lastCompleteSegment={}, bufferPos={}, peek-next-bytes={}",
176+
segmentLength, remaining, messageOffset, lastCompleteSegmentStart, buffer.position(), peekNext);
177+
throw new IllegalArgumentException("Invalid segment size detected: " + segmentLength + " (remaining="
178+
+ remaining + ", decoderOffset=" + messageOffset + ")");
179+
}
180+
181+
LOGGER.atVerbose()
182+
.addKeyValue("segmentLength", segmentLength)
183+
.addKeyValue("decoderOffset", messageOffset)
184+
.log("Valid segment length read");
185+
186+
return segmentLength;
187+
}
188+
121189
/**
122190
* Reads the segment header from the given buffer.
123191
*
@@ -134,14 +202,8 @@ private void readSegmentHeader(ByteBuffer buffer) {
134202

135203
int segmentNum = Short.toUnsignedInt(buffer.getShort());
136204

137-
// Read segment size as long (8 bytes)
138-
long segmentSize = buffer.getLong();
139-
140-
// Validate segment size
141-
if (segmentSize < 0L || segmentSize > buffer.remaining()) {
142-
throw LOGGER
143-
.logExceptionAsError(new IllegalArgumentException("Invalid segment size detected: " + segmentSize));
144-
}
205+
// Read segment size with validation and diagnostics
206+
long segmentSize = readAndValidateSegmentLength(buffer, buffer.remaining());
145207

146208
if (segmentNum != currentSegmentNumber + 1) {
147209
throw LOGGER.logExceptionAsError(new IllegalArgumentException("Unexpected segment number."));
@@ -230,7 +292,11 @@ private void readSegmentFooter(ByteBuffer buffer) {
230292
// Mark that this segment is complete - update the last complete segment boundary
231293
// This is the position where we can safely resume if a retry occurs
232294
lastCompleteSegmentStart = messageOffset;
233-
LOGGER.verbose("Segment {} complete at byte offset {}", currentSegmentNumber, lastCompleteSegmentStart);
295+
LOGGER.atInfo()
296+
.addKeyValue("segmentNum", currentSegmentNumber)
297+
.addKeyValue("offset", lastCompleteSegmentStart)
298+
.addKeyValue("segmentLength", currentSegmentContentLength)
299+
.log("Segment complete at byte offset");
234300

235301
if (currentSegmentNumber == numSegments) {
236302
readMessageFooter(buffer);

sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/policy/StorageContentValidationDecoderPolicy.java

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,15 @@ private Flux<ByteBuffer> decodeStream(Flux<ByteBuffer> encodedFlux, DecoderState
8989
int newBytesReceived = encodedBuffer.remaining();
9090
state.totalEncodedBytesProcessed.addAndGet(newBytesReceived);
9191

92+
int pendingSize = (state.pendingBuffer != null) ? state.pendingBuffer.remaining() : 0;
93+
LOGGER.atInfo()
94+
.addKeyValue("newBytes", newBytesReceived)
95+
.addKeyValue("pendingBytes", pendingSize)
96+
.addKeyValue("totalProcessed", state.totalEncodedBytesProcessed.get())
97+
.addKeyValue("decoderOffset", state.decoder.getMessageOffset())
98+
.addKeyValue("lastCompleteSegment", state.decoder.getLastCompleteSegmentStart())
99+
.log("Received buffer in decodeStream");
100+
92101
// Combine with pending data if any
93102
ByteBuffer dataToProcess = state.combineWithPending(encodedBuffer);
94103

@@ -309,6 +318,15 @@ public long getRetryOffset() {
309318
// Use the decoder's last complete segment start as the retry offset
310319
// This ensures we resume from a segment boundary, not mid-segment
311320
long retryOffset = decoder.getLastCompleteSegmentStart();
321+
long decoderOffsetBefore = decoder.getMessageOffset();
322+
int pendingSize = (pendingBuffer != null) ? pendingBuffer.remaining() : 0;
323+
324+
LOGGER.atInfo()
325+
.addKeyValue("retryOffset", retryOffset)
326+
.addKeyValue("decoderOffsetBefore", decoderOffsetBefore)
327+
.addKeyValue("pendingBytes", pendingSize)
328+
.addKeyValue("totalProcessed", totalEncodedBytesProcessed.get())
329+
.log("Computing retry offset");
312330

313331
// Reset decoder to the last complete segment boundary
314332
// This ensures messageOffset and segment state match the retry offset
@@ -317,12 +335,16 @@ public long getRetryOffset() {
317335
// Clear pending buffer since we're restarting from the segment boundary
318336
// Any bytes in pending are from after this boundary and will be re-fetched
319337
if (pendingBuffer != null && pendingBuffer.hasRemaining()) {
320-
LOGGER.verbose("Clearing {} pending bytes for retry from segment boundary {}",
321-
pendingBuffer.remaining(), retryOffset);
338+
LOGGER.atInfo()
339+
.addKeyValue("pendingBytes", pendingBuffer.remaining())
340+
.addKeyValue("retryOffset", retryOffset)
341+
.log("Clearing pending bytes for retry from segment boundary");
322342
pendingBuffer = null;
323343
}
324344

325-
LOGGER.verbose("Retry offset calculated: {} (last complete segment boundary)", retryOffset);
345+
LOGGER.atInfo()
346+
.addKeyValue("retryOffset", retryOffset)
347+
.log("Retry offset calculated (last complete segment boundary)");
326348
return retryOffset;
327349
}
328350

0 commit comments

Comments
 (0)