diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 7e125df..22ab3ec 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -92,7 +92,7 @@ jobs: - name: Upload broker logs as artifacts if: failure() - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: broker_logs_${{ matrix.java }} path: /tmp/bmq-broker/broker_logs.tar.gz diff --git a/README.md b/README.md index 21710d3..4c23145 100644 --- a/README.md +++ b/README.md @@ -66,19 +66,27 @@ $ mvn clean -Dmaven.test.skip=true -Dspotbugs.skip=true install ### Build and Run `bmq-examples` Producer Example +Prerequisites: +- BlazingMQ backend is built and is available. +- `bmq-sdk` JAR is installed locally. + +Java SDK producers/consumers can be tested with the local BlazingMQ broker. +For Linux build, first start the local broker: +```sh +cd $BLAZINGMQ_DIR/build/Linux/src/applications/bmqbrkr +./run +``` + +Secondly, start sample producer, that will connect to the local broker, open a queue, post messages and exits: ```sh $ cd bmq-examples $ mvn clean compile -$ mvn exec:java -Dexec.mainClass="com.bloomberg.bmq.examples.Producer" +$ mvn exec:java -Dexec.mainClass="com.bloomberg.bmq.examples.SimpleProducer" ``` -Above command expects that `bmq-sdk` JAR is installed locally. Also note that -BlazingMQ backend must be running for producer/consumer examples to run -successfully. - ### Building BlazingMQ Backend -Detailed instructions to build BlazingMQ backend (BlazingMQ message brokers, +Detailed instructions on how to build BlazingMQ backend (BlazingMQ message brokers, etc) can be found [here](https://www.github.com/bloomberg/blazingmq). ### Supported JDKs @@ -166,7 +174,7 @@ in the project, please contact us at opensource@bloomberg.net. ## Security Vulnerability Reporting If you believe you have identified a security vulnerability in this project, -please send an email to the project team at opensource@bloomberg.net, detailing +please email the project team at opensource@bloomberg.net, detailing the suspected issue and any methods you've found to reproduce it. Please do NOT open an issue in the GitHub repository, as we'd prefer to keep diff --git a/bmq-sdk/src/main/java/com/bloomberg/bmq/SubscriptionExpression.java b/bmq-sdk/src/main/java/com/bloomberg/bmq/SubscriptionExpression.java index d4f04a0..e748aae 100644 --- a/bmq-sdk/src/main/java/com/bloomberg/bmq/SubscriptionExpression.java +++ b/bmq-sdk/src/main/java/com/bloomberg/bmq/SubscriptionExpression.java @@ -42,7 +42,7 @@ public SubscriptionExpression(String expression) { Argument.expectNonNull(expression, "expression"); this.expression = Optional.of(expression); this.version = - Optional.of((expression.length() > 0) ? Version.e_VERSION_1 : Version.e_UNDEFINED); + Optional.of((!expression.isEmpty()) ? Version.e_VERSION_1 : Version.e_UNDEFINED); } public SubscriptionExpression() { diff --git a/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/BrokerSession.java b/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/BrokerSession.java index dbfc23b..e88358b 100644 --- a/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/BrokerSession.java +++ b/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/BrokerSession.java @@ -598,7 +598,7 @@ private CompletableFuture execQueueManagerCleanUp() { .collect(Collectors.toList()); if (!activeStrategies.isEmpty()) { // If given list is not empty, then cancel all unfinished sequences - // and set up clean up action as a continuation executed in scheduler. + // and set up cleanup action as a continuation executed in scheduler. activeStrategies.forEach(QueueControlStrategy::cancelOnStop); activeStrategies.stream() .map(QueueControlStrategy::getResultFuture) diff --git a/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/ProtocolEventTcpReader.java b/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/ProtocolEventTcpReader.java index e0860f8..a0437d8 100644 --- a/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/ProtocolEventTcpReader.java +++ b/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/ProtocolEventTcpReader.java @@ -79,7 +79,7 @@ public void read( data.length, completionStatus.numNeeded()); try { - EventHeader eventHeader = null; + EventHeader eventHeader; for (ByteBuffer restData : data) { boolean done = false; while (!done) { diff --git a/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/TcpBrokerConnection.java b/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/TcpBrokerConnection.java index 47c4b9c..a761069 100644 --- a/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/TcpBrokerConnection.java +++ b/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/TcpBrokerConnection.java @@ -437,6 +437,11 @@ private boolean validateBrokerResponse(BrokerResponse resp) { private class EventHandler implements ProtocolEventTcpReader.EventHandler { public void handleEvent(EventType eventType, ByteBuffer[] bbuf) { + if (eventType == null) { + logger.warn("Received null event type with buffer size: {}, skipping", bbuf.length); + return; + } + logger.debug("Handle EventImpl {} with size: {}", eventType, bbuf.length); EventImpl reportedEvent; switch (eventType) { diff --git a/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/events/BrokerSessionEvent.java b/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/events/BrokerSessionEvent.java index edbf0fb..a3e90f3 100644 --- a/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/events/BrokerSessionEvent.java +++ b/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/events/BrokerSessionEvent.java @@ -52,7 +52,7 @@ public void dispatch(BrokerSessionEventHandler handler, BrokerSessionEvent event private static final int TYPE_ID = UniqId.getNumber(); - public final void dispatch(BrokerSessionEventHandler handler) { + public void dispatch(BrokerSessionEventHandler handler) { getEventType().dispatch(handler, this); } diff --git a/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/events/QueueControlEvent.java b/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/events/QueueControlEvent.java index 0337714..4e58d79 100644 --- a/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/events/QueueControlEvent.java +++ b/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/events/QueueControlEvent.java @@ -78,7 +78,7 @@ public final void dispatch(QueueControlEventHandler handler) { getEventType().dispatch(handler, this); } - private QueueHandle queue; + private final QueueHandle queue; private QueueControlEvent( Type queueEventType, GenericCode result, String errorDescription, QueueHandle queue) { diff --git a/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/net/NettyTcpConnection.java b/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/net/NettyTcpConnection.java index 38c7d71..c080545 100644 --- a/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/net/NettyTcpConnection.java +++ b/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/net/NettyTcpConnection.java @@ -80,7 +80,7 @@ public final class NettyTcpConnection extends ChannelInboundHandlerAdapter *

Other goals are to inject server responses and as well as to keep the threading model same * in the test code. The best candidate is {@link io.netty.channel.embedded.EmbeddedChannel} * class that lets us inject responses. The main problem with given class is the fact that it is - * "threadless" and unfortunately does not allow us to inject custom eventloop. But we can + * "threadless" and unfortunately does not allow us to inject custom event loop. But we can * inject it in our channel adapter and execute every embedded channel routine in IO thread. So, * it helps to achieve thread safety for plain embedded channel class and at the same time, * reproduce the threading model for tested class. @@ -200,7 +200,7 @@ private enum ChannelState { private boolean wasOnceConnected; private int numRemainingInitialTries; private ConnectionOptions options; - private NetResolver hostResolver; + private final NetResolver hostResolver; private ReadCompletionStatus readBytesStatus; private EventLoopGroup eventLoop; private Bootstrap bootstrap; @@ -210,7 +210,7 @@ private enum ChannelState { private DisconnectCallback disconnectCallback; private ChannelStatusHandler channelStatusHandler; private Semaphore channelWaterMarkSema; - private ClientChannelAdapter clientChannelAdapter; + private final ClientChannelAdapter clientChannelAdapter; private final long lingerTimeout; static { @@ -476,7 +476,7 @@ public int disconnect(DisconnectCallback disconnectCb) { disconnectCallback = disconnectCb; // If the callback is not null, then we are going to do full - // disconnect procedure. Otherwise we just drop the channel and + // disconnect procedure. Otherwise, we just drop the channel and // schedule reconnecting (see 'channelCloseFutureComplete' method). if (disconnectCb != null) { state = ChannelState.DISCONNECTING; diff --git a/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/proto/EventType.java b/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/proto/EventType.java index 53b3aff..078eb23 100644 --- a/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/proto/EventType.java +++ b/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/proto/EventType.java @@ -23,7 +23,7 @@ public enum EventType { PUSH(4), ACK(5); - private int id; + private final int id; EventType(int id) { this.id = id; diff --git a/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/scm/VersionUtil.java b/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/scm/VersionUtil.java index 49a3366..cd80200 100644 --- a/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/scm/VersionUtil.java +++ b/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/scm/VersionUtil.java @@ -72,7 +72,7 @@ public static int getSdkVersion() { logger.info("BlazingMQ SDK Jar ImplementationVersion: [{}]", implVersion); if (implVersion == null) { - // When loaded outside of a JAR, just return a default value of + // When loaded outside a JAR, just return a default value of // 999999 , which is what we return in C++ when program is built // from the master instead of using 'libbmq'. return DEFAULT_VERSION; diff --git a/pom.xml b/pom.xml index 085e113..0bbb913 100644 --- a/pom.xml +++ b/pom.xml @@ -163,7 +163,7 @@ limitations under the License. --> org.junit junit-bom - 5.10.2 + 6.0.1 pom import