diff --git a/bmq-sdk/src/main/java/com/bloomberg/bmq/Session.java b/bmq-sdk/src/main/java/com/bloomberg/bmq/Session.java index 03dfc2b..bace43f 100644 --- a/bmq-sdk/src/main/java/com/bloomberg/bmq/Session.java +++ b/bmq-sdk/src/main/java/com/bloomberg/bmq/Session.java @@ -26,6 +26,7 @@ import com.bloomberg.bmq.impl.events.PushMessageEvent; import com.bloomberg.bmq.impl.events.QueueControlEvent; import com.bloomberg.bmq.impl.events.QueueControlEventHandler; +import com.bloomberg.bmq.impl.infr.io.ByteBufferOutputStream; import com.bloomberg.bmq.impl.infr.net.NettyTcpConnectionFactory; import com.bloomberg.bmq.impl.infr.proto.AckMessageImpl; import com.bloomberg.bmq.impl.infr.proto.BinaryMessageProperty; @@ -1019,8 +1020,9 @@ public void closeAsync(Duration timeout) throws BMQException { public PutMessage createPutMessage(ByteBuffer... payload) { PutMessageImpl msg = new PutMessageImpl(); - try { - msg.appData().setPayload(payload); + try (ByteBufferOutputStream bbos = new ByteBufferOutputStream()) { + bbos.writeBuffers(payload); + msg.appData().setPayload(bbos.peekUnflipped()); } catch (IOException e) { throw new BMQException("Failed to set payload", e); } diff --git a/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/BrokerSession.java b/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/BrokerSession.java index cda2260..6f76e02 100644 --- a/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/BrokerSession.java +++ b/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/BrokerSession.java @@ -59,7 +59,9 @@ import java.lang.invoke.MethodHandles; import java.nio.ByteBuffer; import java.time.Duration; +import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -1219,10 +1221,12 @@ public QueueHandle lookupQueue(QueueId queueId) { return queueStateManager.findByQueueId(queueId); } - public void post(QueueHandle queueHandle, PutMessageImpl... msgs) throws BMQException { + public void post(QueueHandle queueHandle, Collection msgs) throws BMQException { Argument.expectNonNull(queueHandle, "queueHandle"); Argument.expectNonNull(msgs, "msgs"); - Argument.expectPositive(msgs.length, "message array length"); + if (msgs.isEmpty()) { + return; + } // Queue state guard QueueState state = queueHandle.getState(); @@ -1249,6 +1253,14 @@ public void post(QueueHandle queueHandle, PutMessageImpl... msgs) throws BMQExce } } + public void post(QueueHandle queueHandle, PutMessageImpl msg) throws BMQException { + post(queueHandle, Collections.singletonList(msg)); + } + + public void post(QueueHandle queueHandle, PutMessageImpl... msgs) throws BMQException { + post(queueHandle, Arrays.asList(msgs)); + } + public GenericResult confirm(QueueHandle queueHandle, PushMessageImpl... messages) { Argument.expectNonNull(queueHandle, "queueHandle"); Argument.expectNonNull(messages, "messages"); diff --git a/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/ProtocolEventTcpReader.java b/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/ProtocolEventTcpReader.java index e0860f8..8a01988 100644 --- a/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/ProtocolEventTcpReader.java +++ b/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/ProtocolEventTcpReader.java @@ -74,10 +74,17 @@ private void addPayload(ByteBuffer b) { public void read( TcpConnection.ReadCallback.ReadCompletionStatus completionStatus, ByteBuffer[] data) throws IOException { - logger.debug( - "Read from buffers {}, completion status needed bytes {}", - data.length, - completionStatus.numNeeded()); + if (logger.isDebugEnabled()) { + int totalRead = 0; + for (ByteBuffer b : data) { + totalRead += b.remaining(); + } + logger.debug( + "Read {} bytes from {} buffers, completion status needed bytes {}", + totalRead, + data.length, + completionStatus.numNeeded()); + } try { EventHeader eventHeader = null; for (ByteBuffer restData : data) { @@ -134,12 +141,18 @@ public void read( if (expectedDataSize <= 0) { throw new IOException("Wrong event size: " + expectedDataSize); } - byte[] b = new byte[expectedDataSize]; - restData.get(b, 0, expectedDataSize); - addPayload(ByteBuffer.wrap(b)); + ByteBuffer payload = restData.slice(); + payload.limit(expectedDataSize); + addPayload(payload); + restData.position(restData.position() + expectedDataSize); restData = restData.slice(); - ByteBuffer[] bb = new ByteBuffer[receivedPayloads.size()]; + + ByteBuffer[] bb = new ByteBuffer[0]; bb = receivedPayloads.toArray(bb); + logger.debug( + "dispatching event handler for {} with {} buffers", + expectedEventType, + receivedPayloads.size()); eventHandler.handleEvent(expectedEventType, bb); reset(); break; diff --git a/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/PutPoster.java b/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/PutPoster.java index 9969d7a..30cb0a6 100644 --- a/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/PutPoster.java +++ b/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/PutPoster.java @@ -33,7 +33,9 @@ import java.lang.invoke.MethodHandles; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -68,9 +70,9 @@ void setMaxEventSize(int val) { maxEventSize = Argument.expectNotGreater(val, EventHeader.MAX_SIZE_SOFT, "max event size"); } - public void pack(PutMessageImpl... msgs) { + public void pack(Collection msgs) { Argument.expectNonNull(msgs, "msgs"); - Argument.expectPositive(msgs.length, "message array length"); + Argument.expectPositive(msgs.size(), "message array length"); for (PutMessageImpl m : msgs) { Argument.expectNonNull(m, "put message"); @@ -87,11 +89,19 @@ public void flush() { } } - public void post(PutMessageImpl... msgs) { + public void post(Collection msgs) { pack(msgs); flush(); } + public void post(PutMessageImpl msg) { + post(Collections.singletonList(msg)); + } + + public void post(PutMessageImpl... msgs) { + post(Arrays.asList(msgs)); + } + private void sendEvent() { PutEventBuilder putBuilder = new PutEventBuilder(); putBuilder.setMaxEventSize(maxEventSize); diff --git a/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/QueueImpl.java b/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/QueueImpl.java index e1909c4..e443196 100644 --- a/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/QueueImpl.java +++ b/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/QueueImpl.java @@ -37,6 +37,8 @@ import java.lang.invoke.MethodHandles; import java.time.Duration; import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nonnull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,6 +50,7 @@ public class QueueImpl implements QueueHandle { static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); static final int INVALID_QUEUE_ID = -1; + private static final int INITIAL_PUTMESSAGES_SIZE = 100; // Immutable fields private final BrokerSession brokerSession; @@ -61,7 +64,8 @@ public class QueueImpl implements QueueHandle { // Fields exposed to user thread private final QueueHandleParameters parameters; // mutable object and final field private volatile QueueState state; - private final ArrayList putMessages = new ArrayList<>(); + private final AtomicReference> putMessages = + new AtomicReference<>(new ArrayList<>(INITIAL_PUTMESSAGES_SIZE)); private volatile boolean isSuspended = false; // Whether the queue is suspended. // While suspended, a queue receives no @@ -262,19 +266,16 @@ public BmqFuture closeAsync(Duration timeout) { public void pack(PutMessageImpl message) throws BMQException { synchronized (lock) { - putMessages.add(message); + putMessages.get().add(message); } } - public PutMessageImpl[] flush() throws BMQException { - PutMessageImpl[] msgs; + public void flush() throws BMQException { + Collection messages = null; synchronized (lock) { - msgs = new PutMessageImpl[putMessages.size()]; - msgs = putMessages.toArray(msgs); - putMessages.clear(); + messages = putMessages.getAndSet(new ArrayList<>(INITIAL_PUTMESSAGES_SIZE)); } - brokerSession.post(this, msgs); - return msgs; + brokerSession.post(this, messages); } @Override diff --git a/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/io/ByteBufferInputStream.java b/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/io/ByteBufferInputStream.java index d989cd1..7e2565f 100644 --- a/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/io/ByteBufferInputStream.java +++ b/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/io/ByteBufferInputStream.java @@ -91,6 +91,31 @@ public int read(byte[] b, int off, int len) throws IOException { } } + public int read(ByteBufferOutputStream bbos, int len) throws IOException { + if (len == 0) { + return 0; + } + int needed = len; + while (needed > 0 && currentBuffer < byteBuffers.length) { + ByteBuffer buffer = getBuffer(); + ByteBuffer readable = buffer.slice(); + int remaining = readable.remaining(); + if (needed > remaining) { + readable.position(remaining); + bbos.writeBuffer(false, readable); + buffer.position(buffer.limit()); + needed -= remaining; + } else { + readable.limit(needed); + readable.position(needed); + bbos.writeBuffer(false, readable); + buffer.position(buffer.position() + needed); + needed = 0; + } + } + return len - needed; + } + private ByteBuffer getBuffer(int length) throws IOException { if (length == 0) { return ByteBuffer.allocate(0); diff --git a/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/io/ByteBufferOutputStream.java b/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/io/ByteBufferOutputStream.java index 7e85578..b51a844 100644 --- a/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/io/ByteBufferOutputStream.java +++ b/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/io/ByteBufferOutputStream.java @@ -15,8 +15,6 @@ */ package com.bloomberg.bmq.impl.infr.io; -import com.bloomberg.bmq.impl.infr.util.Argument; -import com.bloomberg.bmq.impl.infr.util.Limits; import java.io.DataOutput; import java.io.IOException; import java.io.OutputStream; @@ -24,49 +22,153 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * An output stream of ByteBuffers + * + *

invariants of this structure are listed below. + * + *

1. a stream that has never been written to will have no buffers yet. + * + *

2. buffers are added on demand. + * + *

3. writeBuffers() or big array write(), the current buffer is sliced and the remainder is + * added after as the current buffer. + * + *

4. writeBuffers() appends duplicated buffers wholesale instead of copying. + * + *

5. writeBuffers() ByteBuffers from outside should always be either unflipped or a wrapped + * array. + * + *

7. write() byte arrays larger than the buffer size get wrapped, smaller ones get copied in one + * piece. + * + *

8. as a result of 7, byte arrays can always be read fully in one read(). + * + *

9. totalBytes is kept up to date as fields / buffers are written in. + * + *

10. the current append buffer is always the last buffer in bbArray. + */ public class ByteBufferOutputStream extends OutputStream implements DataOutput { static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - private ArrayList bbArray; - private ByteBuffer currentBuffer; - - private int currentBufferIndex = 0; - private int bufSize; - private int prevBuffersNumBytes; - + private final ArrayList bbArray; + private final int bufSize; + private final int minSliceSize; + private int totalBytes; + private int currentBufferIndex; private boolean isOpen; private static final int KB = 1024; private static final int DEFAULT_BUF_SIZE = 4 * KB; + private static final int BIG_BUF_SIZE = 4 * KB; + + // small buffer size should be adequete for many event types. + private static final int SMALL_BUF_SIZE = 512; + + private static final int DEFAULT_MIN_SLICE_SIZE = 128; + + public static ByteBufferOutputStream smallBlocks() { + return new ByteBufferOutputStream(SMALL_BUF_SIZE); + } + + public static ByteBufferOutputStream bigBlocks() { + return new ByteBufferOutputStream(BIG_BUF_SIZE); + } public ByteBufferOutputStream() { - init(DEFAULT_BUF_SIZE); + this(DEFAULT_BUF_SIZE, DEFAULT_MIN_SLICE_SIZE); } public ByteBufferOutputStream(int bufSize) { - init(Argument.expectPositive(bufSize, "bufSize")); + this(bufSize, DEFAULT_MIN_SLICE_SIZE); } - private void init(int bufSize) { + public ByteBufferOutputStream(int bufSize, int minSliceSize) { bbArray = new ArrayList<>(); this.bufSize = bufSize; - currentBuffer = ByteBuffer.allocate(bufSize); - bbArray.add(currentBuffer); - currentBufferIndex = 0; + this.minSliceSize = minSliceSize; isOpen = true; - prevBuffersNumBytes = 0; + totalBytes = 0; + currentBufferIndex = -1; + } + + private int availableCapacity() { + if (bbArray.isEmpty() || currentBufferIndex >= bbArray.size()) { + return 0; + } + return getCurrent().remaining(); + } + + private void swapBuffers(int aIndex, int bIndex) { + if (aIndex == bIndex) return; + if (aIndex >= bbArray.size() || bIndex >= bbArray.size()) { + logger.error( + "tried to swap indexes " + + aIndex + + " and " + + bIndex + + " in array of size " + + bbArray.size()); + return; + } + ByteBuffer a = bbArray.get(aIndex); + ByteBuffer b = bbArray.get(bIndex); + bbArray.set(aIndex, b); + bbArray.set(bIndex, a); + } + + private void ensureCapacity(int size) { + int currentBufferCapacity = availableCapacity(); + if (size > currentBufferCapacity - 1) { + // why -1 ? if we write all the way to to end of a + // ByteBuffer, then we cannot tell if is is cleared + // or flipped. + // need more space, either find a slice with enough or allocate a new one + if (currentBufferCapacity >= minSliceSize) { + ByteBuffer remainder = maybeSliceCurrent(); + if (remainder != null) { + bbArray.add(remainder); + } + } + currentBufferIndex++; + int candidateIndex = currentBufferIndex; + while (bbArray.size() > candidateIndex + && bbArray.get(candidateIndex).remaining() < size) { + candidateIndex++; + } + + if (candidateIndex == bbArray.size()) { + // couldn't find a big enough existing slice - have to allocate a new buffer + addBuffer(Math.max(bufSize, size)); + } + // put it in place + swapBuffers(currentBufferIndex, candidateIndex); + } + } + + private void addRemainderOrNew(ByteBuffer remainder) { + if (remainder != null) { + bbArray.add(remainder); + } else { + if (currentBufferIndex >= bbArray.size()) { + addBuffer(); + } + } } @Override public void write(int b) throws IOException { if (!isOpen) throw new IOException("Stream closed"); - if (currentBuffer.remaining() < Limits.BYTE_SIZE) getNewBuffer(); - - currentBuffer.put((byte) b); + ensureCapacity(1); + getCurrent().put((byte) b); + totalBytes += 1; } @Override @@ -80,50 +182,110 @@ public void write(byte[] ba, int offset, int length) throws IOException { if (length <= 0 || (length > ba.length - offset)) return; - int numBytesLeft = length; - while (true) { - int numToWrite = Math.min(numBytesLeft, currentBuffer.remaining()); - currentBuffer.put(ba, offset, numToWrite); - numBytesLeft -= numToWrite; - offset += numToWrite; + ensureCapacity(length); + getCurrent().put(ba, offset, length); + totalBytes += length; + } + + public ByteBuffer[] peekUnflipped() { + return bbArray.stream() + .limit(currentBufferIndex + 1) + .map(ByteBuffer::duplicate) + .toArray(ByteBuffer[]::new); + } - if (numBytesLeft > 0) getNewBuffer(); - else break; + /** + * Make a readable view of the underlying data without copying it. + * + *

The bbos can continue to be written to. + */ + public ByteBuffer[] peek() { + ByteBuffer[] duplicates = peekUnflipped(); + for (ByteBuffer b : duplicates) { + b.flip(); } + return duplicates; } + @Override public void writeBoolean(boolean v) throws IOException { throw new UnsupportedOperationException(); } + @Override public void writeByte(int v) throws IOException { if (!isOpen) throw new IOException("Stream closed"); - if (currentBuffer.remaining() < Limits.BYTE_SIZE) getNewBuffer(); - - currentBuffer.put((byte) v); + ensureCapacity(1); + getCurrent().put((byte) v); + totalBytes += 1; } + @Override public void writeBytes(String s) throws IOException { throw new UnsupportedOperationException(); } - public void writeBytes(ByteBuffer b) throws IOException { + // a buffer that has never been put() to nor flipped + private boolean bufferIsClear(ByteBuffer b) { + return b.position() == 0 && b.limit() == b.capacity(); + } + + private ByteBuffer getCurrent() { + if (currentBufferIndex < 0 || currentBufferIndex >= bbArray.size()) return null; + return bbArray.get(currentBufferIndex); + } + + private ByteBuffer maybeSliceCurrent() { + ByteBuffer current = getCurrent(); + if (current != null) { + // a remainder slice should be meaningfully sized - at least as big as the ByteBuffer + // overhead + if (current.remaining() >= minSliceSize) { + ByteBuffer remainder = current.slice(); + return remainder; + } + } + return null; + } + + public void writeBuffer(ByteBuffer buffer) throws IOException { + writeBuffer(true, buffer); + } + + public void writeBuffer(boolean skipCleared, ByteBuffer buffer) throws IOException { + writeBuffers(skipCleared, Collections.singletonList(buffer)); + } + + public void writeBuffers(ByteBuffer... buffers) throws IOException { + writeBuffers(true, Arrays.asList(buffers)); + } + + public void writeBuffers(boolean skipCleared, ByteBuffer... buffers) throws IOException { + writeBuffers(skipCleared, Arrays.asList(buffers)); + } + + public void writeBuffers(Collection buffers) throws IOException { + writeBuffers(true, buffers); + } + + public void writeBuffers(boolean skipCleared, Collection buffers) + throws IOException { if (!isOpen) throw new IOException("Stream closed"); - b.rewind(); - while (b.hasRemaining()) { - if (currentBuffer.hasRemaining()) { - if (b.remaining() > currentBuffer.remaining()) { - // Read one byte - currentBuffer.put(b.get()); - } else { - // Read the whole buffer - currentBuffer.put(b); - } - continue; + ByteBuffer remainder = maybeSliceCurrent(); + bbArray.ensureCapacity(bbArray.size() + buffers.size() + 1 /* remainder or new buffer */); + for (ByteBuffer b : buffers) { + if (skipCleared && bufferIsClear(b)) continue; + ByteBuffer dup = b.duplicate(); + if (dup.position() == 0) { + dup.position(dup.limit()); } - getNewBuffer(); + currentBufferIndex++; + bbArray.add(dup); + swapBuffers(currentBufferIndex, bbArray.size() - 1); + totalBytes += dup.position(); } + addRemainderOrNew(remainder); } /** @@ -133,75 +295,79 @@ public void writeBytes(ByteBuffer b) throws IOException { * @param bbos stream to write bytes to * @throws IOException if the stream is not open */ - public void writeBytes(ByteBufferOutputStream bbos) throws IOException { - - if (!isOpen || !bbos.isOpen) throw new IOException("Stream closed"); - - for (int i = 0; i < bbos.bbArray.size(); i++) { - ByteBuffer data = bbos.bbArray.get(i); - data.flip(); - writeBytes(data); - } + public void writeBuffers(ByteBufferOutputStream other) throws IOException { + if (!isOpen || !other.isOpen) throw new IOException("Stream closed"); + writeBuffers(other.bbArray); } + @Override public void writeChar(int v) throws IOException { if (!isOpen) throw new IOException("Stream closed"); - if (currentBuffer.remaining() < Limits.CHAR_SIZE) getNewBuffer(); - - currentBuffer.putChar((char) v); + ensureCapacity(2); + getCurrent().putChar((char) v); + totalBytes += 2; } + @Override public void writeChars(String s) throws IOException { throw new UnsupportedOperationException(); } + @Override public void writeDouble(double v) throws IOException { if (!isOpen) throw new IOException("Stream closed"); - if (currentBuffer.remaining() < Limits.DOUBLE_SIZE) getNewBuffer(); - - currentBuffer.putDouble(v); + ensureCapacity(8); + getCurrent().putDouble(v); + totalBytes += 8; } + @Override public void writeFloat(float v) throws IOException { if (!isOpen) throw new IOException("Stream closed"); - if (currentBuffer.remaining() < Limits.FLOAT_SIZE) getNewBuffer(); - - currentBuffer.putFloat(v); + ensureCapacity(4); + getCurrent().putFloat(v); + totalBytes += 4; } + @Override public void writeInt(int v) throws IOException { if (!isOpen) throw new IOException("Stream closed"); - if (currentBuffer.remaining() < Limits.INT_SIZE) getNewBuffer(); - - currentBuffer.putInt(v); + ensureCapacity(4); + getCurrent().putInt(v); + totalBytes += 4; } + @Override public void writeLong(long v) throws IOException { if (!isOpen) throw new IOException("Stream closed"); - if (currentBuffer.remaining() < Limits.LONG_SIZE) getNewBuffer(); - - currentBuffer.putLong(v); + ensureCapacity(8); + getCurrent().putLong(v); + totalBytes += 8; } + @Override public void writeShort(int v) throws IOException { if (!isOpen) throw new IOException("Stream closed"); - if (currentBuffer.remaining() < Limits.SHORT_SIZE) getNewBuffer(); - - currentBuffer.putShort((short) v); + ensureCapacity(2); + getCurrent().putShort((short) v); + totalBytes += 2; } + @Override public void writeUTF(String str) throws IOException { - write(str.getBytes(StandardCharsets.UTF_8)); + byte[] bytes = str.getBytes(StandardCharsets.UTF_8); + write(bytes); } public void writeAscii(String str) throws IOException { - write(str.getBytes(StandardCharsets.US_ASCII)); + byte[] bytes = str.getBytes(StandardCharsets.US_ASCII); + write(bytes); } /** @@ -210,55 +376,21 @@ public void writeAscii(String str) throws IOException { * @return ByteBuffer[] previously allocated buffers flipped to the read mode. */ public ByteBuffer[] reset() { - ByteBuffer[] bbArrayCopy = new ByteBuffer[currentBufferIndex + 1]; - - for (int i = 0; i <= currentBufferIndex; ++i) { - bbArrayCopy[i] = bbArray.get(i); - bbArrayCopy[i].flip(); - } + ByteBuffer[] bbArrayCopy = peek(); bbArray.clear(); - prevBuffersNumBytes = 0; - currentBuffer = null; - currentBufferIndex = 0; - + totalBytes = 0; isOpen = false; return bbArrayCopy; } - /** - * Clears the Stream for reuse. Clears the previously stored data but leaves the stream in its - * previous state. If the stream is still open, then it can be reused to insert new data. Reuses - * previously allocated Buffers if possible. - */ - public void clear() { - if (!isOpen) return; - - for (int i = 0; i <= currentBufferIndex; i++) { - bbArray.get(i).clear(); - } - prevBuffersNumBytes = 0; - currentBuffer = bbArray.get(0); - currentBufferIndex = 0; - } - public int numByteBuffers() { - return bbArray.size(); + return currentBufferIndex + 1; } public int size() { - return (isOpen ? (prevBuffersNumBytes + currentBuffer.position()) : (0)); - } - - private void getNewBuffer() { - // Try to reuse a previously allocated buffer before allocating new. - if (currentBufferIndex < bbArray.size() - 1) { - prevBuffersNumBytes += currentBuffer.position(); - currentBuffer = bbArray.get(++currentBufferIndex); - } else { - addBuffer(); - } + return (isOpen ? (totalBytes) : (0)); } private void addBuffer() { @@ -268,10 +400,7 @@ private void addBuffer() { // allocate a buffer which is large enough to store data // of specified size private void addBuffer(int size) { - prevBuffersNumBytes += currentBuffer.position(); - int allocationSize = Math.max(size, bufSize); - currentBuffer = ByteBuffer.allocate(allocationSize); - bbArray.add(currentBuffer); - currentBufferIndex++; + ByteBuffer buf = ByteBuffer.allocate(size); + bbArray.add(buf); } } diff --git a/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/net/NettyTcpConnection.java b/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/net/NettyTcpConnection.java index 38c7d71..8e9f7f0 100644 --- a/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/net/NettyTcpConnection.java +++ b/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/net/NettyTcpConnection.java @@ -167,6 +167,13 @@ public void reset() { @Override public void write(ChannelHandlerContext ctx, ByteBuffer[] data) { + if (logger.isDebugEnabled()) { + int size = 0; + for (ByteBuffer b : data) { + size += b.remaining(); + } + logger.debug("Writing {} bytes from {} buffers", size, data.length); + } ctx.writeAndFlush(Unpooled.wrappedBuffer(data)); } } @@ -314,10 +321,10 @@ private void dump(ByteBuffer[] byteBuffers) { blackBoxIndex.write(Integer.toString(byteBuffers.length)); for (ByteBuffer b : byteBuffers) { blackBoxIndex.write(" " + b.remaining()); - while (b.hasRemaining()) { - blackBox.write(b.get()); + ByteBuffer dup = b.duplicate(); + while (dup.hasRemaining()) { + blackBox.write(dup.get()); } - b.rewind(); } blackBoxIndex.write("\n"); } catch (IOException e) { @@ -522,10 +529,7 @@ public int linger() { options = null; connectCallback = null; readCallback = null; - if (readBuffer != null) { - readBuffer.reset(); - readBuffer = null; - } + readBuffer = null; readBytesStatus = null; disconnectCallback = null; channelContext = null; @@ -604,7 +608,6 @@ public WriteStatus write(ByteBuffer[] data) { logger.error("Buffer is full"); return WriteStatus.WRITE_BUFFER_FULL; } - clientChannelAdapter.write(this.channelContext, data); return WriteStatus.SUCCESS; } @@ -747,7 +750,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { try { // Make a copy of bytes present in 'byteBuf'. This is needed // so that the lifetime of 'readBuffer' (and ByteBuffer[] - // obtained from it via 'readBuffer.reset()') can be + // obtained from it via 'readBuffer.peek()') can be // decoupled with the associated 'byteBuf'. As per netty docs, // a ByteBuf object is refcounted, and user must invoke // 'ByteBuf.release' to decrement the counter when done. Since @@ -757,15 +760,24 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { // invoke 'ByteBuf.release' right away. This copying is // unfortunate, and we will investigate ways to avoid this // copy later. - readBuffer.writeBytes(byteBuf.nioBuffer()); + // + // The ByteBuffer allocated here will receive a copy of the bytes + // from the netty ByteBuf, and then transferred into readBuffer without copying. + // this ByteBuffer might get sliced and diced, but will remain as the message + // contents for the lifetime of the message, UNLESS it has a compressed payload + // in which case it will be decompressed into a new ByteBuffer. + ByteBuffer nioBuf = ByteBuffer.allocate(byteBuf.readableBytes()); + byteBuf.readBytes(nioBuf); + readBuffer.writeBuffer(nioBuf); } catch (IOException e) { logger.error("Failed to write data: ", e); } byteBuf.release(); // Check if there are enough bytes in 'readBuffer'. - final int numNeeded = readBytesStatus.numNeeded(); + logger.debug("Read {} bytes, need at least {}", readBuffer.size(), numNeeded); + if (readBuffer.size() >= numNeeded) { // There are enough bytes. Notify client. final ByteBuffer[] byteBuffers = readBuffer.reset(); diff --git a/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/proto/AckMessageImpl.java b/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/proto/AckMessageImpl.java index c86bca4..b4c8377 100644 --- a/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/proto/AckMessageImpl.java +++ b/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/proto/AckMessageImpl.java @@ -154,9 +154,7 @@ public void streamIn(ByteBufferInputStream bbis) throws IOException { public void streamOut(ByteBufferOutputStream bbos) throws IOException { bbos.writeInt(statusAndCorrelationId); - for (int i = 0; i < MessageGUID.SIZE_BINARY; i++) { - bbos.writeByte(messageGUID[i]); - } + bbos.write(messageGUID); bbos.writeInt(queueId); } diff --git a/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/proto/ApplicationData.java b/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/proto/ApplicationData.java index b4e2815..515030e 100644 --- a/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/proto/ApplicationData.java +++ b/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/proto/ApplicationData.java @@ -19,7 +19,6 @@ import com.bloomberg.bmq.impl.infr.io.ByteBufferOutputStream; import com.bloomberg.bmq.impl.infr.util.Compression; import com.bloomberg.bmq.impl.infr.util.PrintUtil; -import java.io.ByteArrayOutputStream; import java.io.DataInput; import java.io.DataInputStream; import java.io.DataOutputStream; @@ -34,8 +33,10 @@ public class ApplicationData { static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private static final ThreadLocal decompressionBuffer = + ThreadLocal.withInitial(() -> new byte[1024]); - private byte[] payload; + private ByteBufferOutputStream payload; private MessagePropertiesImpl properties; private CompressionAlgorithmType compressionType = CompressionAlgorithmType.E_NONE; @@ -45,6 +46,9 @@ public class ApplicationData { private boolean isOldStyleProperties = false; private boolean arePropertiesCompressed; + private ByteBufferOutputStream outputBuffer; + private long crc; + private void resetCompressedData() { compressionType = CompressionAlgorithmType.E_NONE; compressedData = null; @@ -52,23 +56,16 @@ private void resetCompressedData() { } public final void setPayload(ByteBuffer... data) throws IOException { - try (ByteBufferInputStream bbis = new ByteBufferInputStream(data)) { - bbis.reset(); - payload = new byte[bbis.available()]; - - final int numRead = bbis.read(payload); - if (numRead != payload.length) { - throw new RuntimeException( - "Unexpected error in ApplicationData::setPayload: " - + " expected to read " - + payload.length - + " bytes, but read " - + numRead - + " bytes."); - } + setPayload(true, data); + } - resetCompressedData(); + public final void setPayload(boolean skipCleared, ByteBuffer... data) throws IOException { + if (data == null) { + throw new IllegalArgumentException("'buffer array' must be non-null"); } + payload = new ByteBufferOutputStream(); + payload.writeBuffers(skipCleared, data); + resetCompressedData(); } public final void setProperties(MessagePropertiesImpl props) { @@ -86,23 +83,17 @@ public boolean isOldStyleProperties() { } public ByteBuffer[] applicationData() throws IOException { - // TODO: used only to calculate CRC32. Can we avoid creating a copy? - - try (ByteBufferOutputStream bbos = new ByteBufferOutputStream()) { - streamOut(bbos, false); - return bbos.reset(); - } + // used only in tests + serializeToBuffer(); + return outputBuffer.peek(); } public ByteBuffer[] payload() throws IOException { decompressData(); - - try (ByteBufferOutputStream bbos = new ByteBufferOutputStream()) { - if (payload != null) { - bbos.write(payload); - } - return bbos.reset(); + if (payload == null) { + return new ByteBuffer[0]; } + return payload.peek(); } public MessagePropertiesImpl properties() { @@ -122,7 +113,7 @@ public boolean isCompressed() { } public int payloadSize() { - return payload == null ? 0 : payload.length; + return payload == null ? 0 : payload.size(); } boolean hasProperties() { @@ -212,12 +203,9 @@ public void streamIn( } } else { // or buffer compressed data try (ByteBufferOutputStream bbos = new ByteBufferOutputStream(size)) { - byte[] buffer = new byte[size]; - - if (bbis.read(buffer) != size) { + if (bbis.read(bbos, size) != size) { throw new IOException("failed to read compressed payload into buffer"); } - bbos.write(buffer); compressedData = bbos; this.compressionType = compressionType; @@ -241,7 +229,7 @@ private void decompressData() throws IOException { } logger.debug("Decompressing application data with algorithm={}", compressionType); - ByteBuffer[] data = compressedData.reset(); + ByteBuffer[] data = compressedData.peek(); ByteBufferInputStream bbis = new ByteBufferInputStream(data); InputStream decompressedStream = compressionType.getCompression().decompress(bbis); @@ -254,7 +242,7 @@ private void decompressData() throws IOException { } // Stream in payload - streamInPayload(inputStream); + streamInCompressedPayload(inputStream); // Check if all data has been read if (bbis.available() > 0) { @@ -284,9 +272,11 @@ private int streamInPropertiesOld(T input) } private int streamInPayload(int size, ByteBufferInputStream bbis) throws IOException { - payload = new byte[size]; - - int read = bbis.read(payload); + payload = new ByteBufferOutputStream(size); + int read = bbis.read(payload, size); + if (payload.size() != size) { + throw new IOException("payload size doesn't match: " + payload.size() + " != " + size); + } if (read != size) { throw new IOException("Failed to read payload from input stream"); } @@ -294,21 +284,15 @@ private int streamInPayload(int size, ByteBufferInputStream bbis) throws IOExcep return read; } - private void streamInPayload(InputStream input) throws IOException { - // When compressed, payload size is unknown. - // So we need to use ByteArrayOutputStream to accumulate all data - // and after that get array of bytes - - ByteArrayOutputStream baos = new ByteArrayOutputStream(); + private void streamInCompressedPayload(InputStream input) throws IOException { + payload = new ByteBufferOutputStream(); - byte[] buf = new byte[1024]; + byte[] buf = decompressionBuffer.get(); int read; while ((read = input.read(buf)) > 0) { - baos.write(buf, 0, read); + payload.write(buf, 0, read); } - - payload = baos.toByteArray(); } public void compressData(CompressionAlgorithmType compressionType) throws IOException { @@ -320,7 +304,7 @@ public void compressData(CompressionAlgorithmType compressionType) throws IOExce logger.debug("Compressing application data with algorithm={}", compressionType); - ByteBufferOutputStream bbos = new ByteBufferOutputStream(); + ByteBufferOutputStream bbos = ByteBufferOutputStream.smallBlocks(); Compression compression = compressionType.getCompression(); // We need to close compressed stream in order to flush all compressed bytes @@ -342,7 +326,9 @@ public void compressData(CompressionAlgorithmType compressionType) throws IOExce } if (payload != null) { - compressedOutput.write(payload); + for (ByteBuffer b : payload.peek()) { + compressedOutput.write(b.array(), b.arrayOffset(), b.remaining()); + } } } @@ -351,46 +337,44 @@ public void compressData(CompressionAlgorithmType compressionType) throws IOExce arePropertiesCompressed = hasProperties() && isOldStyleProperties; } + public long calculateCrc32c() throws IOException { + serializeToBuffer(); + return crc; + } + public void streamOut(ByteBufferOutputStream bbos) throws IOException { - streamOut(bbos, true); + streamOut(bbos, true /* addPadding */); } - private void streamOut(ByteBufferOutputStream bbos, boolean addPadding) throws IOException { - int startPosition = bbos.size(); + private void serializeToBuffer() throws IOException { + if (outputBuffer != null) { + return; + } + outputBuffer = ByteBufferOutputStream.smallBlocks(); // Stream out properties if they are not compressed (no compression or // new style properties). if (hasProperties() && !arePropertiesCompressed) { - if (isOldStyleProperties) { - properties.streamOutOld(bbos); - } else { - properties.streamOut(bbos); - } + properties.streamOut(outputBuffer, isOldStyleProperties); } if (compressionType == CompressionAlgorithmType.E_NONE) { if (payload != null) { - bbos.write(payload); + outputBuffer.writeBuffers(payload.peekUnflipped()); } } else { - bbos.writeBytes(compressedData); + outputBuffer.writeBuffers(compressedData); } + // calculate crc32c without padding + crc = Crc32c.calculate(outputBuffer.peek()); + } - int endPosition = bbos.size(); - + private void streamOut(ByteBufferOutputStream bbos, boolean addPadding) throws IOException { + serializeToBuffer(); + bbos.writeBuffers(outputBuffer); if (addPadding) { - final int length = endPosition - startPosition; - - if (length != unpackedSize()) { - throw new IOException( - "Invalid output stream length: " - + length - + ", expected: " - + unpackedSize()); - } - final int numPaddingBytes = ProtocolUtil.calculatePadding(length); - - bbos.write(ProtocolUtil.getPaddingBytes(numPaddingBytes), 0, numPaddingBytes); + final int paddingSize = ProtocolUtil.calculatePadding(outputBuffer.size()); + bbos.write(ProtocolUtil.getPaddingBytes(paddingSize), 0, paddingSize); } } @@ -404,7 +388,7 @@ public String toString() { sb.append("[ Payload ["); if (payload != null) { sb.append("\""); - PrintUtil.hexAppend(sb, payload); + PrintUtil.hexAppend(sb, payload.peek()); sb.append("\" ]"); } else { sb.append(" EMPTY ]"); diff --git a/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/proto/EventBuilder.java b/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/proto/EventBuilder.java index c7eb692..d3ef709 100644 --- a/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/proto/EventBuilder.java +++ b/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/proto/EventBuilder.java @@ -33,7 +33,7 @@ public void reset(EventType type) { eventHeader = new EventHeader(); eventHeader.setType(type); msgCount = 0; - bbos = new ByteBufferOutputStream(); + bbos = ByteBufferOutputStream.smallBlocks(); } public EventHeader header() { @@ -41,32 +41,30 @@ public EventHeader header() { } public ByteBuffer[] build() { - if (0 == bbos.numByteBuffers()) { + if (0 == bbos.size()) { throw new IllegalStateException("Nothing to build."); } int payloadLen = bbos.size(); - ByteBuffer[] payload = bbos.reset(); + ByteBuffer[] payload = bbos.peekUnflipped(); eventHeader.setLength(EventHeader.HEADER_SIZE + payloadLen); - ByteBufferOutputStream headerStream = new ByteBufferOutputStream(); + ByteBufferOutputStream stream = ByteBufferOutputStream.smallBlocks(); try { - eventHeader.streamOut(headerStream); + eventHeader.streamOut(stream); } catch (IOException ex) { // Should never happen throw new IllegalStateException(ex); } + try { + stream.writeBuffers(payload); + } catch (IOException e) { + // Should never happen + throw new RuntimeException(e); + } - ByteBuffer[] headerBuffers = headerStream.reset(); - int numBuffers = headerBuffers.length + payload.length; - - ByteBuffer[] outputBuffers = new ByteBuffer[numBuffers]; - System.arraycopy(headerBuffers, 0, outputBuffers, 0, headerBuffers.length); - - System.arraycopy(payload, 0, outputBuffers, headerBuffers.length, payload.length); - - return outputBuffers; + return stream.peek(); } public int messageCount() { diff --git a/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/proto/MessageIterator.java b/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/proto/MessageIterator.java index 1a8140c..667cdd1 100644 --- a/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/proto/MessageIterator.java +++ b/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/proto/MessageIterator.java @@ -60,7 +60,7 @@ protected T fetchNextMessage(T message) { return null; } } catch (IOException e) { - logger.debug("Fails to decode Message: ", e); + logger.warn("Fails to decode Message: ", e); return null; } } diff --git a/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/proto/MessagePropertiesImpl.java b/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/proto/MessagePropertiesImpl.java index bc5f634..41240f0 100644 --- a/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/proto/MessagePropertiesImpl.java +++ b/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/proto/MessagePropertiesImpl.java @@ -396,7 +396,7 @@ public void streamOut(DataOutput output) throws IOException { } // TODO: remove boolean after 2nd rollout of "new style" brokers - private void streamOut(DataOutput output, boolean isOldStyleProperties) throws IOException { + void streamOut(DataOutput output, boolean isOldStyleProperties) throws IOException { final int numProps = propertyMap.size(); if (numProps == 0) { logger.info("No message properties to stream out"); diff --git a/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/proto/PushMessageImpl.java b/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/proto/PushMessageImpl.java index 26c4dc4..2c1b8fa 100644 --- a/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/proto/PushMessageImpl.java +++ b/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/proto/PushMessageImpl.java @@ -126,7 +126,7 @@ public void streamIn(ByteBufferInputStream bbis) throws IOException { int dataSize = appDataSize - optionSize; logger.debug( - "Total: {}, header: {}, appData: {}, option: {}, data: {}", + "PushMessage Total: {}, header: {}, appData: {}, option: {}, data: {}", totalSize, headerSize, appDataSize, diff --git a/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/proto/PutMessageImpl.java b/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/proto/PutMessageImpl.java index 07c2867..64e7b22 100644 --- a/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/proto/PutMessageImpl.java +++ b/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/proto/PutMessageImpl.java @@ -24,7 +24,6 @@ import com.bloomberg.bmq.impl.infr.proto.intf.Streamable; import java.io.IOException; import java.lang.invoke.MethodHandles; -import java.nio.ByteBuffer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -115,10 +114,7 @@ public void compressData() throws IOException { } public void calculateAndSetCrc32c() throws IOException { - // Calculate CRC32c - ByteBuffer[] bb = appData.applicationData(); - final long CRC32C = Crc32c.calculate(bb); - header.setCrc32c(CRC32C); + header.setCrc32c(appData.calculateCrc32c()); } public long crc32c() { diff --git a/bmq-sdk/src/test/docker/config/domains.json b/bmq-sdk/src/test/docker/config/domains.json new file mode 100644 index 0000000..f8f468e --- /dev/null +++ b/bmq-sdk/src/test/docker/config/domains.json @@ -0,0 +1,11 @@ +{ + "bmq.test.mem.fanout": { + "mode": "fanout", + "limit.messages": 100000, + "fanout.appids": ["foo", "bar", "baz"] + }, + "bmq.test.mem.priority": { + "mode": "priority", + "limit.messages": 100000 + } +} diff --git a/bmq-sdk/src/test/java/com/bloomberg/bmq/benchmark/SessionBenchmark.java b/bmq-sdk/src/test/java/com/bloomberg/bmq/benchmark/SessionBenchmark.java index c844e46..7ab60d1 100644 --- a/bmq-sdk/src/test/java/com/bloomberg/bmq/benchmark/SessionBenchmark.java +++ b/bmq-sdk/src/test/java/com/bloomberg/bmq/benchmark/SessionBenchmark.java @@ -118,7 +118,7 @@ @BenchmarkMode(Mode.Throughput) @Fork(value = 1, warmups = 1) @Warmup(iterations = 1, time = 1, timeUnit = TimeUnit.MINUTES) -@Measurement(iterations = 30, time = 1, timeUnit = TimeUnit.MINUTES) +@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.MINUTES) @Timeout(time = 2, timeUnit = TimeUnit.MINUTES) public class SessionBenchmark { @@ -152,13 +152,13 @@ public TestSession(Consumer pushConsumer, Consumer ackC @Override public void handleSessionEvent(SessionEvent event) { // Uncomment to see additional information in log - // logger.info("handleSessionEvent: {}", event); + logger.info("handleSessionEvent: {}", event); } @Override public void handleQueueEvent(QueueControlEvent event) { // Uncomment to see additional information in log - // logger.info("handleQueueEvent: {}", event); + logger.info("handleQueueEvent: {}", event); } @Override @@ -289,8 +289,10 @@ public void closeQueue() { public void post(int size, CompressionAlgorithm compression) { counter++; - ByteBuffer data = ByteBuffer.allocate(size); + ByteBuffer data = ByteBuffer.allocate(size + 1); + data.putLong(counter); data.putLong(size / 2, counter); + data.position(size); PutMessage msg = queue.createPutMessage(data); msg.setCorrelationId(); @@ -317,9 +319,9 @@ private static Uri createUniqueUri() { private static final int SIZE_5_MIB = 5 * 1024 * 1024; private static final int SIZE_60_MIB = 60 * 1024 * 1024; - private static final int PUSH_PROCESS_TIMEOUT = 100; // ms + private static final int PUSH_PROCESS_TIMEOUT = 1; // seconds - private final Semaphore batchSema = new Semaphore(0); + private Semaphore batchSema; private final TestSession session; private Blackhole blackhole; @@ -332,6 +334,7 @@ public SessionBenchmark() { // "Setup" the state object before each benchmark @Setup(Level.Trial) public void setupTrial(final Blackhole bh) throws IOException { + batchSema = new Semaphore(0); session.startBroker(); session.startSession(); session.openReaderWriterAckQueue(); @@ -463,7 +466,6 @@ private void sendReceiveBatch( // Post PUT messages for (int i = 0; i < batchSize; i++) { session.post(rawMsgSize, compression); - TestTools.sleepForMilliSeconds(10); } logger.info("Sent {} PUT messages", batchSize); @@ -487,7 +489,7 @@ private void sendReceiveBatch( private void confirm(PushMessage msg) { msg.confirm(); // Uncomment to see additional information in log - // logger.info("Confirm status: {}", confirmResult); + // logger.info("Confirm status: {}", msg); blackhole.consume(msg.payload()); } @@ -504,6 +506,10 @@ private void onPushMessage(PushMessage msg) { // Notify 'post' method batchSema.release(); + logger.debug( + "Releaseing sema, now {} permits and {} threads waiting", + batchSema.availablePermits(), + batchSema.getQueueLength()); } private void onAckMessage(AckMessage msg) { diff --git a/bmq-sdk/src/test/java/com/bloomberg/bmq/impl/PutPosterTest.java b/bmq-sdk/src/test/java/com/bloomberg/bmq/impl/PutPosterTest.java index 363102b..9650eaa 100644 --- a/bmq-sdk/src/test/java/com/bloomberg/bmq/impl/PutPosterTest.java +++ b/bmq-sdk/src/test/java/com/bloomberg/bmq/impl/PutPosterTest.java @@ -40,9 +40,11 @@ import com.bloomberg.bmq.impl.infr.stat.EventsStats; import com.bloomberg.bmq.impl.infr.stat.EventsStatsTest; import com.bloomberg.bmq.impl.intf.BrokerConnection; +import com.bloomberg.bmq.util.TestHelpers; import java.io.IOException; import java.lang.invoke.MethodHandles; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -92,7 +94,7 @@ public void testPostEmptyArray() { // NULL array try { - PutMessageImpl[] m = null; + List m = null; poster.post(m); fail(); // Should not get here } catch (IllegalArgumentException e) { @@ -133,7 +135,7 @@ public void testPostFailed() throws IOException { // Payload too big msg = new PutMessageImpl(); - msg.appData().setPayload(ByteBuffer.allocate(PutHeader.MAX_PAYLOAD_SIZE_SOFT + 1)); + msg.appData().setPayload(TestHelpers.filledBuffer(PutHeader.MAX_PAYLOAD_SIZE_SOFT + 1)); // Set compression to none in order to get too big payload msg.setCompressionType(CompressionAlgorithmType.E_NONE); @@ -147,7 +149,7 @@ public void testPostFailed() throws IOException { // Missing correlation ID msg = new PutMessageImpl(); - msg.appData().setPayload(ByteBuffer.allocate(10)); + msg.appData().setPayload(TestHelpers.filledBuffer(10)); msg.setFlags(PutHeaderFlags.setFlag(0, PutHeaderFlags.ACK_REQUESTED)); try { @@ -174,29 +176,29 @@ public void testPostValidMessages() throws IOException { props.setPropertyAsBinary("data", new byte[] {1, 2, 3, 4, 5}); PutMessageImpl bigMsg1 = new PutMessageImpl(); - bigMsg1.appData().setPayload(ByteBuffer.allocate(PutHeader.MAX_PAYLOAD_SIZE_SOFT)); + bigMsg1.appData().setPayload(TestHelpers.filledBuffer(PutHeader.MAX_PAYLOAD_SIZE_SOFT)); bigMsg1.setCompressionType(CompressionAlgorithmType.E_NONE); PutMessageImpl smallMsg1 = new PutMessageImpl(); - smallMsg1.appData().setPayload(ByteBuffer.allocate(10000)); + smallMsg1.appData().setPayload(TestHelpers.filledBuffer(10000)); smallMsg1.appData().setProperties(props); PutMessageImpl bigMsg2 = new PutMessageImpl(); - bigMsg2.appData().setPayload(ByteBuffer.allocate(PutHeader.MAX_PAYLOAD_SIZE_SOFT)); + bigMsg2.appData().setPayload(TestHelpers.filledBuffer(PutHeader.MAX_PAYLOAD_SIZE_SOFT)); bigMsg2.setCompressionType(CompressionAlgorithmType.E_NONE); PutMessageImpl smallMsg2 = new PutMessageImpl(); smallMsg2.appData().setProperties(props); - smallMsg2.appData().setPayload(ByteBuffer.allocate(10001)); + smallMsg2.appData().setPayload(TestHelpers.filledBuffer(10001)); PutMessageImpl compressedMsg = new PutMessageImpl(); compressedMsg .appData() - .setPayload(ByteBuffer.allocate(PutHeader.MAX_PAYLOAD_SIZE_SOFT)); + .setPayload(TestHelpers.filledBuffer(PutHeader.MAX_PAYLOAD_SIZE_SOFT)); compressedMsg.appData().setProperties(props); compressedMsg.setCompressionType(CompressionAlgorithmType.E_ZLIB); - poster.post(bigMsg1, smallMsg1, bigMsg2, smallMsg2, compressedMsg); + poster.post(Arrays.asList(bigMsg1, smallMsg1, bigMsg2, smallMsg2, compressedMsg)); assertEquals(isOldStyleProperties, bigMsg1.appData().isOldStyleProperties()); assertEquals(isOldStyleProperties, smallMsg1.appData().isOldStyleProperties()); @@ -284,7 +286,7 @@ public void testPostNoInfiniteLoop() // Create a msg with payload = max event size PutMessageImpl msg1 = new PutMessageImpl(); - msg1.appData().setPayload(ByteBuffer.allocate(MAX_EVENT_SIZE)); + msg1.appData().setPayload(TestHelpers.filledBuffer(MAX_EVENT_SIZE)); // Post async ExecutorService es = Executors.newSingleThreadExecutor(); @@ -305,7 +307,8 @@ public void testPostNoInfiniteLoop() // Post a msg with payload = max payload size PutMessageImpl msg2 = new PutMessageImpl(); msg2.appData() - .setPayload(ByteBuffer.allocate(MAX_EVENT_SIZE - PutHeader.HEADER_SIZE - 4)); + .setPayload( + TestHelpers.filledBuffer(MAX_EVENT_SIZE - PutHeader.HEADER_SIZE - 4)); poster.post(msg2); } @@ -348,7 +351,7 @@ public void testRegisterAck() throws Exception { CorrelationIdImpl cId = CorrelationIdImpl.nextId(userData); PutMessageImpl msg = new PutMessageImpl(); - msg.appData().setPayload(ByteBuffer.allocate(10)); + msg.appData().setPayload(TestHelpers.filledBuffer(10)); msg.setupCorrelationId(cId); poster.post(msg); diff --git a/bmq-sdk/src/test/java/com/bloomberg/bmq/impl/infr/io/ByteBufferOutputStreamTest.java b/bmq-sdk/src/test/java/com/bloomberg/bmq/impl/infr/io/ByteBufferOutputStreamTest.java new file mode 100644 index 0000000..477019e --- /dev/null +++ b/bmq-sdk/src/test/java/com/bloomberg/bmq/impl/infr/io/ByteBufferOutputStreamTest.java @@ -0,0 +1,205 @@ +/* + * Copyright 2022 Bloomberg Finance L.P. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.bloomberg.bmq.impl.infr.io; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.nio.ByteBuffer; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ByteBufferOutputStreamTest { + + static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private byte[] readAsBytes(ByteBufferOutputStream bbos) throws IOException { + int total = bbos.size(); + byte[] actual = new byte[total]; + int pos = 0; + ByteBufferInputStream bbis = new ByteBufferInputStream(bbos.peek()); + while (pos < total) { + int qty = bbis.read(actual, pos, total - pos); + if (qty == 0) { + break; + } + pos += qty; + } + return actual; + } + + @Test + public void testFresh() { + ByteBufferOutputStream bbos = new ByteBufferOutputStream(); + assertEquals(0, bbos.size()); + ByteBuffer[] buffers = bbos.peek(); + assertEquals(0, buffers.length); + } + + @Test + public void testWriteEmptyBuffer() throws IOException { + ByteBuffer buf = ByteBuffer.allocate(0); + ByteBufferOutputStream bbos = new ByteBufferOutputStream(); + bbos.writeBuffer(buf); + assertEquals(0, bbos.size()); + + byte[] actual = readAsBytes(bbos); + assertEquals(0, actual.length); + } + + @Test + public void testWriteBuffers() throws IOException { + String payload1 = "The Quick Brown Fox jumps over the Lazy Dog"; + String payload2 = "All work and no play makes Jack a dull boy"; + String payload3 = "Lorem ipsum dolor sit amet, consectetur adipiscing elit"; + String payload4 = "Once upon a midnight dreary, while I pondered weak and weary"; + + String expected = payload1 + payload2 + payload3 + payload4; + ByteBuffer[] bufs = + new ByteBuffer[] { + ByteBuffer.allocate(100), + ByteBuffer.allocate(200), + ByteBuffer.allocate(100), + ByteBuffer.allocate(100) + }; + bufs[0].put(payload1.getBytes()); + bufs[1].put(payload2.getBytes()).put(payload3.getBytes()); + bufs[3].put(payload4.getBytes()); + + ByteBufferOutputStream bbos = new ByteBufferOutputStream(); + bbos.writeBuffers(bufs); + assertEquals("bytes written", expected.getBytes().length, bbos.size()); + + String actual = new String(readAsBytes(bbos)); + assertEquals("contents", expected, actual); + } + + @Test + public void testWritePrimitives() throws IOException { + ByteBufferOutputStream bbos = new ByteBufferOutputStream(); + bbos.write(42); + bbos.writeShort(1000); + bbos.writeInt(31337); + bbos.writeLong(1234567890L); + bbos.writeChar('Z'); + bbos.writeFloat(0.0001F); + bbos.writeDouble(1234.5678); + + assertEquals("bytes written", 1 + 2 + 4 + 8 + 2 + 4 + 8, bbos.size()); + ByteBufferInputStream bbis = new ByteBufferInputStream(bbos.peek()); + + assertEquals("byte value", 42, bbis.readByte()); + assertEquals("short value", 1000, bbis.readShort()); + assertEquals("int value", 31337, bbis.readInt()); + assertEquals("long value", 1234567890L, bbis.readLong()); + assertEquals("char value", 'Z', bbis.readChar()); + assertEquals("float value", 0.0001F, bbis.readFloat(), 0.000001); + assertEquals("double value", 1234.5678, bbis.readDouble(), 0.000001); + assertEquals("bytes remaining", 0, bbis.available()); + } + + @Test + public void testWritePrimitivesAndBuffers() throws IOException { + String payload1 = "The Quick Brown Fox jumps over the Lazy Dog"; + String payload2 = "All work and no play makes Jack a dull boy"; + String payload3 = "Lorem ipsum dolor sit amet, consectetur adipiscing elit"; + ByteBuffer[] bufs = new ByteBuffer[] {ByteBuffer.allocate(100), ByteBuffer.allocate(200)}; + bufs[0].put(payload1.getBytes()); + bufs[1].put(payload2.getBytes()).put(payload3.getBytes()); + + ByteBufferOutputStream bbos = new ByteBufferOutputStream(); + bbos.write(42); + bbos.writeShort(1000); + bbos.writeInt(31337); + bbos.writeLong(1234567890L); + bbos.writeBuffers(bufs); + bbos.writeChar('Z'); + bbos.writeFloat(0.0001F); + bbos.writeDouble(1234.5678); + + assertEquals( + "bytes written", + 1 + + 2 + + 4 + + 8 + + payload1.length() + + payload2.length() + + payload3.length() + + 2 + + 4 + + 8, + bbos.size()); + + ByteBufferInputStream bbis = new ByteBufferInputStream(bbos.peek()); + + assertEquals("byte value", 42, bbis.readByte()); + assertEquals("short value", 1000, bbis.readShort()); + assertEquals("int value", 31337, bbis.readInt()); + assertEquals("long value", 1234567890L, bbis.readLong()); + + byte[] actual1 = new byte[payload1.length()]; + byte[] actual2 = new byte[payload2.length()]; + byte[] actual3 = new byte[payload3.length()]; + assertEquals("payload 1 length", payload1.length(), bbis.read(actual1)); + assertEquals("payload 1 contents", payload1, new String(actual1)); + assertEquals("payload 2 length", payload2.length(), bbis.read(actual2)); + assertEquals("payload 2 contents", payload2, new String(actual2)); + assertEquals("payload 3 length", payload3.length(), bbis.read(actual3)); + assertEquals("payload 3 contents", payload3, new String(actual3)); + + assertEquals("char value", 'Z', bbis.readChar()); + assertEquals("float value", 0.0001F, bbis.readFloat(), 0.000001); + assertEquals("double value", 1234.5678, bbis.readDouble(), 0.000001); + assertEquals("bytes remaining", 0, bbis.available()); + } + + @Test + public void testWriteTooMuchForOneBuffer() throws IOException { + String payload = "Lorem ipsum dolor sit amet, consectetur adipiscing elit."; + + // make small buffers + ByteBufferOutputStream bbos = new ByteBufferOutputStream(32, 8); + bbos.write(42); + bbos.writeShort(1000); + bbos.writeInt(31337); + bbos.writeLong(1234567890L); + bbos.write(payload.getBytes()); + bbos.writeChar('Z'); + bbos.writeFloat(0.0001F); + bbos.writeDouble(1234.5678); + + assertEquals("bytes written", 1 + 2 + 4 + 8 + payload.length() + 2 + 4 + 8, bbos.size()); + + ByteBufferInputStream bbis = new ByteBufferInputStream(bbos.peek()); + + assertEquals("byte value", 42, bbis.readByte()); + assertEquals("short value", 1000, bbis.readShort()); + assertEquals("int value", 31337, bbis.readInt()); + assertEquals("long value", 1234567890L, bbis.readLong()); + + byte[] actual = new byte[payload.length()]; + assertEquals("payload length", payload.length(), bbis.read(actual)); + assertEquals("payload contents", payload, new String(actual)); + + assertEquals("char value", 'Z', bbis.readChar()); + assertEquals("float value", 0.0001F, bbis.readFloat(), 0.000001); + assertEquals("double value", 1234.5678, bbis.readDouble(), 0.000001); + assertEquals("bytes remaining", 0, bbis.available()); + } +} diff --git a/bmq-sdk/src/test/java/com/bloomberg/bmq/impl/infr/net/NettyTcpConnectionTest.java b/bmq-sdk/src/test/java/com/bloomberg/bmq/impl/infr/net/NettyTcpConnectionTest.java index d8e1341..38083a6 100644 --- a/bmq-sdk/src/test/java/com/bloomberg/bmq/impl/infr/net/NettyTcpConnectionTest.java +++ b/bmq-sdk/src/test/java/com/bloomberg/bmq/impl/infr/net/NettyTcpConnectionTest.java @@ -15,6 +15,7 @@ */ package com.bloomberg.bmq.impl.infr.net; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; @@ -363,8 +364,8 @@ public void readWriteTest(Boolean isAsync) { logger.info("Copied : {}", copiedStr); logger.info("Raw : {}", rawStr); - assertEquals(expectedStr, copiedStr); - assertEquals(expectedStr, rawStr); + assertArrayEquals("copied is expected", expectedStr.getBytes(), copiedStr.getBytes()); + assertArrayEquals("raw is expected", expectedStr.getBytes(), rawStr.getBytes()); } // 4) Invoke 'disconnect' and ensure that it succeeds. diff --git a/bmq-sdk/src/test/java/com/bloomberg/bmq/impl/infr/proto/AckEventImplBuilderTest.java b/bmq-sdk/src/test/java/com/bloomberg/bmq/impl/infr/proto/AckEventImplBuilderTest.java index ba7f350..d1f14b2 100644 --- a/bmq-sdk/src/test/java/com/bloomberg/bmq/impl/infr/proto/AckEventImplBuilderTest.java +++ b/bmq-sdk/src/test/java/com/bloomberg/bmq/impl/infr/proto/AckEventImplBuilderTest.java @@ -74,8 +74,7 @@ public void testBuildAckMessage() throws IOException { builder.packMessage(m); } - ByteBuffer[] message; - message = builder.build(); + ByteBuffer[] message = builder.build(); TestHelpers.compareWithFileContent(message, MessagesTestSamples.ACK_MSG); } diff --git a/bmq-sdk/src/test/java/com/bloomberg/bmq/impl/infr/proto/ApplicationDataTest.java b/bmq-sdk/src/test/java/com/bloomberg/bmq/impl/infr/proto/ApplicationDataTest.java index e40aa1f..cf74326 100644 --- a/bmq-sdk/src/test/java/com/bloomberg/bmq/impl/infr/proto/ApplicationDataTest.java +++ b/bmq-sdk/src/test/java/com/bloomberg/bmq/impl/infr/proto/ApplicationDataTest.java @@ -26,6 +26,7 @@ import com.bloomberg.bmq.impl.infr.io.ByteBufferInputStream; import com.bloomberg.bmq.impl.infr.io.ByteBufferOutputStream; +import com.bloomberg.bmq.util.TestHelpers; import java.io.DataOutputStream; import java.io.IOException; import java.io.OutputStream; @@ -107,14 +108,14 @@ public void testStreamInInvalidCompression() throws IOException { ByteBufferOutputStream bbos = new ByteBufferOutputStream(); ByteBuffer[] payload = generatePayload(1024); - for (ByteBuffer b : duplicate(payload)) { - bbos.writeBytes(b); + for (ByteBuffer b : payload) { + bbos.writeBuffer(b); } int numPaddingBytes = ProtocolUtil.calculatePadding(bbos.size()); bbos.write(ProtocolUtil.getPaddingBytes(numPaddingBytes), 0, numPaddingBytes); - ByteBufferInputStream bbis = new ByteBufferInputStream(bbos.reset()); + ByteBufferInputStream bbis = new ByteBufferInputStream(bbos.peek()); ApplicationData data = new ApplicationData(); @@ -123,7 +124,6 @@ public void testStreamInInvalidCompression() throws IOException { // "Compressed" data should be buffered assertTrue(data.isCompressed()); - assertArrayEquals(payload, data.applicationData()); // Try to decompress payload - fail try { @@ -135,13 +135,17 @@ public void testStreamInInvalidCompression() throws IOException { } public ByteBuffer[] generatePayload(int size) throws IOException { + return generatePayload(size, true); + } + + public ByteBuffer[] generatePayload(int size, boolean flipped) throws IOException { ByteBufferOutputStream bbos = new ByteBufferOutputStream(); for (int i = 0; i < size; i++) { bbos.writeByte(i % 10); } - return bbos.reset(); + return flipped ? bbos.peek() : bbos.peekUnflipped(); } public MessagePropertiesImpl generateProps() { @@ -174,7 +178,9 @@ private ByteBuffer[] generateOutput( if (payload != null) { for (ByteBuffer b : payload) { - bbos.writeBytes(b); + ByteBuffer dup = b.duplicate(); + dup.position(dup.limit()); + bbos.writeBuffer(dup); } } } else { @@ -246,8 +252,6 @@ public void verifyStreamOut( if (payload != null) { appData.setPayload(duplicate(payload)); verifyPayload(payload, appData.payload()); - - assertEquals(getSize(payload), appData.payloadSize()); } else { try { appData.setPayload(payload); @@ -278,7 +282,7 @@ public void verifyStreamOut( // Check streamOut output appData.streamOut(bbos); - assertArrayEquals(expected, bbos.reset()); + verifyPayload(expected, bbos.peek()); // Skip padding bytes int lastBufferSize = expected[expected.length - 1].limit(); @@ -288,7 +292,7 @@ public void verifyStreamOut( expected[expected.length - 1].limit(lastBufferNoPaddingSize); // Check applicationData() output - assertArrayEquals(expected, appData.applicationData()); + verifyPayload(expected, appData.applicationData()); // Revert limit expected[expected.length - 1].limit(lastBufferSize); @@ -355,7 +359,7 @@ public void verifyStreamIn( assertTrue(appData.isCompressed()); } - // Check payload + // Check payload has been populated verifyPayload(payload, appData.payload()); // `isCompressed()` should be reset @@ -364,7 +368,7 @@ public void verifyStreamIn( // Double check. Stream out data and compare to original input appData = new ApplicationData(); if (payload != null) { - appData.setPayload(duplicate(payload)); + appData.setPayload(payload); } if (props != null) { appData.setProperties(props); @@ -381,15 +385,20 @@ public void verifyStreamIn( verifyCompressionRatio(appData); // Check the data - assertArrayEquals(data, bbos.reset()); + assertArrayEquals( + TestHelpers.buffersContents(data), TestHelpers.buffersContents(bbos.reset())); } - private void verifyPayload(ByteBuffer[] expected, ByteBuffer[] actual) { + private void verifyPayload(ByteBuffer[] expected, ByteBuffer[] actual) throws IOException { if (expected == null) { expected = new ByteBufferOutputStream().reset(); } + if (actual == null) { + actual = new ByteBufferOutputStream().reset(); + } - assertArrayEquals(expected, actual); + assertArrayEquals( + TestHelpers.buffersContents(expected), TestHelpers.buffersContents(actual)); } private void verifyProperties(MessagePropertiesImpl expected, MessagePropertiesImpl actual) { diff --git a/bmq-sdk/src/test/java/com/bloomberg/bmq/impl/infr/proto/PushEventImplBuilderTest.java b/bmq-sdk/src/test/java/com/bloomberg/bmq/impl/infr/proto/PushEventImplBuilderTest.java index 2bdc517..fd26aef 100644 --- a/bmq-sdk/src/test/java/com/bloomberg/bmq/impl/infr/proto/PushEventImplBuilderTest.java +++ b/bmq-sdk/src/test/java/com/bloomberg/bmq/impl/infr/proto/PushEventImplBuilderTest.java @@ -46,8 +46,8 @@ public void testPackPushMessage() throws IOException { pushMsg.reset(); - ByteBuffer buffer = ByteBuffer.allocate(PushHeader.MAX_PAYLOAD_SIZE_SOFT + 1); - pushMsg.appData().setPayload(buffer); + pushMsg.appData() + .setPayload(TestHelpers.filledBuffer(PushHeader.MAX_PAYLOAD_SIZE_SOFT + 1)); res = builder.packMessage(pushMsg, isOldStyleProperties); assertEquals(EventBuilderResult.PAYLOAD_TOO_BIG, res); @@ -56,17 +56,18 @@ public void testPackPushMessage() throws IOException { builder.reset(); final int numMsgs = EventHeader.MAX_SIZE_SOFT / PushHeader.MAX_PAYLOAD_SIZE_SOFT; - // Cannot pack more than 'numMsgs' having a unpackedSize of - // 'PushHeader.MAX_PAYLOAD_SIZE_SOFT' in 1 bmqp event. - - buffer = ByteBuffer.allocate(PushHeader.MAX_PAYLOAD_SIZE_SOFT); - pushMsg.appData().setPayload(buffer); for (int i = 0; i < numMsgs; i++) { + // Cannot pack more than 'numMsgs' having a unpackedSize of + // 'PushHeader.MAX_PAYLOAD_SIZE_SOFT' in 1 bmqp event. + pushMsg.appData() + .setPayload(TestHelpers.filledBuffer(PushHeader.MAX_PAYLOAD_SIZE_SOFT)); res = builder.packMessage(pushMsg, isOldStyleProperties); assertEquals(EventBuilderResult.SUCCESS, res); } + pushMsg.appData() + .setPayload(TestHelpers.filledBuffer(PushHeader.MAX_PAYLOAD_SIZE_SOFT)); // Try to add one more message, which must fail with event_too_big. res = builder.packMessage(pushMsg, isOldStyleProperties); assertEquals(EventBuilderResult.EVENT_TOO_BIG, res); @@ -74,7 +75,8 @@ public void testPackPushMessage() throws IOException { pushMsg.reset(); builder.reset(); - pushMsg.appData().setPayload(buffer); + pushMsg.appData() + .setPayload(TestHelpers.filledBuffer(PushHeader.MAX_PAYLOAD_SIZE_SOFT)); res = builder.packMessage(pushMsg, isOldStyleProperties); assertEquals(EventBuilderResult.SUCCESS, res); @@ -103,7 +105,9 @@ public void testBuildPushMessage() throws IOException { PushMessageImpl pushMsg = new PushMessageImpl(); pushMsg.setQueueId(9876); pushMsg.setMessageGUID(guid); - pushMsg.appData().setPayload(ByteBuffer.wrap(PAYLOAD.getBytes())); + ByteBuffer payload = ByteBuffer.allocate(PAYLOAD.getBytes().length); + payload.put(PAYLOAD.getBytes()); + pushMsg.appData().setPayload(payload); pushMsg.appData().setProperties(props); // set compression to none in order to match file content diff --git a/bmq-sdk/src/test/java/com/bloomberg/bmq/impl/infr/proto/PushEventImplTest.java b/bmq-sdk/src/test/java/com/bloomberg/bmq/impl/infr/proto/PushEventImplTest.java index 549044a..1f2428c 100644 --- a/bmq-sdk/src/test/java/com/bloomberg/bmq/impl/infr/proto/PushEventImplTest.java +++ b/bmq-sdk/src/test/java/com/bloomberg/bmq/impl/infr/proto/PushEventImplTest.java @@ -21,6 +21,7 @@ import com.bloomberg.bmq.impl.infr.io.ByteBufferOutputStream; import com.bloomberg.bmq.impl.intf.SessionEventHandler; +import com.bloomberg.bmq.util.TestHelpers; import java.io.IOException; import java.lang.invoke.MethodHandles; import java.nio.ByteBuffer; @@ -39,7 +40,7 @@ public void testDispatchUnknownCompression() throws IOException { final byte[] bytes = new byte[Protocol.COMPRESSION_MIN_APPDATA_SIZE + 1]; bytes[0] = 1; - bytes[Protocol.COMPRESSION_MIN_APPDATA_SIZE - 1] = 1; + bytes[Protocol.COMPRESSION_MIN_APPDATA_SIZE] = 1; final int NUM = 4; @@ -73,8 +74,9 @@ public void testDispatchUnknownCompression() throws IOException { for (int i = 0; i < NUM; i++) { PushMessageImpl pushMsg = new PushMessageImpl(); - - pushMsg.appData().setPayload(ByteBuffer.wrap(bytes)); + ByteBuffer payload = ByteBuffer.allocate(bytes.length); + payload.put(bytes); + pushMsg.appData().setPayload(payload); pushMsg.appData().setProperties(props); pushMsg.appData().setIsOldStyleProperties(isOldStyleProperties); @@ -92,7 +94,7 @@ public void testDispatchUnknownCompression() throws IOException { pushMsg.streamOut(bbos); } - PushEventImpl pushEvent = new PushEventImpl(bbos.reset()); + PushEventImpl pushEvent = new PushEventImpl(bbos.peek()); SessionEventHandler handler = new SessionEventHandler() { @@ -107,8 +109,8 @@ public void handleAckMessage(AckMessageImpl ackMsg) { public void handlePushMessage(PushMessageImpl pushMsg) { try { assertArrayEquals( - new ByteBuffer[] {ByteBuffer.wrap(bytes)}, - pushMsg.appData().payload()); + bytes, + TestHelpers.buffersContents(pushMsg.appData().payload())); return; } catch (IOException e) { logger.error("IOException has been thrown", e); diff --git a/bmq-sdk/src/test/java/com/bloomberg/bmq/impl/infr/proto/PushMessageIteratorTest.java b/bmq-sdk/src/test/java/com/bloomberg/bmq/impl/infr/proto/PushMessageIteratorTest.java index 56de6a3..97bc103 100644 --- a/bmq-sdk/src/test/java/com/bloomberg/bmq/impl/infr/proto/PushMessageIteratorTest.java +++ b/bmq-sdk/src/test/java/com/bloomberg/bmq/impl/infr/proto/PushMessageIteratorTest.java @@ -207,7 +207,7 @@ public void testWithBuilder() throws IOException { msg.setQueueId(i); msg.setMessageGUID(GUID); msg.appData().setProperties(props); - msg.appData().setPayload(payload); + msg.appData().setPayload(payload.duplicate()); EventBuilderResult rc = builder.packMessage(msg, isOldStyleProperties); assertEquals(EventBuilderResult.SUCCESS, rc); @@ -316,7 +316,9 @@ public void testMultipleCompressedMessages() throws IOException { PushMessageImpl pushMsg = new PushMessageImpl(); - pushMsg.appData().setPayload(ByteBuffer.wrap(bytes)); + ByteBuffer payload = ByteBuffer.allocate(bytes.length); + payload.put(bytes); + pushMsg.appData().setPayload(payload); MessagePropertiesImpl props = new MessagePropertiesImpl(); props.setPropertyAsInt32("routingId", 42); @@ -336,8 +338,7 @@ public void testMultipleCompressedMessages() throws IOException { while (pushIt.hasNext()) { PushMessageImpl pushMsg = pushIt.next(); - assertArrayEquals( - new ByteBuffer[] {ByteBuffer.wrap(bytes)}, pushMsg.appData().payload()); + assertArrayEquals(bytes, TestHelpers.buffersContents(pushMsg.appData().payload())); final MessagePropertiesImpl props = pushMsg.appData().properties(); assertEquals(2, props.numProperties()); @@ -403,7 +404,9 @@ public void testUnknownCompression() throws IOException { for (int i = 0; i < NUM; i++) { PushMessageImpl pushMsg = new PushMessageImpl(); - pushMsg.appData().setPayload(ByteBuffer.wrap(bytes)); + ByteBuffer payload = ByteBuffer.allocate(bytes.length); + payload.put(bytes); + pushMsg.appData().setPayload(payload); pushMsg.appData().setProperties(props); pushMsg.appData().setIsOldStyleProperties(isOldStyleProperties); diff --git a/bmq-sdk/src/test/java/com/bloomberg/bmq/impl/infr/proto/PutEventImplBuilderTest.java b/bmq-sdk/src/test/java/com/bloomberg/bmq/impl/infr/proto/PutEventImplBuilderTest.java index a423056..e15a3c3 100644 --- a/bmq-sdk/src/test/java/com/bloomberg/bmq/impl/infr/proto/PutEventImplBuilderTest.java +++ b/bmq-sdk/src/test/java/com/bloomberg/bmq/impl/infr/proto/PutEventImplBuilderTest.java @@ -51,8 +51,8 @@ public void testErrorPutMessage() throws IOException { assertEquals(EventBuilderResult.PAYLOAD_EMPTY, res); putMsg = new PutMessageImpl(); - buffer = ByteBuffer.allocate(PutHeader.MAX_PAYLOAD_SIZE_SOFT + 1); - putMsg.appData().setPayload(buffer); + putMsg.appData() + .setPayload(TestHelpers.filledBuffer(PutHeader.MAX_PAYLOAD_SIZE_SOFT + 1)); // set compression to none in order to get PAYLOAD_TOO_BIG result putMsg.setCompressionType(CompressionAlgorithmType.E_NONE); @@ -60,30 +60,31 @@ public void testErrorPutMessage() throws IOException { res = builder.packMessage(putMsg, isOldStyleProperties); assertEquals(EventBuilderResult.PAYLOAD_TOO_BIG, res); - putMsg = new PutMessageImpl(); - final int numMsgs = EventHeader.MAX_SIZE_SOFT / PutHeader.MAX_PAYLOAD_SIZE_SOFT; // Cannot pack more than 'numMsgs' having a unpackedSize of // 'PutHeader.MAX_PAYLOAD_SIZE_SOFT' in 1 bmqp event. - buffer = ByteBuffer.allocate(PutHeader.MAX_PAYLOAD_SIZE_SOFT); - putMsg.appData().setPayload(buffer); - // set compression to none in order to get EVENT_TOO_BIG result - putMsg.setCompressionType(CompressionAlgorithmType.E_NONE); for (int i = 0; i < numMsgs; i++) { + putMsg = new PutMessageImpl(); + putMsg.appData() + .setPayload(TestHelpers.filledBuffer(PutHeader.MAX_PAYLOAD_SIZE_SOFT)); + putMsg.setCompressionType(CompressionAlgorithmType.E_NONE); res = builder.packMessage(putMsg, isOldStyleProperties); assertEquals(EventBuilderResult.SUCCESS, res); } // Try to add one more message, which must fail with event_too_big. + putMsg = new PutMessageImpl(); + putMsg.appData().setPayload(TestHelpers.filledBuffer(PutHeader.MAX_PAYLOAD_SIZE_SOFT)); + putMsg.setCompressionType(CompressionAlgorithmType.E_NONE); res = builder.packMessage(putMsg, isOldStyleProperties); assertEquals(EventBuilderResult.EVENT_TOO_BIG, res); putMsg = new PutMessageImpl(); - putMsg.appData().setPayload(buffer); + putMsg.appData().setPayload(TestHelpers.filledBuffer(PutHeader.MAX_PAYLOAD_SIZE_SOFT)); putMsg.setFlags(PutHeaderFlags.ACK_REQUESTED.toInt()); CorrelationId corId = putMsg.correlationId(); @@ -103,7 +104,8 @@ public void testBigPutEvent() throws IOException { PutMessageImpl putMsg = new PutMessageImpl(); PutEventBuilder builder = new PutEventBuilder(); - ByteBuffer buffer = ByteBuffer.allocate(EventHeader.MAX_SIZE_SOFT + 1024); + ByteBuffer buffer = TestHelpers.filledBuffer(EventHeader.MAX_SIZE_SOFT + 1024); + buffer.position(PutHeader.MAX_PAYLOAD_SIZE_SOFT); buffer.limit(PutHeader.MAX_PAYLOAD_SIZE_SOFT); putMsg.appData().setPayload(buffer); @@ -134,7 +136,8 @@ public void testBigPutEvent() throws IOException { @Test public void testBuildPutMessageWithProperties() throws IOException { final String k = "abcdefghijklmnopqrstuvwxyz"; - final ByteBuffer b = ByteBuffer.wrap(k.getBytes()); + final ByteBuffer b = ByteBuffer.allocate(k.getBytes().length); + b.put(k.getBytes()); final MessagePropertiesImpl props = new MessagePropertiesImpl(); props.setPropertyAsInt32("encoding", 3); @@ -152,7 +155,7 @@ public void testBuildPutMessageWithProperties() throws IOException { PutMessageImpl putMsg = new PutMessageImpl(); putMsg.setQueueId(9876); putMsg.setupCorrelationId(CorrelationIdImpl.restoreId(1234)); - putMsg.appData().setPayload(b); + putMsg.appData().setPayload(b.duplicate()); putMsg.setFlags(flags); putMsg.appData().setProperties(props); diff --git a/bmq-sdk/src/test/java/com/bloomberg/bmq/it/util/TestTools.java b/bmq-sdk/src/test/java/com/bloomberg/bmq/it/util/TestTools.java index 3caeebc..45c394a 100644 --- a/bmq-sdk/src/test/java/com/bloomberg/bmq/it/util/TestTools.java +++ b/bmq-sdk/src/test/java/com/bloomberg/bmq/it/util/TestTools.java @@ -68,7 +68,10 @@ public static void acquireSema(Semaphore sema, int sec) { public static void acquireSema(Semaphore sema, int permits, int sec) { Argument.expectPositive(permits, "permits"); Argument.expectPositive(sec, "sec"); - + logger.debug( + "Trying to acquire from sema with {} permits and {} other waiting threads", + sema.availablePermits(), + sema.getQueueLength()); try { if (!sema.tryAcquire(permits, sec, TimeUnit.SECONDS)) { logger.error("Semaphore timeout"); diff --git a/bmq-sdk/src/test/java/com/bloomberg/bmq/util/TestHelpers.java b/bmq-sdk/src/test/java/com/bloomberg/bmq/util/TestHelpers.java index 3653c84..b5750bb 100644 --- a/bmq-sdk/src/test/java/com/bloomberg/bmq/util/TestHelpers.java +++ b/bmq-sdk/src/test/java/com/bloomberg/bmq/util/TestHelpers.java @@ -15,15 +15,12 @@ */ package com.bloomberg.bmq.util; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.fail; import com.bloomberg.bmq.BMQException; import com.bloomberg.bmq.impl.infr.msg.MessagesTestSamples.SampleFileMetadata; import com.bloomberg.bmq.impl.infr.util.Argument; -import java.io.BufferedInputStream; -import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; @@ -41,9 +38,16 @@ public class TestHelpers { static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + public static ByteBuffer filledBuffer(int size) { + byte[] buf = new byte[size]; + ByteBuffer filled = ByteBuffer.allocate(size); + filled.put(buf); + return filled; + } + public static ByteBuffer readFile(final String fileName) throws IOException { ByteArrayOutputStream bos = new ByteArrayOutputStream(); - try (InputStream ins = TestHelpers.class.getResourceAsStream(fileName); ) { + try (InputStream ins = TestHelpers.class.getResourceAsStream(fileName)) { int value; do { value = ins.read(); @@ -58,6 +62,33 @@ public static ByteBuffer readFile(final String fileName) throws IOException { return res; } + public static byte[] buffersContents(ByteBuffer[] buffers) throws IOException { + ByteArrayOutputStream os = new ByteArrayOutputStream(); + for (ByteBuffer buf : buffers) { + ByteBuffer dup = buf.duplicate(); + if (dup.position() != 0 && dup.limit() == dup.capacity()) { + dup.flip(); + } + byte[] temp = new byte[dup.remaining()]; + dup.get(temp, 0, temp.length); + os.write(temp); + } + return os.toByteArray(); + } + + public static byte[] resourceFileContents(String path, int size) throws IOException { + try (InputStream in = TestHelpers.class.getResourceAsStream(path)) { + ByteArrayOutputStream os = new ByteArrayOutputStream(); + int pos = 0; + while (pos++ < size) { + int b = in.read(); + if (b < 0) break; + os.write(b); + } + return os.toByteArray(); + } + } + public static void compareWithFileContent( ByteBuffer[] message, SampleFileMetadata messageSample) throws IOException { @@ -66,25 +97,9 @@ public static void compareWithFileContent( // Read 'contentLength' bytes from the file at 'filePath', and compare // those bytes with 'message'. - - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - - for (ByteBuffer b : message) { - baos.write(b.array(), 0, b.limit()); - } - - ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); - - try (InputStream in1 = new BufferedInputStream(bais); - InputStream in2 = TestHelpers.class.getResourceAsStream(filePath)) { - int v1, v2; - for (int i = 0; i < contentLength; i++) { - v1 = in1.read(); - v2 = in2.read(); - assertTrue(v1 >= 0); - assertEquals(v1, v2); - } - } + byte[] messageContent = buffersContents(message); + byte[] fileContent = resourceFileContents(filePath, contentLength); + assertArrayEquals(fileContent, messageContent); } public static void acquireSema(Semaphore sema, int sec) {