Skip to content

Commit ff9bbf4

Browse files
adding the StructuredMessageDecoder
1 parent 25c061d commit ff9bbf4

File tree

1 file changed

+267
-0
lines changed

1 file changed

+267
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,267 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.storage.common.implementation.structuredmessage;
5+
6+
import com.azure.core.util.logging.ClientLogger;
7+
import com.azure.storage.common.implementation.StorageCrc64Calculator;
8+
9+
import java.io.ByteArrayOutputStream;
10+
import java.nio.ByteBuffer;
11+
import java.nio.ByteOrder;
12+
import java.util.HashMap;
13+
import java.util.Map;
14+
15+
import static com.azure.storage.common.implementation.structuredmessage.StructuredMessageConstants.CRC64_LENGTH;
16+
import static com.azure.storage.common.implementation.structuredmessage.StructuredMessageConstants.DEFAULT_MESSAGE_VERSION;
17+
import static com.azure.storage.common.implementation.structuredmessage.StructuredMessageConstants.V1_HEADER_LENGTH;
18+
import static com.azure.storage.common.implementation.structuredmessage.StructuredMessageConstants.V1_SEGMENT_HEADER_LENGTH;
19+
20+
/**
21+
* Decoder for structured messages with support for segmenting and CRC64 checksums.
22+
*/
23+
public class StructuredMessageDecoder {
24+
private static final ClientLogger LOGGER = new ClientLogger(StructuredMessageDecoder.class);
25+
private long messageLength;
26+
private StructuredMessageFlags flags;
27+
private int numSegments;
28+
private final long expectedContentLength;
29+
30+
private int messageOffset = 0;
31+
private int currentSegmentNumber = 0;
32+
private int currentSegmentContentLength = 0;
33+
private int currentSegmentContentOffset = 0;
34+
35+
private long messageCrc64 = 0;
36+
private long segmentCrc64 = 0;
37+
private final Map<Integer, Long> segmentCrcs = new HashMap<>();
38+
39+
/**
40+
* Constructs a new StructuredMessageDecoder.
41+
*
42+
* @param expectedContentLength The expected length of the content to be decoded.
43+
*/
44+
public StructuredMessageDecoder(long expectedContentLength) {
45+
this.expectedContentLength = expectedContentLength;
46+
}
47+
48+
/**
49+
* Reads the message header from the given buffer.
50+
*
51+
* @param buffer The buffer containing the message header.
52+
* @throws IllegalArgumentException if the buffer does not contain a valid message header.
53+
*/
54+
private void readMessageHeader(ByteBuffer buffer) {
55+
if (buffer.remaining() < V1_HEADER_LENGTH) {
56+
throw LOGGER.logExceptionAsError(
57+
new IllegalArgumentException("Content not long enough to contain a valid " + "message header."));
58+
}
59+
60+
int messageVersion = Byte.toUnsignedInt(buffer.get());
61+
if (messageVersion != DEFAULT_MESSAGE_VERSION) {
62+
throw LOGGER.logExceptionAsError(
63+
new IllegalArgumentException("Unsupported structured message version: " + messageVersion));
64+
}
65+
66+
messageLength = (int) buffer.getLong();
67+
if (messageLength < V1_HEADER_LENGTH) {
68+
throw LOGGER.logExceptionAsError(
69+
new IllegalArgumentException("Content not long enough to contain a valid " + "message header."));
70+
}
71+
if (messageLength != expectedContentLength) {
72+
throw LOGGER.logExceptionAsError(new IllegalArgumentException("Structured message length " + messageLength
73+
+ " did not match content length " + expectedContentLength));
74+
}
75+
76+
flags = StructuredMessageFlags.fromValue(Short.toUnsignedInt(buffer.getShort()));
77+
numSegments = Short.toUnsignedInt(buffer.getShort());
78+
79+
messageOffset += V1_HEADER_LENGTH;
80+
}
81+
82+
/**
83+
* Reads the segment header from the given buffer.
84+
*
85+
* @param buffer The buffer containing the segment header.
86+
* @throws IllegalArgumentException if the buffer does not contain a valid segment header.
87+
*/
88+
private void readSegmentHeader(ByteBuffer buffer) {
89+
if (buffer.remaining() < V1_SEGMENT_HEADER_LENGTH) {
90+
throw LOGGER.logExceptionAsError(new IllegalArgumentException("Segment header is incomplete."));
91+
}
92+
93+
int segmentNum = Short.toUnsignedInt(buffer.getShort());
94+
int segmentSize = (int) buffer.getLong();
95+
96+
if (segmentSize < 0 || segmentSize > buffer.remaining()) {
97+
throw LOGGER
98+
.logExceptionAsError(new IllegalArgumentException("Invalid segment size detected: " + segmentSize));
99+
}
100+
101+
if (segmentNum != currentSegmentNumber + 1) {
102+
throw LOGGER.logExceptionAsError(new IllegalArgumentException("Unexpected segment number."));
103+
}
104+
105+
currentSegmentNumber = segmentNum;
106+
currentSegmentContentLength = segmentSize;
107+
currentSegmentContentOffset = 0;
108+
109+
if (segmentSize == 0) {
110+
readSegmentFooter(buffer);
111+
}
112+
113+
if (flags == StructuredMessageFlags.STORAGE_CRC64) {
114+
segmentCrc64 = 0;
115+
}
116+
117+
messageOffset += V1_SEGMENT_HEADER_LENGTH;
118+
}
119+
120+
/**
121+
* Reads the segment content from the given buffer and writes it to the output stream.
122+
*
123+
* @param buffer The buffer containing the segment content.
124+
* @param output The output stream to write the segment content to.
125+
* @param size The maximum number of bytes to read.
126+
* @throws IllegalArgumentException if there is a segment size mismatch.
127+
*/
128+
private void readSegmentContent(ByteBuffer buffer, ByteArrayOutputStream output, int size) {
129+
int toRead = Math.min(buffer.remaining(), currentSegmentContentLength - currentSegmentContentOffset);
130+
toRead = Math.min(toRead, size);
131+
132+
if (toRead == 0) {
133+
return;
134+
}
135+
136+
byte[] content = new byte[toRead];
137+
buffer.get(content);
138+
output.write(content, 0, toRead);
139+
140+
if (flags == StructuredMessageFlags.STORAGE_CRC64) {
141+
segmentCrc64 = StorageCrc64Calculator.compute(content, segmentCrc64);
142+
messageCrc64 = StorageCrc64Calculator.compute(content, messageCrc64);
143+
}
144+
145+
messageOffset += toRead;
146+
currentSegmentContentOffset += toRead;
147+
148+
if (currentSegmentContentOffset > currentSegmentContentLength) {
149+
throw LOGGER.logExceptionAsError(
150+
new IllegalArgumentException("Segment size mismatch detected in segment " + currentSegmentNumber));
151+
}
152+
153+
if (currentSegmentContentOffset == currentSegmentContentLength) {
154+
readSegmentFooter(buffer);
155+
}
156+
}
157+
158+
/**
159+
* Reads the segment footer from the given buffer.
160+
*
161+
* @param buffer The buffer containing the segment footer.
162+
* @throws IllegalArgumentException if the buffer does not contain a valid segment footer.
163+
*/
164+
private void readSegmentFooter(ByteBuffer buffer) {
165+
if (currentSegmentContentOffset != currentSegmentContentLength) {
166+
throw LOGGER.logExceptionAsError(
167+
new IllegalArgumentException("Segment content length mismatch in segment " + currentSegmentNumber
168+
+ ". Expected: " + currentSegmentContentLength + ", Read: " + currentSegmentContentOffset));
169+
}
170+
171+
if (flags == StructuredMessageFlags.STORAGE_CRC64) {
172+
if (buffer.remaining() < CRC64_LENGTH) {
173+
throw LOGGER.logExceptionAsError(new IllegalArgumentException("Segment footer is incomplete."));
174+
}
175+
176+
long reportedCrc64 = buffer.getLong();
177+
if (segmentCrc64 != reportedCrc64) {
178+
throw LOGGER.logExceptionAsError(
179+
new IllegalArgumentException("CRC64 mismatch detected in segment " + currentSegmentNumber));
180+
}
181+
segmentCrcs.put(currentSegmentNumber, segmentCrc64);
182+
messageOffset += CRC64_LENGTH;
183+
}
184+
185+
if (currentSegmentNumber == numSegments) {
186+
readMessageFooter(buffer);
187+
} else {
188+
readSegmentHeader(buffer);
189+
}
190+
}
191+
192+
/**
193+
* Reads the segment footer from the given buffer.
194+
*
195+
* @param buffer The buffer containing the segment footer.
196+
* @throws IllegalArgumentException if the buffer does not contain a valid segment footer.
197+
*/
198+
private void readMessageFooter(ByteBuffer buffer) {
199+
if (flags == StructuredMessageFlags.STORAGE_CRC64) {
200+
if (buffer.remaining() < CRC64_LENGTH) {
201+
throw LOGGER.logExceptionAsError(new IllegalArgumentException("Message footer is incomplete."));
202+
}
203+
204+
long reportedCrc = buffer.getLong();
205+
if (messageCrc64 != reportedCrc) {
206+
throw LOGGER.logExceptionAsError(
207+
new IllegalArgumentException("CRC64 mismatch detected in message " + "footer."));
208+
}
209+
messageOffset += CRC64_LENGTH;
210+
}
211+
212+
if (messageOffset != messageLength) {
213+
throw LOGGER.logExceptionAsError(
214+
new IllegalArgumentException("Decoded message length does not match " + "expected length."));
215+
}
216+
}
217+
218+
/**
219+
* Decodes the structured message from the given buffer up to the specified size.
220+
*
221+
* @param buffer The buffer containing the structured message.
222+
* @param size The maximum number of bytes to decode.
223+
* @return A ByteBuffer containing the decoded message content.
224+
* @throws IllegalArgumentException if the buffer does not contain a valid structured message.
225+
*/
226+
public ByteBuffer decode(ByteBuffer buffer, int size) {
227+
buffer.order(ByteOrder.LITTLE_ENDIAN);
228+
ByteArrayOutputStream decodedContent = new ByteArrayOutputStream();
229+
230+
if (messageOffset == 0) {
231+
readMessageHeader(buffer);
232+
}
233+
234+
while (buffer.hasRemaining() && decodedContent.size() < size) {
235+
if (currentSegmentContentOffset == currentSegmentContentLength) {
236+
readSegmentHeader(buffer);
237+
}
238+
239+
readSegmentContent(buffer, decodedContent, size - decodedContent.size());
240+
}
241+
242+
return ByteBuffer.wrap(decodedContent.toByteArray());
243+
}
244+
245+
/**
246+
* Decodes the entire structured message from the given buffer.
247+
*
248+
* @param buffer The buffer containing the structured message.
249+
* @return A ByteBuffer containing the decoded message content.
250+
* @throws IllegalArgumentException if the buffer does not contain a valid structured message.
251+
*/
252+
public ByteBuffer decode(ByteBuffer buffer) {
253+
return decode(buffer, buffer.remaining());
254+
}
255+
256+
/**
257+
* Finalizes the decoding process and validates that the entire message has been decoded.
258+
*
259+
* @throws IllegalArgumentException if the decoded message length does not match the expected length.
260+
*/
261+
public void finalizeDecoding() {
262+
if (messageOffset != messageLength) {
263+
throw LOGGER.logExceptionAsError(new IllegalArgumentException("Decoded message length does not match "
264+
+ "expected length. Expected: " + messageLength + ", but was: " + messageOffset));
265+
}
266+
}
267+
}

0 commit comments

Comments
 (0)