99import reactor .core .publisher .Flux ;
1010
1111import java .nio .ByteBuffer ;
12- import java .io .ByteArrayOutputStream ;
1312import java .nio .ByteOrder ;
13+ import java .util .ArrayList ;
1414import java .util .HashMap ;
15+ import java .util .List ;
1516import java .util .Map ;
1617
1718import static com .azure .storage .common .implementation .structuredmessage .StructuredMessageConstants .CRC64_LENGTH ;
@@ -35,7 +36,6 @@ public class StructuredMessageEncoder {
3536 private int currentContentOffset ;
3637 private int currentSegmentNumber ;
3738 private int currentSegmentOffset ;
38- private int currentMessageLength ;
3939 private long messageCRC64 ;
4040 private final Map <Integer , Long > segmentCRC64s ;
4141
@@ -66,7 +66,6 @@ public StructuredMessageEncoder(int contentLength, int segmentSize, StructuredMe
6666 this .currentSegmentOffset = 0 ;
6767 this .messageCRC64 = 0 ;
6868 this .segmentCRC64s = new HashMap <>();
69- this .currentMessageLength = 0 ;
7069
7170 if (numSegments > Short .MAX_VALUE ) {
7271 StorageImplUtils .assertInBounds ("numSegments" , numSegments , 1 , Short .MAX_VALUE );
@@ -110,117 +109,98 @@ private byte[] generateMessageHeader() {
110109 }
111110
112111 private byte [] generateSegmentHeader () {
113- int segmentHeaderSize = Math .min (segmentSize , contentLength - currentContentOffset );
112+ int segmentContentSize = Math .min (segmentSize , contentLength - currentContentOffset );
114113 // 2 byte number, 8 byte size
115114 ByteBuffer buffer = ByteBuffer .allocate (getSegmentHeaderLength ()).order (ByteOrder .LITTLE_ENDIAN );
116115 buffer .putShort ((short ) currentSegmentNumber );
117- buffer .putLong (segmentHeaderSize );
116+ buffer .putLong (segmentContentSize );
118117
119118 return buffer .array ();
120119 }
121120
122- public Flux <ByteBuffer > encode (Flux <ByteBuffer > unencodedBufferFlux ) {
123- return unencodedBufferFlux .map (this ::encode ).filter (ByteBuffer ::hasRemaining );
124- }
125-
126121 /**
127- * Encodes the given buffer into a structured message format.
122+ * Encodes the given buffer into a structured message format as a stream of ByteBuffers .
128123 *
129124 * @param unencodedBuffer The buffer to be encoded.
130- * @return The encoded buffer .
125+ * @return A Flux of encoded ByteBuffers .
131126 * @throws IllegalArgumentException If the buffer length exceeds the content length, or the content has already been
132127 * encoded.
133128 */
134- public ByteBuffer encode (ByteBuffer unencodedBuffer ) {
129+ public Flux < ByteBuffer > encode (ByteBuffer unencodedBuffer ) {
135130 StorageImplUtils .assertNotNull ("unencodedBuffer" , unencodedBuffer );
136131
137132 if (currentContentOffset == contentLength ) {
138- throw LOGGER .logExceptionAsError (new IllegalArgumentException ("Content has already been encoded." ));
133+ return Flux
134+ .error (LOGGER .logExceptionAsError (new IllegalArgumentException ("Content has already been encoded." )));
139135 }
140136
141137 if ((unencodedBuffer .remaining () + currentContentOffset ) > contentLength ) {
142- throw LOGGER .logExceptionAsError (new IllegalArgumentException ("Buffer length exceeds content length." ));
138+ return Flux .error (
139+ LOGGER .logExceptionAsError (new IllegalArgumentException ("Buffer length exceeds content length." )));
143140 }
144141
145142 if (!unencodedBuffer .hasRemaining ()) {
146- return ByteBuffer . allocate ( 0 );
143+ return Flux . empty ( );
147144 }
148145
149- ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream ();
150-
151- // if we are at the beginning of the message, encode message header
152- if (currentMessageLength == 0 ) {
153- encodeMessageHeader (byteArrayOutputStream );
154- }
146+ return Flux .defer (() -> {
147+ List <ByteBuffer > buffers = new ArrayList <>();
155148
156- while (unencodedBuffer .hasRemaining ()) {
157- // if we are at the beginning of a segment's content, encode segment header
158- if (currentSegmentOffset == 0 ) {
159- encodeSegmentHeader (byteArrayOutputStream );
149+ // if we are at the beginning of the message, encode message header
150+ if (currentContentOffset == 0 ) {
151+ buffers .add (ByteBuffer .wrap (generateMessageHeader ()));
160152 }
161153
162- encodeSegmentContent (unencodedBuffer , byteArrayOutputStream );
163-
164- // if we are at the end of a segment's content, encode segment footer
165- if (currentSegmentOffset == getSegmentContentLength ()) {
166- encodeSegmentFooter (byteArrayOutputStream );
154+ while (unencodedBuffer .hasRemaining ()) {
155+ // if we are at the beginning of a segment's content, encode segment header
156+ if (currentSegmentOffset == 0 ) {
157+ incrementCurrentSegment ();
158+ buffers .add (ByteBuffer .wrap (generateSegmentHeader ()));
159+ }
160+
161+ buffers .add (encodeSegmentContent (unencodedBuffer ));
162+
163+ // if we are at the end of a segment's content, encode segment footer
164+ if (currentSegmentOffset == getSegmentContentLength ()) {
165+ byte [] footer = generateSegmentFooter ();
166+ if (footer .length > 0 ) {
167+ buffers .add (ByteBuffer .wrap (footer ));
168+ }
169+ currentSegmentOffset = 0 ;
170+ }
167171 }
168- }
169-
170- // if all content has been encoded, encode message footer
171- if (currentContentOffset == contentLength ) {
172- encodeMessageFooter (byteArrayOutputStream );
173- }
174-
175- return ByteBuffer .wrap (byteArrayOutputStream .toByteArray ());
176- }
177172
178- private void encodeMessageHeader (ByteArrayOutputStream output ) {
179- byte [] metadata = generateMessageHeader ();
180- output .write (metadata , 0 , metadata .length );
181-
182- currentMessageLength += metadata .length ;
183- }
184-
185- private void encodeSegmentHeader (ByteArrayOutputStream output ) {
186- incrementCurrentSegment ();
187- byte [] metadata = generateSegmentHeader ();
188- output .write (metadata , 0 , metadata .length );
173+ // if all content has been encoded, encode message footer
174+ if (currentContentOffset == contentLength ) {
175+ byte [] footer = generateMessageFooter ();
176+ if (footer .length > 0 ) {
177+ buffers .add (ByteBuffer .wrap (footer ));
178+ }
179+ }
189180
190- currentMessageLength += metadata .length ;
181+ return Flux .fromIterable (buffers );
182+ });
191183 }
192184
193- private void encodeSegmentFooter (ByteArrayOutputStream output ) {
194- byte [] metadata ;
185+ private byte [] generateSegmentFooter () {
195186 if (structuredMessageFlags == StructuredMessageFlags .STORAGE_CRC64 ) {
196- metadata = ByteBuffer .allocate (CRC64_LENGTH )
187+ return ByteBuffer .allocate (CRC64_LENGTH )
197188 .order (ByteOrder .LITTLE_ENDIAN )
198189 .putLong (segmentCRC64s .get (currentSegmentNumber ))
199190 .array ();
200- } else {
201- metadata = new byte [0 ];
202191 }
203- output .write (metadata , 0 , metadata .length );
204-
205- currentMessageLength += metadata .length ;
206- currentSegmentOffset = 0 ;
192+ return new byte [0 ];
207193 }
208194
209- private void encodeMessageFooter (ByteArrayOutputStream output ) {
210- byte [] metadata ;
195+ private byte [] generateMessageFooter () {
211196 if (structuredMessageFlags == StructuredMessageFlags .STORAGE_CRC64 ) {
212- metadata = ByteBuffer .allocate (CRC64_LENGTH ).order (ByteOrder .LITTLE_ENDIAN ).putLong (messageCRC64 ).array ();
213- } else {
214- metadata = new byte [0 ];
197+ return ByteBuffer .allocate (CRC64_LENGTH ).order (ByteOrder .LITTLE_ENDIAN ).putLong (messageCRC64 ).array ();
215198 }
216-
217- output .write (metadata , 0 , metadata .length );
218- currentMessageLength += metadata .length ;
199+ return new byte [0 ];
219200 }
220201
221- private void encodeSegmentContent (ByteBuffer unencodedBuffer , ByteArrayOutputStream output ) {
202+ private ByteBuffer encodeSegmentContent (ByteBuffer unencodedBuffer ) {
222203 int readSize = Math .min (unencodedBuffer .remaining (), getSegmentContentLength () - currentSegmentOffset );
223-
224204 byte [] content = new byte [readSize ];
225205 unencodedBuffer .get (content , 0 , readSize );
226206
@@ -233,8 +213,7 @@ private void encodeSegmentContent(ByteBuffer unencodedBuffer, ByteArrayOutputStr
233213 currentContentOffset += readSize ;
234214 currentSegmentOffset += readSize ;
235215
236- output .write (content , 0 , content .length );
237- currentMessageLength += readSize ;
216+ return ByteBuffer .wrap (content );
238217 }
239218
240219 private int calculateMessageLength () {
@@ -258,16 +237,7 @@ private void incrementCurrentSegment() {
258237 *
259238 * @return The length of the message.
260239 */
261- public String getEncodedMessageLength () {
262- return String . valueOf ( messageLength ) ;
240+ public long getEncodedMessageLength () {
241+ return messageLength ;
263242 }
264-
265- // /**
266- // * Returns the CRC64 of the message
267- // *
268- // * @return The CRC64 of the message
269- // */
270- // public long getMessageCRC64() {
271- // return messageCRC64;
272- // }
273243}
0 commit comments