Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/121885.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 121885
summary: Introduce batched query execution and data-node side reduce
area: Search
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.Request;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.ErrorTraceHelper;
Expand All @@ -24,6 +25,7 @@
import org.elasticsearch.test.MockLog;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.xcontent.XContentType;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;

Expand All @@ -50,6 +52,13 @@ public static void setDebugLogLevel() {
@Before
public void setupMessageListener() {
hasStackTrace = ErrorTraceHelper.setupErrorTraceListener(internalCluster());
// TODO: make this test work with batched query execution by enhancing ErrorTraceHelper.setupErrorTraceListener
updateClusterSettings(Settings.builder().put(SearchService.BATCHED_QUERY_PHASE.getKey(), false));
}

@After
public void resetSettings() {
updateClusterSettings(Settings.builder().putNull(SearchService.BATCHED_QUERY_PHASE.getKey()));
}

private void setupIndexWithDocs() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
setup:
- skip:
awaits_fix: "TODO fix this test, the response with batched execution is not deterministic enough for the available matchers"

- do:
indices.create:
index: test_1
Expand Down Expand Up @@ -48,7 +51,6 @@ setup:
batched_reduce_size: 2
body: { "size" : 0, "aggs" : { "str_terms" : { "terms" : { "field" : "str" } } } }

- match: { num_reduce_phases: 4 }
- match: { hits.total: 3 }
- length: { aggregations.str_terms.buckets: 2 }
- match: { aggregations.str_terms.buckets.0.key: "abc" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -574,11 +574,8 @@ public void testSearchQueryThenFetch() throws Exception {
);

clearInterceptedActions();
assertIndicesSubset(
Arrays.asList(searchRequest.indices()),
SearchTransportService.QUERY_ACTION_NAME,
SearchTransportService.FETCH_ID_ACTION_NAME
);
assertIndicesSubset(Arrays.asList(searchRequest.indices()), true, SearchTransportService.QUERY_ACTION_NAME);
assertIndicesSubset(Arrays.asList(searchRequest.indices()), SearchTransportService.FETCH_ID_ACTION_NAME);
}

public void testSearchDfsQueryThenFetch() throws Exception {
Expand Down Expand Up @@ -631,10 +628,6 @@ private static void assertIndicesSubset(List<String> indices, String... actions)
assertIndicesSubset(indices, false, actions);
}

private static void assertIndicesSubsetOptionalRequests(List<String> indices, String... actions) {
assertIndicesSubset(indices, true, actions);
}

private static void assertIndicesSubset(List<String> indices, boolean optional, String... actions) {
// indices returned by each bulk shard request need to be a subset of the original indices
for (String action : actions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.tasks.RemovedTaskListener;
import org.elasticsearch.tasks.Task;
Expand Down Expand Up @@ -352,6 +353,8 @@ public void testTransportBulkTasks() {
}

public void testSearchTaskDescriptions() {
// TODO: enhance this test to also check the tasks created by batched query execution
updateClusterSettings(Settings.builder().put(SearchService.BATCHED_QUERY_PHASE.getKey(), false));
registerTaskManagerListeners(TransportSearchAction.TYPE.name()); // main task
registerTaskManagerListeners(TransportSearchAction.TYPE.name() + "[*]"); // shard task
createIndex("test");
Expand Down Expand Up @@ -398,7 +401,7 @@ public void testSearchTaskDescriptions() {
// assert that all task descriptions have non-zero length
assertThat(taskInfo.description().length(), greaterThan(0));
}

updateClusterSettings(Settings.builder().putNull(SearchService.BATCHED_QUERY_PHASE.getKey()));
}

public void testSearchTaskHeaderLimit() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.aggregations.AbstractAggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationExecutionContext;
Expand Down Expand Up @@ -445,6 +446,7 @@ public void testSearchIdle() throws Exception {
}

public void testCircuitBreakerReduceFail() throws Exception {
updateClusterSettings(Settings.builder().put(SearchService.BATCHED_QUERY_PHASE.getKey(), false));
int numShards = randomIntBetween(1, 10);
indexSomeDocs("test", numShards, numShards * 3);

Expand Down Expand Up @@ -518,7 +520,9 @@ public void onFailure(Exception exc) {
}
assertBusy(() -> assertThat(requestBreakerUsed(), equalTo(0L)));
} finally {
updateClusterSettings(Settings.builder().putNull("indices.breaker.request.limit"));
updateClusterSettings(
Settings.builder().putNull("indices.breaker.request.limit").putNull(SearchService.BATCHED_QUERY_PHASE.getKey())
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.action.search.TransportSearchScrollAction;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
Expand Down Expand Up @@ -239,6 +240,8 @@ public void testCancelMultiSearch() throws Exception {
}

public void testCancelFailedSearchWhenPartialResultDisallowed() throws Exception {
// TODO: make this test compatible with batched execution, currently the exceptions are slightly different with batched
updateClusterSettings(Settings.builder().put(SearchService.BATCHED_QUERY_PHASE.getKey(), false));
// Have at least two nodes so that we have parallel execution of two request guaranteed even if max concurrent requests per node
// are limited to 1
internalCluster().ensureAtLeastNumDataNodes(2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.aggregations.Aggregator.SubAggCollectionMode;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregatorFactory.ExecutionMode;
import org.elasticsearch.test.ESIntegTestCase;
import org.junit.After;
import org.junit.Before;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -50,6 +53,18 @@ public static String randomExecutionHint() {

private static int numRoutingValues;

@Before
public void disableBatchedExecution() {
// TODO: it's practically impossible to get a 100% deterministic test with batched execution unfortunately, adjust this test to
// still do something useful with batched execution (i.e. use somewhat relaxed assertions)
updateClusterSettings(Settings.builder().put(SearchService.BATCHED_QUERY_PHASE.getKey(), false));
}

@After
public void resetSettings() {
updateClusterSettings(Settings.builder().putNull(SearchService.BATCHED_QUERY_PHASE.getKey()));
}

@Override
public void setupSuiteScopeCluster() throws Exception {
assertAcked(indicesAdmin().prepareCreate("idx").setMapping(STRING_FIELD_NAME, "type=keyword").get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ static TransportVersion def(int id) {
public static final TransportVersion RERANK_COMMON_OPTIONS_ADDED_8_19 = def(8_841_0_15);
public static final TransportVersion REMOTE_EXCEPTION_8_19 = def(8_841_0_16);
public static final TransportVersion AMAZON_BEDROCK_TASK_SETTINGS_8_19 = def(8_841_0_17);
public static final TransportVersion BATCHED_QUERY_PHASE_VERSION_BACKPORT_8_X = def(8_841_0_18);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,33 +65,33 @@
* distributed frequencies
*/
abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> extends SearchPhase {
private static final float DEFAULT_INDEX_BOOST = 1.0f;
protected static final float DEFAULT_INDEX_BOOST = 1.0f;
private final Logger logger;
private final NamedWriteableRegistry namedWriteableRegistry;
private final SearchTransportService searchTransportService;
protected final SearchTransportService searchTransportService;
private final Executor executor;
private final ActionListener<SearchResponse> listener;
private final SearchRequest request;
protected final SearchRequest request;

/**
* Used by subclasses to resolve node ids to DiscoveryNodes.
**/
private final BiFunction<String, String, Transport.Connection> nodeIdToConnection;
private final SearchTask task;
protected final SearchTask task;
protected final SearchPhaseResults<Result> results;
private final long clusterStateVersion;
private final TransportVersion minTransportVersion;
private final Map<String, AliasFilter> aliasFilter;
private final Map<String, Float> concreteIndexBoosts;
protected final Map<String, AliasFilter> aliasFilter;
protected final Map<String, Float> concreteIndexBoosts;
private final SetOnce<AtomicArray<ShardSearchFailure>> shardFailures = new SetOnce<>();
private final Object shardFailuresMutex = new Object();
private final AtomicBoolean hasShardResponse = new AtomicBoolean(false);
private final AtomicInteger successfulOps;
private final SearchTimeProvider timeProvider;
protected final SearchTimeProvider timeProvider;
private final SearchResponse.Clusters clusters;

protected final List<SearchShardIterator> shardsIts;
private final SearchShardIterator[] shardIterators;
protected final SearchShardIterator[] shardIterators;
private final AtomicInteger outstandingShards;
private final int maxConcurrentRequestsPerNode;
private final Map<String, PendingExecutions> pendingExecutionsPerNode = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -231,10 +231,17 @@ protected final void run() {
onPhaseDone();
return;
}
if (shardsIts.isEmpty()) {
return;
}
final Map<SearchShardIterator, Integer> shardIndexMap = Maps.newHashMapWithExpectedSize(shardIterators.length);
for (int i = 0; i < shardIterators.length; i++) {
shardIndexMap.put(shardIterators[i], i);
}
doRun(shardIndexMap);
}

protected void doRun(Map<SearchShardIterator, Integer> shardIndexMap) {
doCheckNoMissingShards(getName(), request, shardsIts);
Version version = request.minCompatibleShardNode();
if (version != null && Version.CURRENT.minimumCompatibilityVersion().equals(version) == false) {
Expand Down Expand Up @@ -275,7 +282,7 @@ private boolean checkMinimumVersion(List<SearchShardIterator> shardsIts) {
return true;
}

private void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final SearchShardTarget shard) {
protected final void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final SearchShardTarget shard) {
if (throttleConcurrentRequests) {
var pendingExecutions = pendingExecutionsPerNode.computeIfAbsent(
shard.getNodeId(),
Expand Down Expand Up @@ -315,7 +322,7 @@ public void onFailure(Exception e) {
executePhaseOnShard(shardIt, connection, shardListener);
}

private void failOnUnavailable(int shardIndex, SearchShardIterator shardIt) {
protected final void failOnUnavailable(int shardIndex, SearchShardIterator shardIt) {
SearchShardTarget unassignedShard = new SearchShardTarget(null, shardIt.shardId(), shardIt.getClusterAlias());
onShardFailure(shardIndex, unassignedShard, shardIt, new NoShardAvailableActionException(shardIt.shardId()));
}
Expand Down Expand Up @@ -422,7 +429,7 @@ private ShardSearchFailure[] buildShardFailures() {
return failures;
}

private void onShardFailure(final int shardIndex, SearchShardTarget shard, final SearchShardIterator shardIt, Exception e) {
protected final void onShardFailure(final int shardIndex, SearchShardTarget shard, final SearchShardIterator shardIt, Exception e) {
// we always add the shard failure for a specific shard instance
// we do make sure to clean it on a successful response from a shard
onShardFailure(shardIndex, shard, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ public void onFailure(Exception e) {
}
}

private record SendingTarget(@Nullable String clusterAlias, @Nullable String nodeId) {}
public record SendingTarget(@Nullable String clusterAlias, @Nullable String nodeId) {}

private CanMatchNodeRequest createCanMatchRequest(Map.Entry<SendingTarget, List<SearchShardIterator>> entry) {
final SearchShardIterator first = entry.getValue().get(0);
Expand Down
Loading