diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index 1e0fa28889c97..097ad8ea73a72 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -21,6 +21,7 @@ import org.elasticsearch.client.internal.OriginSettingClient; import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -52,6 +53,7 @@ import org.elasticsearch.transport.RemoteClusterService; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportActionProxy; +import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequestHandler; import org.elasticsearch.transport.TransportRequestOptions; @@ -487,6 +489,7 @@ public static void registerRequestHandler(TransportService transportService, Sea (request, channel, task) -> searchService.executeFetchPhase( request, (SearchShardTask) task, + maybeGetNetworkBuffer(transportService, channel), new ChannelActionListener<>(channel) ) ); @@ -503,7 +506,12 @@ public static void registerRequestHandler(TransportService transportService, Sea TransportActionProxy.registerProxyAction(transportService, RANK_FEATURE_SHARD_ACTION_NAME, true, RankFeatureResult::new); final TransportRequestHandler shardFetchRequestHandler = (request, channel, task) -> searchService - .executeFetchPhase(request, (SearchShardTask) task, new ChannelActionListener<>(channel)); + .executeFetchPhase( + request, + (SearchShardTask) task, + maybeGetNetworkBuffer(transportService, channel), + new ChannelActionListener<>(channel) + ); transportService.registerRequestHandler( FETCH_ID_SCROLL_ACTION_NAME, EsExecutors.DIRECT_EXECUTOR_SERVICE, @@ -531,6 +539,12 @@ public static void registerRequestHandler(TransportService transportService, Sea TransportActionProxy.registerProxyAction(transportService, QUERY_CAN_MATCH_NODE_NAME, true, CanMatchNodeResponse::new); } + private static RecyclerBytesStreamOutput maybeGetNetworkBuffer(TransportService transportService, TransportChannel channel) { + return TransportService.DIRECT_RESPONSE_PROFILE.equals(channel.getProfileName()) || channel.compressionScheme() != null + ? null + : transportService.newNetworkBytesStream(); + } + private static Executor buildFreeContextExecutor(TransportService transportService) { final ThrottledTaskRunner throttledTaskRunner = new ThrottledTaskRunner( "free_context", diff --git a/server/src/main/java/org/elasticsearch/search/SearchHits.java b/server/src/main/java/org/elasticsearch/search/SearchHits.java index ed6fe5098d2be..454af58043412 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchHits.java +++ b/server/src/main/java/org/elasticsearch/search/SearchHits.java @@ -157,16 +157,9 @@ public boolean isPooled() { @Override public void writeTo(StreamOutput out) throws IOException { assert hasReferences(); - final boolean hasTotalHits = totalHits != null; - out.writeBoolean(hasTotalHits); - if (hasTotalHits) { - Lucene.writeTotalHits(out, totalHits); - } - out.writeFloat(maxScore); + writeHeader(out); out.writeArray(hits); - out.writeOptional(Lucene::writeSortFieldArray, sortFields); - out.writeOptionalString(collapseField); - out.writeOptionalArray(Lucene::writeSortValue, collapseValues); + writeFooter(out); } /** @@ -260,6 +253,37 @@ private void deallocate() { } } + public void writeAndRelease(StreamOutput out) throws IOException { + boolean released = refCounted.decRef(); + assert released; + writeHeader(out); + var hits = this.hits; + out.writeVInt(hits.length); + for (int i = 0; i < hits.length; i++) { + var h = hits[i]; + hits[i] = null; + assert h != null; + h.writeTo(out); + h.decRef(); + } + writeFooter(out); + } + + private void writeFooter(StreamOutput out) throws IOException { + out.writeOptional(Lucene::writeSortFieldArray, sortFields); + out.writeOptionalString(collapseField); + out.writeOptionalArray(Lucene::writeSortValue, collapseValues); + } + + private void writeHeader(StreamOutput out) throws IOException { + final boolean hasTotalHits = totalHits != null; + out.writeBoolean(hasTotalHits); + if (hasTotalHits) { + Lucene.writeTotalHits(out, totalHits); + } + out.writeFloat(maxScore); + } + @Override public boolean hasReferences() { return refCounted.hasReferences(); diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 2a814a1a36489..01950f1f9c747 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -36,7 +36,9 @@ import org.elasticsearch.common.CheckedSupplier; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput; import org.elasticsearch.common.logging.LoggerMessageFormat; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.Setting; @@ -95,7 +97,6 @@ import org.elasticsearch.search.dfs.DfsPhase; import org.elasticsearch.search.dfs.DfsSearchResult; import org.elasticsearch.search.fetch.FetchPhase; -import org.elasticsearch.search.fetch.FetchSearchResult; import org.elasticsearch.search.fetch.QueryFetchSearchResult; import org.elasticsearch.search.fetch.ScrollQueryFetchSearchResult; import org.elasticsearch.search.fetch.ShardFetchRequest; @@ -136,7 +137,9 @@ import org.elasticsearch.threadpool.Scheduler.Cancellable; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool.Names; +import org.elasticsearch.transport.BytesTransportResponse; import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.Transports; import java.io.IOException; @@ -1107,7 +1110,8 @@ private Executor getExecutor(IndexShard indexShard) { public void executeFetchPhase( InternalScrollSearchRequest request, SearchShardTask task, - ActionListener listener + RecyclerBytesStreamOutput networkBuffer, + ActionListener listener ) { final LegacyReaderContext readerContext = (LegacyReaderContext) findReaderContext(request.contextId(), request); final Releasable markAsUsed; @@ -1139,8 +1143,14 @@ public void executeFetchPhase( opsListener.onFailedQueryPhase(searchContext); } } - QueryFetchSearchResult fetchSearchResult = executeFetchPhase(readerContext, searchContext, afterQueryTime); - return new ScrollQueryFetchSearchResult(fetchSearchResult, searchContext.shardTarget()); + var resp = executeFetchPhase(readerContext, searchContext, afterQueryTime); + if (networkBuffer == null) { + return new ScrollQueryFetchSearchResult(resp, searchContext.shardTarget()); + } + searchContext.shardTarget().writeTo(networkBuffer); + resp.writeTo(networkBuffer); + resp.decRef(); + return new BytesTransportResponse(new ReleasableBytesReference(networkBuffer.bytes(), networkBuffer)); } catch (Exception e) { assert TransportActions.isShardNotAvailableException(e) == false : new AssertionError(e); logger.trace("Fetch phase failed", e); @@ -1150,7 +1160,12 @@ public void executeFetchPhase( }, wrapFailureListener(listener, readerContext, markAsUsed)); } - public void executeFetchPhase(ShardFetchRequest request, CancellableTask task, ActionListener listener) { + public void executeFetchPhase( + ShardFetchRequest request, + CancellableTask task, + RecyclerBytesStreamOutput networkBuffer, + ActionListener listener + ) { final ReaderContext readerContext = findReaderContext(request.contextId(), request); final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.getShardSearchRequest()); final Releasable markAsUsed = readerContext.markAsUsed(getKeepAlive(shardSearchRequest)); @@ -1179,8 +1194,18 @@ public void executeFetchPhase(ShardFetchRequest request, CancellableTask task, A } var fetchResult = searchContext.fetchResult(); // inc-ref fetch result because we close the SearchContext that references it in this try-with-resources block - fetchResult.incRef(); - return fetchResult; + if (networkBuffer == null) { + fetchResult.incRef(); + return fetchResult; + } + try (networkBuffer) { + // no need to worry about releasing this instance safely before we write the first byte to it + // => the try-with-resources here is all we need to not leak any buffers + fetchResult.contextId.writeTo(networkBuffer); + fetchResult.consumeHits(networkBuffer); + networkBuffer.writeOptionalWriteable(fetchResult.profileResult()); + return new BytesTransportResponse(networkBuffer.moveToBytesReference()); + } } catch (Exception e) { assert TransportActions.isShardNotAvailableException(e) == false : new AssertionError(e); // we handle the failure in the failure listener below diff --git a/server/src/main/java/org/elasticsearch/search/fetch/FetchSearchResult.java b/server/src/main/java/org/elasticsearch/search/fetch/FetchSearchResult.java index 694526dc5b14c..656a0df270316 100644 --- a/server/src/main/java/org/elasticsearch/search/fetch/FetchSearchResult.java +++ b/server/src/main/java/org/elasticsearch/search/fetch/FetchSearchResult.java @@ -83,6 +83,12 @@ public SearchHits hits() { return hits; } + public void consumeHits(StreamOutput out) throws IOException { + var hits = this.hits; + this.hits = null; + hits.writeAndRelease(out); + } + public FetchSearchResult initCounter() { counter = 0; return this; diff --git a/server/src/main/java/org/elasticsearch/transport/TaskTransportChannel.java b/server/src/main/java/org/elasticsearch/transport/TaskTransportChannel.java index 809fe1c6ead5c..c93a53a4eff0f 100644 --- a/server/src/main/java/org/elasticsearch/transport/TaskTransportChannel.java +++ b/server/src/main/java/org/elasticsearch/transport/TaskTransportChannel.java @@ -47,6 +47,11 @@ public void sendResponse(Exception exception) { } } + @Override + public Compression.Scheme compressionScheme() { + return channel.compressionScheme(); + } + @Override public TransportVersion getVersion() { return channel.getVersion(); diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransportChannel.java b/server/src/main/java/org/elasticsearch/transport/TcpTransportChannel.java index 3776e98368408..0eaa764cc28eb 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransportChannel.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransportChannel.java @@ -78,6 +78,11 @@ public void sendResponse(Exception exception) { } } + @Override + public Compression.Scheme compressionScheme() { + return compressionScheme; + } + @Override public TransportVersion getVersion() { return version; diff --git a/server/src/main/java/org/elasticsearch/transport/TransportChannel.java b/server/src/main/java/org/elasticsearch/transport/TransportChannel.java index 0902a869e195a..926d60d7d9137 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportChannel.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportChannel.java @@ -22,6 +22,14 @@ public interface TransportChannel { void sendResponse(Exception exception); + /** + * Returns a suggestion about the desired compression scheme to use for sending the response when using {@link BytesTransportResponse} + * to bypass transport layer serialization and compression. + * + * @return the suggested compression scheme to use for responses or {@code null} when not using compression + */ + Compression.Scheme compressionScheme(); + /** * Returns the version of the data to communicate in this channel. */ diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index a51184cb08714..47911e7125891 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -1630,6 +1630,11 @@ public String toString() { } } + @Override + public Compression.Scheme compressionScheme() { + return null; + } + protected RemoteTransportException wrapInRemote(Exception e) { return e instanceof RemoteTransportException remoteTransportException ? remoteTransportException diff --git a/server/src/test/java/org/elasticsearch/search/SearchServiceSingleNodeTests.java b/server/src/test/java/org/elasticsearch/search/SearchServiceSingleNodeTests.java index 66b9e69aea906..b928788eb6db6 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchServiceSingleNodeTests.java +++ b/server/src/test/java/org/elasticsearch/search/SearchServiceSingleNodeTests.java @@ -129,6 +129,7 @@ import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.test.hamcrest.ElasticsearchAssertions; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.json.JsonXContent; import org.junit.Before; @@ -412,8 +413,8 @@ public void testSearchWhileIndexDeleted() throws InterruptedException { intCursors, null/* not a scroll */ ); - PlainActionFuture listener = new PlainActionFuture<>(); - service.executeFetchPhase(req, new SearchShardTask(123L, "", "", "", null, emptyMap()), listener); + PlainActionFuture listener = new PlainActionFuture<>(); + service.executeFetchPhase(req, new SearchShardTask(123L, "", "", "", null, emptyMap()), null, listener); listener.get(); if (useScroll) { // have to free context since this test does not remove the index from IndicesService. @@ -601,9 +602,10 @@ public RankShardResult buildRankFeatureShardResult(SearchHits hits, int shardId) // execute fetch phase and perform any validations once we retrieve the response // the difference in how we do assertions here is needed because once the transport service sends back the response // it decrements the reference to the FetchSearchResult (through the ActionListener#respondAndRelease) and sets hits to null - PlainActionFuture fetchListener = new PlainActionFuture<>() { + PlainActionFuture fetchListener = new PlainActionFuture<>() { @Override - public void onResponse(FetchSearchResult fetchSearchResult) { + public void onResponse(TransportResponse response) { + FetchSearchResult fetchSearchResult = (FetchSearchResult) response; assertNotNull(fetchSearchResult); assertNotNull(fetchSearchResult.hits()); @@ -624,7 +626,7 @@ public void onFailure(Exception e) { throw new AssertionError("No failure should have been raised", e); } }; - service.executeFetchPhase(fetchRequest, searchTask, fetchListener); + service.executeFetchPhase(fetchRequest, searchTask, null, fetchListener); fetchListener.get(); } catch (Exception ex) { if (queryResult != null) { diff --git a/server/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java b/server/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java index f64b012165478..24388f9466c18 100644 --- a/server/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java @@ -529,6 +529,11 @@ public void sendResponse(Exception exception) { in.sendResponse(exception); } + @Override + public Compression.Scheme compressionScheme() { + return in.compressionScheme(); + } + @Override public TransportVersion getVersion() { return in.getVersion(); diff --git a/server/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java b/server/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java index bd2d1c44e1e17..ef23fb0fef02d 100644 --- a/server/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java @@ -468,6 +468,11 @@ public void sendResponse(Exception exception) { channel.sendResponse(exception); } + + @Override + public Compression.Scheme compressionScheme() { + return channel.compressionScheme(); + } }, task); } else { return actualHandler; diff --git a/test/framework/src/main/java/org/elasticsearch/transport/DisruptableMockTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/DisruptableMockTransport.java index a7dac8f727b0c..336eddcb92878 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/DisruptableMockTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/DisruptableMockTransport.java @@ -324,6 +324,11 @@ public String toString() { } }); } + + @Override + public Compression.Scheme compressionScheme() { + return null; + } }; final TransportRequest copiedRequest; diff --git a/test/framework/src/main/java/org/elasticsearch/transport/TestTransportChannel.java b/test/framework/src/main/java/org/elasticsearch/transport/TestTransportChannel.java index ea73bfa4b1a90..60020b2414b51 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/TestTransportChannel.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/TestTransportChannel.java @@ -33,4 +33,9 @@ public void sendResponse(TransportResponse response) { public void sendResponse(Exception exception) { listener.onFailure(exception); } + + @Override + public Compression.Scheme compressionScheme() { + return null; + } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java index 9da7ae8d4bbd5..2ad129cd30489 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java @@ -47,6 +47,7 @@ import org.elasticsearch.threadpool.FixedExecutorBuilder; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.transport.AbstractSimpleTransportTestCase; +import org.elasticsearch.transport.Compression; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportRequest; @@ -721,6 +722,11 @@ public void sendResponse(TransportResponse response) { public void sendResponse(Exception exception) { in.sendResponse(exception); } + + @Override + public Compression.Scheme compressionScheme() { + return in.compressionScheme(); + } } private final List breakers = Collections.synchronizedList(new ArrayList<>()); diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java index 91f1cc12851dc..8cbdf7f285985 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java @@ -16,6 +16,7 @@ import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.transport.Compression; import org.elasticsearch.transport.NoSuchRemoteClusterException; import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportResponse; @@ -467,6 +468,11 @@ public void sendResponse(TransportResponse response) { public void sendResponse(Exception exception) { channel.sendResponse(exception); } + + @Override + public Compression.Scheme compressionScheme() { + return channel.compressionScheme(); + } }, task) ); } diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java index c6d45486549e2..ec9e7c876af39 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java @@ -19,6 +19,7 @@ import org.elasticsearch.compute.operator.exchange.ExchangeService; import org.elasticsearch.test.FailingFieldPlugin; import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.transport.Compression; import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; @@ -212,6 +213,11 @@ public void sendResponse(TransportResponse response) { public void sendResponse(Exception exception) { channel.sendResponse(exception); } + + @Override + public Compression.Scheme compressionScheme() { + return channel.compressionScheme(); + } }, task) ); } diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/ManyShardsIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/ManyShardsIT.java index f17eb18e0c371..428af68d0b0c1 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/ManyShardsIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/ManyShardsIT.java @@ -26,6 +26,7 @@ import org.elasticsearch.search.MockSearchService; import org.elasticsearch.search.SearchService; import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.transport.Compression; import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportResponse; @@ -182,6 +183,11 @@ public void sendResponse(TransportResponse response) { public void sendResponse(Exception exception) { channel.sendResponse(exception); } + + @Override + public Compression.Scheme compressionScheme() { + return channel.compressionScheme(); + } }, task); } }); diff --git a/x-pack/plugin/snapshot-based-recoveries/src/internalClusterTest/java/org/elasticsearch/xpack/snapshotbasedrecoveries/recovery/SnapshotBasedIndexRecoveryIT.java b/x-pack/plugin/snapshot-based-recoveries/src/internalClusterTest/java/org/elasticsearch/xpack/snapshotbasedrecoveries/recovery/SnapshotBasedIndexRecoveryIT.java index 7c596bff281e7..9385c095afb62 100644 --- a/x-pack/plugin/snapshot-based-recoveries/src/internalClusterTest/java/org/elasticsearch/xpack/snapshotbasedrecoveries/recovery/SnapshotBasedIndexRecoveryIT.java +++ b/x-pack/plugin/snapshot-based-recoveries/src/internalClusterTest/java/org/elasticsearch/xpack/snapshotbasedrecoveries/recovery/SnapshotBasedIndexRecoveryIT.java @@ -70,6 +70,7 @@ import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.Compression; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportResponse; @@ -694,6 +695,11 @@ public void sendResponse(Exception exception) { } } + @Override + public Compression.Scheme compressionScheme() { + return channel.compressionScheme(); + } + private boolean assertShardClosedException(Exception e) { if (e instanceof CancellableThreads.ExecutionCancelledException cancelledException) { assert cancelledException.getMessage().contains("shard is closed") : e; diff --git a/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/integrity/RepositoryVerifyIntegrityIT.java b/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/integrity/RepositoryVerifyIntegrityIT.java index 298dcd9ed94c5..3e959c0f0816f 100644 --- a/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/integrity/RepositoryVerifyIntegrityIT.java +++ b/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/integrity/RepositoryVerifyIntegrityIT.java @@ -45,6 +45,7 @@ import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.test.transport.StubbableTransport; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.Compression; import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestHandler; @@ -421,6 +422,11 @@ public void sendResponse(Exception exception) { channel.sendResponse(exception); } } + + @Override + public Compression.Scheme compressionScheme() { + return channel.compressionScheme(); + } }, task); } }