Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions src/main/java/io/nats/client/Options.java
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,12 @@ public class Options {
* {@link Builder#statisticsCollector(StatisticsCollector) statisticsCollector}.
*/
public static final String PROP_STATISTICS_COLLECTOR = PFX + "statisticscollector";

/**
* Property used to configure a builder from a Properties object. {@value}, see
* {@link Builder#writeListener(WriteListener) writeListener}.
*/
public static final String PROP_WRITE_LISTENER = PFX + "write.listener";
/**
* Property used to configure a builder from a Properties object. {@value}, see {@link Builder#maxPingsOut(int) maxPingsOut}.
*/
Expand Down Expand Up @@ -699,6 +705,7 @@ public class Options {
private final ConnectionListener connectionListener;
private final ReadListener readListener;
private final StatisticsCollector statisticsCollector;
private final WriteListener writeListener;
private final String dataPortType;

private final boolean trackAdvancedStats;
Expand Down Expand Up @@ -848,6 +855,7 @@ public static class Builder {
private ConnectionListener connectionListener = null;
private ReadListener readListener = null;
private StatisticsCollector statisticsCollector = null;
private WriteListener writeListener = null;
private String dataPortType = DEFAULT_DATA_PORT_TYPE;
private ExecutorService executor;
private ScheduledExecutorService scheduledExecutor;
Expand Down Expand Up @@ -972,6 +980,7 @@ public Builder properties(Properties props) {
classnameProperty(props, PROP_CONNECTION_CB, o -> this.connectionListener = (ConnectionListener) o);
classnameProperty(props, PROP_READ_LISTENER_CLASS, o -> this.readListener = (ReadListener) o);
classnameProperty(props, PROP_STATISTICS_COLLECTOR, o -> this.statisticsCollector = (StatisticsCollector) o);
classnameProperty(props, PROP_WRITE_LISTENER, o -> this.writeListener = (WriteListener) o);

stringProperty(props, PROP_DATA_PORT_TYPE, s -> this.dataPortType = s);
stringProperty(props, PROP_INBOX_PREFIX, this::inboxPrefix);
Expand Down Expand Up @@ -1687,6 +1696,19 @@ public Builder statisticsCollector(StatisticsCollector collector) {
return this;
}

/**
* Set the {@link WriteListener WriteListener} to track messages buffered from the queue to the socket buffer.
* <p>
* If not set, no implementation will be used
*
* @param listener the new WriteListener for this connection.
* @return the Builder for chaining
*/
public Builder writeListener(WriteListener listener) {
this.writeListener = listener;
return this;
}

/**
* Set the {@link ExecutorService ExecutorService} used to run threaded tasks. The default is a
* cached thread pool that names threads after the connection name (or a default). This executor
Expand Down Expand Up @@ -2096,6 +2118,7 @@ public Builder(Options o) {
this.connectionListener = o.connectionListener;
this.readListener = o.readListener;
this.statisticsCollector = o.statisticsCollector;
this.writeListener = o.writeListener;
this.dataPortType = o.dataPortType;
this.trackAdvancedStats = o.trackAdvancedStats;
this.executor = o.executor;
Expand Down Expand Up @@ -2168,6 +2191,7 @@ private Options(Builder b) {
this.connectionListener = b.connectionListener;
this.readListener = b.readListener;
this.statisticsCollector = b.statisticsCollector;
this.writeListener = b.writeListener;
this.dataPortType = b.dataPortType;
this.trackAdvancedStats = b.trackAdvancedStats;
this.executor = b.executor;
Expand Down Expand Up @@ -2302,6 +2326,13 @@ public StatisticsCollector getStatisticsCollector() {
return this.statisticsCollector;
}

/**
* @return the WriteListener, or null, see {@link Builder#writeListener(WriteListener) writeListener()} in the builder doc
*/
public WriteListener getWriteListener() {
return this.writeListener;
}

/**
* @return the auth handler, or null, see {@link Builder#authHandler(AuthHandler) authHandler()} in the builder doc
*/
Expand Down
40 changes: 40 additions & 0 deletions src/main/java/io/nats/client/WriteListener.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package io.nats.client;

import io.nats.client.impl.NatsMessage;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public abstract class WriteListener {
public final ExecutorService executorService;

public WriteListener() {
this.executorService = Executors.newSingleThreadExecutor();
}

public WriteListener(ExecutorService executorService) {
this.executorService = executorService;
}

public final void submit(Runnable runnable) {
executorService.submit(runnable);
}

public void runStarted(int instanceHashCode) {}
public void runEnded(int instanceHashCode) {}

public abstract void buffered(NatsMessage msg);
}
Loading
Loading