Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteUtils;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.stats.CommonStats;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.support.IndicesOptions;
Expand Down Expand Up @@ -126,6 +127,42 @@ public void testBasic() {
}
}

public void testIndexWithFilteredAlias() {
String indexName = "index_1";
String alias = "alias_1";
assertAcked(
indicesAdmin().prepareCreate(indexName)
.setSettings(indexSettings(10, 0))
.addAlias(new Alias(alias).filter("{\"term\":{\"tag\":\"a\"}}"))
);

int numDocs = randomIntBetween(50, 150);
int countTagA = 0;
for (int i = 0; i < numDocs; i++) {
boolean isA = randomBoolean();
if (isA) countTagA++;
prepareIndex(indexName).setId(Integer.toString(i)).setSource("tag", isA ? "a" : "b").get();
}

refresh(indexName);
BytesReference pitId = openPointInTime(new String[] { alias }, TimeValue.timeValueMinutes(1)).getPointInTimeId();

try {
int finalCountTagA = countTagA;
assertResponse(
prepareSearch().setPointInTime(new PointInTimeBuilder(pitId).setKeepAlive(TimeValue.timeValueMinutes(1)))
.setSize(0)
.setQuery(new MatchAllQueryBuilder()),
resp1 -> {
assertThat(resp1.pointInTimeId(), equalTo(pitId));
assertHitCount(resp1, finalCountTagA);
}
);
} finally {
closePointInTime(pitId);
}
}

public void testMultipleIndices() {
int numIndices = randomIntBetween(1, 5);
for (int i = 1; i <= numIndices; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,10 +394,14 @@ private static ShardSearchRequest tryRewriteWithUpdatedSortValue(
return request;
}

private static boolean isPartOfPIT(SearchRequest request, ShardSearchContextId contextId) {
private static boolean isPartOfPIT(
SearchRequest request,
ShardSearchContextId contextId,
NamedWriteableRegistry namedWriteableRegistry
) {
final PointInTimeBuilder pointInTimeBuilder = request.pointInTimeBuilder();
if (pointInTimeBuilder != null) {
return request.pointInTimeBuilder().getSearchContextId(null).contains(contextId);
return request.pointInTimeBuilder().getSearchContextId(namedWriteableRegistry).contains(contextId);
} else {
return false;
}
Expand Down Expand Up @@ -551,7 +555,8 @@ private void onNodeQueryFailure(Exception e, NodeQueryRequest request, CanMatchP
static void registerNodeSearchAction(
SearchTransportService searchTransportService,
SearchService searchService,
SearchPhaseController searchPhaseController
SearchPhaseController searchPhaseController,
NamedWriteableRegistry namedWriteableRegistry
) {
var transportService = searchTransportService.transportService();
var threadPool = transportService.getThreadPool();
Expand Down Expand Up @@ -581,7 +586,8 @@ static void registerNodeSearchAction(
request,
cancellableTask,
channel,
dependencies
dependencies,
namedWriteableRegistry
);
// TODO: log activating or otherwise limiting parallelism might be helpful here
for (int i = 0; i < workers; i++) {
Expand All @@ -592,12 +598,17 @@ static void registerNodeSearchAction(
TransportActionProxy.registerProxyAction(transportService, NODE_SEARCH_ACTION_NAME, true, NodeQueryResponse::new);
}

private static void releaseLocalContext(SearchService searchService, NodeQueryRequest request, SearchPhaseResult result) {
private static void releaseLocalContext(
SearchService searchService,
NodeQueryRequest request,
SearchPhaseResult result,
NamedWriteableRegistry namedWriteableRegistry
) {
var phaseResult = result.queryResult() != null ? result.queryResult() : result.rankFeatureResult();
if (phaseResult != null
&& phaseResult.hasSearchContext()
&& request.searchRequest.scroll() == null
&& isPartOfPIT(request.searchRequest, phaseResult.getContextId()) == false) {
&& isPartOfPIT(request.searchRequest, phaseResult.getContextId(), namedWriteableRegistry) == false) {
searchService.freeReaderContext(phaseResult.getContextId());
}
}
Expand Down Expand Up @@ -741,13 +752,15 @@ private static final class QueryPerNodeState {
private final CountDown countDown;
private final TransportChannel channel;
private volatile BottomSortValuesCollector bottomSortCollector;
private final NamedWriteableRegistry namedWriteableRegistry;

private QueryPerNodeState(
QueryPhaseResultConsumer queryPhaseResultConsumer,
NodeQueryRequest searchRequest,
CancellableTask task,
TransportChannel channel,
Dependencies dependencies
Dependencies dependencies,
NamedWriteableRegistry namedWriteableRegistry
) {
this.queryPhaseResultConsumer = queryPhaseResultConsumer;
this.searchRequest = searchRequest;
Expand All @@ -757,6 +770,7 @@ private QueryPerNodeState(
this.countDown = new CountDown(queryPhaseResultConsumer.getNumShards());
this.channel = channel;
this.dependencies = dependencies;
this.namedWriteableRegistry = namedWriteableRegistry;
}

void onShardDone() {
Expand All @@ -769,7 +783,7 @@ void onShardDone() {
try (queryPhaseResultConsumer) {
var failure = queryPhaseResultConsumer.failure.get();
if (failure != null) {
handleMergeFailure(failure, channelListener);
handleMergeFailure(failure, channelListener, namedWriteableRegistry);
return;
}
final QueryPhaseResultConsumer.MergeResult mergeResult;
Expand All @@ -779,7 +793,7 @@ void onShardDone() {
EMPTY_PARTIAL_MERGE_RESULT
);
} catch (Exception e) {
handleMergeFailure(e, channelListener);
handleMergeFailure(e, channelListener, namedWriteableRegistry);
return;
}
// translate shard indices to those on the coordinator so that it can interpret the merge result without adjustments,
Expand All @@ -804,14 +818,14 @@ void onShardDone() {
NodeQueryResponse.writePerShardException(out, failures.remove(i));
} else {
// free context id and remove it from the result right away in case we don't need it anymore
maybeFreeContext(result, relevantShardIndices);
maybeFreeContext(result, relevantShardIndices, namedWriteableRegistry);
NodeQueryResponse.writePerShardResult(out, result);
}
}
NodeQueryResponse.writeMergeResult(out, mergeResult, queryPhaseResultConsumer.topDocsStats);
success = true;
} catch (IOException e) {
handleMergeFailure(e, channelListener);
handleMergeFailure(e, channelListener, namedWriteableRegistry);
return;
}
} finally {
Expand All @@ -822,23 +836,38 @@ void onShardDone() {
ActionListener.respondAndRelease(channelListener, new BytesTransportResponse(out.moveToBytesReference()));
}

private void maybeFreeContext(SearchPhaseResult result, BitSet relevantShardIndices) {
private void maybeFreeContext(
SearchPhaseResult result,
BitSet relevantShardIndices,
NamedWriteableRegistry namedWriteableRegistry
) {
if (result instanceof QuerySearchResult q
&& q.getContextId() != null
&& relevantShardIndices.get(q.getShardIndex()) == false
&& q.hasSuggestHits() == false
&& q.getRankShardResult() == null
&& searchRequest.searchRequest.scroll() == null
&& isPartOfPIT(searchRequest.searchRequest, q.getContextId()) == false) {
&& isPartOfPIT(searchRequest.searchRequest, q.getContextId(), namedWriteableRegistry) == false) {
if (dependencies.searchService.freeReaderContext(q.getContextId())) {
q.clearContextId();
}
}
}

private void handleMergeFailure(Exception e, ChannelActionListener<TransportResponse> channelListener) {
private void handleMergeFailure(
Exception e,
ChannelActionListener<TransportResponse> channelListener,
NamedWriteableRegistry namedWriteableRegistry
) {
queryPhaseResultConsumer.getSuccessfulResults()
.forEach(searchPhaseResult -> releaseLocalContext(dependencies.searchService, searchRequest, searchPhaseResult));
.forEach(
searchPhaseResult -> releaseLocalContext(
dependencies.searchService,
searchRequest,
searchPhaseResult,
namedWriteableRegistry
)
);
channelListener.onFailure(e);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,12 @@ public TransportSearchAction(
this.searchTransportService = searchTransportService;
this.remoteClusterService = searchTransportService.getRemoteClusterService();
SearchTransportService.registerRequestHandler(transportService, searchService);
SearchQueryThenFetchAsyncAction.registerNodeSearchAction(searchTransportService, searchService, searchPhaseController);
SearchQueryThenFetchAsyncAction.registerNodeSearchAction(
searchTransportService,
searchService,
searchPhaseController,
namedWriteableRegistry
);
this.clusterService = clusterService;
this.transportService = transportService;
this.searchService = searchService;
Expand Down