1616package com .bloomberg .bmq .impl .infr .io ;
1717
1818import com .bloomberg .bmq .impl .infr .util .Argument ;
19- import com .bloomberg .bmq .impl .infr .util .Limits ;
2019import java .io .DataOutput ;
2120import java .io .IOException ;
2221import java .io .OutputStream ;
2322import java .lang .invoke .MethodHandles ;
2423import java .nio .ByteBuffer ;
2524import java .nio .charset .StandardCharsets ;
2625import java .util .ArrayList ;
26+ import java .util .Arrays ;
27+ import java .util .Collection ;
28+ import java .util .Collections ;
2729import org .slf4j .Logger ;
2830import org .slf4j .LoggerFactory ;
2931
32+ /**
33+ * An output stream of ByteBuffers
34+ *
35+ * <p>invariants of this structure are listed below.
36+ *
37+ * <p>1. a stream that has never been written to will have no buffers yet.
38+ *
39+ * <p>2. buffers are added on demand.
40+ *
41+ * <p>3. writeBuffers() or big array write(), the current buffer is sliced and the remainder is
42+ * added after as the current buffer.
43+ *
44+ * <p>4. writeBuffers() appends duplicated buffers wholesale instead of copying.
45+ *
46+ * <p>5. writeBuffers() ByteBuffers from outside should always be either unflipped or a wrapped
47+ * array.
48+ *
49+ * <p>7. write() byte arrays larger than the buffer size get wrapped, smaller ones get copied in one
50+ * piece.
51+ *
52+ * <p>8. as a result of 7, byte arrays can always be read fully in one read().
53+ *
54+ * <p>9. totalBytes is kept up to date as fields / buffers are written in.
55+ *
56+ * <p>10. the current append buffer is always the last buffer in bbArray.
57+ */
3058public class ByteBufferOutputStream extends OutputStream implements DataOutput {
3159 static final Logger logger = LoggerFactory .getLogger (MethodHandles .lookup ().lookupClass ());
3260
3361 private ArrayList <ByteBuffer > bbArray ;
34- private ByteBuffer currentBuffer ;
35-
36- private int currentBufferIndex = 0 ;
3762 private int bufSize ;
38- private int prevBuffersNumBytes ;
39-
63+ private int totalBytes ;
4064 private boolean isOpen ;
4165
4266 private static final int KB = 1024 ;
4367 private static final int DEFAULT_BUF_SIZE = 4 * KB ;
68+ private static final int BIG_BUF_SIZE = DEFAULT_BUF_SIZE ;
69+
70+ // small buffer size should be adequete for many event types.
71+ private static final int SMALL_BUF_SIZE = 512 ;
72+
73+ public static ByteBufferOutputStream smallBlocks () {
74+ return new ByteBufferOutputStream (SMALL_BUF_SIZE );
75+ }
76+
77+ public static ByteBufferOutputStream bigBlocks () {
78+ return new ByteBufferOutputStream (BIG_BUF_SIZE );
79+ }
4480
4581 public ByteBufferOutputStream () {
4682 init (DEFAULT_BUF_SIZE );
@@ -53,20 +89,39 @@ public ByteBufferOutputStream(int bufSize) {
5389 private void init (int bufSize ) {
5490 bbArray = new ArrayList <>();
5591 this .bufSize = bufSize ;
56- currentBuffer = ByteBuffer .allocate (bufSize );
57- bbArray .add (currentBuffer );
58- currentBufferIndex = 0 ;
5992 isOpen = true ;
60- prevBuffersNumBytes = 0 ;
93+ totalBytes = 0 ;
94+ }
95+
96+ private int availableCapacity () {
97+ if (bbArray .isEmpty ()) {
98+ return 0 ;
99+ }
100+ return getCurrent ().remaining ();
101+ }
102+
103+ private void ensureCapacity (int size ) {
104+ int capacity = availableCapacity ();
105+ if (size > capacity ) {
106+ addBuffer (Math .max (bufSize , size ));
107+ }
108+ }
109+
110+ private void addRemainderOrNew (ByteBuffer remainder ) {
111+ if (remainder != null ) {
112+ bbArray .add (remainder );
113+ } else {
114+ addBuffer ();
115+ }
61116 }
62117
63118 @ Override
64119 public void write (int b ) throws IOException {
65120 if (!isOpen ) throw new IOException ("Stream closed" );
66121
67- if ( currentBuffer . remaining () < Limits . BYTE_SIZE ) getNewBuffer ( );
68-
69- currentBuffer . put (( byte ) b ) ;
122+ ensureCapacity ( 1 );
123+ getCurrent (). put (( byte ) b );
124+ totalBytes += 1 ;
70125 }
71126
72127 @ Override
@@ -80,16 +135,18 @@ public void write(byte[] ba, int offset, int length) throws IOException {
80135
81136 if (length <= 0 || (length > ba .length - offset )) return ;
82137
83- int numBytesLeft = length ;
84- while (true ) {
85- int numToWrite = Math .min (numBytesLeft , currentBuffer .remaining ());
86- currentBuffer .put (ba , offset , numToWrite );
87- numBytesLeft -= numToWrite ;
88- offset += numToWrite ;
89-
90- if (numBytesLeft > 0 ) getNewBuffer ();
91- else break ;
138+ // if its too big for a pre-defined buffer, just wrap it instead
139+ if (length > bufSize ) {
140+ ByteBuffer remainder = maybeSliceCurrent ();
141+ ByteBuffer buf = ByteBuffer .wrap (ba , offset , length );
142+ buf .position (buf .limit ());
143+ bbArray .add (buf );
144+ addRemainderOrNew (remainder );
145+ } else {
146+ ensureCapacity (length );
147+ getCurrent ().put (ba , offset , length );
92148 }
149+ totalBytes += length ;
93150 }
94151
95152 /**
@@ -98,9 +155,12 @@ public void write(byte[] ba, int offset, int length) throws IOException {
98155 * <p>The bbos can continue to be written to.
99156 */
100157 public ByteBuffer [] peek () {
101- return bbArray .stream ()
102- .map (bb -> (ByteBuffer ) (bb .duplicate ().flip ()))
103- .toArray (ByteBuffer []::new );
158+ ByteBuffer [] duplicates =
159+ bbArray .stream ().map (ByteBuffer ::duplicate ).toArray (ByteBuffer []::new );
160+ for (ByteBuffer b : duplicates ) {
161+ b .flip ();
162+ }
163+ return duplicates ;
104164 }
105165
106166 private ArrayList <ByteBuffer > buffers () {
@@ -116,47 +176,68 @@ public void writeBoolean(boolean v) throws IOException {
116176 public void writeByte (int v ) throws IOException {
117177 if (!isOpen ) throw new IOException ("Stream closed" );
118178
119- if ( currentBuffer . remaining () < Limits . BYTE_SIZE ) getNewBuffer ( );
120-
121- currentBuffer . put (( byte ) v ) ;
179+ ensureCapacity ( 1 );
180+ getCurrent (). put (( byte ) v );
181+ totalBytes += 1 ;
122182 }
123183
124184 @ Override
125185 public void writeBytes (String s ) throws IOException {
126186 throw new UnsupportedOperationException ();
127187 }
128188
189+ // a buffer that has never been put() to nor flipped
129190 private boolean bufferIsFresh (ByteBuffer b ) {
130- // a buffer that has never been put() to nor flipped
131191 return b .position () == 0 && b .limit () == b .capacity ();
132192 }
133193
134- public void writeBuffer (ByteBuffer b ) throws IOException {
135- if (!isOpen ) throw new IOException ("Stream closed" );
136- if (bufferIsFresh (b )) return ;
137-
138- boolean currentIsFresh = bufferIsFresh (currentBuffer );
139- // remove the currentBuffer if it is fresh
140- // we'll put it back at the end after adding other's buffers
141- // to avoid allocating a new one in that case
142- if (currentIsFresh ) {
143- bbArray .remove (currentBufferIndex );
144- }
145- ByteBuffer buf = b .duplicate ();
146- if (buf .limit () != buf .capacity ()) {
147- // it has already been flipped - unflip it
148- int newPosition = buf .limit ();
149- buf .limit (buf .capacity ());
150- buf .position (newPosition );
194+ private ByteBuffer getCurrent () {
195+ if (bbArray .size () == 0 ) return null ;
196+ return bbArray .get (bbArray .size () - 1 );
197+ }
198+
199+ private ByteBuffer maybeSliceCurrent () {
200+ ByteBuffer current = getCurrent ();
201+ if (current != null ) {
202+ if (bufferIsFresh (current )) {
203+ // current has never been written to,
204+ // remove it from bbArray and return it
205+ // whole as the remainder
206+ bbArray .remove (bbArray .size () - 1 );
207+ return current ;
208+ }
209+ // a remainder slice should be meaningfully sized - at least as big as the ByteBuffer
210+ // overhead
211+ if (current .remaining () >= 16 ) {
212+ ByteBuffer remainder = current .slice ();
213+ return remainder ;
214+ }
151215 }
152- prevBuffersNumBytes += buf .position ();
153- bbArray .add (buf );
154- if (currentIsFresh ) {
155- bbArray .add (currentBuffer );
156- } else {
157- addBuffer ();
216+ return null ;
217+ }
218+
219+ public void writeBuffer (ByteBuffer buffer ) throws IOException {
220+ writeBuffers (Collections .singletonList (buffer ));
221+ }
222+
223+ public void writeBuffers (ByteBuffer ... buffers ) throws IOException {
224+ writeBuffers (Arrays .asList (buffers ));
225+ }
226+
227+ public void writeBuffers (Collection <ByteBuffer > buffers ) throws IOException {
228+ if (!isOpen ) throw new IOException ("Stream closed" );
229+ ByteBuffer remainder = maybeSliceCurrent ();
230+ bbArray .ensureCapacity (bbArray .size () + buffers .size () + 1 /* remainder or new buffer */ );
231+ for (ByteBuffer b : buffers ) {
232+ if (bufferIsFresh (b )) continue ;
233+ ByteBuffer dup = b .duplicate ();
234+ if (dup .position () == 0 ) {
235+ dup .position (dup .limit ());
236+ }
237+ bbArray .add (dup );
238+ totalBytes += dup .position ();
158239 }
159- currentBufferIndex = bbArray . size () - 1 ;
240+ addRemainderOrNew ( remainder ) ;
160241 }
161242
162243 /**
@@ -168,18 +249,16 @@ public void writeBuffer(ByteBuffer b) throws IOException {
168249 */
169250 public void writeBuffers (ByteBufferOutputStream other ) throws IOException {
170251 if (!isOpen || !other .isOpen ) throw new IOException ("Stream closed" );
171- for (ByteBuffer b : other .buffers ()) {
172- writeBuffer (b );
173- }
252+ writeBuffers (other .buffers ());
174253 }
175254
176255 @ Override
177256 public void writeChar (int v ) throws IOException {
178257 if (!isOpen ) throw new IOException ("Stream closed" );
179258
180- if ( currentBuffer . remaining () < Limits . CHAR_SIZE ) getNewBuffer ( );
181-
182- currentBuffer . putChar (( char ) v ) ;
259+ ensureCapacity ( 2 );
260+ getCurrent (). putChar (( char ) v );
261+ totalBytes += 2 ;
183262 }
184263
185264 @ Override
@@ -191,54 +270,56 @@ public void writeChars(String s) throws IOException {
191270 public void writeDouble (double v ) throws IOException {
192271 if (!isOpen ) throw new IOException ("Stream closed" );
193272
194- if ( currentBuffer . remaining () < Limits . DOUBLE_SIZE ) getNewBuffer ( );
195-
196- currentBuffer . putDouble ( v ) ;
273+ ensureCapacity ( 8 );
274+ getCurrent (). putDouble ( v );
275+ totalBytes += 8 ;
197276 }
198277
199278 @ Override
200279 public void writeFloat (float v ) throws IOException {
201280 if (!isOpen ) throw new IOException ("Stream closed" );
202281
203- if ( currentBuffer . remaining () < Limits . FLOAT_SIZE ) getNewBuffer ( );
204-
205- currentBuffer . putFloat ( v ) ;
282+ ensureCapacity ( 4 );
283+ getCurrent (). putFloat ( v );
284+ totalBytes += 4 ;
206285 }
207286
208287 @ Override
209288 public void writeInt (int v ) throws IOException {
210289 if (!isOpen ) throw new IOException ("Stream closed" );
211290
212- if ( currentBuffer . remaining () < Limits . INT_SIZE ) getNewBuffer ( );
213-
214- currentBuffer . putInt ( v ) ;
291+ ensureCapacity ( 4 );
292+ getCurrent (). putInt ( v );
293+ totalBytes += 4 ;
215294 }
216295
217296 @ Override
218297 public void writeLong (long v ) throws IOException {
219298 if (!isOpen ) throw new IOException ("Stream closed" );
220299
221- if ( currentBuffer . remaining () < Limits . LONG_SIZE ) getNewBuffer ( );
222-
223- currentBuffer . putLong ( v ) ;
300+ ensureCapacity ( 8 );
301+ getCurrent (). putLong ( v );
302+ totalBytes += 8 ;
224303 }
225304
226305 @ Override
227306 public void writeShort (int v ) throws IOException {
228307 if (!isOpen ) throw new IOException ("Stream closed" );
229308
230- if ( currentBuffer . remaining () < Limits . SHORT_SIZE ) getNewBuffer ( );
231-
232- currentBuffer . putShort (( short ) v ) ;
309+ ensureCapacity ( 2 );
310+ getCurrent (). putShort (( short ) v );
311+ totalBytes += 2 ;
233312 }
234313
235314 @ Override
236315 public void writeUTF (String str ) throws IOException {
237- write (str .getBytes (StandardCharsets .UTF_8 ));
316+ byte [] bytes = str .getBytes (StandardCharsets .UTF_8 );
317+ write (bytes );
238318 }
239319
240320 public void writeAscii (String str ) throws IOException {
241- write (str .getBytes (StandardCharsets .US_ASCII ));
321+ byte [] bytes = str .getBytes (StandardCharsets .US_ASCII );
322+ write (bytes );
242323 }
243324
244325 /**
@@ -250,10 +331,7 @@ public ByteBuffer[] reset() {
250331 ByteBuffer [] bbArrayCopy = peek ();
251332
252333 bbArray .clear ();
253- prevBuffersNumBytes = 0 ;
254- currentBuffer = null ;
255- currentBufferIndex = 0 ;
256-
334+ totalBytes = 0 ;
257335 isOpen = false ;
258336
259337 return bbArrayCopy ;
@@ -264,11 +342,7 @@ public int numByteBuffers() {
264342 }
265343
266344 public int size () {
267- return (isOpen ? (prevBuffersNumBytes + currentBuffer .position ()) : (0 ));
268- }
269-
270- private void getNewBuffer () {
271- addBuffer ();
345+ return (isOpen ? (totalBytes ) : (0 ));
272346 }
273347
274348 private void addBuffer () {
@@ -278,10 +352,7 @@ private void addBuffer() {
278352 // allocate a buffer which is large enough to store data
279353 // of specified size
280354 private void addBuffer (int size ) {
281- prevBuffersNumBytes += currentBuffer .position ();
282- int allocationSize = Math .max (size , bufSize );
283- currentBuffer = ByteBuffer .allocate (allocationSize );
284- bbArray .add (currentBuffer );
285- currentBufferIndex = bbArray .size () - 1 ;
355+ ByteBuffer buf = ByteBuffer .allocate (size );
356+ bbArray .add (buf );
286357 }
287358}
0 commit comments