@@ -75,6 +75,10 @@ void deliverFrame(
7575 // effectively final. Can only be set once.
7676 private int maxOutboundMessageSize = NO_MAX_OUTBOUND_MESSAGE_SIZE ;
7777 private WritableBuffer buffer ;
78+ /**
79+ * if > 0 - the number of bytes to allocate for the current known-length message.
80+ */
81+ private int knownLengthPendingAllocation ;
7882 private Compressor compressor = Codec .Identity .NONE ;
7983 private boolean messageCompression = true ;
8084 private final OutputStreamAdapter outputStreamAdapter = new OutputStreamAdapter ();
@@ -120,6 +124,8 @@ public void setMaxOutboundMessageSize(int maxSize) {
120124 maxOutboundMessageSize = maxSize ;
121125 }
122126
127+ private static final int COMPRESSION_MIN_LENGTH = 20 ;
128+
123129 /**
124130 * Writes out a payload message.
125131 *
@@ -137,7 +143,7 @@ public void writePayload(InputStream message) {
137143 int messageLength = -2 ;
138144 try {
139145 messageLength = getKnownLength (message );
140- if (messageLength != 0 && compressed ) {
146+ if (messageLength >= COMPRESSION_MIN_LENGTH && compressed ) {
141147 written = writeCompressed (message , messageLength );
142148 } else {
143149 written = writeUncompressed (message , messageLength );
@@ -222,13 +228,25 @@ private int writeKnownLengthUncompressed(InputStream message, int messageLength)
222228 headerScratch .put (UNCOMPRESSED ).putInt (messageLength );
223229 // Allocate the initial buffer chunk based on frame header + payload length.
224230 // Note that the allocator may allocate a buffer larger or smaller than this length
231+ knownLengthPendingAllocation = HEADER_LENGTH + messageLength ;
225232 if (buffer == null ) {
226- buffer = bufferAllocator . allocate ( headerScratch . position () + messageLength );
233+ buffer = allocateKnownLength ( );
227234 }
228235 writeRaw (headerScratch .array (), 0 , headerScratch .position ());
229236 return writeToOutputStream (message , outputStreamAdapter );
230237 }
231238
239+ /**
240+ * Allocate buffer according to {@link #knownLengthPendingAllocation} which is decremented after
241+ * that.
242+ */
243+ private WritableBuffer allocateKnownLength () {
244+ WritableBuffer newBuffer = bufferAllocator .allocateKnownLength (knownLengthPendingAllocation );
245+ knownLengthPendingAllocation -= Math .min (knownLengthPendingAllocation ,
246+ newBuffer .writableBytes ());
247+ return newBuffer ;
248+ }
249+
232250 /**
233251 * Write a message that has been serialized to a sequence of buffers.
234252 */
@@ -243,7 +261,7 @@ private void writeBufferChain(BufferChainOutputStream bufferChain, boolean compr
243261 }
244262 headerScratch .clear ();
245263 headerScratch .put (compressed ? COMPRESSED : UNCOMPRESSED ).putInt (messageLength );
246- WritableBuffer writeableHeader = bufferAllocator .allocate (HEADER_LENGTH );
264+ WritableBuffer writeableHeader = bufferAllocator .allocateKnownLength (HEADER_LENGTH );
247265 writeableHeader .write (headerScratch .array (), 0 , headerScratch .position ());
248266 if (messageLength == 0 ) {
249267 // the payload had 0 length so make the header the current buffer.
@@ -289,7 +307,9 @@ private void writeRaw(byte[] b, int off, int len) {
289307 }
290308 if (buffer == null ) {
291309 // Request a buffer allocation using the message length as a hint.
292- buffer = bufferAllocator .allocate (len );
310+ buffer = knownLengthPendingAllocation > 0
311+ ? allocateKnownLength ()
312+ : bufferAllocator .allocate (len );
293313 }
294314 int toWrite = min (len , buffer .writableBytes ());
295315 buffer .write (b , off , toWrite );
@@ -397,7 +417,7 @@ private final class BufferChainOutputStream extends OutputStream {
397417 * {@link #write(byte[], int, int)}.
398418 */
399419 @ Override
400- public void write (int b ) throws IOException {
420+ public void write (int b ) {
401421 if (current != null && current .writableBytes () > 0 ) {
402422 current .write ((byte )b );
403423 return ;
0 commit comments