Skip to content

Commit 4e326c5

Browse files
authored
MockTransportService did not call handle exception (#102766) (#102805)
A failure to send the delayed message would previously not call the handler logic for exceptions. This resulted in some tests complaining about un-subtracted bytes in IndexingPressure. Also adding some logging that helped figuring out the situation. Fixes #92344
1 parent eacb70f commit 4e326c5

File tree

5 files changed

+64
-37
lines changed

5 files changed

+64
-37
lines changed

server/src/main/java/org/elasticsearch/index/IndexingPressure.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
import org.apache.logging.log4j.LogManager;
1212
import org.apache.logging.log4j.Logger;
13+
import org.elasticsearch.common.Strings;
1314
import org.elasticsearch.common.settings.Setting;
1415
import org.elasticsearch.common.settings.Settings;
1516
import org.elasticsearch.common.unit.ByteSizeValue;
@@ -101,12 +102,14 @@ public Releasable markCoordinatingOperationStarted(int operations, long bytes, b
101102
false
102103
);
103104
}
105+
logger.trace(() -> Strings.format("adding [%d] coordinating operations and [%d] bytes", operations, bytes));
104106
currentCoordinatingBytes.getAndAdd(bytes);
105107
currentCoordinatingOps.getAndAdd(operations);
106108
totalCombinedCoordinatingAndPrimaryBytes.getAndAdd(bytes);
107109
totalCoordinatingBytes.getAndAdd(bytes);
108110
totalCoordinatingOps.getAndAdd(operations);
109111
return wrapReleasable(() -> {
112+
logger.trace(() -> Strings.format("removing [%d] coordinating operations and [%d] bytes", operations, bytes));
110113
this.currentCombinedCoordinatingAndPrimaryBytes.getAndAdd(-bytes);
111114
this.currentCoordinatingBytes.getAndAdd(-bytes);
112115
this.currentCoordinatingOps.getAndAdd(-operations);
@@ -153,12 +156,14 @@ public Releasable markPrimaryOperationStarted(int operations, long bytes, boolea
153156
false
154157
);
155158
}
159+
logger.trace(() -> Strings.format("adding [%d] primary operations and [%d] bytes", operations, bytes));
156160
currentPrimaryBytes.getAndAdd(bytes);
157161
currentPrimaryOps.getAndAdd(operations);
158162
totalCombinedCoordinatingAndPrimaryBytes.getAndAdd(bytes);
159163
totalPrimaryBytes.getAndAdd(bytes);
160164
totalPrimaryOps.getAndAdd(operations);
161165
return wrapReleasable(() -> {
166+
logger.trace(() -> Strings.format("removing [%d] primary operations and [%d] bytes", operations, bytes));
162167
this.currentCombinedCoordinatingAndPrimaryBytes.getAndAdd(-bytes);
163168
this.currentPrimaryBytes.getAndAdd(-bytes);
164169
this.currentPrimaryOps.getAndAdd(-operations);

server/src/main/java/org/elasticsearch/transport/Transport.java

Lines changed: 17 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ interface Connection extends Closeable, RefCounted {
107107

108108
/**
109109
* Sends the request to the node this connection is associated with
110-
* @param requestId see {@link ResponseHandlers#add(ResponseContext)} for details
110+
* @param requestId see {@link ResponseHandlers#add(TransportResponseHandler, Connection, String)} for details
111111
* @param action the action to execute
112112
* @param request the request to send
113113
* @param options request options to apply
@@ -163,35 +163,15 @@ default Object getCacheKey() {
163163
}
164164

165165
/**
166-
* This class represents a response context that encapsulates the actual response handler, the action and the connection it was
167-
* executed on.
166+
* This class represents a response context that encapsulates the actual response handler, the action. the connection it was
167+
* executed on, and the request ID.
168168
*/
169-
final class ResponseContext<T extends TransportResponse> {
170-
171-
private final TransportResponseHandler<T> handler;
172-
173-
private final Connection connection;
174-
175-
private final String action;
176-
177-
ResponseContext(TransportResponseHandler<T> handler, Connection connection, String action) {
178-
this.handler = handler;
179-
this.connection = connection;
180-
this.action = action;
181-
}
182-
183-
public TransportResponseHandler<T> handler() {
184-
return handler;
185-
}
186-
187-
public Connection connection() {
188-
return this.connection;
189-
}
190-
191-
public String action() {
192-
return this.action;
193-
}
194-
}
169+
record ResponseContext<T extends TransportResponse>(
170+
TransportResponseHandler<T> handler,
171+
Connection connection,
172+
String action,
173+
long requestId
174+
) {};
195175

196176
/**
197177
* This class is a registry that allows
@@ -218,14 +198,19 @@ public ResponseContext<? extends TransportResponse> remove(long requestId) {
218198

219199
/**
220200
* Adds a new response context and associates it with a new request ID.
221-
* @return the new request ID
201+
* @return the new response context
222202
* @see Connection#sendRequest(long, String, TransportRequest, TransportRequestOptions)
223203
*/
224-
public long add(ResponseContext<? extends TransportResponse> holder) {
204+
public ResponseContext<? extends TransportResponse> add(
205+
TransportResponseHandler<? extends TransportResponse> handler,
206+
Connection connection,
207+
String action
208+
) {
225209
long requestId = newRequestId();
210+
ResponseContext<? extends TransportResponse> holder = new ResponseContext<>(handler, connection, action, requestId);
226211
ResponseContext<? extends TransportResponse> existing = handlers.put(requestId, holder);
227212
assert existing == null : "request ID already in use: " + requestId;
228-
return requestId;
213+
return holder;
229214
}
230215

231216
/**

server/src/main/java/org/elasticsearch/transport/TransportService.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,10 @@ protected void doStop() {
358358
try {
359359
final TransportResponseHandler<?> handler = holderToNotify.handler();
360360
final var targetNode = holderToNotify.connection().getNode();
361+
final long requestId = holderToNotify.requestId();
362+
if (tracerLog.isTraceEnabled() && shouldTraceAction(holderToNotify.action())) {
363+
tracerLog.trace("[{}][{}] pruning request for node [{}]", requestId, holderToNotify.action(), targetNode);
364+
}
361365

362366
assert transport instanceof TcpTransport == false
363367
/* other transports (used in tests) may not implement the proper close-connection behaviour. TODO fix this. */
@@ -921,7 +925,7 @@ private <T extends TransportResponse> void sendRequestInternal(
921925
Supplier<ThreadContext.StoredContext> storedContextSupplier = threadPool.getThreadContext().newRestorableContext(true);
922926
ContextRestoreResponseHandler<T> responseHandler = new ContextRestoreResponseHandler<>(storedContextSupplier, handler);
923927
// TODO we can probably fold this entire request ID dance into connection.sendRequest but it will be a bigger refactoring
924-
final long requestId = responseHandlers.add(new Transport.ResponseContext<>(responseHandler, connection, action));
928+
final long requestId = responseHandlers.add(responseHandler, connection, action).requestId();
925929
request.setRequestId(requestId);
926930
final TimeoutHandler timeoutHandler;
927931
if (options.timeout() != null) {
@@ -950,7 +954,7 @@ private <T extends TransportResponse> void sendRequestInternal(
950954
}
951955
}
952956

953-
private void handleInternalSendException(
957+
protected void handleInternalSendException(
954958
String action,
955959
DiscoveryNode node,
956960
long requestId,
@@ -985,6 +989,9 @@ public void onFailure(Exception e) {
985989

986990
@Override
987991
protected void doRun() {
992+
if (tracerLog.isTraceEnabled() && shouldTraceAction(action)) {
993+
tracerLog.trace("[{}][{}] failed to send request to node [{}]", requestId, action, node);
994+
}
988995
contextToNotify.handler().handleException(sendRequestException);
989996
}
990997
});
@@ -1302,6 +1309,14 @@ public void onConnectionClosed(Transport.Connection connection) {
13021309
@Override
13031310
public void doRun() {
13041311
for (Transport.ResponseContext<?> holderToNotify : pruned) {
1312+
if (tracerLog.isTraceEnabled() && shouldTraceAction(holderToNotify.action())) {
1313+
tracerLog.trace(
1314+
"[{}][{}] pruning request because connection to node [{}] closed",
1315+
holderToNotify.requestId(),
1316+
holderToNotify.action(),
1317+
connection.getNode()
1318+
);
1319+
}
13051320
holderToNotify.handler().handleException(new NodeDisconnectedException(connection.getNode(), holderToNotify.action()));
13061321
}
13071322
}

server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ public void testRequestAndResponse() throws Exception {
137137
AtomicReference<Exception> exceptionCaptor = new AtomicReference<>();
138138
AtomicReference<TransportChannel> channelCaptor = new AtomicReference<>();
139139

140-
long requestId = responseHandlers.add(new Transport.ResponseContext<>(new TransportResponseHandler<TestResponse>() {
140+
long requestId = responseHandlers.add(new TransportResponseHandler<TestResponse>() {
141141
@Override
142142
public Executor executor(ThreadPool threadPool) {
143143
return TransportResponseHandler.TRANSPORT_WORKER;
@@ -157,7 +157,7 @@ public void handleException(TransportException exp) {
157157
public TestResponse read(StreamInput in) throws IOException {
158158
return new TestResponse(in);
159159
}
160-
}, null, action));
160+
}, null, action).requestId();
161161
RequestHandlerRegistry<TestRequest> registry = new RequestHandlerRegistry<>(
162162
action,
163163
TestRequest::new,

test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.elasticsearch.common.util.concurrent.RunOnce;
3232
import org.elasticsearch.core.IOUtils;
3333
import org.elasticsearch.core.Nullable;
34+
import org.elasticsearch.core.Strings;
3435
import org.elasticsearch.core.TimeValue;
3536
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
3637
import org.elasticsearch.node.Node;
@@ -546,11 +547,23 @@ public void sendRequest(
546547
final RunOnce runnable = new RunOnce(new AbstractRunnable() {
547548
@Override
548549
public void onFailure(Exception e) {
549-
logger.debug("failed to send delayed request", e);
550+
logger.debug(
551+
() -> Strings.format(
552+
"[%d][%s] failed to send delayed request to node [%s]",
553+
requestId,
554+
action,
555+
connection.getNode()
556+
),
557+
e
558+
);
559+
handleInternalSendException(action, connection.getNode(), requestId, null, e);
550560
}
551561

552562
@Override
553563
protected void doRun() throws IOException {
564+
logger.debug(
565+
() -> Strings.format("[%d][%s] sending delayed request to node [%s]", requestId, action, connection.getNode())
566+
);
554567
connection.sendRequest(requestId, action, clonedRequest, options);
555568
}
556569
});
@@ -561,6 +574,15 @@ protected void doRun() throws IOException {
561574
runnable.run();
562575
} else {
563576
requestsToSendWhenCleared.add(runnable);
577+
logger.debug(
578+
() -> Strings.format(
579+
"[%d][%s] delaying sending request to node [%s] by [%s]",
580+
requestId,
581+
action,
582+
connection.getNode(),
583+
delay
584+
)
585+
);
564586
threadPool.schedule(runnable, delay, threadPool.generic());
565587
}
566588
}

0 commit comments

Comments
 (0)