4949import java .time .ZoneId ;
5050import java .util .Date ;
5151import java .util .concurrent .ExecutorService ;
52+ import java .util .concurrent .Executors ;
53+ import java .util .concurrent .ScheduledExecutorService ;
5254import java .util .concurrent .TimeUnit ;
5355
5456/**
@@ -75,6 +77,7 @@ public class ChannelAdaptor
7577 private final Object disconnectLock = Boolean .TRUE ;
7678
7779 private ExecutorService executor ;
80+ private ScheduledExecutorService scheduledExcecutor ;
7881
7982 private Gauge connectionsGauge ;
8083
@@ -92,6 +95,11 @@ public void initService() throws ConfigurationException {
9295 initSpaceAndQueues ();
9396 NameRegistrar .register (getName (), this );
9497 executor = QFactory .executorService (cfg .getBoolean ("virtual-threads" , false ));
98+ scheduledExcecutor = Executors .newSingleThreadScheduledExecutor (r -> {
99+ Thread t = new Thread (r );
100+ t .setDaemon (true );
101+ return t ;
102+ });
95103 }
96104 public void startService () {
97105 try {
@@ -108,14 +116,14 @@ public void startService () {
108116 public void stopService () {
109117 try {
110118 sp .out (in , Boolean .TRUE );
111- if (channel != null )
112- disconnect ();
113- if (waitForWorkersOnStop ) {
114- executor .awaitTermination (5L , TimeUnit .SECONDS );
115- if (!writeOnly ) {
116- sp .put (ready , new Date ());
117- }
119+ if (channel != null ) {
120+ if (softStop > 0L )
121+ disconnectLater (softStop );
122+ else
123+ disconnect ();
118124 }
125+ if (waitForWorkersOnStop )
126+ executor .awaitTermination (Math .max (5000L , softStop ), TimeUnit .MILLISECONDS );
119127 sender = null ;
120128 receiver = null ;
121129 removeMeters ();
@@ -397,6 +405,7 @@ public void run () {
397405 }
398406 }
399407 }
408+ disconnect ();
400409 }
401410 }
402411 protected void checkConnection () {
@@ -431,6 +440,10 @@ protected void disconnect () {
431440 }
432441 }
433442 }
443+ private void disconnectLater (long delayInMillis ) {
444+ SpaceUtil .wipe (sp , ready );
445+ scheduledExcecutor .schedule (this ::disconnect , delayInMillis , TimeUnit .MILLISECONDS );
446+ }
434447 public synchronized void setHost (String host ) {
435448 setProperty (getProperties ("channel" ), "host" , host );
436449 setModified (true );
0 commit comments