3838import org .apache .commons .collections4 .MapUtils ;
3939import org .apache .commons .lang3 .StringUtils ;
4040import org .apache .commons .lang3 .exception .ExceptionUtils ;
41+ import org .apache .logging .log4j .CloseableThreadContext ;
4142import org .apache .logging .log4j .LogManager ;
4243import org .apache .logging .log4j .Logger ;
4344import org .apache .logging .log4j .ThreadContext ;
@@ -1258,7 +1259,10 @@ protected DispatchResult dispatchRawMessage(RawMessage rawMessage, boolean batch
12581259 boolean lockAcquired = false ;
12591260 Long persistedMessageId = null ;
12601261
1261- try {
1262+ try (CloseableThreadContext .Instance ctc = CloseableThreadContext
1263+ .put ("channelId" , channelId )
1264+ .put ("channelName" , name )
1265+ ) {
12621266 synchronized (dispatchThreads ) {
12631267 if (!shuttingDown ) {
12641268 dispatchThreads .add (currentThread );
@@ -1272,9 +1276,6 @@ protected DispatchResult dispatchRawMessage(RawMessage rawMessage, boolean batch
12721276 } else {
12731277 currentThread .setName ("Channel Dispatch Thread on " + name + " (" + channelId + ") < " + originalThreadName );
12741278 }
1275-
1276- ThreadContext .put ("channelId" , channelId );
1277- ThreadContext .put ("channelName" , name );
12781279
12791280 DonkeyDao dao = null ;
12801281 boolean commitSuccess = false ;
@@ -1390,8 +1391,6 @@ protected DispatchResult dispatchRawMessage(RawMessage rawMessage, boolean batch
13901391 dispatchThreads .remove (currentThread );
13911392 }
13921393 currentThread .setName (originalThreadName );
1393- ThreadContext .remove ("channelId" );
1394- ThreadContext .remove ("channelName" );
13951394 }
13961395 }
13971396
@@ -1939,17 +1938,15 @@ public void processUnfinishedMessages() throws Exception {
19391938
19401939 @ Override
19411940 public void run () {
1942- try {
1943- ThreadContext .put ("channelId" , channelId );
1944- ThreadContext .put ("channelName" , name );
1941+ try (CloseableThreadContext .Instance ctc = CloseableThreadContext
1942+ .put ("channelId" , channelId )
1943+ .put ("channelName" , name )
1944+ ) {
19451945 do {
19461946 processSourceQueue (Constants .SOURCE_QUEUE_POLL_TIMEOUT_MILLIS );
19471947 } while (isActive () && !stopSourceQueue );
19481948 } catch (InterruptedException e ) {
19491949 Thread .currentThread ().interrupt ();
1950- } finally {
1951- ThreadContext .remove ("channelId" );
1952- ThreadContext .remove ("channelName" );
19531950 }
19541951 }
19551952
0 commit comments