diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml
index 7e125df..22ab3ec 100644
--- a/.github/workflows/build.yaml
+++ b/.github/workflows/build.yaml
@@ -92,7 +92,7 @@ jobs:
- name: Upload broker logs as artifacts
if: failure()
- uses: actions/upload-artifact@v3
+ uses: actions/upload-artifact@v4
with:
name: broker_logs_${{ matrix.java }}
path: /tmp/bmq-broker/broker_logs.tar.gz
diff --git a/bmq-sdk/src/main/java/com/bloomberg/bmq/SessionOptions.java b/bmq-sdk/src/main/java/com/bloomberg/bmq/SessionOptions.java
index bdb8c8e..4801984 100644
--- a/bmq-sdk/src/main/java/com/bloomberg/bmq/SessionOptions.java
+++ b/bmq-sdk/src/main/java/com/bloomberg/bmq/SessionOptions.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2022 Bloomberg Finance L.P.
+ * Copyright 2022-2025 Bloomberg Finance L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -62,6 +62,10 @@
* HostHealthMonitor} interface responsible for notifying the session when the health of the
* host machine has changed. A {@code hostHealthMonitor} must be specified in oder for queues
* opened through the session to suspend on unhealthy hosts.
+ *
{@code userAgentPrefix}: String to include in the user agent for broker telemetry. This
+ * string must only contain printable characters and must be less than 128 characters long.
+ * This is provided for libraries that are wrapping this SDK. Applications directly using the
+ * SDK are encouraged NOT to set this value.
*
*
* Thread Safety
@@ -215,6 +219,8 @@ public int highWaterMark() {
private final HostHealthMonitor hostHealthMonitor;
+ private final String userAgentPrefix;
+
private SessionOptions() {
brokerUri = DEFAULT_URI;
startTimeout = DEFAULT_START_TIMEOUT;
@@ -226,6 +232,7 @@ private SessionOptions() {
configureQueueTimeout = QUEUE_OPERATION_TIMEOUT;
closeQueueTimeout = QUEUE_OPERATION_TIMEOUT;
hostHealthMonitor = null;
+ userAgentPrefix = "";
}
private SessionOptions(Builder builder) {
@@ -239,6 +246,7 @@ private SessionOptions(Builder builder) {
configureQueueTimeout = builder.configureQueueTimeout;
closeQueueTimeout = builder.closeQueueTimeout;
hostHealthMonitor = builder.hostHealthMonitor;
+ userAgentPrefix = builder.userAgentPrefix;
}
/**
@@ -383,6 +391,16 @@ public HostHealthMonitor hostHealthMonitor() {
return hostHealthMonitor;
}
+ /**
+ * Returns the string that will be prefixed to the user agent used by {@link
+ * com.bloomberg.bmq.Session} to identify itself to the broker.
+ *
+ * @return String user agent string prefix
+ */
+ public String userAgentPrefix() {
+ return userAgentPrefix;
+ }
+
/** Helper class to create a {@code SesssionOptions} object with custom settings. */
public static class Builder {
private URI brokerUri;
@@ -397,6 +415,9 @@ public static class Builder {
private Duration closeQueueTimeout;
private HostHealthMonitor hostHealthMonitor;
+
+ private String userAgentPrefix;
+
/**
* Creates a {@code SesssionOptions} object based on this {@code Builder} properties.
*
@@ -417,6 +438,7 @@ private Builder(SessionOptions options) {
configureQueueTimeout = options.configureQueueTimeout;
closeQueueTimeout = options.closeQueueTimeout;
hostHealthMonitor = options.hostHealthMonitor;
+ userAgentPrefix = options.userAgentPrefix;
}
/**
@@ -558,5 +580,29 @@ public Builder setHostHealthMonitor(HostHealthMonitor value) {
hostHealthMonitor = Argument.expectNonNull(value, "host health monitor");
return this;
}
+
+ /**
+ * Sets the user agent prefix. This string is prefixed to a user agent constructed by the
+ * SDK. This is intended ONLY for other libraries that wrap this SDK to identify themselves
+ * for broker telemetry. Applications that are directly using this SDK are encouraged not to
+ * set this.
+ *
+ * @param value String user agent prefix
+ * @return Builder this object
+ * @throws NullPointerException if the specified value is null
+ * @throws IllegalArgumentException in case the specified value contains non-ASCII
+ * characters, contains non-printable characters (i.e., control characters), or is
+ * longer than 127 characters.
+ */
+ public Builder setUserAgentPrefix(String value) {
+ Argument.expectNonNull(value, "user agent prefix");
+ Argument.expectCondition(
+ value.codePoints().allMatch(c -> c < 128 && !Character.isISOControl(c)),
+ "user agent prefix must be printable ASCII");
+ Argument.expectCondition(
+ value.length() < 128, "user agent prefix must be shorter than 128 characters");
+ userAgentPrefix = value;
+ return this;
+ }
}
}
diff --git a/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/TcpBrokerConnection.java b/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/TcpBrokerConnection.java
index 47c4b9c..ca7a569 100644
--- a/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/TcpBrokerConnection.java
+++ b/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/TcpBrokerConnection.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2022 Bloomberg Finance L.P.
+ * Copyright 2022-2025 Bloomberg Finance L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -373,9 +373,20 @@ private NegotiationMessageChoice createNegoMsg() {
clientIdentity.setClusterName("");
clientIdentity.setClusterNodeId(-1);
clientIdentity.setSdkLanguage(ClientLanguage.E_JAVA);
+ clientIdentity.setUserAgent(constructUserAgent());
return negoMsgChoice;
}
+ private String constructUserAgent() {
+ String prefix = "";
+ if (connectionOptions.userAgentPrefix().length() > 0) {
+ prefix = connectionOptions.userAgentPrefix() + " ";
+ }
+ String javaVersion = "java" + SystemUtil.getJavaVersionString();
+ String sdkVersion = VersionUtil.getJarVersion();
+ return prefix + "com.bloomberg.bmq(" + javaVersion + "):" + sdkVersion;
+ }
+
private WriteStatus negotiate() throws IOException {
if (negotiationMsg == null) {
negotiationMsg = createNegoMsg();
diff --git a/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/msg/ClientIdentity.java b/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/msg/ClientIdentity.java
index 47f87c8..8f1d644 100644
--- a/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/msg/ClientIdentity.java
+++ b/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/msg/ClientIdentity.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2022 Bloomberg Finance L.P.
+ * Copyright 2022-2025 Bloomberg Finance L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -27,6 +27,7 @@ public class ClientIdentity {
private String clusterName;
private Integer clusterNodeId;
private ClientLanguage sdkLanguage;
+ private String userAgent;
public ClientIdentity() {
init();
@@ -48,6 +49,7 @@ private void init() {
clusterName = "";
clusterNodeId = -1;
sdkLanguage = ClientLanguage.E_UNKNOWN;
+ userAgent = "";
}
public Integer protocolVersion() {
@@ -138,6 +140,14 @@ public void setSdkLanguage(ClientLanguage value) {
sdkLanguage = value;
}
+ public String userAgent() {
+ return userAgent;
+ }
+
+ public void setUserAgent(String value) {
+ userAgent = value;
+ }
+
public Object createNewInstance() {
return new ClientIdentity();
}
diff --git a/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/net/ConnectionOptions.java b/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/net/ConnectionOptions.java
index bc585cc..f95631a 100644
--- a/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/net/ConnectionOptions.java
+++ b/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/net/ConnectionOptions.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2022 Bloomberg Finance L.P.
+ * Copyright 2022-2025 Bloomberg Finance L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -39,12 +39,14 @@ public final class ConnectionOptions {
private int startNumRetries = DEFAULT_START_NUM_RETRIES;
private Duration startRetryInterval = DEFAULT_START_RETRY_INTERVAL;
private WriteBufferWaterMark writeWaterMark = new WriteBufferWaterMark();
+ private String userAgentPrefix = "";
public ConnectionOptions() {}
public ConnectionOptions(SessionOptions sesOpts) {
brokerUri = sesOpts.brokerUri();
writeWaterMark = sesOpts.writeBufferWaterMark();
+ userAgentPrefix = sesOpts.userAgentPrefix();
}
public ConnectionOptions setBrokerUri(URI value) {
@@ -80,6 +82,17 @@ public ConnectionOptions setWriteBufferWaterMark(WriteBufferWaterMark value) {
return this;
}
+ public ConnectionOptions setUserAgentPrefix(String value) {
+ Argument.expectNonNull(value, "user agent prefix");
+ Argument.expectCondition(
+ value.codePoints().allMatch(c -> c < 128 && !Character.isISOControl(c)),
+ "user agent prefix must be printable ASCII");
+ Argument.expectCondition(
+ value.length() < 128, "user agent prefix must be shorter than 128 characters");
+ userAgentPrefix = value;
+ return this;
+ }
+
public URI brokerUri() {
return brokerUri;
}
@@ -99,4 +112,8 @@ public Duration startRetryInterval() {
public WriteBufferWaterMark writeBufferWaterMark() {
return writeWaterMark;
}
+
+ public String userAgentPrefix() {
+ return userAgentPrefix;
+ }
}
diff --git a/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/util/SystemUtil.java b/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/util/SystemUtil.java
index d735c86..8d4feee 100644
--- a/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/util/SystemUtil.java
+++ b/bmq-sdk/src/main/java/com/bloomberg/bmq/impl/infr/util/SystemUtil.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2022 Bloomberg Finance L.P.
+ * Copyright 2022-2025 Bloomberg Finance L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -48,7 +48,7 @@ public static JavaVersion getJavaVersion() {
JavaVersion result = JavaVersion.JAVA_UNSUPPORTED;
try {
- String version = System.getProperty("java.version");
+ String version = getJavaVersionString();
result =
Arrays.stream(JavaVersion.values())
@@ -63,6 +63,10 @@ public static JavaVersion getJavaVersion() {
return result;
}
+ public static String getJavaVersionString() {
+ return System.getProperty("java.version");
+ }
+
public static int getProcessId() {
int pid = 0;
String vmname = null;
diff --git a/bmq-sdk/src/test/java/com/bloomberg/bmq/impl/infr/codec/JsonDecoderUtilTest.java b/bmq-sdk/src/test/java/com/bloomberg/bmq/impl/infr/codec/JsonDecoderUtilTest.java
index ba21746..dc1dc60 100644
--- a/bmq-sdk/src/test/java/com/bloomberg/bmq/impl/infr/codec/JsonDecoderUtilTest.java
+++ b/bmq-sdk/src/test/java/com/bloomberg/bmq/impl/infr/codec/JsonDecoderUtilTest.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2022 Bloomberg Finance L.P.
+ * Copyright 2022-2025 Bloomberg Finance L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
diff --git a/bmq-sdk/src/test/java/com/bloomberg/bmq/impl/infr/proto/SchemaEventImplBuilderTest.java b/bmq-sdk/src/test/java/com/bloomberg/bmq/impl/infr/proto/SchemaEventImplBuilderTest.java
index 44f879d..1289ae7 100644
--- a/bmq-sdk/src/test/java/com/bloomberg/bmq/impl/infr/proto/SchemaEventImplBuilderTest.java
+++ b/bmq-sdk/src/test/java/com/bloomberg/bmq/impl/infr/proto/SchemaEventImplBuilderTest.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2022 Bloomberg Finance L.P.
+ * Copyright 2022-2025 Bloomberg Finance L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
diff --git a/bmq-sdk/src/test/java/com/bloomberg/bmq/it/PlainConsumerIT.java b/bmq-sdk/src/test/java/com/bloomberg/bmq/it/PlainConsumerIT.java
index 820d064..636aa43 100644
--- a/bmq-sdk/src/test/java/com/bloomberg/bmq/it/PlainConsumerIT.java
+++ b/bmq-sdk/src/test/java/com/bloomberg/bmq/it/PlainConsumerIT.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2022 Bloomberg Finance L.P.
+ * Copyright 2022-2025 Bloomberg Finance L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -115,6 +115,7 @@ public static ByteBuffer[] getLastMessage(int port, Uri uri)
clientIdentity.setClusterName("");
clientIdentity.setClusterNodeId(-1);
clientIdentity.setSdkLanguage(ClientLanguage.E_JAVA);
+ clientIdentity.setUserAgent("com.bloomberg.bmq.it.PlainConsumerIT");
SchemaEventBuilder schemaBuilder = new SchemaEventBuilder();
diff --git a/bmq-sdk/src/test/java/com/bloomberg/bmq/it/PlainProducerIT.java b/bmq-sdk/src/test/java/com/bloomberg/bmq/it/PlainProducerIT.java
index 09024f5..b0e5bdf 100644
--- a/bmq-sdk/src/test/java/com/bloomberg/bmq/it/PlainProducerIT.java
+++ b/bmq-sdk/src/test/java/com/bloomberg/bmq/it/PlainProducerIT.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2022 Bloomberg Finance L.P.
+ * Copyright 2022-2025 Bloomberg Finance L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -117,6 +117,7 @@ public static void sendMessage(
clientIdentity.setClusterName("");
clientIdentity.setClusterNodeId(-1);
clientIdentity.setSdkLanguage(ClientLanguage.E_JAVA);
+ clientIdentity.setUserAgent("com.bloomberg.bmq.it.PlainProducerIT");
SchemaEventBuilder schemaBuilder = new SchemaEventBuilder();
diff --git a/bmq-sdk/src/test/java/com/bloomberg/bmq/util/TestHelpers.java b/bmq-sdk/src/test/java/com/bloomberg/bmq/util/TestHelpers.java
index 2a74eee..a8ddb51 100644
--- a/bmq-sdk/src/test/java/com/bloomberg/bmq/util/TestHelpers.java
+++ b/bmq-sdk/src/test/java/com/bloomberg/bmq/util/TestHelpers.java
@@ -82,7 +82,10 @@ public static void compareWithFileContent(
v1 = in1.read();
v2 = in2.read();
assertTrue(v1 >= 0);
- assertEquals(v1, v2);
+ assertEquals(
+ v1,
+ v2,
+ "Found mismatch at byte " + i + ": " + (char) v1 + ", " + (char) v2);
}
}
}
diff --git a/bmq-sdk/src/test/resources/data/msg_control_nego_client_a326573a-2c1e-42a2-afdc-da184456c118.bin b/bmq-sdk/src/test/resources/data/msg_control_nego_client_a326573a-2c1e-42a2-afdc-da184456c118.bin
index 212cc5f..d825560 100644
Binary files a/bmq-sdk/src/test/resources/data/msg_control_nego_client_a326573a-2c1e-42a2-afdc-da184456c118.bin and b/bmq-sdk/src/test/resources/data/msg_control_nego_client_a326573a-2c1e-42a2-afdc-da184456c118.bin differ