Skip to content

Commit dbfc675

Browse files
committed
Add option to enable jdk support for UDS
1 parent 2047344 commit dbfc675

File tree

4 files changed

+26
-12
lines changed

4 files changed

+26
-12
lines changed

src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ String tag() {
9595
public static final int SOCKET_BUFFER_BYTES = -1;
9696
public static final boolean DEFAULT_BLOCKING = false;
9797
public static final boolean DEFAULT_ENABLE_TELEMETRY = true;
98+
public static final boolean DEFAULT_ENABLE_JDK_SOCKET = true;
9899

99100
public static final boolean DEFAULT_ENABLE_AGGREGATION = true;
100101
public static final boolean DEFAULT_ENABLE_ORIGIN_DETECTION = true;
@@ -244,6 +245,9 @@ protected static String format(ThreadLocal<NumberFormat> formatter, Number value
244245
* @param connectionTimeout
245246
* the timeout in milliseconds for connecting to the StatsD server. Applies to unix sockets only.
246247
* It is also used to detect if a connection is still alive and re-establish a new one if needed.
248+
* @param enableJdkSocket
249+
* Boolean to enable native JDK UDS support for the UnixStreamClientChannel.
250+
* Only compatible with Java 16 and up.
247251
* @throws StatsDClientException
248252
* if the client could not be started
249253
*/
@@ -253,7 +257,8 @@ protected static String format(ThreadLocal<NumberFormat> formatter, Number value
253257
final int maxPacketSizeBytes, String entityID, final int poolSize, final int processorWorkers,
254258
final int senderWorkers, boolean blocking, final boolean enableTelemetry, final int telemetryFlushInterval,
255259
final int aggregationFlushInterval, final int aggregationShards, final ThreadFactory customThreadFactory,
256-
String containerID, final boolean originDetectionEnabled, final int connectionTimeout)
260+
String containerID, final boolean originDetectionEnabled, final int connectionTimeout,
261+
final boolean enableJdkSocket)
257262
throws StatsDClientException {
258263

259264
if ((prefix != null) && (!prefix.isEmpty())) {
@@ -298,7 +303,7 @@ protected static String format(ThreadLocal<NumberFormat> formatter, Number value
298303
}
299304

300305
try {
301-
clientChannel = createByteChannel(addressLookup, timeout, connectionTimeout, bufferSize);
306+
clientChannel = createByteChannel(addressLookup, timeout, connectionTimeout, bufferSize, enableJdkSocket);
302307

303308
ThreadFactory threadFactory = customThreadFactory != null ? customThreadFactory : new StatsDThreadFactory();
304309

@@ -317,7 +322,7 @@ protected static String format(ThreadLocal<NumberFormat> formatter, Number value
317322
telemetryClientChannel = clientChannel;
318323
telemetryStatsDProcessor = statsDProcessor;
319324
} else {
320-
telemetryClientChannel = createByteChannel(telemetryAddressLookup, timeout, connectionTimeout, bufferSize);
325+
telemetryClientChannel = createByteChannel(telemetryAddressLookup, timeout, connectionTimeout, bufferSize, enableJdkSocket);
321326

322327
// similar settings, but a single worker and non-blocking.
323328
telemetryStatsDProcessor = createProcessor(queueSize, handler, getPacketSize(telemetryClientChannel),
@@ -378,7 +383,8 @@ public NonBlockingStatsDClient(final NonBlockingStatsDClientBuilder builder) thr
378383
builder.blocking, builder.enableTelemetry, builder.telemetryFlushInterval,
379384
(builder.enableAggregation ? builder.aggregationFlushInterval : 0),
380385
builder.aggregationShards, builder.threadFactory, builder.containerID,
381-
builder.originDetectionEnabled, builder.connectionTimeout);
386+
builder.originDetectionEnabled, builder.connectionTimeout,
387+
builder.enableJdkSocket);
382388
}
383389

384390
protected StatsDProcessor createProcessor(final int queueSize, final StatsDClientErrorHandler handler,
@@ -480,7 +486,7 @@ StringBuilder tagString(final String[] tags, StringBuilder builder) {
480486
}
481487

482488
ClientChannel createByteChannel(
483-
Callable<SocketAddress> addressLookup, int timeout, int connectionTimeout, int bufferSize)
489+
Callable<SocketAddress> addressLookup, int timeout, int connectionTimeout, int bufferSize, boolean enableJdkSocket)
484490
throws Exception {
485491
final SocketAddress address = addressLookup.call();
486492
if (address instanceof NamedPipeSocketAddress) {
@@ -493,7 +499,7 @@ ClientChannel createByteChannel(
493499
// Allow us to support `unix://` for both kind of sockets like in go.
494500
switch (unixAddr.getTransportType()) {
495501
case UDS_STREAM:
496-
return new UnixStreamClientChannel(unixAddr.getAddress(), timeout, connectionTimeout, bufferSize);
502+
return new UnixStreamClientChannel(unixAddr.getAddress(), timeout, connectionTimeout, bufferSize, enableJdkSocket);
497503
case UDS_DATAGRAM:
498504
case UDS:
499505
return new UnixDatagramClientChannel(unixAddr.getAddress(), timeout, bufferSize);

src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ public class NonBlockingStatsDClientBuilder implements Cloneable {
3333
public boolean blocking = NonBlockingStatsDClient.DEFAULT_BLOCKING;
3434
public boolean enableTelemetry = NonBlockingStatsDClient.DEFAULT_ENABLE_TELEMETRY;
3535
public boolean enableAggregation = NonBlockingStatsDClient.DEFAULT_ENABLE_AGGREGATION;
36+
public boolean enableJdkSocket = NonBlockingStatsDClient.DEFAULT_ENABLE_JDK_SOCKET;
3637
public int telemetryFlushInterval = Telemetry.DEFAULT_FLUSH_INTERVAL;
3738
public int aggregationFlushInterval = StatsDAggregator.DEFAULT_FLUSH_INTERVAL;
3839
public int aggregationShards = StatsDAggregator.DEFAULT_SHARDS;
@@ -205,6 +206,11 @@ public NonBlockingStatsDClientBuilder originDetectionEnabled(boolean val) {
205206
return this;
206207
}
207208

209+
public NonBlockingStatsDClientBuilder enableJdkSocket(boolean val) {
210+
enableJdkSocket = val;
211+
return this;
212+
}
213+
208214
/**
209215
* NonBlockingStatsDClient factory method.
210216
* @return the built NonBlockingStatsDClient.
@@ -380,7 +386,7 @@ protected static Callable<SocketAddress> staticUnixResolution(
380386
@Override public SocketAddress call() {
381387
SocketAddress socketAddress;
382388
// Use native UDS support for compatible Java versions and jnr-unixsocket support otherwise.
383-
if (VersionUtils.isJavaVersionAtLeast(16)) {
389+
if (VersionUtils.isJavaVersionAtLeast(16) && NonBlockingStatsDClient.DEFAULT_ENABLE_JDK_SOCKET) {
384390
try {
385391
// Use reflection to avoid compiling Java 16+ classes in incompatible versions
386392
Class<?> unixDomainSocketAddressClass = Class.forName("java.net.UnixDomainSocketAddress");

src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ public class UnixStreamClientChannel implements ClientChannel {
2020
private final int timeout;
2121
private final int connectionTimeout;
2222
private final int bufferSize;
23+
private final boolean enableJdkSocket;
2324

2425
private SocketChannel delegate;
2526
private final ByteBuffer delimiterBuffer = ByteBuffer.allocateDirect(Integer.SIZE / Byte.SIZE).order(ByteOrder.LITTLE_ENDIAN);
@@ -29,14 +30,15 @@ public class UnixStreamClientChannel implements ClientChannel {
2930
*
3031
* @param address Location of named pipe
3132
*/
32-
UnixStreamClientChannel(SocketAddress address, int timeout, int connectionTimeout, int bufferSize) throws IOException {
33+
UnixStreamClientChannel(SocketAddress address, int timeout, int connectionTimeout, int bufferSize, boolean enableJdkSocket) throws IOException {
3334
this.delegate = null;
3435
this.address = address;
3536
System.out.println("========== Constructor address: " + address);
3637
System.out.println("========== Constructor address type: " + address.getClass().getName());
3738
this.timeout = timeout;
3839
this.connectionTimeout = connectionTimeout;
3940
this.bufferSize = bufferSize;
41+
this.enableJdkSocket = enableJdkSocket;
4042
}
4143

4244
@Override
@@ -130,7 +132,7 @@ private void connect() throws IOException {
130132

131133
long deadline = System.nanoTime() + connectionTimeout * 1_000_000L;
132134
// Use native UDS support for compatible Java versions and jnr-unixsocket support otherwise.
133-
if (VersionUtils.isJavaVersionAtLeast(16)) {
135+
if (VersionUtils.isJavaVersionAtLeast(16) && enableJdkSocket) {
134136
try {
135137
// Use reflection to avoid compiling Java 16+ classes in incompatible versions
136138
Class<?> protocolFamilyClass = Class.forName("java.net.StandardProtocolFamily");
@@ -177,7 +179,7 @@ private void connect() throws IOException {
177179
throw new IOException("Failed to create UnixStreamClientChannel for native UDS implementation", e);
178180
}
179181
}
180-
// Default to jnr-unixsocket if Java version is less than 16
182+
// Default to jnr-unixsocket if Java version is less than 16 or native UDS support is disabled
181183
UnixSocketChannel channel = UnixSocketChannel.create();
182184

183185
if (connectionTimeout > 0) {

src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1831,7 +1831,7 @@ public NonBlockingStatsDClient build() {
18311831
this.originDetectionEnabled(false);
18321832
return new NonBlockingStatsDClient(resolve()) {
18331833
@Override
1834-
ClientChannel createByteChannel(Callable<SocketAddress> addressLookup, int timeout, int connectionTimeout, int bufferSize) throws Exception {
1834+
ClientChannel createByteChannel(Callable<SocketAddress> addressLookup, int timeout, int connectionTimeout, int bufferSize, boolean enableJdkSocket) throws Exception {
18351835
return new DatagramClientChannel(addressLookup.call()) {
18361836
@Override
18371837
public int write(ByteBuffer data) throws IOException {
@@ -1870,7 +1870,7 @@ public NonBlockingStatsDClient build() {
18701870
this.bufferPoolSize(1);
18711871
return new NonBlockingStatsDClient(resolve()) {
18721872
@Override
1873-
ClientChannel createByteChannel(Callable<SocketAddress> addressLookup, int timeout, int connectionTimeout, int bufferSize) throws Exception {
1873+
ClientChannel createByteChannel(Callable<SocketAddress> addressLookup, int timeout, int connectionTimeout, int bufferSize, boolean enableJdkSocket) throws Exception {
18741874
return new DatagramClientChannel(addressLookup.call()) {
18751875
@Override
18761876
public int write(ByteBuffer data) throws IOException {

0 commit comments

Comments
 (0)