66import com .azure .core .util .logging .ClientLogger ;
77import com .azure .storage .common .implementation .StorageCrc64Calculator ;
88import com .azure .storage .common .implementation .StorageImplUtils ;
9+ import reactor .core .publisher .Flux ;
910
10- import java .io .IOException ;
1111import java .nio .ByteBuffer ;
12- import java .io .ByteArrayOutputStream ;
1312import java .nio .ByteOrder ;
13+ import java .util .ArrayList ;
1414import java .util .HashMap ;
15+ import java .util .List ;
1516import java .util .Map ;
1617
1718import static com .azure .storage .common .implementation .structuredmessage .StructuredMessageConstants .CRC64_LENGTH ;
@@ -35,7 +36,6 @@ public class StructuredMessageEncoder {
3536 private int currentContentOffset ;
3637 private int currentSegmentNumber ;
3738 private int currentSegmentOffset ;
38- private int currentMessageLength ;
3939 private long messageCRC64 ;
4040 private final Map <Integer , Long > segmentCRC64s ;
4141
@@ -66,7 +66,6 @@ public StructuredMessageEncoder(int contentLength, int segmentSize, StructuredMe
6666 this .currentSegmentOffset = 0 ;
6767 this .messageCRC64 = 0 ;
6868 this .segmentCRC64s = new HashMap <>();
69- this .currentMessageLength = 0 ;
7069
7170 if (numSegments > Short .MAX_VALUE ) {
7271 StorageImplUtils .assertInBounds ("numSegments" , numSegments , 1 , Short .MAX_VALUE );
@@ -109,115 +108,99 @@ private byte[] generateMessageHeader() {
109108 return buffer .array ();
110109 }
111110
112- private byte [] generateSegmentHeader () {
113- int segmentHeaderSize = Math .min (segmentSize , contentLength - currentContentOffset );
114- // 2 byte number, 8 byte size
111+ private byte [] generateSegmentHeader (int segmentContentSize ) {
115112 ByteBuffer buffer = ByteBuffer .allocate (getSegmentHeaderLength ()).order (ByteOrder .LITTLE_ENDIAN );
116113 buffer .putShort ((short ) currentSegmentNumber );
117- buffer .putLong (segmentHeaderSize );
114+ buffer .putLong (segmentContentSize );
118115
119116 return buffer .array ();
120117 }
121118
122119 /**
123- * Encodes the given buffer into a structured message format.
120+ * Encodes the given buffer into a structured message format as a stream of ByteBuffers .
124121 *
125122 * @param unencodedBuffer The buffer to be encoded.
126- * @return The encoded buffer.
127- * @throws IOException If an error occurs while encoding the buffer.
123+ * @return A Flux of encoded ByteBuffers.
128124 * @throws IllegalArgumentException If the buffer length exceeds the content length, or the content has already been
129125 * encoded.
130126 */
131- public ByteBuffer encode (ByteBuffer unencodedBuffer ) throws IOException {
127+ public Flux < ByteBuffer > encode (ByteBuffer unencodedBuffer ) {
132128 StorageImplUtils .assertNotNull ("unencodedBuffer" , unencodedBuffer );
133129
134130 if (currentContentOffset == contentLength ) {
135- throw LOGGER .logExceptionAsError (new IllegalArgumentException ("Content has already been encoded." ));
131+ return Flux
132+ .error (LOGGER .logExceptionAsError (new IllegalArgumentException ("Content has already been encoded." )));
136133 }
137134
138135 if ((unencodedBuffer .remaining () + currentContentOffset ) > contentLength ) {
139- throw LOGGER .logExceptionAsError (new IllegalArgumentException ("Buffer length exceeds content length." ));
136+ return Flux .error (
137+ LOGGER .logExceptionAsError (new IllegalArgumentException ("Buffer length exceeds content length." )));
140138 }
141139
142140 if (!unencodedBuffer .hasRemaining ()) {
143- return ByteBuffer . allocate ( 0 );
141+ return Flux . empty ( );
144142 }
145143
146- ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream ();
144+ return Flux .defer (() -> {
145+ List <ByteBuffer > buffers = new ArrayList <>();
147146
148- // if we are at the beginning of the message, encode message header
149- if (currentMessageLength == 0 ) {
150- encodeMessageHeader (byteArrayOutputStream );
151- }
152-
153- while (unencodedBuffer .hasRemaining ()) {
154- // if we are at the beginning of a segment's content, encode segment header
155- if (currentSegmentOffset == 0 ) {
156- encodeSegmentHeader (byteArrayOutputStream );
147+ // if we are at the beginning of the message, encode message header
148+ if (currentContentOffset == 0 ) {
149+ buffers .add (ByteBuffer .wrap (generateMessageHeader ()));
157150 }
158151
159- encodeSegmentContent (unencodedBuffer , byteArrayOutputStream );
160-
161- // if we are at the end of a segment's content, encode segment footer
162- if (currentSegmentOffset == getSegmentContentLength ()) {
163- encodeSegmentFooter (byteArrayOutputStream );
152+ while (unencodedBuffer .hasRemaining ()) {
153+ // if we are at the beginning of a segment's content, encode segment header
154+ if (currentSegmentOffset == 0 ) {
155+ incrementCurrentSegment ();
156+ // Calculate actual segment size based on remaining content
157+ int actualSegmentSize = Math .min (segmentSize , contentLength - currentContentOffset );
158+ buffers .add (ByteBuffer .wrap (generateSegmentHeader (actualSegmentSize )));
159+ }
160+
161+ buffers .add (encodeSegmentContent (unencodedBuffer ));
162+
163+ // if we are at the end of a segment's content, encode segment footer
164+ if (currentSegmentOffset == getSegmentContentLength ()) {
165+ byte [] footer = generateSegmentFooter ();
166+ if (footer .length > 0 ) {
167+ buffers .add (ByteBuffer .wrap (footer ));
168+ }
169+ currentSegmentOffset = 0 ;
170+ }
164171 }
165- }
166-
167- // if all content has been encoded, encode message footer
168- if (currentContentOffset == contentLength ) {
169- encodeMessageFooter (byteArrayOutputStream );
170- }
171-
172- return ByteBuffer .wrap (byteArrayOutputStream .toByteArray ());
173- }
174172
175- private void encodeMessageHeader (ByteArrayOutputStream output ) {
176- byte [] metadata = generateMessageHeader ();
177- output .write (metadata , 0 , metadata .length );
178-
179- currentMessageLength += metadata .length ;
180- }
181-
182- private void encodeSegmentHeader (ByteArrayOutputStream output ) {
183- incrementCurrentSegment ();
184- byte [] metadata = generateSegmentHeader ();
185- output .write (metadata , 0 , metadata .length );
173+ // if all content has been encoded, encode message footer
174+ if (currentContentOffset == contentLength ) {
175+ byte [] footer = generateMessageFooter ();
176+ if (footer .length > 0 ) {
177+ buffers .add (ByteBuffer .wrap (footer ));
178+ }
179+ }
186180
187- currentMessageLength += metadata .length ;
181+ return Flux .fromIterable (buffers );
182+ });
188183 }
189184
190- private void encodeSegmentFooter (ByteArrayOutputStream output ) {
191- byte [] metadata ;
185+ private byte [] generateSegmentFooter () {
192186 if (structuredMessageFlags == StructuredMessageFlags .STORAGE_CRC64 ) {
193- metadata = ByteBuffer .allocate (CRC64_LENGTH )
187+ return ByteBuffer .allocate (CRC64_LENGTH )
194188 .order (ByteOrder .LITTLE_ENDIAN )
195189 .putLong (segmentCRC64s .get (currentSegmentNumber ))
196190 .array ();
197- } else {
198- metadata = new byte [0 ];
199191 }
200- output .write (metadata , 0 , metadata .length );
201-
202- currentMessageLength += metadata .length ;
203- currentSegmentOffset = 0 ;
192+ return new byte [0 ];
204193 }
205194
206- private void encodeMessageFooter (ByteArrayOutputStream output ) {
207- byte [] metadata ;
195+ private byte [] generateMessageFooter () {
208196 if (structuredMessageFlags == StructuredMessageFlags .STORAGE_CRC64 ) {
209- metadata = ByteBuffer .allocate (CRC64_LENGTH ).order (ByteOrder .LITTLE_ENDIAN ).putLong (messageCRC64 ).array ();
210- } else {
211- metadata = new byte [0 ];
197+ return ByteBuffer .allocate (CRC64_LENGTH ).order (ByteOrder .LITTLE_ENDIAN ).putLong (messageCRC64 ).array ();
212198 }
213-
214- output .write (metadata , 0 , metadata .length );
215- currentMessageLength += metadata .length ;
199+ return new byte [0 ];
216200 }
217201
218- private void encodeSegmentContent (ByteBuffer unencodedBuffer , ByteArrayOutputStream output ) {
202+ private ByteBuffer encodeSegmentContent (ByteBuffer unencodedBuffer ) {
219203 int readSize = Math .min (unencodedBuffer .remaining (), getSegmentContentLength () - currentSegmentOffset );
220-
221204 byte [] content = new byte [readSize ];
222205 unencodedBuffer .get (content , 0 , readSize );
223206
@@ -230,8 +213,7 @@ private void encodeSegmentContent(ByteBuffer unencodedBuffer, ByteArrayOutputStr
230213 currentContentOffset += readSize ;
231214 currentSegmentOffset += readSize ;
232215
233- output .write (content , 0 , content .length );
234- currentMessageLength += readSize ;
216+ return ByteBuffer .wrap (content );
235217 }
236218
237219 private int calculateMessageLength () {
@@ -255,7 +237,7 @@ private void incrementCurrentSegment() {
255237 *
256238 * @return The length of the message.
257239 */
258- public int getMessageLength () {
240+ public long getEncodedMessageLength () {
259241 return messageLength ;
260242 }
261243}
0 commit comments