diff --git a/src/main/java/io/nats/client/Options.java b/src/main/java/io/nats/client/Options.java
index 6848308bb..86945a9a0 100644
--- a/src/main/java/io/nats/client/Options.java
+++ b/src/main/java/io/nats/client/Options.java
@@ -268,6 +268,12 @@ public class Options {
* {@link Builder#statisticsCollector(StatisticsCollector) statisticsCollector}.
*/
public static final String PROP_STATISTICS_COLLECTOR = PFX + "statisticscollector";
+
+ /**
+ * Property used to configure a builder from a Properties object. {@value}, see
+ * {@link Builder#writeListener(WriteListener) writeListener}.
+ */
+ public static final String PROP_WRITE_LISTENER = PFX + "write.listener";
/**
* Property used to configure a builder from a Properties object. {@value}, see {@link Builder#maxPingsOut(int) maxPingsOut}.
*/
@@ -699,6 +705,7 @@ public class Options {
private final ConnectionListener connectionListener;
private final ReadListener readListener;
private final StatisticsCollector statisticsCollector;
+ private final WriteListener writeListener;
private final String dataPortType;
private final boolean trackAdvancedStats;
@@ -848,6 +855,7 @@ public static class Builder {
private ConnectionListener connectionListener = null;
private ReadListener readListener = null;
private StatisticsCollector statisticsCollector = null;
+ private WriteListener writeListener = null;
private String dataPortType = DEFAULT_DATA_PORT_TYPE;
private ExecutorService executor;
private ScheduledExecutorService scheduledExecutor;
@@ -972,6 +980,7 @@ public Builder properties(Properties props) {
classnameProperty(props, PROP_CONNECTION_CB, o -> this.connectionListener = (ConnectionListener) o);
classnameProperty(props, PROP_READ_LISTENER_CLASS, o -> this.readListener = (ReadListener) o);
classnameProperty(props, PROP_STATISTICS_COLLECTOR, o -> this.statisticsCollector = (StatisticsCollector) o);
+ classnameProperty(props, PROP_WRITE_LISTENER, o -> this.writeListener = (WriteListener) o);
stringProperty(props, PROP_DATA_PORT_TYPE, s -> this.dataPortType = s);
stringProperty(props, PROP_INBOX_PREFIX, this::inboxPrefix);
@@ -1687,6 +1696,19 @@ public Builder statisticsCollector(StatisticsCollector collector) {
return this;
}
+ /**
+ * Set the {@link WriteListener WriteListener} to track messages buffered from the queue to the socket buffer.
+ *
+ * If not set, no implementation will be used
+ *
+ * @param listener the new WriteListener for this connection.
+ * @return the Builder for chaining
+ */
+ public Builder writeListener(WriteListener listener) {
+ this.writeListener = listener;
+ return this;
+ }
+
/**
* Set the {@link ExecutorService ExecutorService} used to run threaded tasks. The default is a
* cached thread pool that names threads after the connection name (or a default). This executor
@@ -2096,6 +2118,7 @@ public Builder(Options o) {
this.connectionListener = o.connectionListener;
this.readListener = o.readListener;
this.statisticsCollector = o.statisticsCollector;
+ this.writeListener = o.writeListener;
this.dataPortType = o.dataPortType;
this.trackAdvancedStats = o.trackAdvancedStats;
this.executor = o.executor;
@@ -2168,6 +2191,7 @@ private Options(Builder b) {
this.connectionListener = b.connectionListener;
this.readListener = b.readListener;
this.statisticsCollector = b.statisticsCollector;
+ this.writeListener = b.writeListener;
this.dataPortType = b.dataPortType;
this.trackAdvancedStats = b.trackAdvancedStats;
this.executor = b.executor;
@@ -2302,6 +2326,13 @@ public StatisticsCollector getStatisticsCollector() {
return this.statisticsCollector;
}
+ /**
+ * @return the WriteListener, or null, see {@link Builder#writeListener(WriteListener) writeListener()} in the builder doc
+ */
+ public WriteListener getWriteListener() {
+ return this.writeListener;
+ }
+
/**
* @return the auth handler, or null, see {@link Builder#authHandler(AuthHandler) authHandler()} in the builder doc
*/
diff --git a/src/main/java/io/nats/client/WriteListener.java b/src/main/java/io/nats/client/WriteListener.java
new file mode 100644
index 000000000..3465778a0
--- /dev/null
+++ b/src/main/java/io/nats/client/WriteListener.java
@@ -0,0 +1,40 @@
+// Copyright 2023 The NATS Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package io.nats.client;
+
+import io.nats.client.impl.NatsMessage;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+public abstract class WriteListener {
+ public final ExecutorService executorService;
+
+ public WriteListener() {
+ this.executorService = Executors.newSingleThreadExecutor();
+ }
+
+ public WriteListener(ExecutorService executorService) {
+ this.executorService = executorService;
+ }
+
+ public final void submit(Runnable runnable) {
+ executorService.submit(runnable);
+ }
+
+ public void runStarted(int instanceHashCode) {}
+ public void runEnded(int instanceHashCode) {}
+
+ public abstract void buffered(NatsMessage msg);
+}
diff --git a/src/main/java/io/nats/client/impl/MessageQueue.java b/src/main/java/io/nats/client/impl/MessageQueue.java
index 4366eb41e..78b5defd5 100644
--- a/src/main/java/io/nats/client/impl/MessageQueue.java
+++ b/src/main/java/io/nats/client/impl/MessageQueue.java
@@ -1,364 +1,365 @@
-// Copyright 2015-2018 The NATS Authors
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at:
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package io.nats.client.impl;
-
-import io.nats.client.NatsSystemClock;
-
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.Predicate;
-
-import static io.nats.client.support.NatsConstants.*;
-
-class MessageQueue {
- protected static final int STOPPED = 0;
- protected static final int RUNNING = 1;
- protected static final int DRAINING = 2;
- protected static final long MIN_OFFER_TIMEOUT_NANOS = 100 * NANOS_PER_MILLI;
-
- protected final AtomicLong length;
- protected final AtomicLong sizeInBytes;
- protected final AtomicInteger running;
- protected final boolean singleReaderMode;
- protected final LinkedBlockingQueue queue;
- protected final Lock editLock;
- protected final int maxMessagesInOutgoingQueue;
- protected final boolean discardWhenFull;
- protected final long offerLockNanos;
- protected final long offerTimeoutNanos;
- protected final Duration requestCleanupInterval;
-
- // SPECIAL MARKER MESSAGES
- // A simple == is used to resolve if any message is exactly the static pill object in question
- // ----------
- // 1. Poison pill is a graphic, but common term for an item that breaks loops or stop something.
- // In this class the poison pill is used to break out of timed waits on the blocking queue.
- protected static final NatsMessage POISON_PILL = new NatsMessage("_poison", null, EMPTY_BODY);
-
- MessageQueue(boolean singleReaderMode, Duration requestCleanupInterval) {
- this(singleReaderMode, -1, false, requestCleanupInterval, null);
- }
-
- MessageQueue(boolean singleReaderMode, Duration requestCleanupInterval, MessageQueue source) {
- this(singleReaderMode, -1, false, requestCleanupInterval, source);
- }
-
- /**
- * If publishHighwaterMark is set to 0 the underlying queue can grow forever (or until the max size of a linked blocking queue that is).
- * A value of 0 is used by readers to prevent the read thread from blocking.
- * If set to a number of messages, the publish command will block, which provides
- * backpressure on a publisher if the writer is slow to push things onto the network. Publishers use the value of Options.getMaxMessagesInOutgoingQueue().
- * @param singleReaderMode allows the use of "accumulate"
- * @param maxMessagesInOutgoingQueue sets a limit on the size of the underlying queue
- * @param discardWhenFull allows to discard messages when the underlying queue is full
- * @param requestCleanupInterval is used to figure the offer timeout
- */
- MessageQueue(boolean singleReaderMode, int maxMessagesInOutgoingQueue, boolean discardWhenFull, Duration requestCleanupInterval) {
- this(singleReaderMode, maxMessagesInOutgoingQueue, discardWhenFull, requestCleanupInterval, null);
- }
-
- MessageQueue(boolean singleReaderMode, int maxMessagesInOutgoingQueue, boolean discardWhenFull, Duration requestCleanupInterval, MessageQueue source) {
- this.maxMessagesInOutgoingQueue = maxMessagesInOutgoingQueue;
- this.queue = maxMessagesInOutgoingQueue > 0 ? new LinkedBlockingQueue<>(maxMessagesInOutgoingQueue) : new LinkedBlockingQueue<>();
- this.discardWhenFull = discardWhenFull;
- this.running = new AtomicInteger(RUNNING);
- this.sizeInBytes = new AtomicLong(0);
- this.length = new AtomicLong(0);
- this.offerLockNanos = requestCleanupInterval.toNanos();
- this.offerTimeoutNanos = Math.max(MIN_OFFER_TIMEOUT_NANOS, requestCleanupInterval.toMillis() * NANOS_PER_MILLI * 95 / 100) ;
-
- editLock = new ReentrantLock();
-
- this.singleReaderMode = singleReaderMode;
- this.requestCleanupInterval = requestCleanupInterval;
-
- if (source != null) {
- source.drainTo(this);
- }
- }
-
- void drainTo(MessageQueue target) {
- editLock.lock();
- try {
- queue.drainTo(target.queue);
- target.length.set(queue.size());
- } finally {
- editLock.unlock();
- }
- }
-
- boolean isSingleReaderMode() {
- return singleReaderMode;
- }
-
- boolean isRunning() {
- return this.running.get() != STOPPED;
- }
-
- boolean isDraining() {
- return this.running.get() == DRAINING;
- }
-
- void pause() {
- this.running.set(STOPPED);
- this.poisonTheQueue();
- }
-
- void resume() {
- this.running.set(RUNNING);
- }
-
- void drain() {
- this.running.set(DRAINING);
- this.poisonTheQueue();
- }
-
- boolean isDrained() {
- // poison pill is not included in the length count, or the size
- return this.running.get() == DRAINING && this.length.get() == 0;
- }
-
- boolean push(NatsMessage msg) {
- return push(msg, false);
- }
-
- boolean push(NatsMessage msg, boolean internal) {
- try {
- long startNanos = NatsSystemClock.nanoTime();
- /*
- This was essentially a Head-Of-Line blocking problem.
-
- So the crux of the problem was that many threads were waiting to push a message to the queue.
- They all waited for the lock and once they had the lock they waited 5 seconds (4750 millis actually)
- only to find out the queue was full. They released the lock, so then another thread acquired the lock,
- and waited 5 seconds. So instead of being parallel, all these threads had to wait in line
- 200 * 4750 = 15.8 minutes
-
- So what I did was try to acquire the lock but only wait 5 seconds.
- If I could not acquire the lock, then I assumed that this means that we are in this exact situation,
- another thread can't add b/c the queue is full, and so there is no point in even trying, so just throw the queue full exception.
-
- If I did acquire the lock, I deducted the time spent waiting for the lock from the time allowed to try to add.
- I took the max of that or 100 millis to try to add to the queue.
- This ensures that the max total time each thread can take is 5100 millis in parallel.
-
- Notes: The 5 seconds and the 4750 seconds is derived from the Options requestCleanupInterval, which defaults to 5 seconds and can be modified.
- The 4750 is 95% of that time. The MIN_OFFER_TIMEOUT_NANOS 100 ms minimum is arbitrary.
- */
- if (editLock.tryLock(offerLockNanos, TimeUnit.NANOSECONDS)) {
- try {
- if (!internal && this.discardWhenFull) {
- return this.queue.offer(msg);
- }
-
- long timeoutNanosLeft = Math.max(MIN_OFFER_TIMEOUT_NANOS, offerTimeoutNanos - (NatsSystemClock.nanoTime() - startNanos));
-
- if (!this.queue.offer(msg, timeoutNanosLeft, TimeUnit.NANOSECONDS)) {
- throw new IllegalStateException(OUTPUT_QUEUE_IS_FULL + queue.size());
- }
- this.sizeInBytes.getAndAdd(msg.getSizeInBytes());
- this.length.incrementAndGet();
- return true;
-
- }
- finally {
- editLock.unlock();
- }
- }
- else {
- throw new IllegalStateException(OUTPUT_QUEUE_BUSY + queue.size());
- }
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- return false;
- }
- }
-
- /**
- * poisoning the queue puts the known poison pill into the queue, forcing any waiting code to stop
- * waiting and return.
- */
- void poisonTheQueue() {
- try {
- this.queue.add(POISON_PILL);
- } catch (IllegalStateException ie) { // queue was full, so we don't really need poison pill
- // ok to ignore this
- }
- }
-
- NatsMessage poll(Duration timeout) throws InterruptedException {
- NatsMessage msg = null;
-
- if (timeout == null || this.isDraining()) { // try immediately
- msg = this.queue.poll();
- } else {
- long nanos = timeout.toNanos();
-
- if (nanos != 0) {
- msg = this.queue.poll(nanos, TimeUnit.NANOSECONDS);
- } else {
- // A value of 0 means wait forever
- // We will loop and wait for a LONG time
- // if told to suspend/drain the poison pill will break this loop
- while (this.isRunning()) {
- msg = this.queue.poll(100, TimeUnit.DAYS);
- if (msg != null) break;
- }
- }
- }
-
- return msg == null || msg == POISON_PILL ? null : msg;
- }
-
- NatsMessage pop(Duration timeout) throws InterruptedException {
- if (!this.isRunning()) {
- return null;
- }
-
- NatsMessage msg = this.poll(timeout);
-
- if (msg == null) {
- return null;
- }
-
- this.sizeInBytes.getAndAdd(-msg.getSizeInBytes());
- this.length.decrementAndGet();
-
- return msg;
- }
-
- // Waits up to the timeout to try to accumulate multiple messages
- // Use the next field to read the entire set accumulated.
- // maxSize and maxMessages are both checked and if either is exceeded
- // the method returns.
- //
- // A timeout of 0 will wait forever (or until the queue is stopped/drained)
- //
- // Only works in single reader mode, because we want to maintain order.
- // accumulate reads off the concurrent queue one at a time, so if multiple
- // readers are present, you could get out of order message delivery.
- NatsMessage accumulate(long maxBytesToAccumulate, long maxMessagesToAccumulate, Duration timeout)
- throws InterruptedException {
-
- if (!this.singleReaderMode) {
- throw new IllegalStateException("Accumulate is only supported in single reader mode.");
- }
-
- if (!this.isRunning()) {
- return null;
- }
-
- NatsMessage msg = this.poll(timeout);
-
- if (msg == null) {
- return null;
- }
-
- long size = msg.getSizeInBytes();
-
- if (maxMessagesToAccumulate <= 1 || size >= maxBytesToAccumulate) {
- this.sizeInBytes.addAndGet(-size);
- this.length.decrementAndGet();
- return msg;
- }
-
- long count = 1;
- NatsMessage cursor = msg;
-
- while (true) {
- NatsMessage next = this.queue.peek();
- if (next != null && next != POISON_PILL) {
- long s = next.getSizeInBytes();
- if (maxBytesToAccumulate < 0 || (size + s) < maxBytesToAccumulate) { // keep going
- size += s;
- count++;
-
- this.queue.poll(); // we need to get the message out of the queue b/c we only peeked
- cursor.next = next;
- if (next.flushImmediatelyAfterPublish) {
- // if we are going to flush, then don't accumulate more
- break;
- }
- if (count == maxMessagesToAccumulate) {
- break;
- }
- cursor = cursor.next;
- } else { // One more is too far
- break;
- }
- } else { // Didn't meet max condition
- break;
- }
- }
-
- this.sizeInBytes.addAndGet(-size);
- this.length.addAndGet(-count);
-
- return msg;
- }
-
- // Returns a message or null
- NatsMessage popNow() throws InterruptedException {
- return pop(null);
- }
-
- long length() {
- return this.length.get();
- }
-
- long sizeInBytes() {
- return this.sizeInBytes.get();
- }
-
- void filter(Predicate p) {
- editLock.lock();
- try {
- if (this.isRunning()) {
- throw new IllegalStateException("Filter is only supported when the queue is paused");
- }
- ArrayList newQueue = new ArrayList<>();
- NatsMessage cursor = this.queue.poll();
- while (cursor != null) {
- if (!p.test(cursor)) {
- newQueue.add(cursor);
- } else {
- this.sizeInBytes.addAndGet(-cursor.getSizeInBytes());
- this.length.decrementAndGet();
- }
- cursor = this.queue.poll();
- }
- this.queue.addAll(newQueue);
- } finally {
- editLock.unlock();
- }
- }
-
- void clear() {
- editLock.lock();
- try {
- this.queue.clear();
- this.length.set(0);
- this.sizeInBytes.set(0);
- } finally {
- editLock.unlock();
- }
- }
-}
+// Copyright 2015-2018 The NATS Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package io.nats.client.impl;
+
+import io.nats.client.NatsSystemClock;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Predicate;
+
+import static io.nats.client.support.NatsConstants.*;
+
+class MessageQueue {
+ protected static final int STOPPED = 0;
+ protected static final int RUNNING = 1;
+ protected static final int DRAINING = 2;
+ protected static final long MIN_OFFER_TIMEOUT_NANOS = 100 * NANOS_PER_MILLI;
+
+ protected final AtomicLong length;
+ protected final AtomicLong sizeInBytes;
+ protected final AtomicInteger running;
+ protected final boolean singleReaderMode;
+ protected final LinkedBlockingQueue queue;
+ protected final Lock editLock;
+ protected final int maxMessagesInOutgoingQueue;
+ protected final boolean discardWhenFull;
+ protected final long offerLockNanos;
+ protected final long offerTimeoutNanos;
+ protected final Duration requestCleanupInterval;
+
+ // SPECIAL MARKER MESSAGES
+ // A simple == is used to resolve if any message is exactly the static pill object in question
+ // ----------
+ // 1. Poison pill is a graphic, but common term for an item that breaks loops or stop something.
+ // In this class the poison pill is used to break out of timed waits on the blocking queue.
+ protected static final NatsMessage POISON_PILL = new NatsMessage("_poison", null, EMPTY_BODY);
+
+ MessageQueue(boolean singleReaderMode, Duration requestCleanupInterval) {
+ this(singleReaderMode, -1, false, requestCleanupInterval, null);
+ }
+
+ MessageQueue(boolean singleReaderMode, Duration requestCleanupInterval, MessageQueue source) {
+ this(singleReaderMode, -1, false, requestCleanupInterval, source);
+ }
+
+ /**
+ * If publishHighwaterMark is set to 0 the underlying queue can grow forever (or until the max size of a linked blocking queue that is).
+ * A value of 0 is used by readers to prevent the read thread from blocking.
+ * If set to a number of messages, the publish command will block, which provides
+ * backpressure on a publisher if the writer is slow to push things onto the network. Publishers use the value of Options.getMaxMessagesInOutgoingQueue().
+ * @param singleReaderMode allows the use of "accumulate"
+ * @param maxMessagesInOutgoingQueue sets a limit on the size of the underlying queue
+ * @param discardWhenFull allows to discard messages when the underlying queue is full
+ * @param requestCleanupInterval is used to figure the offer timeout
+ */
+ MessageQueue(boolean singleReaderMode, int maxMessagesInOutgoingQueue, boolean discardWhenFull, Duration requestCleanupInterval) {
+ this(singleReaderMode, maxMessagesInOutgoingQueue, discardWhenFull, requestCleanupInterval, null);
+ }
+
+ MessageQueue(boolean singleReaderMode, int maxMessagesInOutgoingQueue, boolean discardWhenFull, Duration requestCleanupInterval, MessageQueue source) {
+ this.maxMessagesInOutgoingQueue = maxMessagesInOutgoingQueue;
+ this.queue = maxMessagesInOutgoingQueue > 0 ? new LinkedBlockingQueue<>(maxMessagesInOutgoingQueue) : new LinkedBlockingQueue<>();
+ this.discardWhenFull = discardWhenFull;
+ this.running = new AtomicInteger(RUNNING);
+ this.sizeInBytes = new AtomicLong(0);
+ this.length = new AtomicLong(0);
+ this.offerLockNanos = requestCleanupInterval.toNanos();
+ this.offerTimeoutNanos = Math.max(MIN_OFFER_TIMEOUT_NANOS, requestCleanupInterval.toMillis() * NANOS_PER_MILLI * 95 / 100) ;
+
+ editLock = new ReentrantLock();
+
+ this.singleReaderMode = singleReaderMode;
+ this.requestCleanupInterval = requestCleanupInterval;
+
+ if (source != null) {
+ source.drainTo(this);
+ }
+ }
+
+ void drainTo(MessageQueue target) {
+ editLock.lock();
+ try {
+ queue.drainTo(target.queue);
+ target.length.set(queue.size());
+ } finally {
+ editLock.unlock();
+ }
+ }
+
+ boolean isSingleReaderMode() {
+ return singleReaderMode;
+ }
+
+ boolean isRunning() {
+ return this.running.get() != STOPPED;
+ }
+
+ boolean isDraining() {
+ return this.running.get() == DRAINING;
+ }
+
+ void pause() {
+ this.running.set(STOPPED);
+ this.poisonTheQueue();
+ }
+
+ void resume() {
+ this.running.set(RUNNING);
+ }
+
+ void drain() {
+ this.running.set(DRAINING);
+ this.poisonTheQueue();
+ }
+
+ boolean isDrained() {
+ // poison pill is not included in the length count, or the size
+ return this.running.get() == DRAINING && this.length.get() == 0;
+ }
+
+ boolean push(NatsMessage msg) {
+ return push(msg, false);
+ }
+
+ boolean push(NatsMessage msg, boolean internal) {
+ try {
+ long startNanos = NatsSystemClock.nanoTime();
+ /*
+ This was essentially a Head-Of-Line blocking problem.
+
+ So the crux of the problem was that many threads were waiting to push a message to the queue.
+ They all waited for the lock and once they had the lock they waited 5 seconds (4750 millis actually)
+ only to find out the queue was full. They released the lock, so then another thread acquired the lock,
+ and waited 5 seconds. So instead of being parallel, all these threads had to wait in line
+ 200 * 4750 = 15.8 minutes
+
+ So what I did was try to acquire the lock but only wait 5 seconds.
+ If I could not acquire the lock, then I assumed that this means that we are in this exact situation,
+ another thread can't add b/c the queue is full, and so there is no point in even trying, so just throw the queue full exception.
+
+ If I did acquire the lock, I deducted the time spent waiting for the lock from the time allowed to try to add.
+ I took the max of that or 100 millis to try to add to the queue.
+ This ensures that the max total time each thread can take is 5100 millis in parallel.
+
+ Notes: The 5 seconds and the 4750 seconds is derived from the Options requestCleanupInterval, which defaults to 5 seconds and can be modified.
+ The 4750 is 95% of that time. The MIN_OFFER_TIMEOUT_NANOS 100 ms minimum is arbitrary.
+ */
+ if (editLock.tryLock(offerLockNanos, TimeUnit.NANOSECONDS)) {
+ try {
+ if (!internal && this.discardWhenFull) {
+ return this.queue.offer(msg);
+ }
+
+ long timeoutNanosLeft = Math.max(MIN_OFFER_TIMEOUT_NANOS, offerTimeoutNanos - (NatsSystemClock.nanoTime() - startNanos));
+
+ if (!this.queue.offer(msg, timeoutNanosLeft, TimeUnit.NANOSECONDS)) {
+ throw new IllegalStateException(OUTPUT_QUEUE_IS_FULL + queue.size());
+ }
+ this.sizeInBytes.getAndAdd(msg.getSizeInBytes());
+ this.length.incrementAndGet();
+ return true;
+
+ }
+ finally {
+ editLock.unlock();
+ }
+ }
+ else {
+ throw new IllegalStateException(OUTPUT_QUEUE_BUSY + queue.size());
+ }
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return false;
+ }
+ }
+
+ /**
+ * poisoning the queue puts the known poison pill into the queue, forcing any waiting code to stop
+ * waiting and return.
+ */
+ void poisonTheQueue() {
+ try {
+ this.queue.add(POISON_PILL);
+ } catch (IllegalStateException ie) { // queue was full, so we don't really need poison pill
+ // ok to ignore this
+ }
+ }
+
+ NatsMessage poll(Duration timeout) throws InterruptedException {
+ NatsMessage msg = null;
+
+ if (timeout == null || this.isDraining()) { // try immediately
+ msg = this.queue.poll();
+ } else {
+ long nanos = timeout.toNanos();
+
+ if (nanos != 0) {
+ msg = this.queue.poll(nanos, TimeUnit.NANOSECONDS);
+ } else {
+ // A value of 0 means wait forever
+ // We will loop and wait for a LONG time
+ // if told to suspend/drain the poison pill will break this loop
+ while (this.isRunning()) {
+ msg = this.queue.poll(100, TimeUnit.DAYS);
+ if (msg != null) break;
+ }
+ }
+ }
+
+ return msg == null || msg == POISON_PILL ? null : msg;
+ }
+
+ NatsMessage pop(Duration timeout) throws InterruptedException {
+ if (!this.isRunning()) {
+ return null;
+ }
+
+ NatsMessage msg = this.poll(timeout);
+
+ if (msg == null) {
+ return null;
+ }
+
+ this.sizeInBytes.getAndAdd(-msg.getSizeInBytes());
+ this.length.decrementAndGet();
+
+ return msg;
+ }
+
+ // Waits up to the timeout to try to accumulate multiple messages
+ // Use the next field to read the entire set accumulated.
+ // maxSize and maxMessages are both checked and if either is exceeded
+ // the method returns.
+ //
+ // A timeout of 0 will wait forever (or until the queue is stopped/drained)
+ //
+ // Only works in single reader mode, because we want to maintain order.
+ // accumulate reads off the concurrent queue one at a time, so if multiple
+ // readers are present, you could get out of order message delivery.
+ NatsMessage accumulate(long maxBytesToAccumulate, long maxMessagesToAccumulate, Duration timeout)
+ throws InterruptedException {
+
+ if (!this.singleReaderMode) {
+ throw new IllegalStateException("Accumulate is only supported in single reader mode.");
+ }
+
+ if (!this.isRunning()) {
+ return null;
+ }
+
+ NatsMessage msg = this.poll(timeout);
+
+ if (msg == null) {
+ return null;
+ }
+
+ long size = msg.getSizeInBytes();
+
+ if (maxMessagesToAccumulate <= 1 || size >= maxBytesToAccumulate) {
+ this.sizeInBytes.addAndGet(-size);
+ this.length.decrementAndGet();
+ return msg;
+ }
+
+ long count = 1;
+ NatsMessage cursor = msg;
+
+ while (true) {
+ NatsMessage next = this.queue.peek();
+ if (next != null && next != POISON_PILL) {
+ long s = next.getSizeInBytes();
+ if (maxBytesToAccumulate < 0 || (size + s) < maxBytesToAccumulate) { // keep going
+ size += s;
+ count++;
+
+ this.queue.poll(); // we need to get the message out of the queue b/c we only peeked
+ next.prev = cursor;
+ cursor.next = next;
+ if (next.flushImmediatelyAfterPublish) {
+ // if we are going to flush, then don't accumulate more
+ break;
+ }
+ if (count == maxMessagesToAccumulate) {
+ break;
+ }
+ cursor = cursor.next;
+ } else { // One more is too far
+ break;
+ }
+ } else { // Didn't meet max condition
+ break;
+ }
+ }
+
+ this.sizeInBytes.addAndGet(-size);
+ this.length.addAndGet(-count);
+
+ return msg;
+ }
+
+ // Returns a message or null
+ NatsMessage popNow() throws InterruptedException {
+ return pop(null);
+ }
+
+ long length() {
+ return this.length.get();
+ }
+
+ long sizeInBytes() {
+ return this.sizeInBytes.get();
+ }
+
+ void filter(Predicate p) {
+ editLock.lock();
+ try {
+ if (this.isRunning()) {
+ throw new IllegalStateException("Filter is only supported when the queue is paused");
+ }
+ ArrayList newQueue = new ArrayList<>();
+ NatsMessage cursor = this.queue.poll();
+ while (cursor != null) {
+ if (!p.test(cursor)) {
+ newQueue.add(cursor);
+ } else {
+ this.sizeInBytes.addAndGet(-cursor.getSizeInBytes());
+ this.length.decrementAndGet();
+ }
+ cursor = this.queue.poll();
+ }
+ this.queue.addAll(newQueue);
+ } finally {
+ editLock.unlock();
+ }
+ }
+
+ void clear() {
+ editLock.lock();
+ try {
+ this.queue.clear();
+ this.length.set(0);
+ this.sizeInBytes.set(0);
+ } finally {
+ editLock.unlock();
+ }
+ }
+}
diff --git a/src/main/java/io/nats/client/impl/NatsConnectionWriter.java b/src/main/java/io/nats/client/impl/NatsConnectionWriter.java
index 610cc351d..52e8598b8 100644
--- a/src/main/java/io/nats/client/impl/NatsConnectionWriter.java
+++ b/src/main/java/io/nats/client/impl/NatsConnectionWriter.java
@@ -16,6 +16,7 @@
import io.nats.client.Options;
import io.nats.client.StatisticsCollector;
+import io.nats.client.WriteListener;
import io.nats.client.support.ByteArrayBuilder;
import java.io.IOException;
@@ -40,6 +41,8 @@ enum Mode {
private static final int BUFFER_BLOCK_SIZE = 256;
private final NatsConnection connection;
+ private final StatisticsCollector stats;
+ private final WriteListener writeListener;
private final ReentrantLock writerLock;
private Future stopped;
@@ -58,6 +61,8 @@ enum Mode {
NatsConnectionWriter(NatsConnection connection, NatsConnectionWriter sourceWriter) {
this.connection = connection;
+ stats = connection.getStatisticsCollector();
+ writeListener = connection.getOptions().getWriteListener();
writerLock = new ReentrantLock();
this.running = new AtomicBoolean(false);
@@ -129,7 +134,7 @@ boolean isRunning() {
private static final NatsMessage END_RECONNECT = new NatsMessage("_end", null, EMPTY_BODY);
- void sendMessageBatch(NatsMessage msg, DataPort dataPort, StatisticsCollector stats) throws IOException {
+ void sendMessageBatch(NatsMessage msg, DataPort dataPort) throws IOException {
writerLock.lock();
try {
int sendPosition = 0;
@@ -176,8 +181,18 @@ void sendMessageBatch(NatsMessage msg, DataPort dataPort, StatisticsCollector st
sendBuffer[sendPosition++] = LF;
}
- stats.incrementOutMsgs();
- stats.incrementOutBytes(size);
+ if (writeListener == null) {
+ stats.incrementOutMsgs();
+ stats.incrementOutBytes(size);
+ }
+ else {
+ NatsMessage finalMsg = msg;
+ writeListener.submit(() -> {
+ stats.incrementOutMsgs();
+ stats.incrementOutBytes(size);
+ writeListener.buffered(finalMsg);
+ });
+ }
if (msg.flushImmediatelyAfterPublish) {
dataPort.flush();
@@ -203,7 +218,10 @@ public void run() {
try {
dataPort = this.dataPortFuture.get(); // Will wait for the future to complete
- StatisticsCollector stats = this.connection.getStatisticsCollector();
+
+ if (writeListener != null) {
+ writeListener.submit(() -> writeListener.runStarted(this.hashCode()));
+ }
while (running.get() && !Thread.interrupted()) {
NatsMessage msg;
@@ -214,7 +232,7 @@ public void run() {
msg = this.reconnectOutgoing.accumulate(sendBufferLength.get(), Options.MAX_MESSAGES_IN_NETWORK_BUFFER, reconnectTimeout);
}
if (msg != null) {
- sendMessageBatch(msg, dataPort, stats);
+ sendMessageBatch(msg, dataPort);
}
}
} catch (IOException | BufferOverflowException io) {
@@ -229,6 +247,9 @@ public void run() {
Thread.currentThread().interrupt();
} finally {
this.running.set(false);
+ if (writeListener != null) {
+ writeListener.submit(() -> writeListener.runEnded(this.hashCode()));
+ }
}
}
diff --git a/src/main/java/io/nats/client/impl/NatsMessage.java b/src/main/java/io/nats/client/impl/NatsMessage.java
index e6bd61310..f78783061 100644
--- a/src/main/java/io/nats/client/impl/NatsMessage.java
+++ b/src/main/java/io/nats/client/impl/NatsMessage.java
@@ -1,547 +1,548 @@
-// Copyright 2015-2018 The NATS Authors
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at:
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package io.nats.client.impl;
-
-import io.nats.client.Connection;
-import io.nats.client.Message;
-import io.nats.client.Subscription;
-import io.nats.client.support.ByteArrayBuilder;
-import io.nats.client.support.Status;
-
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-import java.time.Duration;
-import java.util.concurrent.TimeoutException;
-
-import static io.nats.client.support.NatsConstants.*;
-import static io.nats.client.support.Validator.validateReplyTo;
-import static io.nats.client.support.Validator.validateSubject;
-import static java.nio.charset.StandardCharsets.ISO_8859_1;
-import static java.nio.charset.StandardCharsets.UTF_8;
-
-public class NatsMessage implements Message {
-
- protected static final String NOT_A_JET_STREAM_MESSAGE = "Message is not a JetStream message";
-
- protected String subject;
- protected String replyTo;
- protected byte[] data;
- protected Headers headers;
-
- // incoming specific : subject, replyTo, data and these fields
- protected String sid;
- protected int controlLineLength;
-
- // protocol specific : just this field
- ByteArrayBuilder protocolBab;
-
- // housekeeping
- protected int sizeInBytes;
- protected int headerLen;
- protected int dataLen;
-
- protected NatsSubscription subscription;
-
- // for accumulate
- protected NatsMessage next;
- protected boolean flushImmediatelyAfterPublish;
-
- // ack tracking
- protected AckType lastAck;
-
- // ----------------------------------------------------------------------------------------------------
- // Constructors - Prefer to use Builder
- // ----------------------------------------------------------------------------------------------------
- protected NatsMessage() {
- this((byte[])null);
- }
-
- protected NatsMessage(byte[] data) {
- this.data = data == null ? EMPTY_BODY : data;
- dataLen = this.data.length;
- }
-
- @Deprecated // utf8-mode is ignored
- public NatsMessage(String subject, String replyTo, byte[] data, boolean utf8mode) {
- this(subject, replyTo, null, data);
- }
-
- @Deprecated // utf8-mode is ignored
- public NatsMessage(String subject, String replyTo, Headers headers, byte[] data, boolean utf8mode) {
- this(subject, replyTo, headers, data);
- }
-
- public NatsMessage(String subject, String replyTo, byte[] data) {
- this(subject, replyTo, null, data);
- }
-
- public NatsMessage(String subject, String replyTo, Headers headers, byte[] data) {
- this(data);
- this.subject = validateSubject(subject, true);
- this.replyTo = validateReplyTo(replyTo, false);
- this.headers = headers;
- }
-
- public NatsMessage(Message message) {
- this(message.getData());
- this.subject = message.getSubject();
- this.replyTo = message.getReplyTo();
- this.headers = message.getHeaders();
- }
-
- // ----------------------------------------------------------------------------------------------------
- // Client and Message Internal Methods
- // ----------------------------------------------------------------------------------------------------
- boolean isProtocol() {
- return false; // overridden in NatsMessage.ProtocolMessage
- }
-
- boolean isProtocolFilterOnStop() {
- return false; // overridden in NatsMessage.ProtocolMessage
- }
-
- private static final Headers EMPTY_READ_ONLY = new Headers(null, true, null);
-
- protected void calculate() {
- int replyToLen = replyTo == null ? 0 : replyTo.length();
-
- // headers get frozen (read only) at this point
- if (headers == null) {
- headerLen = 0;
- }
- else if (headers.isEmpty()) {
- headers = EMPTY_READ_ONLY;
- headerLen = 0;
- }
- else {
- headers = headers.isReadOnly() ? headers : new Headers(headers, true, null);
- headerLen = headers.serializedLength();
- }
-
- int headerAndDataLen = headerLen + dataLen;
-
- // initialize the builder with a reasonable length, preventing resize in 99.9% of the cases
- // 32 for misc + subject length doubled in case of utf8 mode + replyToLen + totLen (headerLen + dataLen)
- ByteArrayBuilder bab = new ByteArrayBuilder(32 + (subject.length() * 2) + replyToLen + headerAndDataLen, UTF_8);
-
- // protocol come first
- if (headerLen > 0) {
- bab.append(HPUB_SP_BYTES, 0, HPUB_SP_BYTES_LEN);
- }
- else {
- bab.append(PUB_SP_BYTES, 0, PUB_SP_BYTES_LEN);
- }
-
- // next comes the subject
- bab.append(subject.getBytes(UTF_8)).append(SP);
-
- // reply to if it's there
- if (replyToLen > 0) {
- bab.append(replyTo.getBytes(UTF_8)).append(SP);
- }
-
- // header length if there are headers
- if (headerLen > 0) {
- bab.append(Integer.toString(headerLen).getBytes(ISO_8859_1)).append(SP);
- }
-
- // payload length
- bab.append(Integer.toString(headerAndDataLen).getBytes(ISO_8859_1));
-
- protocolBab = bab;
- controlLineLength = protocolBab.length() + 2; // One CRLF. This is just how controlLineLength is defined.
- sizeInBytes = controlLineLength + headerAndDataLen + 2; // The 2nd CRLFs
- }
-
- ByteArrayBuilder getProtocolBab() {
- calculate();
- return protocolBab;
- }
-
- long getSizeInBytes() {
- calculate();
- return sizeInBytes;
- }
-
- byte[] getProtocolBytes() {
- calculate();
- return protocolBab.toByteArray();
- }
-
- int getControlLineLength() {
- calculate();
- return controlLineLength;
- }
-
- /**
- * @param destPosition the position index in destination byte array to start
- * @param dest is the byte array to write to
- * @return the length of the header
- */
- int copyNotEmptyHeaders(int destPosition, byte[] dest) {
- calculate();
- if (headerLen > 0) {
- return headers.serializeToArray(destPosition, dest);
- }
- return 0;
- }
-
- void setSubscription(NatsSubscription sub) {
- subscription = sub;
- }
-
- NatsSubscription getNatsSubscription() {
- return subscription;
- }
-
- // ----------------------------------------------------------------------------------------------------
- // Public Interface Methods
- // ----------------------------------------------------------------------------------------------------
- /**
- * {@inheritDoc}
- */
- @Override
- public String getSID() {
- return sid;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public Connection getConnection() {
- return subscription == null ? null : subscription.connection;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public String getSubject() {
- return subject;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public String getReplyTo() {
- return replyTo;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean hasHeaders() {
- return headers != null && !headers.isEmpty();
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public Headers getHeaders() {
- return headers;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean isStatusMessage() {
- return false;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public Status getStatus() {
- return null;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public byte[] getData() {
- return data;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean isUtf8mode() {
- return false;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public Subscription getSubscription() {
- return subscription;
- }
-
- @Override
- public AckType lastAck() {
- return lastAck;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void ack() {
- // do nothing. faster. saves checking whether a message is jetstream or not
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void ackSync(Duration d) throws InterruptedException, TimeoutException {
- // do nothing. faster. saves checking whether a message is jetstream or not
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void nak() {
- // do nothing. faster. saves checking whether a message is jetstream or not
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void nakWithDelay(Duration nakDelay) {
- // do nothing. faster. saves checking whether a message is jetstream or not
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void nakWithDelay(long nakDelayMillis) {
- // do nothing. faster. saves checking whether a message is jetstream or not
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void inProgress() {
- // do nothing. faster. saves checking whether a message is jetstream or not
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void term() {
- // do nothing. faster. saves checking whether a message is jetstream or not
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public NatsJetStreamMetaData metaData() {
- throw new IllegalStateException(NOT_A_JET_STREAM_MESSAGE);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean isJetStream() {
- return false; // overridden in NatsJetStreamMessage
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public long consumeByteCount() {
- return subject == null ? 0 : subject.length()
- + headerLen
- + dataLen
- + (replyTo == null ? 0 : replyTo.length());
- }
-
- @Override
- public String toString() {
- if (subject == null) {
- return getClass().getSimpleName() + " | " + protocolBytesToString();
- }
- return getClass().getSimpleName() + " |" + subject + "|" + replyToString() + "|" + dataToString() + "|";
- }
-
- String toDetailString() {
- return "NatsMessage:" +
- "\n subject='" + subject + '\'' +
- "\n replyTo='" + replyToString() + '\'' +
- "\n data=" + dataToString() +
- "\n headers=" + headersToString() +
- "\n sid='" + sid + '\'' +
- "\n protocolBytes=" + protocolBytesToString() +
- "\n sizeInBytes=" + sizeInBytes +
- "\n headerLen=" + headerLen +
- "\n dataLen=" + dataLen +
- "\n subscription=" + subscription +
- "\n next=" + nextToString();
-
- }
-
- private String headersToString() {
- return hasHeaders() ? new String(headers.getSerialized(), ISO_8859_1).replace("\r", "+").replace("\n", "+") : "";
- }
-
- private String dataToString() {
- if (data.length == 0) {
- return "";
- }
- String s = new String(data, UTF_8);
- int at = s.indexOf("io.nats.jetstream.api");
- if (at == -1) {
- return s.length() > 27 ? s.substring(0, 27) + "..." : s;
- }
- int at2 = s.indexOf('"', at);
- return s.substring(at, at2);
- }
-
- private String replyToString() {
- return replyTo == null ? "" : replyTo;
- }
-
- private String protocolBytesToString() {
- return protocolBab == null ? null : protocolBab.toString();
- }
-
- private String nextToString() {
- return next == null ? "No" : "Yes";
- }
-
- // ----------------------------------------------------------------------------------------------------
- // Standard Builder
- // ----------------------------------------------------------------------------------------------------
- public static Builder builder() {
- return new Builder();
- }
-
- /**
- * The builder is for building normal publish/request messages,
- * as an option for client use developers instead of the normal constructor
- */
- public static class Builder {
- private String subject;
- private String replyTo;
- private Headers headers;
- private byte[] data;
-
- /**
- * Set the subject
- *
- * @param subject the subject
- * @return the builder
- */
- public Builder subject(final String subject) {
- this.subject = subject;
- return this;
- }
-
- /**
- * Set the reply to
- *
- * @param replyTo the reply to
- * @return the builder
- */
- public Builder replyTo(final String replyTo) {
- this.replyTo = replyTo;
- return this;
- }
-
- /**
- * Set the headers
- *
- * @param headers the headers
- * @return the builder
- */
- public Builder headers(final Headers headers) {
- this.headers = headers;
- return this;
- }
-
- /**
- * Set the data from a string converting using the
- * charset StandardCharsets.UTF_8
- *
- * @param data the data string
- * @return the builder
- */
- public Builder data(final String data) {
- if (data != null) {
- this.data = data.getBytes(StandardCharsets.UTF_8);
- }
- return this;
- }
-
- /**
- * Set the data from a string
- *
- * @param data the data string
- * @param charset the charset, for example {@code StandardCharsets.UTF_8}
- * @return the builder
- */
- public Builder data(final String data, final Charset charset) {
- this.data = data.getBytes(charset);
- return this;
- }
-
- /**
- * Set the data from a byte array. null data changed to empty byte array
- *
- * @param data the data
- * @return the builder
- */
- public Builder data(final byte[] data) {
- this.data = data;
- return this;
- }
-
- /**
- * Set if the subject should be treated as utf
- * @deprecated Code is just always treating as utf8
- * @param utf8mode true if utf8 mode for subject
- * @return the builder
- */
- @Deprecated
- public Builder utf8mode(final boolean utf8mode) {
- return this;
- }
-
- /**
- * Build the {@code NatsMessage} object
- *
- * @return the {@code NatsMessage}
- */
- public NatsMessage build() {
- return new NatsMessage(subject, replyTo, headers, data);
- }
- }
-}
+// Copyright 2015-2018 The NATS Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package io.nats.client.impl;
+
+import io.nats.client.Connection;
+import io.nats.client.Message;
+import io.nats.client.Subscription;
+import io.nats.client.support.ByteArrayBuilder;
+import io.nats.client.support.Status;
+
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.concurrent.TimeoutException;
+
+import static io.nats.client.support.NatsConstants.*;
+import static io.nats.client.support.Validator.validateReplyTo;
+import static io.nats.client.support.Validator.validateSubject;
+import static java.nio.charset.StandardCharsets.ISO_8859_1;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+public class NatsMessage implements Message {
+
+ protected static final String NOT_A_JET_STREAM_MESSAGE = "Message is not a JetStream message";
+
+ protected String subject;
+ protected String replyTo;
+ protected byte[] data;
+ protected Headers headers;
+
+ // incoming specific : subject, replyTo, data and these fields
+ protected String sid;
+ protected int controlLineLength;
+
+ // protocol specific : just this field
+ ByteArrayBuilder protocolBab;
+
+ // housekeeping
+ protected int sizeInBytes;
+ protected int headerLen;
+ protected int dataLen;
+
+ protected NatsSubscription subscription;
+
+ // for accumulate
+ protected NatsMessage prev;
+ protected NatsMessage next;
+ protected boolean flushImmediatelyAfterPublish;
+
+ // ack tracking
+ protected AckType lastAck;
+
+ // ----------------------------------------------------------------------------------------------------
+ // Constructors - Prefer to use Builder
+ // ----------------------------------------------------------------------------------------------------
+ protected NatsMessage() {
+ this((byte[])null);
+ }
+
+ protected NatsMessage(byte[] data) {
+ this.data = data == null ? EMPTY_BODY : data;
+ dataLen = this.data.length;
+ }
+
+ @Deprecated // utf8-mode is ignored
+ public NatsMessage(String subject, String replyTo, byte[] data, boolean utf8mode) {
+ this(subject, replyTo, null, data);
+ }
+
+ @Deprecated // utf8-mode is ignored
+ public NatsMessage(String subject, String replyTo, Headers headers, byte[] data, boolean utf8mode) {
+ this(subject, replyTo, headers, data);
+ }
+
+ public NatsMessage(String subject, String replyTo, byte[] data) {
+ this(subject, replyTo, null, data);
+ }
+
+ public NatsMessage(String subject, String replyTo, Headers headers, byte[] data) {
+ this(data);
+ this.subject = validateSubject(subject, true);
+ this.replyTo = validateReplyTo(replyTo, false);
+ this.headers = headers;
+ }
+
+ public NatsMessage(Message message) {
+ this(message.getData());
+ this.subject = message.getSubject();
+ this.replyTo = message.getReplyTo();
+ this.headers = message.getHeaders();
+ }
+
+ // ----------------------------------------------------------------------------------------------------
+ // Client and Message Internal Methods
+ // ----------------------------------------------------------------------------------------------------
+ boolean isProtocol() {
+ return false; // overridden in NatsMessage.ProtocolMessage
+ }
+
+ boolean isProtocolFilterOnStop() {
+ return false; // overridden in NatsMessage.ProtocolMessage
+ }
+
+ private static final Headers EMPTY_READ_ONLY = new Headers(null, true, null);
+
+ protected void calculate() {
+ int replyToLen = replyTo == null ? 0 : replyTo.length();
+
+ // headers get frozen (read only) at this point
+ if (headers == null) {
+ headerLen = 0;
+ }
+ else if (headers.isEmpty()) {
+ headers = EMPTY_READ_ONLY;
+ headerLen = 0;
+ }
+ else {
+ headers = headers.isReadOnly() ? headers : new Headers(headers, true, null);
+ headerLen = headers.serializedLength();
+ }
+
+ int headerAndDataLen = headerLen + dataLen;
+
+ // initialize the builder with a reasonable length, preventing resize in 99.9% of the cases
+ // 32 for misc + subject length doubled in case of utf8 mode + replyToLen + totLen (headerLen + dataLen)
+ ByteArrayBuilder bab = new ByteArrayBuilder(32 + (subject.length() * 2) + replyToLen + headerAndDataLen, UTF_8);
+
+ // protocol come first
+ if (headerLen > 0) {
+ bab.append(HPUB_SP_BYTES, 0, HPUB_SP_BYTES_LEN);
+ }
+ else {
+ bab.append(PUB_SP_BYTES, 0, PUB_SP_BYTES_LEN);
+ }
+
+ // next comes the subject
+ bab.append(subject.getBytes(UTF_8)).append(SP);
+
+ // reply to if it's there
+ if (replyToLen > 0) {
+ bab.append(replyTo.getBytes(UTF_8)).append(SP);
+ }
+
+ // header length if there are headers
+ if (headerLen > 0) {
+ bab.append(Integer.toString(headerLen).getBytes(ISO_8859_1)).append(SP);
+ }
+
+ // payload length
+ bab.append(Integer.toString(headerAndDataLen).getBytes(ISO_8859_1));
+
+ protocolBab = bab;
+ controlLineLength = protocolBab.length() + 2; // One CRLF. This is just how controlLineLength is defined.
+ sizeInBytes = controlLineLength + headerAndDataLen + 2; // The 2nd CRLFs
+ }
+
+ ByteArrayBuilder getProtocolBab() {
+ calculate();
+ return protocolBab;
+ }
+
+ long getSizeInBytes() {
+ calculate();
+ return sizeInBytes;
+ }
+
+ byte[] getProtocolBytes() {
+ calculate();
+ return protocolBab.toByteArray();
+ }
+
+ int getControlLineLength() {
+ calculate();
+ return controlLineLength;
+ }
+
+ /**
+ * @param destPosition the position index in destination byte array to start
+ * @param dest is the byte array to write to
+ * @return the length of the header
+ */
+ int copyNotEmptyHeaders(int destPosition, byte[] dest) {
+ calculate();
+ if (headerLen > 0) {
+ return headers.serializeToArray(destPosition, dest);
+ }
+ return 0;
+ }
+
+ void setSubscription(NatsSubscription sub) {
+ subscription = sub;
+ }
+
+ NatsSubscription getNatsSubscription() {
+ return subscription;
+ }
+
+ // ----------------------------------------------------------------------------------------------------
+ // Public Interface Methods
+ // ----------------------------------------------------------------------------------------------------
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String getSID() {
+ return sid;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Connection getConnection() {
+ return subscription == null ? null : subscription.connection;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String getSubject() {
+ return subject;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String getReplyTo() {
+ return replyTo;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean hasHeaders() {
+ return headers != null && !headers.isEmpty();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Headers getHeaders() {
+ return headers;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean isStatusMessage() {
+ return false;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Status getStatus() {
+ return null;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public byte[] getData() {
+ return data;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean isUtf8mode() {
+ return false;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Subscription getSubscription() {
+ return subscription;
+ }
+
+ @Override
+ public AckType lastAck() {
+ return lastAck;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void ack() {
+ // do nothing. faster. saves checking whether a message is jetstream or not
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void ackSync(Duration d) throws InterruptedException, TimeoutException {
+ // do nothing. faster. saves checking whether a message is jetstream or not
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void nak() {
+ // do nothing. faster. saves checking whether a message is jetstream or not
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void nakWithDelay(Duration nakDelay) {
+ // do nothing. faster. saves checking whether a message is jetstream or not
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void nakWithDelay(long nakDelayMillis) {
+ // do nothing. faster. saves checking whether a message is jetstream or not
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void inProgress() {
+ // do nothing. faster. saves checking whether a message is jetstream or not
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void term() {
+ // do nothing. faster. saves checking whether a message is jetstream or not
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public NatsJetStreamMetaData metaData() {
+ throw new IllegalStateException(NOT_A_JET_STREAM_MESSAGE);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean isJetStream() {
+ return false; // overridden in NatsJetStreamMessage
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public long consumeByteCount() {
+ return subject == null ? 0 : subject.length()
+ + headerLen
+ + dataLen
+ + (replyTo == null ? 0 : replyTo.length());
+ }
+
+ @Override
+ public String toString() {
+ if (subject == null) {
+ return getClass().getSimpleName() + " | " + protocolBytesToString();
+ }
+ return getClass().getSimpleName() + " |" + subject + "|" + replyToString() + "|" + dataToString() + "|";
+ }
+
+ String toDetailString() {
+ return "NatsMessage:" +
+ "\n subject='" + subject + '\'' +
+ "\n replyTo='" + replyToString() + '\'' +
+ "\n data=" + dataToString() +
+ "\n headers=" + headersToString() +
+ "\n sid='" + sid + '\'' +
+ "\n protocolBytes=" + protocolBytesToString() +
+ "\n sizeInBytes=" + sizeInBytes +
+ "\n headerLen=" + headerLen +
+ "\n dataLen=" + dataLen +
+ "\n subscription=" + subscription +
+ "\n next=" + nextToString();
+
+ }
+
+ private String headersToString() {
+ return hasHeaders() ? new String(headers.getSerialized(), ISO_8859_1).replace("\r", "+").replace("\n", "+") : "";
+ }
+
+ private String dataToString() {
+ if (data.length == 0) {
+ return "";
+ }
+ String s = new String(data, UTF_8);
+ int at = s.indexOf("io.nats.jetstream.api");
+ if (at == -1) {
+ return s.length() > 27 ? s.substring(0, 27) + "..." : s;
+ }
+ int at2 = s.indexOf('"', at);
+ return s.substring(at, at2);
+ }
+
+ private String replyToString() {
+ return replyTo == null ? "" : replyTo;
+ }
+
+ private String protocolBytesToString() {
+ return protocolBab == null ? null : protocolBab.toString();
+ }
+
+ private String nextToString() {
+ return next == null ? "No" : "Yes";
+ }
+
+ // ----------------------------------------------------------------------------------------------------
+ // Standard Builder
+ // ----------------------------------------------------------------------------------------------------
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ * The builder is for building normal publish/request messages,
+ * as an option for client use developers instead of the normal constructor
+ */
+ public static class Builder {
+ private String subject;
+ private String replyTo;
+ private Headers headers;
+ private byte[] data;
+
+ /**
+ * Set the subject
+ *
+ * @param subject the subject
+ * @return the builder
+ */
+ public Builder subject(final String subject) {
+ this.subject = subject;
+ return this;
+ }
+
+ /**
+ * Set the reply to
+ *
+ * @param replyTo the reply to
+ * @return the builder
+ */
+ public Builder replyTo(final String replyTo) {
+ this.replyTo = replyTo;
+ return this;
+ }
+
+ /**
+ * Set the headers
+ *
+ * @param headers the headers
+ * @return the builder
+ */
+ public Builder headers(final Headers headers) {
+ this.headers = headers;
+ return this;
+ }
+
+ /**
+ * Set the data from a string converting using the
+ * charset StandardCharsets.UTF_8
+ *
+ * @param data the data string
+ * @return the builder
+ */
+ public Builder data(final String data) {
+ if (data != null) {
+ this.data = data.getBytes(StandardCharsets.UTF_8);
+ }
+ return this;
+ }
+
+ /**
+ * Set the data from a string
+ *
+ * @param data the data string
+ * @param charset the charset, for example {@code StandardCharsets.UTF_8}
+ * @return the builder
+ */
+ public Builder data(final String data, final Charset charset) {
+ this.data = data.getBytes(charset);
+ return this;
+ }
+
+ /**
+ * Set the data from a byte array. null data changed to empty byte array
+ *
+ * @param data the data
+ * @return the builder
+ */
+ public Builder data(final byte[] data) {
+ this.data = data;
+ return this;
+ }
+
+ /**
+ * Set if the subject should be treated as utf
+ * @deprecated Code is just always treating as utf8
+ * @param utf8mode true if utf8 mode for subject
+ * @return the builder
+ */
+ @Deprecated
+ public Builder utf8mode(final boolean utf8mode) {
+ return this;
+ }
+
+ /**
+ * Build the {@code NatsMessage} object
+ *
+ * @return the {@code NatsMessage}
+ */
+ public NatsMessage build() {
+ return new NatsMessage(subject, replyTo, headers, data);
+ }
+ }
+}