Skip to content

Commit 374f484

Browse files
original-brownbeargeorgewallace
authored andcommitted
Remove some overhead from TransportService message handling (elastic#124428)
Avoiding some indirection, volatile-reads and moving the listener functionality that needlessly kept iterating an empty CoW list (creating iterator instances, volatile reads, more code) in an effort to improve the low IPC on transport threads.
1 parent 795dc15 commit 374f484

File tree

6 files changed

+134
-83
lines changed

6 files changed

+134
-83
lines changed

qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/SearchErrorTraceIT.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,20 +14,29 @@
1414
import org.elasticsearch.action.search.MultiSearchRequest;
1515
import org.elasticsearch.action.search.SearchRequest;
1616
import org.elasticsearch.client.Request;
17+
import org.elasticsearch.common.util.CollectionUtils;
18+
import org.elasticsearch.plugins.Plugin;
1719
import org.elasticsearch.search.ErrorTraceHelper;
1820
import org.elasticsearch.search.builder.SearchSourceBuilder;
21+
import org.elasticsearch.test.transport.MockTransportService;
1922
import org.elasticsearch.xcontent.XContentType;
2023
import org.junit.Before;
2124

2225
import java.io.IOException;
2326
import java.nio.charset.Charset;
27+
import java.util.Collection;
2428
import java.util.function.BooleanSupplier;
2529

2630
import static org.elasticsearch.index.query.QueryBuilders.simpleQueryStringQuery;
2731

2832
public class SearchErrorTraceIT extends HttpSmokeTestCase {
2933
private BooleanSupplier hasStackTrace;
3034

35+
@Override
36+
protected Collection<Class<? extends Plugin>> nodePlugins() {
37+
return CollectionUtils.appendToCopyNoNullElements(super.nodePlugins(), MockTransportService.TestPlugin.class);
38+
}
39+
3140
@Before
3241
public void setupMessageListener() {
3342
hasStackTrace = ErrorTraceHelper.setupErrorTraceListener(internalCluster());

server/src/internalClusterTest/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@
6666
import org.elasticsearch.transport.TransportMessageListener;
6767
import org.elasticsearch.transport.TransportRequest;
6868
import org.elasticsearch.transport.TransportRequestOptions;
69-
import org.elasticsearch.transport.TransportService;
7069

7170
import java.io.IOException;
7271
import java.io.UncheckedIOException;
@@ -1052,8 +1051,7 @@ public void testAbortWaitsOnDataNode() throws Exception {
10521051

10531052
final AtomicBoolean blocked = new AtomicBoolean(true);
10541053

1055-
final TransportService transportService = internalCluster().getInstance(TransportService.class, otherDataNode);
1056-
transportService.addMessageListener(new TransportMessageListener() {
1054+
MockTransportService.getInstance(otherDataNode).addMessageListener(new TransportMessageListener() {
10571055
@Override
10581056
public void onRequestSent(
10591057
DiscoveryNode node,

server/src/main/java/org/elasticsearch/transport/TransportService.java

Lines changed: 8 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,9 @@
6060
import java.util.Map;
6161
import java.util.Objects;
6262
import java.util.Set;
63-
import java.util.concurrent.CopyOnWriteArrayList;
6463
import java.util.concurrent.CountDownLatch;
6564
import java.util.concurrent.Executor;
6665
import java.util.concurrent.TimeUnit;
67-
import java.util.concurrent.atomic.AtomicBoolean;
6866
import java.util.function.Function;
6967
import java.util.function.Predicate;
7068
import java.util.function.Supplier;
@@ -103,8 +101,7 @@ public class TransportService extends AbstractLifecycleComponent
103101
Setting.Property.Deprecated
104102
);
105103

106-
private final AtomicBoolean handleIncomingRequests = new AtomicBoolean();
107-
private final DelegatingTransportMessageListener messageListener = new DelegatingTransportMessageListener();
104+
private volatile boolean handleIncomingRequests;
108105
protected final Transport transport;
109106
protected final ConnectionManager connectionManager;
110107
protected final ThreadPool threadPool;
@@ -134,7 +131,7 @@ protected boolean removeEldestEntry(Map.Entry<Long, TimeoutInfoHolder> eldest) {
134131

135132
// tracer log
136133

137-
private final Logger tracerLog;
134+
private static final Logger tracerLog = Loggers.getLogger(logger, ".tracer");
138135
private final Tracer tracer;
139136

140137
volatile String[] tracerLogInclude;
@@ -291,7 +288,6 @@ public TransportService(
291288
this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
292289
setTracerLogInclude(TransportSettings.TRACE_LOG_INCLUDE_SETTING.get(settings));
293290
setTracerLogExclude(TransportSettings.TRACE_LOG_EXCLUDE_SETTING.get(settings));
294-
tracerLog = Loggers.getLogger(logger, ".tracer");
295291
this.taskManager = taskManger;
296292
this.interceptor = transportInterceptor;
297293
this.asyncSender = interceptor.interceptSender(this::sendRequestInternal);
@@ -432,8 +428,8 @@ protected void doClose() throws IOException {
432428
* reject any incoming requests, including handshakes, by closing the connection.
433429
*/
434430
public final void acceptIncomingRequests() {
435-
final boolean startedWithThisCall = handleIncomingRequests.compareAndSet(false, true);
436-
assert startedWithThisCall : "transport service was already accepting incoming requests";
431+
assert handleIncomingRequests == false : "transport service was already accepting incoming requests";
432+
handleIncomingRequests = true;
437433
logger.debug("now accepting incoming requests");
438434
}
439435

@@ -750,14 +746,6 @@ public void disconnectFromNode(DiscoveryNode node) {
750746
connectionManager.disconnectFromNode(node);
751747
}
752748

753-
public void addMessageListener(TransportMessageListener listener) {
754-
messageListener.listeners.add(listener);
755-
}
756-
757-
public void removeMessageListener(TransportMessageListener listener) {
758-
messageListener.listeners.remove(listener);
759-
}
760-
761749
public void addConnectionListener(TransportConnectionListener listener) {
762750
connectionManager.addListener(listener);
763751
}
@@ -1265,13 +1253,12 @@ public <Request extends TransportRequest> void registerRequestHandler(
12651253
*/
12661254
@Override
12671255
public void onRequestReceived(long requestId, String action) {
1268-
if (handleIncomingRequests.get() == false) {
1256+
if (handleIncomingRequests == false) {
12691257
throw new TransportNotReadyException();
12701258
}
12711259
if (tracerLog.isTraceEnabled() && shouldTraceAction(action)) {
12721260
tracerLog.trace("[{}][{}] received request", requestId, action);
12731261
}
1274-
messageListener.onRequestReceived(requestId, action);
12751262
}
12761263

12771264
/** called by the {@link Transport} implementation once a request has been sent */
@@ -1286,7 +1273,6 @@ public void onRequestSent(
12861273
if (tracerLog.isTraceEnabled() && shouldTraceAction(action)) {
12871274
tracerLog.trace("[{}][{}] sent to [{}] (timeout: [{}])", requestId, action, node, options.timeout());
12881275
}
1289-
messageListener.onRequestSent(node, requestId, action, request, options);
12901276
}
12911277

12921278
@Override
@@ -1297,7 +1283,6 @@ public void onResponseReceived(long requestId, Transport.ResponseContext holder)
12971283
} else if (tracerLog.isTraceEnabled() && shouldTraceAction(holder.action())) {
12981284
tracerLog.trace("[{}][{}] received response from [{}]", requestId, holder.action(), holder.connection().getNode());
12991285
}
1300-
messageListener.onResponseReceived(requestId, holder);
13011286
}
13021287

13031288
/** called by the {@link Transport} implementation once a response was sent to calling node */
@@ -1306,7 +1291,6 @@ public void onResponseSent(long requestId, String action, TransportResponse resp
13061291
if (tracerLog.isTraceEnabled() && shouldTraceAction(action)) {
13071292
tracerLog.trace("[{}][{}] sent response", requestId, action);
13081293
}
1309-
messageListener.onResponseSent(requestId, action, response);
13101294
}
13111295

13121296
/** called by the {@link Transport} implementation after an exception was sent as a response to an incoming request */
@@ -1315,7 +1299,6 @@ public void onResponseSent(long requestId, String action, Exception e) {
13151299
if (tracerLog.isTraceEnabled() && shouldTraceAction(action)) {
13161300
tracerLog.trace(() -> format("[%s][%s] sent error response", requestId, action), e);
13171301
}
1318-
messageListener.onResponseSent(requestId, action, e);
13191302
}
13201303

13211304
public RequestHandlerRegistry<? extends TransportRequest> getRequestHandler(String action) {
@@ -1453,6 +1436,7 @@ public void run() {
14531436
public void cancel() {
14541437
assert responseHandlers.contains(requestId) == false
14551438
: "cancel must be called after the requestId [" + requestId + "] has been removed from clientHandlers";
1439+
var cancellable = this.cancellable;
14561440
if (cancellable != null) {
14571441
cancellable.cancel();
14581442
}
@@ -1492,6 +1476,7 @@ public T read(StreamInput in) throws IOException {
14921476

14931477
@Override
14941478
public void handleResponse(T response) {
1479+
var handler = this.handler;
14951480
if (handler != null) {
14961481
handler.cancel();
14971482
}
@@ -1502,6 +1487,7 @@ public void handleResponse(T response) {
15021487

15031488
@Override
15041489
public void handleException(TransportException exp) {
1490+
var handler = this.handler;
15051491
if (handler != null) {
15061492
handler.cancel();
15071493
}
@@ -1666,53 +1652,6 @@ private boolean isLocalNode(DiscoveryNode discoveryNode) {
16661652
return discoveryNode.equals(localNode);
16671653
}
16681654

1669-
private static final class DelegatingTransportMessageListener implements TransportMessageListener {
1670-
1671-
private final List<TransportMessageListener> listeners = new CopyOnWriteArrayList<>();
1672-
1673-
@Override
1674-
public void onRequestReceived(long requestId, String action) {
1675-
for (TransportMessageListener listener : listeners) {
1676-
listener.onRequestReceived(requestId, action);
1677-
}
1678-
}
1679-
1680-
@Override
1681-
public void onResponseSent(long requestId, String action, TransportResponse response) {
1682-
for (TransportMessageListener listener : listeners) {
1683-
listener.onResponseSent(requestId, action, response);
1684-
}
1685-
}
1686-
1687-
@Override
1688-
public void onResponseSent(long requestId, String action, Exception error) {
1689-
for (TransportMessageListener listener : listeners) {
1690-
listener.onResponseSent(requestId, action, error);
1691-
}
1692-
}
1693-
1694-
@Override
1695-
public void onRequestSent(
1696-
DiscoveryNode node,
1697-
long requestId,
1698-
String action,
1699-
TransportRequest request,
1700-
TransportRequestOptions finalOptions
1701-
) {
1702-
for (TransportMessageListener listener : listeners) {
1703-
listener.onRequestSent(node, requestId, action, request, finalOptions);
1704-
}
1705-
}
1706-
1707-
@Override
1708-
@SuppressWarnings("rawtypes")
1709-
public void onResponseReceived(long requestId, Transport.ResponseContext holder) {
1710-
for (TransportMessageListener listener : listeners) {
1711-
listener.onResponseReceived(requestId, holder);
1712-
}
1713-
}
1714-
}
1715-
17161655
private static class PendingDirectHandlers extends AbstractRefCounted {
17171656

17181657
// To handle a response we (i) remove the handler from responseHandlers and then (ii) enqueue an action to complete the handler on

test/framework/src/main/java/org/elasticsearch/search/ErrorTraceHelper.java

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,16 @@
1111

1212
import org.elasticsearch.ExceptionsHelper;
1313
import org.elasticsearch.test.InternalTestCluster;
14+
import org.elasticsearch.test.transport.MockTransportService;
1415
import org.elasticsearch.transport.TransportMessageListener;
1516
import org.elasticsearch.transport.TransportService;
1617

1718
import java.util.Optional;
1819
import java.util.concurrent.atomic.AtomicBoolean;
1920
import java.util.function.BooleanSupplier;
2021

22+
import static org.elasticsearch.test.ESTestCase.asInstanceOf;
23+
2124
/**
2225
* Utilities around testing the `error_trace` message header in search.
2326
*/
@@ -26,16 +29,20 @@ public enum ErrorTraceHelper {
2629

2730
public static BooleanSupplier setupErrorTraceListener(InternalTestCluster internalCluster) {
2831
final AtomicBoolean transportMessageHasStackTrace = new AtomicBoolean(false);
29-
internalCluster.getDataNodeInstances(TransportService.class).forEach(ts -> ts.addMessageListener(new TransportMessageListener() {
30-
@Override
31-
public void onResponseSent(long requestId, String action, Exception error) {
32-
TransportMessageListener.super.onResponseSent(requestId, action, error);
33-
if (action.startsWith("indices:data/read/search")) {
34-
Optional<Throwable> throwable = ExceptionsHelper.unwrapCausesAndSuppressed(error, t -> t.getStackTrace().length > 0);
35-
transportMessageHasStackTrace.set(throwable.isPresent());
32+
internalCluster.getDataNodeInstances(TransportService.class)
33+
.forEach(ts -> asInstanceOf(MockTransportService.class, ts).addMessageListener(new TransportMessageListener() {
34+
@Override
35+
public void onResponseSent(long requestId, String action, Exception error) {
36+
TransportMessageListener.super.onResponseSent(requestId, action, error);
37+
if (action.startsWith("indices:data/read/search")) {
38+
Optional<Throwable> throwable = ExceptionsHelper.unwrapCausesAndSuppressed(
39+
error,
40+
t -> t.getStackTrace().length > 0
41+
);
42+
transportMessageHasStackTrace.set(throwable.isPresent());
43+
}
3644
}
37-
}
38-
}));
45+
}));
3946
return transportMessageHasStackTrace::get;
4047
}
4148
}

0 commit comments

Comments
 (0)