Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ jobs:

- name: Upload broker logs as artifacts
if: failure()
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: broker_logs_${{ matrix.java }}
path: /tmp/bmq-broker/broker_logs.tar.gz
Expand Down
22 changes: 15 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,19 +66,27 @@ $ mvn clean -Dmaven.test.skip=true -Dspotbugs.skip=true install

### Build and Run `bmq-examples` Producer Example

Prerequisites:
- BlazingMQ backend is built and is available.
- `bmq-sdk` JAR is installed locally.

Java SDK producers/consumers can be tested with the local BlazingMQ broker.
For Linux build, first start the local broker:
```sh
cd $BLAZINGMQ_DIR/build/Linux/src/applications/bmqbrkr
./run
```

Secondly, start sample producer, that will connect to the local broker, open a queue, post messages and exits:
```sh
$ cd bmq-examples
$ mvn clean compile
$ mvn exec:java -Dexec.mainClass="com.bloomberg.bmq.examples.Producer"
$ mvn exec:java -Dexec.mainClass="com.bloomberg.bmq.examples.SimpleProducer"
```

Above command expects that `bmq-sdk` JAR is installed locally. Also note that
BlazingMQ backend must be running for producer/consumer examples to run
successfully.

### Building BlazingMQ Backend

Detailed instructions to build BlazingMQ backend (BlazingMQ message brokers,
Detailed instructions on how to build BlazingMQ backend (BlazingMQ message brokers,
etc) can be found [here](https://www.github.com/bloomberg/blazingmq).

### Supported JDKs
Expand Down Expand Up @@ -166,7 +174,7 @@ in the project, please contact us at [email protected].
## Security Vulnerability Reporting

If you believe you have identified a security vulnerability in this project,
please send an email to the project team at [email protected], detailing
please email the project team at [email protected], detailing
the suspected issue and any methods you've found to reproduce it.

Please do NOT open an issue in the GitHub repository, as we'd prefer to keep
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public SubscriptionExpression(String expression) {
Argument.expectNonNull(expression, "expression");
this.expression = Optional.of(expression);
this.version =
Optional.of((expression.length() > 0) ? Version.e_VERSION_1 : Version.e_UNDEFINED);
Optional.of((!expression.isEmpty()) ? Version.e_VERSION_1 : Version.e_UNDEFINED);
}

public SubscriptionExpression() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@ private CompletableFuture<Void> execQueueManagerCleanUp() {
.collect(Collectors.toList());
if (!activeStrategies.isEmpty()) {
// If given list is not empty, then cancel all unfinished sequences
// and set up clean up action as a continuation executed in scheduler.
// and set up cleanup action as a continuation executed in scheduler.
activeStrategies.forEach(QueueControlStrategy::cancelOnStop);
activeStrategies.stream()
.map(QueueControlStrategy::getResultFuture)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void read(
data.length,
completionStatus.numNeeded());
try {
EventHeader eventHeader = null;
EventHeader eventHeader;
for (ByteBuffer restData : data) {
boolean done = false;
while (!done) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,11 @@ private boolean validateBrokerResponse(BrokerResponse resp) {
private class EventHandler implements ProtocolEventTcpReader.EventHandler {

public void handleEvent(EventType eventType, ByteBuffer[] bbuf) {
if (eventType == null) {
logger.warn("Received null event type with buffer size: {}, skipping", bbuf.length);
return;
}

logger.debug("Handle EventImpl {} with size: {}", eventType, bbuf.length);
EventImpl reportedEvent;
switch (eventType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void dispatch(BrokerSessionEventHandler handler, BrokerSessionEvent event

private static final int TYPE_ID = UniqId.getNumber();

public final void dispatch(BrokerSessionEventHandler handler) {
public void dispatch(BrokerSessionEventHandler handler) {
getEventType().dispatch(handler, this);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public final void dispatch(QueueControlEventHandler handler) {
getEventType().dispatch(handler, this);
}

private QueueHandle queue;
private final QueueHandle queue;

private QueueControlEvent(
Type queueEventType, GenericCode result, String errorDescription, QueueHandle queue) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public final class NettyTcpConnection extends ChannelInboundHandlerAdapter
* <p>Other goals are to inject server responses and as well as to keep the threading model same
* in the test code. The best candidate is {@link io.netty.channel.embedded.EmbeddedChannel}
* class that lets us inject responses. The main problem with given class is the fact that it is
* "threadless" and unfortunately does not allow us to inject custom eventloop. But we can
* "threadless" and unfortunately does not allow us to inject custom event loop. But we can
* inject it in our channel adapter and execute every embedded channel routine in IO thread. So,
* it helps to achieve thread safety for plain embedded channel class and at the same time,
* reproduce the threading model for tested class.
Expand Down Expand Up @@ -200,7 +200,7 @@ private enum ChannelState {
private boolean wasOnceConnected;
private int numRemainingInitialTries;
private ConnectionOptions options;
private NetResolver hostResolver;
private final NetResolver hostResolver;
private ReadCompletionStatus readBytesStatus;
private EventLoopGroup eventLoop;
private Bootstrap bootstrap;
Expand All @@ -210,7 +210,7 @@ private enum ChannelState {
private DisconnectCallback disconnectCallback;
private ChannelStatusHandler channelStatusHandler;
private Semaphore channelWaterMarkSema;
private ClientChannelAdapter clientChannelAdapter;
private final ClientChannelAdapter clientChannelAdapter;
private final long lingerTimeout;

static {
Expand Down Expand Up @@ -476,7 +476,7 @@ public int disconnect(DisconnectCallback disconnectCb) {
disconnectCallback = disconnectCb;

// If the callback is not null, then we are going to do full
// disconnect procedure. Otherwise we just drop the channel and
// disconnect procedure. Otherwise, we just drop the channel and
// schedule reconnecting (see 'channelCloseFutureComplete' method).
if (disconnectCb != null) {
state = ChannelState.DISCONNECTING;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public enum EventType {
PUSH(4),
ACK(5);

private int id;
private final int id;

EventType(int id) {
this.id = id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public static int getSdkVersion() {
logger.info("BlazingMQ SDK Jar ImplementationVersion: [{}]", implVersion);

if (implVersion == null) {
// When loaded outside of a JAR, just return a default value of
// When loaded outside a JAR, just return a default value of
// 999999 , which is what we return in C++ when program is built
// from the master instead of using 'libbmq'.
return DEFAULT_VERSION;
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ limitations under the License. -->
<dependency>
<groupId>org.junit</groupId>
<artifactId>junit-bom</artifactId>
<version>5.10.2</version>
<version>6.0.1</version>
<type>pom</type>
<scope>import</scope>
</dependency>
Expand Down
Loading