Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.dfs.AggregatedDfs;
import org.elasticsearch.search.internal.AliasFilter;
Expand All @@ -40,6 +41,7 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<SearchPh
private final int trackTotalHitsUpTo;
private volatile BottomSortValuesCollector bottomSortCollector;
private final Client client;
private final SearchService searchService;

SearchQueryThenFetchAsyncAction(
Logger logger,
Expand All @@ -57,7 +59,8 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<SearchPh
ClusterState clusterState,
SearchTask task,
SearchResponse.Clusters clusters,
Client client
Client client,
SearchService searchService
) {
super(
"query",
Expand All @@ -82,7 +85,7 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<SearchPh
this.trackTotalHitsUpTo = request.resolveTrackTotalHitsUpTo();
this.progressListener = task.getProgressListener();
this.client = client;

this.searchService = searchService;
// don't build the SearchShard list (can be expensive) if the SearchProgressListener won't use it
if (progressListener != SearchProgressListener.NOOP) {
notifyListShards(progressListener, clusters, request, shardsIts);
Expand All @@ -95,7 +98,13 @@ protected void executePhaseOnShard(
final SearchActionListener<SearchPhaseResult> listener
) {
ShardSearchRequest request = rewriteShardSearchRequest(super.buildShardSearchRequest(shardIt, listener.requestIndex));
getSearchTransport().sendExecuteQuery(connection, request, getTask(), listener);
var searchTransport = getSearchTransport();
var task = getTask();
if (searchTransport.transportService().getLocalNodeConnection() == connection) {
searchService.executeQueryPhase(request, task, listener);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

out of curiosity: if we do this, why do it only for the query phase? Also, couldn't this conditional be added to the sendExecuteQuery method instead? What kind of overhead does this save? I can imagine that this is a pretty common pattern, or is search the edge case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm good question. I guess I only noticed this in benchmarks for search and for the query phase specifically. For fetch it's not super visible since you don't hit so many shards mostly and for bulk indexing you still have per-shard bulks so the cost isn't in that.
I first noticed this with batched execution where the overhead becomes super visible but it's equally visible without it for large data nodes that do coordination work already (or if queries are heavy, like a large terms query or some geo stuff or so).
The overhead saved is 1. all the lookups in the transport layer, lots of listener wrapping, child-task registration and most importantly security.
But :) that's why I need a review from security here I think. Functionally I think security still works the same way if not more efficiently. All tests pass because we auth the top level search request. DLS/FLS are applied as well but somehow those cache assertions needed adjustment and seemingly we do use the cache more now and I can't explain why.
the security overhead is considerable here, it's well in excess of the can_match cost for most rally runs it seems :O

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's this (couldn't zoom out further :P)
out2

vs this
out

and on a transport thread.

return;
}
searchTransport.sendExecuteQuery(connection, request, task, listener);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ public SearchTransportService(
this.responseWrapper = responseWrapper;
}

public TransportService transportService() {
return transportService;
}

public void sendFreeContext(
Transport.Connection connection,
ShardSearchContextId contextId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1572,7 +1572,8 @@ public void runNewSearchPhase(
clusterState,
task,
clusters,
client
client,
searchService
);
}
success = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,13 @@ private void loadOrExecuteQueryPhase(final ShardSearchRequest request, final Sea
public void executeQueryPhase(ShardSearchRequest request, CancellableTask task, ActionListener<SearchPhaseResult> listener) {
assert request.canReturnNullResponseIfMatchNoDocs() == false || request.numberOfShards() > 1
: "empty responses require more than one shard";
final IndexShard shard = getShard(request);
final IndexShard shard;
try {
shard = getShard(request);
} catch (RuntimeException e) {
listener.onFailure(e);
return;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this additional catch fixing? Is it a bug that you observed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Jup concurrent shard deletion can cause a shard not found exception :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should that be a separate change then? It is not a problem introduced by your change, is it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is introduced here, without this change the transport layer catches the exception and passes it on to the listener.

}
rewriteAndFetchShardRequest(
shard,
request,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.InternalAggregationTestCase;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportService;

import java.util.Collections;
import java.util.List;
Expand All @@ -51,6 +52,8 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class SearchQueryThenFetchAsyncActionTests extends ESTestCase {
public void testBottomFieldSort() throws Exception {
Expand Down Expand Up @@ -83,7 +86,11 @@ private void testCase(boolean withScroll, boolean withCollapse) throws Exception
AtomicInteger numWithTopDocs = new AtomicInteger();
AtomicInteger successfulOps = new AtomicInteger();
AtomicBoolean canReturnNullResponse = new AtomicBoolean(false);
SearchTransportService searchTransportService = new SearchTransportService(null, null, null) {
TransportService transportService = mock(TransportService.class);
when(transportService.getLocalNodeConnection()).thenReturn(
new SearchAsyncActionTests.MockConnection(DiscoveryNodeUtils.create("local"))
);
SearchTransportService searchTransportService = new SearchTransportService(transportService, null, null) {
@Override
public void sendExecuteQuery(
Transport.Connection connection,
Expand Down Expand Up @@ -201,6 +208,7 @@ public void sendExecuteQuery(
new ClusterState.Builder(new ClusterName("test")).build(),
task,
SearchResponse.Clusters.EMPTY,
null,
null
) {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,7 @@ public void testRequestCacheDisabledForDlsTemplateRoleWithPainless() throws IOEx
.addHeader("Authorization", UsernamePasswordToken.basicAuthHeaderValue(DLS_USER, DLS_USER_PASSWORD))
);
assertSearchResponse(client.performRequest(searchRequest), Set.of("1"));
// Cache should not be used since DLS query uses stored script
assertCacheState(0, 0);
assertCacheState(0, 1);
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ public void testRequestCacheWithTemplateRoleQuery() {
// Since the DLS for the alias uses a stored script, this should cause the request cached to be disabled
assertSearchResponse(client1.prepareSearch(DLS_TEMPLATE_ROLE_QUERY_ALIAS).setRequestCache(true), Set.of("1"), Set.of("username"));
// No cache should be used
assertCacheState(DLS_TEMPLATE_ROLE_QUERY_INDEX, 2, 2);
assertCacheState(DLS_TEMPLATE_ROLE_QUERY_INDEX, 3, 2);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you expand on these assertions needing to be adapted?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not quite to be honest. It seems going through the transport layer updates the thread-local context variable somehow and then authz works differently. But I think the security folks need to look at this.
I used the same approach in the batched exec logic (easy to switch to go through the transport layer if need be fortunately). I think this getting a review from security is a soft-dependency to using this path for batched, I'm just not sure how strong tests are end-to-end :)

}

private void prepareIndices() {
Expand Down