Skip to content

Commit 5a7388d

Browse files
authored
Message encoder (#43803)
* wip * basic message encoder logic working * removing print statements and making slightly more readable * fixing several bugs * adding more tests * adding comments for building purposes and to branch off for service testing * wip * wip * working encode that takes in bytebuffer * removing redundant 'this' and making small readability changes * fixing flag bug, adding more comments, adjusting incorrect test case * adding structuredmessage package in implemenation and addressing other comments * refactoring * adding more tests, adjusting test encoder, adding more validation * re-ordering empty buffer return * adding correct comments * addressing comments and fixing ci issues * trying to resolve spotbugs issue * formatting
1 parent 5416c4f commit 5a7388d

File tree

6 files changed

+605
-0
lines changed

6 files changed

+605
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,260 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.storage.common.implementation.structuredmessage;
5+
6+
import com.azure.core.util.logging.ClientLogger;
7+
import com.azure.storage.common.implementation.StorageCrc64Calculator;
8+
import com.azure.storage.common.implementation.StorageImplUtils;
9+
10+
import java.io.IOException;
11+
import java.nio.ByteBuffer;
12+
import java.io.ByteArrayOutputStream;
13+
import java.nio.ByteOrder;
14+
import java.util.HashMap;
15+
import java.util.Map;
16+
17+
/**
18+
* Encoder for structured messages with support for segmenting and CRC64 checksums.
19+
*/
20+
public class StructuredMessageEncoder {
21+
private static final int DEFAULT_MESSAGE_VERSION = 1;
22+
private static final int V1_HEADER_LENGTH = 13;
23+
private static final int V1_SEGMENT_HEADER_LENGTH = 10;
24+
private static final int CRC64_LENGTH = 8;
25+
private static final ClientLogger LOGGER = new ClientLogger(StructuredMessageEncoder.class);
26+
27+
private final int messageVersion;
28+
private final int contentLength;
29+
private final int messageLength;
30+
private final StructuredMessageFlags structuredMessageFlags;
31+
private final int segmentSize;
32+
private final int numSegments;
33+
34+
private int currentContentOffset;
35+
private int currentSegmentNumber;
36+
private int currentSegmentOffset;
37+
private int currentMessageLength;
38+
private long messageCRC64;
39+
private final Map<Integer, Long> segmentCRC64s;
40+
41+
/**
42+
* Constructs a new StructuredMessageEncoder.
43+
* @param contentLength The length of the content to be encoded.
44+
* @param segmentSize The size of each segment.
45+
* @param structuredMessageFlags The structuredMessageFlags to be set.
46+
* @throws IllegalArgumentException If the segment size is less than 1, the content length is less than 1, or the
47+
* number of segments is greater than {@link java.lang.Short#MAX_VALUE}.
48+
*/
49+
public StructuredMessageEncoder(int contentLength, int segmentSize, StructuredMessageFlags structuredMessageFlags) {
50+
if (segmentSize < 1) {
51+
StorageImplUtils.assertInBounds("segmentSize", segmentSize, 1, Long.MAX_VALUE);
52+
}
53+
if (contentLength < 1) {
54+
StorageImplUtils.assertInBounds("contentLength", contentLength, 1, Long.MAX_VALUE);
55+
}
56+
57+
this.messageVersion = DEFAULT_MESSAGE_VERSION;
58+
this.contentLength = contentLength;
59+
this.structuredMessageFlags = structuredMessageFlags;
60+
this.segmentSize = segmentSize;
61+
this.numSegments = Math.max(1, (int) Math.ceil((double) this.contentLength / this.segmentSize));
62+
this.messageLength = calculateMessageLength();
63+
this.currentContentOffset = 0;
64+
this.currentSegmentNumber = 0;
65+
this.currentSegmentOffset = 0;
66+
this.messageCRC64 = 0;
67+
this.segmentCRC64s = new HashMap<>();
68+
this.currentMessageLength = 0;
69+
70+
if (numSegments > Short.MAX_VALUE) {
71+
StorageImplUtils.assertInBounds("numSegments", numSegments, 1, Short.MAX_VALUE);
72+
}
73+
}
74+
75+
private int getMessageHeaderLength() {
76+
return V1_HEADER_LENGTH;
77+
}
78+
79+
private int getSegmentHeaderLength() {
80+
return V1_SEGMENT_HEADER_LENGTH;
81+
}
82+
83+
private int getSegmentFooterLength() {
84+
return (structuredMessageFlags == StructuredMessageFlags.STORAGE_CRC64) ? CRC64_LENGTH : 0;
85+
}
86+
87+
private int getMessageFooterLength() {
88+
return (structuredMessageFlags == StructuredMessageFlags.STORAGE_CRC64) ? CRC64_LENGTH : 0;
89+
}
90+
91+
private int getSegmentContentLength() {
92+
// last segment size is remaining content
93+
if (currentSegmentNumber == numSegments) {
94+
return contentLength - ((currentSegmentNumber - 1) * segmentSize);
95+
} else {
96+
return segmentSize;
97+
}
98+
}
99+
100+
private byte[] generateMessageHeader() {
101+
// 1 byte version, 8 byte size, 2 byte structuredMessageFlags, 2 byte numSegments
102+
ByteBuffer buffer = ByteBuffer.allocate(getMessageHeaderLength()).order(ByteOrder.LITTLE_ENDIAN);
103+
buffer.put((byte) messageVersion);
104+
buffer.putLong(messageLength);
105+
buffer.putShort((short) structuredMessageFlags.getValue());
106+
buffer.putShort((short) numSegments);
107+
108+
return buffer.array();
109+
}
110+
111+
private byte[] generateSegmentHeader() {
112+
int segmentHeaderSize = Math.min(segmentSize, contentLength - currentContentOffset);
113+
// 2 byte number, 8 byte size
114+
ByteBuffer buffer = ByteBuffer.allocate(getSegmentHeaderLength()).order(ByteOrder.LITTLE_ENDIAN);
115+
buffer.putShort((short) currentSegmentNumber);
116+
buffer.putLong(segmentHeaderSize);
117+
118+
return buffer.array();
119+
}
120+
121+
/**
122+
* Encodes the given buffer into a structured message format.
123+
*
124+
* @param unencodedBuffer The buffer to be encoded.
125+
* @return The encoded buffer.
126+
* @throws IOException If an error occurs while encoding the buffer.
127+
* @throws IllegalArgumentException If the buffer length exceeds the content length, or the content has already been
128+
* encoded.
129+
*/
130+
public ByteBuffer encode(ByteBuffer unencodedBuffer) throws IOException {
131+
StorageImplUtils.assertNotNull("unencodedBuffer", unencodedBuffer);
132+
133+
if (currentContentOffset == contentLength) {
134+
throw LOGGER.logExceptionAsError(new IllegalArgumentException("Content has already been encoded."));
135+
}
136+
137+
if ((unencodedBuffer.remaining() + currentContentOffset) > contentLength) {
138+
throw LOGGER.logExceptionAsError(new IllegalArgumentException("Buffer length exceeds content length."));
139+
}
140+
141+
if (!unencodedBuffer.hasRemaining()) {
142+
return ByteBuffer.allocate(0);
143+
}
144+
145+
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
146+
147+
// if we are at the beginning of the message, encode message header
148+
if (currentMessageLength == 0) {
149+
encodeMessageHeader(byteArrayOutputStream);
150+
}
151+
152+
while (unencodedBuffer.hasRemaining()) {
153+
// if we are at the beginning of a segment's content, encode segment header
154+
if (currentSegmentOffset == 0) {
155+
encodeSegmentHeader(byteArrayOutputStream);
156+
}
157+
158+
encodeSegmentContent(unencodedBuffer, byteArrayOutputStream);
159+
160+
// if we are at the end of a segment's content, encode segment footer
161+
if (currentSegmentOffset == getSegmentContentLength()) {
162+
encodeSegmentFooter(byteArrayOutputStream);
163+
}
164+
}
165+
166+
// if all content has been encoded, encode message footer
167+
if (currentContentOffset == contentLength) {
168+
encodeMessageFooter(byteArrayOutputStream);
169+
}
170+
171+
return ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
172+
}
173+
174+
private void encodeMessageHeader(ByteArrayOutputStream output) {
175+
byte[] metadata = generateMessageHeader();
176+
output.write(metadata, 0, metadata.length);
177+
178+
currentMessageLength += metadata.length;
179+
}
180+
181+
private void encodeSegmentHeader(ByteArrayOutputStream output) {
182+
incrementCurrentSegment();
183+
byte[] metadata = generateSegmentHeader();
184+
output.write(metadata, 0, metadata.length);
185+
186+
currentMessageLength += metadata.length;
187+
}
188+
189+
private void encodeSegmentFooter(ByteArrayOutputStream output) {
190+
byte[] metadata;
191+
if (structuredMessageFlags == StructuredMessageFlags.STORAGE_CRC64) {
192+
metadata = ByteBuffer.allocate(CRC64_LENGTH)
193+
.order(ByteOrder.LITTLE_ENDIAN)
194+
.putLong(segmentCRC64s.get(currentSegmentNumber))
195+
.array();
196+
} else {
197+
metadata = new byte[0];
198+
}
199+
output.write(metadata, 0, metadata.length);
200+
201+
currentMessageLength += metadata.length;
202+
currentSegmentOffset = 0;
203+
}
204+
205+
private void encodeMessageFooter(ByteArrayOutputStream output) {
206+
byte[] metadata;
207+
if (structuredMessageFlags == StructuredMessageFlags.STORAGE_CRC64) {
208+
metadata = ByteBuffer.allocate(CRC64_LENGTH).order(ByteOrder.LITTLE_ENDIAN).putLong(messageCRC64).array();
209+
} else {
210+
metadata = new byte[0];
211+
}
212+
213+
output.write(metadata, 0, metadata.length);
214+
currentMessageLength += metadata.length;
215+
}
216+
217+
private void encodeSegmentContent(ByteBuffer unencodedBuffer, ByteArrayOutputStream output) {
218+
int readSize = Math.min(unencodedBuffer.remaining(), getSegmentContentLength() - currentSegmentOffset);
219+
220+
byte[] content = new byte[readSize];
221+
unencodedBuffer.get(content, 0, readSize);
222+
223+
if (structuredMessageFlags == StructuredMessageFlags.STORAGE_CRC64) {
224+
segmentCRC64s.put(currentSegmentNumber,
225+
StorageCrc64Calculator.compute(content, segmentCRC64s.get(currentSegmentNumber)));
226+
messageCRC64 = StorageCrc64Calculator.compute(content, messageCRC64);
227+
}
228+
229+
currentContentOffset += readSize;
230+
currentSegmentOffset += readSize;
231+
232+
output.write(content, 0, content.length);
233+
currentMessageLength += readSize;
234+
}
235+
236+
private int calculateMessageLength() {
237+
int length = getMessageHeaderLength();
238+
239+
length += (getSegmentHeaderLength() + getSegmentFooterLength()) * numSegments;
240+
length += contentLength;
241+
length += getMessageFooterLength();
242+
return length;
243+
}
244+
245+
private void incrementCurrentSegment() {
246+
currentSegmentNumber++;
247+
if (structuredMessageFlags == StructuredMessageFlags.STORAGE_CRC64) {
248+
segmentCRC64s.putIfAbsent(currentSegmentNumber, 0L);
249+
}
250+
}
251+
252+
/**
253+
* Returns the length of the message.
254+
*
255+
* @return The length of the message.
256+
*/
257+
public int getMessageLength() {
258+
return messageLength;
259+
}
260+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.storage.common.implementation.structuredmessage;
5+
6+
/**
7+
* Defines values for StructuredMessageFlags.
8+
*/
9+
public enum StructuredMessageFlags {
10+
/**
11+
* No flags set.
12+
*/
13+
NONE(0),
14+
15+
/**
16+
* StructuredMessageFlag indicating the use of CRC64.
17+
*/
18+
STORAGE_CRC64(1);
19+
20+
/**
21+
* The actual serialized value for a StructuredMessageFlags instance.
22+
*/
23+
private final int value;
24+
25+
StructuredMessageFlags(int value) {
26+
this.value = value;
27+
}
28+
29+
/**
30+
* Parses a serialized value to a StructuredMessageFlags instance.
31+
*
32+
* @param value the serialized value to parse.
33+
* @return the parsed StructuredMessageFlags object, or null if unable to parse.
34+
*/
35+
public static StructuredMessageFlags fromString(String value) {
36+
if (value == null) {
37+
return null;
38+
}
39+
StructuredMessageFlags[] items = StructuredMessageFlags.values();
40+
for (StructuredMessageFlags item : items) {
41+
if (item.getValue() == Integer.parseInt(value)) {
42+
return item;
43+
}
44+
}
45+
return null;
46+
}
47+
48+
/**
49+
* Returns the value for a StructuredMessageFlags instance.
50+
*
51+
* @return the integer value of the StructuredMessageFlags object.
52+
*/
53+
public int getValue() {
54+
return value;
55+
}
56+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
/**
5+
* Package containing classes for structured message encoding and decoding.
6+
*/
7+
package com.azure.storage.common.implementation.structuredmessage;

sdk/storage/azure-storage-common/src/main/java/module-info.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,7 @@
2525
exports com.azure.storage.common.implementation.connectionstring to // FIXME this should not be a long-term solution
2626
com.azure.data.tables, com.azure.storage.blob, com.azure.storage.blob.cryptography,
2727
com.azure.storage.file.share, com.azure.storage.file.datalake, com.azure.storage.queue;
28+
29+
exports com.azure.storage.common.implementation.structuredmessage
30+
to com.azure.storage.blob, com.azure.storage.file.share, com.azure.storage.file.datalake;
2831
}

0 commit comments

Comments
 (0)