diff --git a/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java b/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java index 3e727216b8693..88686e7cf63e1 100644 --- a/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java +++ b/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java @@ -148,7 +148,7 @@ void sendResponse( ); assert response.hasReferences(); try { - sendMessage(channel, message, responseStatsConsumer, () -> messageListener.onResponseSent(requestId, action, response)); + sendMessage(channel, message, responseStatsConsumer, () -> messageListener.onResponseSent(requestId, action)); } catch (Exception ex) { if (isHandshake) { logger.error( diff --git a/server/src/main/java/org/elasticsearch/transport/TransportMessageListener.java b/server/src/main/java/org/elasticsearch/transport/TransportMessageListener.java index 8da32a98bcc1d..e751dd7e0aab4 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportMessageListener.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportMessageListener.java @@ -27,9 +27,8 @@ default void onRequestReceived(long requestId, String action) {} * Called for every action response sent after the response has been passed to the underlying network implementation. * @param requestId the request ID (unique per client) * @param action the request action - * @param response the response send */ - default void onResponseSent(long requestId, String action, TransportResponse response) {} + default void onResponseSent(long requestId, String action) {} /*** * Called for every failed action response after the response has been passed to the underlying network implementation. diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index 247ab22020ef2..01ea7b6a03127 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -1288,7 +1288,7 @@ public void onResponseReceived(long requestId, Transport.ResponseContext holder) /** called by the {@link Transport} implementation once a response was sent to calling node */ @Override - public void onResponseSent(long requestId, String action, TransportResponse response) { + public void onResponseSent(long requestId, String action) { if (tracerLog.isTraceEnabled() && shouldTraceAction(action)) { tracerLog.trace("[{}][{}] sent response", requestId, action); } @@ -1541,7 +1541,7 @@ public String getProfileName() { @Override public void sendResponse(TransportResponse response) { - service.onResponseSent(requestId, action, response); + service.onResponseSent(requestId, action); try (var shutdownBlock = service.pendingDirectHandlers.withRef()) { if (shutdownBlock == null) { // already shutting down, the handler will be completed by sendRequestInternal or doStop diff --git a/server/src/test/java/org/elasticsearch/transport/OutboundHandlerTests.java b/server/src/test/java/org/elasticsearch/transport/OutboundHandlerTests.java index 8a63097d6441c..8d18841f8e18e 100644 --- a/server/src/test/java/org/elasticsearch/transport/OutboundHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/OutboundHandlerTests.java @@ -220,13 +220,11 @@ public void testSendResponse() throws IOException { AtomicLong requestIdRef = new AtomicLong(); AtomicReference actionRef = new AtomicReference<>(); - AtomicReference responseRef = new AtomicReference<>(); handler.setMessageListener(new TransportMessageListener() { @Override - public void onResponseSent(long requestId, String action, TransportResponse response) { + public void onResponseSent(long requestId, String action) { requestIdRef.set(requestId); actionRef.set(action); - responseRef.set(response); } }); if (compress) { @@ -244,7 +242,6 @@ public void onResponseSent(long requestId, String action, TransportResponse resp } assertEquals(requestId, requestIdRef.get()); assertEquals(action, actionRef.get()); - assertEquals(response, responseRef.get()); pipeline.handleBytes(channel, new ReleasableBytesReference(reference, () -> {})); final Tuple tuple = message.get(); @@ -338,18 +335,15 @@ public void writeTo(StreamOutput out) { AtomicLong requestIdRef = new AtomicLong(); AtomicReference actionRef = new AtomicReference<>(); - AtomicReference responseRef = new AtomicReference<>(); AtomicReference exceptionRef = new AtomicReference<>(); handler.setMessageListener(new TransportMessageListener() { @Override - public void onResponseSent(long requestId, String action, TransportResponse response) { + public void onResponseSent(long requestId, String action) { assertNull(channel.getMessageCaptor().get()); assertThat(requestIdRef.get(), equalTo(0L)); requestIdRef.set(requestId); assertNull(actionRef.get()); actionRef.set(action); - assertNull(responseRef.get()); - responseRef.set(response); } @Override @@ -374,7 +368,6 @@ public void onResponseSent(long requestId, String action, Exception error) { } assertEquals(requestId, requestIdRef.get()); assertEquals(action, actionRef.get()); - assertEquals(response, responseRef.get()); assertThat(exceptionRef.get().getMessage(), equalTo("simulated cbe")); assertTrue(response.released.get()); BytesReference reference = channel.getMessageCaptor().get(); @@ -400,18 +393,15 @@ public void sendMessage(BytesReference reference, ActionListener listener) long requestId = randomLongBetween(0, 300); AtomicLong requestIdRef = new AtomicLong(); AtomicReference actionRef = new AtomicReference<>(); - AtomicReference responseRef = new AtomicReference<>(); AtomicReference exceptionRef = new AtomicReference<>(); handler.setMessageListener(new TransportMessageListener() { @Override - public void onResponseSent(long requestId, String action, TransportResponse response) { + public void onResponseSent(long requestId, String action) { assertNull(channel.getMessageCaptor().get()); assertThat(requestIdRef.get(), equalTo(0L)); requestIdRef.set(requestId); assertNull(actionRef.get()); actionRef.set(action); - assertNull(responseRef.get()); - responseRef.set(response); } @Override @@ -436,7 +426,6 @@ public void writeTo(StreamOutput out) { } assertEquals(requestId, requestIdRef.get()); assertEquals(action, actionRef.get()); - assertEquals(response, responseRef.get()); assertThat(exceptionRef.get().getMessage(), equalTo("simulated cbe")); assertTrue(response.released.get()); assertNull(channel.getMessageCaptor().get()); @@ -457,17 +446,14 @@ public void writeTo(StreamOutput out) { AtomicLong requestIdRef = new AtomicLong(); AtomicReference actionRef = new AtomicReference<>(); - AtomicReference responseRef = new AtomicReference<>(); handler.setMessageListener(new TransportMessageListener() { @Override - public void onResponseSent(long requestId, String action, TransportResponse response) { + public void onResponseSent(long requestId, String action) { assertNull(channel.getMessageCaptor().get()); assertThat(requestIdRef.get(), equalTo(0L)); requestIdRef.set(requestId); assertNull(actionRef.get()); actionRef.set(action); - assertNull(responseRef.get()); - responseRef.set(response); } @Override @@ -485,7 +471,6 @@ public void onResponseSent(long requestId, String action, Exception error) { assertNull(channel.getListenerCaptor().get()); assertEquals(requestId, requestIdRef.get()); assertEquals(action, actionRef.get()); - assertEquals(response, responseRef.get()); assertTrue(response.released.get()); assertFalse(channel.isOpen()); } @@ -507,7 +492,7 @@ public void sendMessage(BytesReference reference, ActionListener listener) AtomicReference exceptionRef = new AtomicReference<>(); handler.setMessageListener(new TransportMessageListener() { @Override - public void onResponseSent(long requestId, String action, TransportResponse response) { + public void onResponseSent(long requestId, String action) { throw new AssertionError("must not be called"); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index df77d2b939dfe..3f6cf453fd0d1 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -60,7 +60,6 @@ import org.elasticsearch.transport.TransportMessageListener; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; -import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportSettings; import org.elasticsearch.transport.netty4.Netty4Transport; @@ -845,9 +844,9 @@ public void onResponseReceived(long requestId, Transport.ResponseContext holder) } @Override - public void onResponseSent(long requestId, String action, TransportResponse response) { - super.onResponseSent(requestId, action, response); - messageListener.onResponseSent(requestId, action, response); + public void onResponseSent(long requestId, String action) { + super.onResponseSent(requestId, action); + messageListener.onResponseSent(requestId, action); } @Override @@ -876,9 +875,9 @@ public void onRequestReceived(long requestId, String action) { } @Override - public void onResponseSent(long requestId, String action, TransportResponse response) { + public void onResponseSent(long requestId, String action) { for (TransportMessageListener listener : listeners) { - listener.onResponseSent(requestId, action, response); + listener.onResponseSent(requestId, action); } } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index a0cce9079391d..11fe07e005b9c 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -545,7 +545,7 @@ public void onRequestReceived(long requestId, String action) { } @Override - public void onResponseSent(long requestId, String action, TransportResponse response) { + public void onResponseSent(long requestId, String action) { if (action.equals(ACTION)) { responseSent.incrementAndGet(); }