44import java .nio .Buffer ;
55import java .nio .ByteBuffer ;
66import java .nio .ByteOrder ;
7- import java .nio .channels .ServerSocketChannel ;
87import java .nio .channels .SocketChannel ;
9- import java .net .SocketAddress ;
108import java .util .concurrent .ConcurrentLinkedQueue ;
119import java .util .logging .Logger ;
1210import jnr .unixsocket .UnixServerSocketChannel ;
1614import static com .timgroup .statsd .NonBlockingStatsDClient .DEFAULT_UDS_MAX_PACKET_SIZE_BYTES ;
1715
1816public class UnixStreamSocketDummyStatsDServer extends DummyStatsDServer {
19- private final Object server ; // Object is either ServerSocketChannel or UnixServerSocketChannel
20- private final ConcurrentLinkedQueue <SocketChannel > channels = new ConcurrentLinkedQueue <>();
21- private final boolean useNativeUds ;
17+ private final UnixServerSocketChannel server ;
18+ private final ConcurrentLinkedQueue <UnixSocketChannel > channels = new ConcurrentLinkedQueue <>();
2219
2320 private final Logger logger = Logger .getLogger (UnixStreamSocketDummyStatsDServer .class .getName ());
2421
2522 public UnixStreamSocketDummyStatsDServer (String socketPath ) throws IOException {
26- this .useNativeUds = ClientChannelUtils .hasNativeUdsSupport ();
27- if (useNativeUds ) {
28- try {
29- Class <?> udsAddressClass = Class .forName ("java.net.UnixDomainSocketAddress" );
30- Object udsAddress = udsAddressClass .getMethod ("of" , String .class ).invoke (null , socketPath );
31-
32- ServerSocketChannel nativeServer = ServerSocketChannel .open ();
33- nativeServer .bind ((SocketAddress ) udsAddress );
34- this .server = nativeServer ;
35- } catch (ReflectiveOperationException e ) {
36- throw new IOException (e );
37- }
38- } else {
39- UnixServerSocketChannel jnrServer = UnixServerSocketChannel .open ();
40- jnrServer .configureBlocking (true );
41- jnrServer .socket ().bind (new UnixSocketAddress (socketPath ));
42- this .server = jnrServer ;
43- }
23+ server = UnixServerSocketChannel .open ();
24+ server .configureBlocking (true );
25+ server .socket ().bind (new UnixSocketAddress (socketPath ));
4426 this .listen ();
4527 }
4628
4729 @ Override
4830 protected boolean isOpen () {
49- return useNativeUds ? (( ServerSocketChannel ) server ). isOpen () : (( UnixServerSocketChannel ) server ) .isOpen ();
31+ return server .isOpen ();
5032 }
5133
5234 @ Override
@@ -56,43 +38,39 @@ protected void receive(ByteBuffer packet) throws IOException {
5638
5739 @ Override
5840 protected void listen () {
59- try {
60- String localAddressMessage = useNativeUds ? "Listening on " + ((ServerSocketChannel )server ).getLocalAddress () : "Listening on " + ((UnixServerSocketChannel )server ).getLocalSocketAddress ();
61- logger .info (localAddressMessage );
62- } catch (Exception e ) {
63- logger .warning ("Failed to get local address: " + e );
64- }
41+ logger .info ("Listening on " + server .getLocalSocketAddress ());
6542 Thread thread = new Thread (new Runnable () {
6643 @ Override
6744 public void run () {
6845 while (isOpen ()) {
6946 if (sleepIfFrozen ()) {
7047 continue ;
7148 }
72- try {
73- logger .info ("Waiting for connection" );
74- SocketChannel clientChannel = null ;
75- clientChannel = useNativeUds ? ((ServerSocketChannel )server ).accept () : ((UnixServerSocketChannel )server ).accept ();
76- if (clientChannel != null ) {
77- clientChannel .configureBlocking (true );
78- String connectionMessage = useNativeUds ? "Accepted connection from " + clientChannel .getRemoteAddress () : "Accepted connection from " + ((UnixSocketChannel )clientChannel ).getRemoteSocketAddress ();
79- logger .info (connectionMessage );
80- channels .add (clientChannel );
81- readChannel (clientChannel );
49+ try {
50+ logger .info ("Waiting for connection" );
51+ UnixSocketChannel clientChannel = server .accept ();
52+ if (clientChannel != null ) {
53+ clientChannel .configureBlocking (true );
54+ try {
55+ logger .info ("Accepted connection from " + clientChannel .getRemoteSocketAddress ());
56+ } catch (Exception e ) {
57+ logger .warning ("Failed to get remote socket address" );
58+ }
59+ channels .add (clientChannel );
60+ readChannel (clientChannel );
61+ }
62+ } catch (IOException e ) {
8263 }
83- } catch (Exception e ) {
84- // ignore
85- }
8664 }
8765 }
8866 });
8967 thread .setDaemon (true );
9068 thread .start ();
9169 }
9270
93- public void readChannel (final SocketChannel clientChannel ) {
94- logger .info ("Reading from " + clientChannel );
95- Thread thread = new Thread (new Runnable () {
71+ public void readChannel (final UnixSocketChannel clientChannel ) {
72+ logger .info ("Reading from " + clientChannel );
73+ Thread thread = new Thread (new Runnable () {
9674 @ Override
9775 public void run () {
9876 final ByteBuffer packet = ByteBuffer .allocate (DEFAULT_UDS_MAX_PACKET_SIZE_BYTES );
@@ -112,6 +90,7 @@ public void run() {
11290 logger .warning ("Failed to close channel: " + e );
11391 }
11492 }
93+
11594 }
11695 logger .info ("Disconnected from " + clientChannel );
11796 }
@@ -149,16 +128,13 @@ private boolean readPacket(SocketChannel channel, ByteBuffer packet) {
149128
150129 public void close () throws IOException {
151130 try {
152- if (useNativeUds ) {
153- ((ServerSocketChannel )server ).close ();
154- } else {
155- ((UnixServerSocketChannel )server ).close ();
156- }
157- for (SocketChannel channel : channels ) {
131+ server .close ();
132+ for (UnixSocketChannel channel : channels ) {
158133 channel .close ();
159134 }
160135 } catch (Exception e ) {
161136 //ignore
162137 }
163138 }
139+
164140}
0 commit comments