6363import java .util .function .Supplier ;
6464import java .util .stream .Collectors ;
6565
66- import static com .hivemq .common .executors .ioc .ExecutorsModule .shutdownExecutor ;
6766import static com .hivemq .persistence .domain .DomainTagAddResult .DomainTagPutStatus .ADAPTER_FAILED_TO_START ;
6867import static com .hivemq .persistence .domain .DomainTagAddResult .DomainTagPutStatus .ADAPTER_MISSING ;
6968import static com .hivemq .persistence .domain .DomainTagAddResult .DomainTagPutStatus .ALREADY_EXISTS ;
@@ -88,7 +87,7 @@ public class ProtocolAdapterManager {
8887 private final @ NotNull NorthboundConsumerFactory northboundConsumerFactory ;
8988 private final @ NotNull TagManager tagManager ;
9089 private final @ NotNull ProtocolAdapterExtractor config ;
91- private final @ NotNull ExecutorService singleThreadRefreshExecutor ;
90+ private final @ NotNull ExecutorService refreshExecutor ;
9291 private final @ NotNull ExecutorService sharedAdapterExecutor ;
9392 private final @ NotNull AtomicBoolean shutdownInitiated ;
9493
@@ -124,7 +123,7 @@ public ProtocolAdapterManager(
124123 this .config = config ;
125124 this .sharedAdapterExecutor = sharedAdapterExecutor ;
126125 this .protocolAdapters = new ConcurrentHashMap <>();
127- this .singleThreadRefreshExecutor = Executors .newSingleThreadExecutor ();
126+ this .refreshExecutor = Executors .newSingleThreadExecutor ();
128127 this .shutdownInitiated = new AtomicBoolean (false );
129128 shutdownHooks .add (new HiveMQShutdownHook () {
130129 @ Override
@@ -161,7 +160,7 @@ public void start() {
161160 }
162161
163162 public void refresh (final @ NotNull List <ProtocolAdapterEntity > configs ) {
164- singleThreadRefreshExecutor .submit (() -> {
163+ refreshExecutor .submit (() -> {
165164 log .info ("Refreshing adapters" );
166165
167166 final Map <String , ProtocolAdapterConfig > protocolAdapterConfigs = configs .stream ()
@@ -365,7 +364,7 @@ public boolean isWritingEnabled() {
365364
366365 private void shutdown () {
367366 if (shutdownInitiated .compareAndSet (false , true )) {
368- shutdownExecutor ( singleThreadRefreshExecutor , "protocol-adapter-manager-refresh" , 5 );
367+ shutdownRefreshExecutor ( );
369368
370369 log .info ("Initiating shutdown of Protocol Adapter Manager" );
371370 final List <ProtocolAdapterWrapper > adaptersToStop = new ArrayList <>(protocolAdapters .values ());
@@ -387,7 +386,7 @@ private void shutdown() {
387386 }
388387 // wait for all adapters to stop, with timeout
389388 try {
390- CompletableFuture .allOf (stopFutures .toArray (new CompletableFuture [0 ])).get (20 , TimeUnit .SECONDS );
389+ CompletableFuture .allOf (stopFutures .toArray (new CompletableFuture [0 ])).get (15 , TimeUnit .SECONDS );
391390 log .info ("All adapters stopped successfully during shutdown" );
392391 } catch (final TimeoutException e ) {
393392 log .warn ("Timeout waiting for adapters to stop during shutdown" );
@@ -406,6 +405,28 @@ private void shutdown() {
406405 }
407406 }
408407
408+ private void shutdownRefreshExecutor () {
409+ final String name = "protocol-adapter-manager-refresh" ;
410+ final int timeoutSeconds = 5 ;
411+ log .debug ("Shutting {} executor service" , name );
412+ refreshExecutor .shutdown ();
413+ try {
414+ if (!refreshExecutor .awaitTermination (timeoutSeconds , TimeUnit .SECONDS )) {
415+ log .warn ("Executor service {} did not terminate in {}s, forcing shutdown" , name , timeoutSeconds );
416+ refreshExecutor .shutdownNow ();
417+ if (!refreshExecutor .awaitTermination (2 , TimeUnit .SECONDS )) {
418+ log .error ("Executor service {} still has running tasks after forced shutdown" , name );
419+ }
420+ } else {
421+ log .debug ("Executor service {} shut down successfully" , name );
422+ }
423+ } catch (final InterruptedException e ) {
424+ Thread .currentThread ().interrupt ();
425+ log .warn ("Interrupted while waiting for executor service {} to terminate" , name );
426+ refreshExecutor .shutdownNow ();
427+ }
428+ }
429+
409430 private @ NotNull ProtocolAdapterWrapper createAdapterInternal (
410431 final @ NotNull ProtocolAdapterConfig config ,
411432 final @ NotNull String version ) {
0 commit comments