Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ void sendResponse(
);
assert response.hasReferences();
try {
sendMessage(channel, message, responseStatsConsumer, () -> messageListener.onResponseSent(requestId, action, response));
sendMessage(channel, message, responseStatsConsumer, () -> messageListener.onResponseSent(requestId, action));
} catch (Exception ex) {
if (isHandshake) {
logger.error(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@ default void onRequestReceived(long requestId, String action) {}
* Called for every action response sent after the response has been passed to the underlying network implementation.
* @param requestId the request ID (unique per client)
* @param action the request action
* @param response the response send
*/
default void onResponseSent(long requestId, String action, TransportResponse response) {}
default void onResponseSent(long requestId, String action) {}

/***
* Called for every failed action response after the response has been passed to the underlying network implementation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1288,7 +1288,7 @@ public void onResponseReceived(long requestId, Transport.ResponseContext holder)

/** called by the {@link Transport} implementation once a response was sent to calling node */
@Override
public void onResponseSent(long requestId, String action, TransportResponse response) {
public void onResponseSent(long requestId, String action) {
if (tracerLog.isTraceEnabled() && shouldTraceAction(action)) {
tracerLog.trace("[{}][{}] sent response", requestId, action);
}
Expand Down Expand Up @@ -1541,7 +1541,7 @@ public String getProfileName() {

@Override
public void sendResponse(TransportResponse response) {
service.onResponseSent(requestId, action, response);
service.onResponseSent(requestId, action);
try (var shutdownBlock = service.pendingDirectHandlers.withRef()) {
if (shutdownBlock == null) {
// already shutting down, the handler will be completed by sendRequestInternal or doStop
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,13 +220,11 @@ public void testSendResponse() throws IOException {

AtomicLong requestIdRef = new AtomicLong();
AtomicReference<String> actionRef = new AtomicReference<>();
AtomicReference<TransportResponse> responseRef = new AtomicReference<>();
handler.setMessageListener(new TransportMessageListener() {
@Override
public void onResponseSent(long requestId, String action, TransportResponse response) {
public void onResponseSent(long requestId, String action) {
requestIdRef.set(requestId);
actionRef.set(action);
responseRef.set(response);
}
});
if (compress) {
Expand All @@ -244,7 +242,6 @@ public void onResponseSent(long requestId, String action, TransportResponse resp
}
assertEquals(requestId, requestIdRef.get());
assertEquals(action, actionRef.get());
assertEquals(response, responseRef.get());

pipeline.handleBytes(channel, new ReleasableBytesReference(reference, () -> {}));
final Tuple<Header, BytesReference> tuple = message.get();
Expand Down Expand Up @@ -338,18 +335,15 @@ public void writeTo(StreamOutput out) {

AtomicLong requestIdRef = new AtomicLong();
AtomicReference<String> actionRef = new AtomicReference<>();
AtomicReference<TransportResponse> responseRef = new AtomicReference<>();
AtomicReference<Exception> exceptionRef = new AtomicReference<>();
handler.setMessageListener(new TransportMessageListener() {
@Override
public void onResponseSent(long requestId, String action, TransportResponse response) {
public void onResponseSent(long requestId, String action) {
assertNull(channel.getMessageCaptor().get());
assertThat(requestIdRef.get(), equalTo(0L));
requestIdRef.set(requestId);
assertNull(actionRef.get());
actionRef.set(action);
assertNull(responseRef.get());
responseRef.set(response);
}

@Override
Expand All @@ -374,7 +368,6 @@ public void onResponseSent(long requestId, String action, Exception error) {
}
assertEquals(requestId, requestIdRef.get());
assertEquals(action, actionRef.get());
assertEquals(response, responseRef.get());
assertThat(exceptionRef.get().getMessage(), equalTo("simulated cbe"));
assertTrue(response.released.get());
BytesReference reference = channel.getMessageCaptor().get();
Expand All @@ -400,18 +393,15 @@ public void sendMessage(BytesReference reference, ActionListener<Void> listener)
long requestId = randomLongBetween(0, 300);
AtomicLong requestIdRef = new AtomicLong();
AtomicReference<String> actionRef = new AtomicReference<>();
AtomicReference<TransportResponse> responseRef = new AtomicReference<>();
AtomicReference<Exception> exceptionRef = new AtomicReference<>();
handler.setMessageListener(new TransportMessageListener() {
@Override
public void onResponseSent(long requestId, String action, TransportResponse response) {
public void onResponseSent(long requestId, String action) {
assertNull(channel.getMessageCaptor().get());
assertThat(requestIdRef.get(), equalTo(0L));
requestIdRef.set(requestId);
assertNull(actionRef.get());
actionRef.set(action);
assertNull(responseRef.get());
responseRef.set(response);
}

@Override
Expand All @@ -436,7 +426,6 @@ public void writeTo(StreamOutput out) {
}
assertEquals(requestId, requestIdRef.get());
assertEquals(action, actionRef.get());
assertEquals(response, responseRef.get());
assertThat(exceptionRef.get().getMessage(), equalTo("simulated cbe"));
assertTrue(response.released.get());
assertNull(channel.getMessageCaptor().get());
Expand All @@ -457,17 +446,14 @@ public void writeTo(StreamOutput out) {

AtomicLong requestIdRef = new AtomicLong();
AtomicReference<String> actionRef = new AtomicReference<>();
AtomicReference<TransportResponse> responseRef = new AtomicReference<>();
handler.setMessageListener(new TransportMessageListener() {
@Override
public void onResponseSent(long requestId, String action, TransportResponse response) {
public void onResponseSent(long requestId, String action) {
assertNull(channel.getMessageCaptor().get());
assertThat(requestIdRef.get(), equalTo(0L));
requestIdRef.set(requestId);
assertNull(actionRef.get());
actionRef.set(action);
assertNull(responseRef.get());
responseRef.set(response);
}

@Override
Expand All @@ -485,7 +471,6 @@ public void onResponseSent(long requestId, String action, Exception error) {
assertNull(channel.getListenerCaptor().get());
assertEquals(requestId, requestIdRef.get());
assertEquals(action, actionRef.get());
assertEquals(response, responseRef.get());
assertTrue(response.released.get());
assertFalse(channel.isOpen());
}
Expand All @@ -507,7 +492,7 @@ public void sendMessage(BytesReference reference, ActionListener<Void> listener)
AtomicReference<Exception> exceptionRef = new AtomicReference<>();
handler.setMessageListener(new TransportMessageListener() {
@Override
public void onResponseSent(long requestId, String action, TransportResponse response) {
public void onResponseSent(long requestId, String action) {
throw new AssertionError("must not be called");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
import org.elasticsearch.transport.TransportMessageListener;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.TransportSettings;
import org.elasticsearch.transport.netty4.Netty4Transport;
Expand Down Expand Up @@ -845,9 +844,9 @@ public void onResponseReceived(long requestId, Transport.ResponseContext holder)
}

@Override
public void onResponseSent(long requestId, String action, TransportResponse response) {
super.onResponseSent(requestId, action, response);
messageListener.onResponseSent(requestId, action, response);
public void onResponseSent(long requestId, String action) {
super.onResponseSent(requestId, action);
messageListener.onResponseSent(requestId, action);
}

@Override
Expand Down Expand Up @@ -876,9 +875,9 @@ public void onRequestReceived(long requestId, String action) {
}

@Override
public void onResponseSent(long requestId, String action, TransportResponse response) {
public void onResponseSent(long requestId, String action) {
for (TransportMessageListener listener : listeners) {
listener.onResponseSent(requestId, action, response);
listener.onResponseSent(requestId, action);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ public void onRequestReceived(long requestId, String action) {
}

@Override
public void onResponseSent(long requestId, String action, TransportResponse response) {
public void onResponseSent(long requestId, String action) {
if (action.equals(ACTION)) {
responseSent.incrementAndGet();
}
Expand Down