1515package com .rabbitmq .stream .impl ;
1616
1717import static com .rabbitmq .stream .Constants .*;
18+ import static com .rabbitmq .stream .impl .ThreadUtils .threadFactory ;
1819import static com .rabbitmq .stream .impl .Utils .DEFAULT_USERNAME ;
1920import static com .rabbitmq .stream .impl .Utils .encodeRequestCode ;
2021import static com .rabbitmq .stream .impl .Utils .encodeResponseCode ;
4546import com .rabbitmq .stream .impl .Client .ShutdownContext .ShutdownReason ;
4647import com .rabbitmq .stream .impl .ServerFrameHandler .FrameHandler ;
4748import com .rabbitmq .stream .impl .ServerFrameHandler .FrameHandlerInfo ;
48- import com .rabbitmq .stream .impl .Utils .NamedThreadFactory ;
4949import com .rabbitmq .stream .metrics .MetricsCollector ;
5050import com .rabbitmq .stream .metrics .NoOpMetricsCollector ;
5151import com .rabbitmq .stream .sasl .CredentialsProvider ;
@@ -164,7 +164,7 @@ public class Client implements AutoCloseable {
164164 final CompressionCodecFactory compressionCodecFactory ;
165165 private final Consumer <ShutdownContext .ShutdownReason > shutdownListenerCallback ;
166166 private final ToLongFunction <Object > publishSequenceFunction =
167- new ToLongFunction <Object >() {
167+ new ToLongFunction <>() {
168168 private final AtomicLong publishSequence = new AtomicLong (0 );
169169
170170 @ Override
@@ -302,6 +302,7 @@ public void initChannel(SocketChannel ch) {
302302 }
303303 });
304304
305+ this .nettyClosing = Utils .makeIdempotent (this ::closeNetty );
305306 ChannelFuture f ;
306307 String clientConnectionName = parameters .clientProperties .getOrDefault ("connection_name" , "" );
307308 try {
@@ -326,13 +327,11 @@ public void initChannel(SocketChannel ch) {
326327 throw new StreamException (message , e );
327328 }
328329 }
329-
330330 this .channel = f .channel ();
331- this .nettyClosing = Utils .makeIdempotent (this ::closeNetty );
332331 ExecutorServiceFactory executorServiceFactory = parameters .executorServiceFactory ;
333332 if (executorServiceFactory == null ) {
334333 this .executorService =
335- Executors .newSingleThreadExecutor (new NamedThreadFactory (clientConnectionName + "-" ));
334+ Executors .newSingleThreadExecutor (threadFactory (clientConnectionName + "-" ));
336335 } else {
337336 this .executorService = executorServiceFactory .get ();
338337 }
@@ -341,7 +340,7 @@ public void initChannel(SocketChannel ch) {
341340 if (dispatchingExecutorServiceFactory == null ) {
342341 this .dispatchingExecutorService =
343342 Executors .newSingleThreadExecutor (
344- new NamedThreadFactory ("dispatching-" + clientConnectionName + "-" ));
343+ threadFactory ("dispatching-" + clientConnectionName + "-" ));
345344 } else {
346345 this .dispatchingExecutorService = dispatchingExecutorServiceFactory .get ();
347346 }
@@ -1443,7 +1442,7 @@ void closingSequence(ShutdownContext.ShutdownReason reason) {
14431442
14441443 private void closeNetty () {
14451444 try {
1446- if (this .channel .isOpen ()) {
1445+ if (this .channel != null && this . channel .isOpen ()) {
14471446 LOGGER .debug ("Closing Netty channel" );
14481447 this .channel .close ().get (10 , TimeUnit .SECONDS );
14491448 }
0 commit comments