Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,29 @@
import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.Request;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.ErrorTraceHelper;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.xcontent.XContentType;
import org.junit.Before;

import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Collection;
import java.util.function.BooleanSupplier;

import static org.elasticsearch.index.query.QueryBuilders.simpleQueryStringQuery;

public class SearchErrorTraceIT extends HttpSmokeTestCase {
private BooleanSupplier hasStackTrace;

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return CollectionUtils.appendToCopyNoNullElements(super.nodePlugins(), MockTransportService.TestPlugin.class);
}

@Before
public void setupMessageListener() {
hasStackTrace = ErrorTraceHelper.setupErrorTraceListener(internalCluster());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@
import org.elasticsearch.transport.TransportMessageListener;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.io.UncheckedIOException;
Expand Down Expand Up @@ -1052,8 +1051,7 @@ public void testAbortWaitsOnDataNode() throws Exception {

final AtomicBoolean blocked = new AtomicBoolean(true);

final TransportService transportService = internalCluster().getInstance(TransportService.class, otherDataNode);
transportService.addMessageListener(new TransportMessageListener() {
MockTransportService.getInstance(otherDataNode).addMessageListener(new TransportMessageListener() {
@Override
public void onRequestSent(
DiscoveryNode node,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,9 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
Expand Down Expand Up @@ -103,8 +101,7 @@ public class TransportService extends AbstractLifecycleComponent
Setting.Property.Deprecated
);

private final AtomicBoolean handleIncomingRequests = new AtomicBoolean();
private final DelegatingTransportMessageListener messageListener = new DelegatingTransportMessageListener();
private volatile boolean handleIncomingRequests;
protected final Transport transport;
protected final ConnectionManager connectionManager;
protected final ThreadPool threadPool;
Expand Down Expand Up @@ -134,7 +131,7 @@ protected boolean removeEldestEntry(Map.Entry<Long, TimeoutInfoHolder> eldest) {

// tracer log

private final Logger tracerLog;
private static final Logger tracerLog = Loggers.getLogger(logger, ".tracer");
private final Tracer tracer;

volatile String[] tracerLogInclude;
Expand Down Expand Up @@ -291,7 +288,6 @@ public TransportService(
this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
setTracerLogInclude(TransportSettings.TRACE_LOG_INCLUDE_SETTING.get(settings));
setTracerLogExclude(TransportSettings.TRACE_LOG_EXCLUDE_SETTING.get(settings));
tracerLog = Loggers.getLogger(logger, ".tracer");
this.taskManager = taskManger;
this.interceptor = transportInterceptor;
this.asyncSender = interceptor.interceptSender(this::sendRequestInternal);
Expand Down Expand Up @@ -432,8 +428,8 @@ protected void doClose() throws IOException {
* reject any incoming requests, including handshakes, by closing the connection.
*/
public final void acceptIncomingRequests() {
final boolean startedWithThisCall = handleIncomingRequests.compareAndSet(false, true);
assert startedWithThisCall : "transport service was already accepting incoming requests";
assert handleIncomingRequests == false : "transport service was already accepting incoming requests";
handleIncomingRequests = true;
logger.debug("now accepting incoming requests");
}

Expand Down Expand Up @@ -750,14 +746,6 @@ public void disconnectFromNode(DiscoveryNode node) {
connectionManager.disconnectFromNode(node);
}

public void addMessageListener(TransportMessageListener listener) {
messageListener.listeners.add(listener);
}

public void removeMessageListener(TransportMessageListener listener) {
messageListener.listeners.remove(listener);
}

public void addConnectionListener(TransportConnectionListener listener) {
connectionManager.addListener(listener);
}
Expand Down Expand Up @@ -1265,13 +1253,12 @@ public <Request extends TransportRequest> void registerRequestHandler(
*/
@Override
public void onRequestReceived(long requestId, String action) {
if (handleIncomingRequests.get() == false) {
if (handleIncomingRequests == false) {
throw new TransportNotReadyException();
}
if (tracerLog.isTraceEnabled() && shouldTraceAction(action)) {
tracerLog.trace("[{}][{}] received request", requestId, action);
}
messageListener.onRequestReceived(requestId, action);
}

/** called by the {@link Transport} implementation once a request has been sent */
Expand All @@ -1286,7 +1273,6 @@ public void onRequestSent(
if (tracerLog.isTraceEnabled() && shouldTraceAction(action)) {
tracerLog.trace("[{}][{}] sent to [{}] (timeout: [{}])", requestId, action, node, options.timeout());
}
messageListener.onRequestSent(node, requestId, action, request, options);
}

@Override
Expand All @@ -1297,7 +1283,6 @@ public void onResponseReceived(long requestId, Transport.ResponseContext holder)
} else if (tracerLog.isTraceEnabled() && shouldTraceAction(holder.action())) {
tracerLog.trace("[{}][{}] received response from [{}]", requestId, holder.action(), holder.connection().getNode());
}
messageListener.onResponseReceived(requestId, holder);
}

/** called by the {@link Transport} implementation once a response was sent to calling node */
Expand All @@ -1306,7 +1291,6 @@ public void onResponseSent(long requestId, String action, TransportResponse resp
if (tracerLog.isTraceEnabled() && shouldTraceAction(action)) {
tracerLog.trace("[{}][{}] sent response", requestId, action);
}
messageListener.onResponseSent(requestId, action, response);
}

/** called by the {@link Transport} implementation after an exception was sent as a response to an incoming request */
Expand All @@ -1315,7 +1299,6 @@ public void onResponseSent(long requestId, String action, Exception e) {
if (tracerLog.isTraceEnabled() && shouldTraceAction(action)) {
tracerLog.trace(() -> format("[%s][%s] sent error response", requestId, action), e);
}
messageListener.onResponseSent(requestId, action, e);
}

public RequestHandlerRegistry<? extends TransportRequest> getRequestHandler(String action) {
Expand Down Expand Up @@ -1453,6 +1436,7 @@ public void run() {
public void cancel() {
assert responseHandlers.contains(requestId) == false
: "cancel must be called after the requestId [" + requestId + "] has been removed from clientHandlers";
var cancellable = this.cancellable;
if (cancellable != null) {
cancellable.cancel();
}
Expand Down Expand Up @@ -1492,6 +1476,7 @@ public T read(StreamInput in) throws IOException {

@Override
public void handleResponse(T response) {
var handler = this.handler;
if (handler != null) {
handler.cancel();
}
Expand All @@ -1502,6 +1487,7 @@ public void handleResponse(T response) {

@Override
public void handleException(TransportException exp) {
var handler = this.handler;
if (handler != null) {
handler.cancel();
}
Expand Down Expand Up @@ -1666,53 +1652,6 @@ private boolean isLocalNode(DiscoveryNode discoveryNode) {
return discoveryNode.equals(localNode);
}

private static final class DelegatingTransportMessageListener implements TransportMessageListener {

private final List<TransportMessageListener> listeners = new CopyOnWriteArrayList<>();

@Override
public void onRequestReceived(long requestId, String action) {
for (TransportMessageListener listener : listeners) {
listener.onRequestReceived(requestId, action);
}
}

@Override
public void onResponseSent(long requestId, String action, TransportResponse response) {
for (TransportMessageListener listener : listeners) {
listener.onResponseSent(requestId, action, response);
}
}

@Override
public void onResponseSent(long requestId, String action, Exception error) {
for (TransportMessageListener listener : listeners) {
listener.onResponseSent(requestId, action, error);
}
}

@Override
public void onRequestSent(
DiscoveryNode node,
long requestId,
String action,
TransportRequest request,
TransportRequestOptions finalOptions
) {
for (TransportMessageListener listener : listeners) {
listener.onRequestSent(node, requestId, action, request, finalOptions);
}
}

@Override
@SuppressWarnings("rawtypes")
public void onResponseReceived(long requestId, Transport.ResponseContext holder) {
for (TransportMessageListener listener : listeners) {
listener.onResponseReceived(requestId, holder);
}
}
}

private static class PendingDirectHandlers extends AbstractRefCounted {

// To handle a response we (i) remove the handler from responseHandlers and then (ii) enqueue an action to complete the handler on
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportMessageListener;
import org.elasticsearch.transport.TransportService;

import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BooleanSupplier;

import static org.elasticsearch.test.ESTestCase.asInstanceOf;

/**
* Utilities around testing the `error_trace` message header in search.
*/
Expand All @@ -26,16 +29,20 @@ public enum ErrorTraceHelper {

public static BooleanSupplier setupErrorTraceListener(InternalTestCluster internalCluster) {
final AtomicBoolean transportMessageHasStackTrace = new AtomicBoolean(false);
internalCluster.getDataNodeInstances(TransportService.class).forEach(ts -> ts.addMessageListener(new TransportMessageListener() {
@Override
public void onResponseSent(long requestId, String action, Exception error) {
TransportMessageListener.super.onResponseSent(requestId, action, error);
if (action.startsWith("indices:data/read/search")) {
Optional<Throwable> throwable = ExceptionsHelper.unwrapCausesAndSuppressed(error, t -> t.getStackTrace().length > 0);
transportMessageHasStackTrace.set(throwable.isPresent());
internalCluster.getDataNodeInstances(TransportService.class)
.forEach(ts -> asInstanceOf(MockTransportService.class, ts).addMessageListener(new TransportMessageListener() {
@Override
public void onResponseSent(long requestId, String action, Exception error) {
TransportMessageListener.super.onResponseSent(requestId, action, error);
if (action.startsWith("indices:data/read/search")) {
Optional<Throwable> throwable = ExceptionsHelper.unwrapCausesAndSuppressed(
error,
t -> t.getStackTrace().length > 0
);
transportMessageHasStackTrace.set(throwable.isPresent());
}
}
}
}));
}));
return transportMessageHasStackTrace::get;
}
}
Loading