6060import java .util .Map ;
6161import java .util .Objects ;
6262import java .util .Set ;
63- import java .util .concurrent .CopyOnWriteArrayList ;
6463import java .util .concurrent .CountDownLatch ;
6564import java .util .concurrent .Executor ;
6665import java .util .concurrent .TimeUnit ;
67- import java .util .concurrent .atomic .AtomicBoolean ;
6866import java .util .function .Function ;
6967import java .util .function .Predicate ;
7068import java .util .function .Supplier ;
@@ -103,8 +101,7 @@ public class TransportService extends AbstractLifecycleComponent
103101 Setting .Property .Deprecated
104102 );
105103
106- private final AtomicBoolean handleIncomingRequests = new AtomicBoolean ();
107- private final DelegatingTransportMessageListener messageListener = new DelegatingTransportMessageListener ();
104+ private volatile boolean handleIncomingRequests ;
108105 protected final Transport transport ;
109106 protected final ConnectionManager connectionManager ;
110107 protected final ThreadPool threadPool ;
@@ -134,7 +131,7 @@ protected boolean removeEldestEntry(Map.Entry<Long, TimeoutInfoHolder> eldest) {
134131
135132 // tracer log
136133
137- private final Logger tracerLog ;
134+ private static final Logger tracerLog = Loggers . getLogger ( logger , ".tracer" ) ;
138135 private final Tracer tracer ;
139136
140137 volatile String [] tracerLogInclude ;
@@ -291,7 +288,6 @@ public TransportService(
291288 this .clusterName = ClusterName .CLUSTER_NAME_SETTING .get (settings );
292289 setTracerLogInclude (TransportSettings .TRACE_LOG_INCLUDE_SETTING .get (settings ));
293290 setTracerLogExclude (TransportSettings .TRACE_LOG_EXCLUDE_SETTING .get (settings ));
294- tracerLog = Loggers .getLogger (logger , ".tracer" );
295291 this .taskManager = taskManger ;
296292 this .interceptor = transportInterceptor ;
297293 this .asyncSender = interceptor .interceptSender (this ::sendRequestInternal );
@@ -432,8 +428,8 @@ protected void doClose() throws IOException {
432428 * reject any incoming requests, including handshakes, by closing the connection.
433429 */
434430 public final void acceptIncomingRequests () {
435- final boolean startedWithThisCall = handleIncomingRequests . compareAndSet ( false , true ) ;
436- assert startedWithThisCall : "transport service was already accepting incoming requests" ;
431+ assert handleIncomingRequests == false : "transport service was already accepting incoming requests" ;
432+ handleIncomingRequests = true ;
437433 logger .debug ("now accepting incoming requests" );
438434 }
439435
@@ -750,14 +746,6 @@ public void disconnectFromNode(DiscoveryNode node) {
750746 connectionManager .disconnectFromNode (node );
751747 }
752748
753- public void addMessageListener (TransportMessageListener listener ) {
754- messageListener .listeners .add (listener );
755- }
756-
757- public void removeMessageListener (TransportMessageListener listener ) {
758- messageListener .listeners .remove (listener );
759- }
760-
761749 public void addConnectionListener (TransportConnectionListener listener ) {
762750 connectionManager .addListener (listener );
763751 }
@@ -1265,13 +1253,12 @@ public <Request extends TransportRequest> void registerRequestHandler(
12651253 */
12661254 @ Override
12671255 public void onRequestReceived (long requestId , String action ) {
1268- if (handleIncomingRequests . get () == false ) {
1256+ if (handleIncomingRequests == false ) {
12691257 throw new TransportNotReadyException ();
12701258 }
12711259 if (tracerLog .isTraceEnabled () && shouldTraceAction (action )) {
12721260 tracerLog .trace ("[{}][{}] received request" , requestId , action );
12731261 }
1274- messageListener .onRequestReceived (requestId , action );
12751262 }
12761263
12771264 /** called by the {@link Transport} implementation once a request has been sent */
@@ -1286,7 +1273,6 @@ public void onRequestSent(
12861273 if (tracerLog .isTraceEnabled () && shouldTraceAction (action )) {
12871274 tracerLog .trace ("[{}][{}] sent to [{}] (timeout: [{}])" , requestId , action , node , options .timeout ());
12881275 }
1289- messageListener .onRequestSent (node , requestId , action , request , options );
12901276 }
12911277
12921278 @ Override
@@ -1297,7 +1283,6 @@ public void onResponseReceived(long requestId, Transport.ResponseContext holder)
12971283 } else if (tracerLog .isTraceEnabled () && shouldTraceAction (holder .action ())) {
12981284 tracerLog .trace ("[{}][{}] received response from [{}]" , requestId , holder .action (), holder .connection ().getNode ());
12991285 }
1300- messageListener .onResponseReceived (requestId , holder );
13011286 }
13021287
13031288 /** called by the {@link Transport} implementation once a response was sent to calling node */
@@ -1306,7 +1291,6 @@ public void onResponseSent(long requestId, String action, TransportResponse resp
13061291 if (tracerLog .isTraceEnabled () && shouldTraceAction (action )) {
13071292 tracerLog .trace ("[{}][{}] sent response" , requestId , action );
13081293 }
1309- messageListener .onResponseSent (requestId , action , response );
13101294 }
13111295
13121296 /** called by the {@link Transport} implementation after an exception was sent as a response to an incoming request */
@@ -1315,7 +1299,6 @@ public void onResponseSent(long requestId, String action, Exception e) {
13151299 if (tracerLog .isTraceEnabled () && shouldTraceAction (action )) {
13161300 tracerLog .trace (() -> format ("[%s][%s] sent error response" , requestId , action ), e );
13171301 }
1318- messageListener .onResponseSent (requestId , action , e );
13191302 }
13201303
13211304 public RequestHandlerRegistry <? extends TransportRequest > getRequestHandler (String action ) {
@@ -1453,6 +1436,7 @@ public void run() {
14531436 public void cancel () {
14541437 assert responseHandlers .contains (requestId ) == false
14551438 : "cancel must be called after the requestId [" + requestId + "] has been removed from clientHandlers" ;
1439+ var cancellable = this .cancellable ;
14561440 if (cancellable != null ) {
14571441 cancellable .cancel ();
14581442 }
@@ -1492,6 +1476,7 @@ public T read(StreamInput in) throws IOException {
14921476
14931477 @ Override
14941478 public void handleResponse (T response ) {
1479+ var handler = this .handler ;
14951480 if (handler != null ) {
14961481 handler .cancel ();
14971482 }
@@ -1502,6 +1487,7 @@ public void handleResponse(T response) {
15021487
15031488 @ Override
15041489 public void handleException (TransportException exp ) {
1490+ var handler = this .handler ;
15051491 if (handler != null ) {
15061492 handler .cancel ();
15071493 }
@@ -1666,53 +1652,6 @@ private boolean isLocalNode(DiscoveryNode discoveryNode) {
16661652 return discoveryNode .equals (localNode );
16671653 }
16681654
1669- private static final class DelegatingTransportMessageListener implements TransportMessageListener {
1670-
1671- private final List <TransportMessageListener > listeners = new CopyOnWriteArrayList <>();
1672-
1673- @ Override
1674- public void onRequestReceived (long requestId , String action ) {
1675- for (TransportMessageListener listener : listeners ) {
1676- listener .onRequestReceived (requestId , action );
1677- }
1678- }
1679-
1680- @ Override
1681- public void onResponseSent (long requestId , String action , TransportResponse response ) {
1682- for (TransportMessageListener listener : listeners ) {
1683- listener .onResponseSent (requestId , action , response );
1684- }
1685- }
1686-
1687- @ Override
1688- public void onResponseSent (long requestId , String action , Exception error ) {
1689- for (TransportMessageListener listener : listeners ) {
1690- listener .onResponseSent (requestId , action , error );
1691- }
1692- }
1693-
1694- @ Override
1695- public void onRequestSent (
1696- DiscoveryNode node ,
1697- long requestId ,
1698- String action ,
1699- TransportRequest request ,
1700- TransportRequestOptions finalOptions
1701- ) {
1702- for (TransportMessageListener listener : listeners ) {
1703- listener .onRequestSent (node , requestId , action , request , finalOptions );
1704- }
1705- }
1706-
1707- @ Override
1708- @ SuppressWarnings ("rawtypes" )
1709- public void onResponseReceived (long requestId , Transport .ResponseContext holder ) {
1710- for (TransportMessageListener listener : listeners ) {
1711- listener .onResponseReceived (requestId , holder );
1712- }
1713- }
1714- }
1715-
17161655 private static class PendingDirectHandlers extends AbstractRefCounted {
17171656
17181657 // To handle a response we (i) remove the handler from responseHandlers and then (ii) enqueue an action to complete the handler on
0 commit comments