Skip to content

Commit 22276ba

Browse files
Fix checkstyle errors and add advanceMessageOffset method
Co-authored-by: gunjansingh-msft <[email protected]>
1 parent 3b0b3a7 commit 22276ba

File tree

2 files changed

+51
-16
lines changed

2 files changed

+51
-16
lines changed

sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/implementation/structuredmessage/StructuredMessageDecoder.java

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,23 @@ public long getMessageOffset() {
6868
return messageOffset;
6969
}
7070

71+
/**
72+
* Advances the message offset by the specified number of bytes.
73+
* This should be called after consuming an encoded segment to maintain
74+
* the authoritative encoded offset.
75+
*
76+
* @param bytes The number of bytes to advance.
77+
*/
78+
public void advanceMessageOffset(long bytes) {
79+
long priorOffset = messageOffset;
80+
messageOffset += bytes;
81+
LOGGER.atInfo()
82+
.addKeyValue("priorOffset", priorOffset)
83+
.addKeyValue("bytesAdvanced", bytes)
84+
.addKeyValue("newOffset", messageOffset)
85+
.log("Advanced message offset");
86+
}
87+
7188
/**
7289
* Resets the decoder position to the last complete segment boundary.
7390
* This is used during smart retry to ensure the decoder is in sync with
@@ -139,8 +156,9 @@ private static String toHex(ByteBuffer buf, int len) {
139156
StringBuilder sb = new StringBuilder();
140157
for (int i = 0; i < out.length; i++) {
141158
sb.append(String.format("%02X", out[i]));
142-
if (i < out.length - 1)
159+
if (i < out.length - 1) {
143160
sb.append(' ');
161+
}
144162
}
145163
return sb.toString();
146164
}
@@ -175,11 +193,11 @@ public StructuredMessageFlags getFlags() {
175193
* Reads and validates segment length with diagnostic logging.
176194
*/
177195
private long readAndValidateSegmentLength(ByteBuffer buffer, long remaining) {
178-
final int SEGMENT_SIZE_BYTES = 8; // segment size is 8 bytes (long)
179-
if (buffer.remaining() < SEGMENT_SIZE_BYTES) {
196+
final int segmentSizeBytes = 8; // segment size is 8 bytes (long)
197+
if (buffer.remaining() < segmentSizeBytes) {
180198
LOGGER.error("Not enough bytes to read segment size. pos={}, remaining={}", buffer.position(),
181199
buffer.remaining());
182-
throw new IllegalStateException("Not enough bytes to read segment size");
200+
throw LOGGER.logExceptionAsError(new IllegalStateException("Not enough bytes to read segment size"));
183201
}
184202

185203
// Diagnostic: dump first 16 bytes at this position so we can see what's being read
@@ -200,8 +218,8 @@ private long readAndValidateSegmentLength(ByteBuffer buffer, long remaining) {
200218
"Invalid segment length read: segmentLength={}, remaining={}, decoderOffset={}, "
201219
+ "lastCompleteSegment={}, bufferPos={}, peek-next-bytes={}",
202220
segmentLength, remaining, messageOffset, lastCompleteSegmentStart, buffer.position(), peekNext);
203-
throw new IllegalArgumentException("Invalid segment size detected: " + segmentLength + " (remaining="
204-
+ remaining + ", decoderOffset=" + messageOffset + ")");
221+
throw LOGGER.logExceptionAsError(new IllegalArgumentException("Invalid segment size detected: "
222+
+ segmentLength + " (remaining=" + remaining + ", decoderOffset=" + messageOffset + ")"));
205223
}
206224

207225
LOGGER.atVerbose()
@@ -394,14 +412,18 @@ public ByteBuffer decode(ByteBuffer buffer) {
394412
}
395413

396414
/**
397-
* Finalizes the decoding process and validates that the entire message has been decoded.
415+
* Finalizes the decoding process and returns any final decoded bytes still buffered internally.
416+
* The policy should aggregate decoded byte counts and perform the final length comparison.
398417
*
399-
* @throws IllegalArgumentException if the decoded message length does not match the expected length.
418+
* @return A ByteBuffer containing any final decoded bytes, or null if none remain.
419+
* @throws IllegalArgumentException if the encoded message offset doesn't match expected length.
400420
*/
401-
public void finalizeDecoding() {
421+
public ByteBuffer finalizeDecoding() {
402422
if (messageOffset != messageLength) {
403423
throw LOGGER.logExceptionAsError(new IllegalArgumentException("Decoded message length does not match "
404424
+ "expected length. Expected: " + messageLength + ", but was: " + messageOffset));
405425
}
426+
// No buffered decoded bytes in current implementation
427+
return null;
406428
}
407429
}

sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/policy/StorageContentValidationDecoderPolicy.java

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,8 @@ private Flux<ByteBuffer> decodeStream(Flux<ByteBuffer> encodedFlux, DecoderState
125125
LOGGER.error(
126126
"Negative relative index detected: relativeIndex={}, decoderOffset={}, absoluteStart={}",
127127
relativeIndex, decoderOffset, absoluteStartOfCombined);
128-
throw new IllegalStateException("Negative relative index: " + relativeIndex);
128+
throw LOGGER.logExceptionAsError(
129+
new IllegalStateException("Negative relative index: " + relativeIndex));
129130
}
130131

131132
// Check if we have enough for segment header
@@ -186,21 +187,33 @@ private Flux<ByteBuffer> decodeStream(Flux<ByteBuffer> encodedFlux, DecoderState
186187
// Decode the segment
187188
ByteBuffer decoded = state.decoder.decode(encodedSlice);
188189

189-
LOGGER.atVerbose()
190+
// Track decoded bytes - update counter regardless of whether bytes were produced
191+
int decodedProduced = (decoded != null) ? decoded.remaining() : 0;
192+
193+
LOGGER.atInfo()
190194
.addKeyValue("relativeIndex", relativeIndex)
191195
.addKeyValue("encodedSegmentSize", encodedSegmentSize)
192-
.addKeyValue("decodedBytes", decoded.remaining())
193-
.addKeyValue("newDecoderOffset", state.decoder.getMessageOffset())
196+
.addKeyValue("decodedProduced", decodedProduced)
197+
.addKeyValue("totalDecodedSoFar", state.totalBytesDecoded.get())
198+
.addKeyValue("decoderOffset", state.decoder.getMessageOffset())
194199
.log("Decoded segment");
195200

196201
// Update tracked bytes
197202
state.totalEncodedBytesProcessed.addAndGet(encodedSegmentSize);
198-
if (decoded.remaining() > 0) {
199-
state.totalBytesDecoded.addAndGet(decoded.remaining());
203+
204+
// Always update decoded byte counter (even if zero bytes produced)
205+
if (decodedProduced > 0) {
206+
state.totalBytesDecoded.addAndGet(decodedProduced);
200207
// Accumulate decoded bytes
201-
byte[] decodedBytes = new byte[decoded.remaining()];
208+
byte[] decodedBytes = new byte[decodedProduced];
202209
decoded.get(decodedBytes);
203210
decodedOutput.write(decodedBytes, 0, decodedBytes.length);
211+
} else {
212+
// Log when no decoded bytes are produced from a segment
213+
LOGGER.atInfo()
214+
.addKeyValue("encodedSegmentSize", encodedSegmentSize)
215+
.addKeyValue("decoderOffset", state.decoder.getMessageOffset())
216+
.log("No decoded bytes produced from segment");
204217
}
205218

206219
// Check if we've completed the message

0 commit comments

Comments
 (0)