diff --git a/docs/changelog/128552.yaml b/docs/changelog/128552.yaml new file mode 100644 index 0000000000000..20aacb01abbbd --- /dev/null +++ b/docs/changelog/128552.yaml @@ -0,0 +1,5 @@ +pr: 128552 +summary: Fix - NPE on batched query execution when the request is part of PIT with alias filters +area: Search +type: bug +issues: [] diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/search/PointInTimeIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/search/PointInTimeIT.java index 66323e687eefb..a6069bf1545eb 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/search/PointInTimeIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/search/PointInTimeIT.java @@ -13,6 +13,7 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteUtils; +import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.admin.indices.stats.CommonStats; import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.support.IndicesOptions; @@ -126,6 +127,42 @@ public void testBasic() { } } + public void testIndexWithFilteredAlias() { + String indexName = "index_1"; + String alias = "alias_1"; + assertAcked( + indicesAdmin().prepareCreate(indexName) + .setSettings(indexSettings(10, 0)) + .addAlias(new Alias(alias).filter("{\"term\":{\"tag\":\"a\"}}")) + ); + + int numDocs = randomIntBetween(50, 150); + int countTagA = 0; + for (int i = 0; i < numDocs; i++) { + boolean isA = randomBoolean(); + if (isA) countTagA++; + prepareIndex(indexName).setId(Integer.toString(i)).setSource("tag", isA ? "a" : "b").get(); + } + + refresh(indexName); + BytesReference pitId = openPointInTime(new String[] { alias }, TimeValue.timeValueMinutes(1)).getPointInTimeId(); + + try { + int finalCountTagA = countTagA; + assertResponse( + prepareSearch().setPointInTime(new PointInTimeBuilder(pitId).setKeepAlive(TimeValue.timeValueMinutes(1))) + .setSize(0) + .setQuery(new MatchAllQueryBuilder()), + resp1 -> { + assertThat(resp1.pointInTimeId(), equalTo(pitId)); + assertHitCount(resp1, finalCountTagA); + } + ); + } finally { + closePointInTime(pitId); + } + } + public void testMultipleIndices() { int numIndices = randomIntBetween(1, 5); for (int i = 1; i <= numIndices; i++) { diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index af15d932377b4..8b6e7a4a132da 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -391,10 +391,14 @@ private static ShardSearchRequest tryRewriteWithUpdatedSortValue( return request; } - private static boolean isPartOfPIT(SearchRequest request, ShardSearchContextId contextId) { + private static boolean isPartOfPIT( + SearchRequest request, + ShardSearchContextId contextId, + NamedWriteableRegistry namedWriteableRegistry + ) { final PointInTimeBuilder pointInTimeBuilder = request.pointInTimeBuilder(); if (pointInTimeBuilder != null) { - return request.pointInTimeBuilder().getSearchContextId(null).contains(contextId); + return request.pointInTimeBuilder().getSearchContextId(namedWriteableRegistry).contains(contextId); } else { return false; } @@ -546,7 +550,8 @@ private void onNodeQueryFailure(Exception e, NodeQueryRequest request, CanMatchP static void registerNodeSearchAction( SearchTransportService searchTransportService, SearchService searchService, - SearchPhaseController searchPhaseController + SearchPhaseController searchPhaseController, + NamedWriteableRegistry namedWriteableRegistry ) { var transportService = searchTransportService.transportService(); var threadPool = transportService.getThreadPool(); @@ -576,7 +581,8 @@ static void registerNodeSearchAction( request, cancellableTask, channel, - dependencies + dependencies, + namedWriteableRegistry ); // TODO: log activating or otherwise limiting parallelism might be helpful here for (int i = 0; i < workers; i++) { @@ -587,12 +593,17 @@ static void registerNodeSearchAction( TransportActionProxy.registerProxyAction(transportService, NODE_SEARCH_ACTION_NAME, true, NodeQueryResponse::new); } - private static void releaseLocalContext(SearchService searchService, NodeQueryRequest request, SearchPhaseResult result) { + private static void releaseLocalContext( + SearchService searchService, + NodeQueryRequest request, + SearchPhaseResult result, + NamedWriteableRegistry namedWriteableRegistry + ) { var phaseResult = result.queryResult() != null ? result.queryResult() : result.rankFeatureResult(); if (phaseResult != null && phaseResult.hasSearchContext() && request.searchRequest.scroll() == null - && isPartOfPIT(request.searchRequest, phaseResult.getContextId()) == false) { + && isPartOfPIT(request.searchRequest, phaseResult.getContextId(), namedWriteableRegistry) == false) { searchService.freeReaderContext(phaseResult.getContextId()); } } @@ -736,13 +747,15 @@ private static final class QueryPerNodeState { private final CountDown countDown; private final TransportChannel channel; private volatile BottomSortValuesCollector bottomSortCollector; + private final NamedWriteableRegistry namedWriteableRegistry; private QueryPerNodeState( QueryPhaseResultConsumer queryPhaseResultConsumer, NodeQueryRequest searchRequest, CancellableTask task, TransportChannel channel, - Dependencies dependencies + Dependencies dependencies, + NamedWriteableRegistry namedWriteableRegistry ) { this.queryPhaseResultConsumer = queryPhaseResultConsumer; this.searchRequest = searchRequest; @@ -752,6 +765,7 @@ private QueryPerNodeState( this.countDown = new CountDown(queryPhaseResultConsumer.getNumShards()); this.channel = channel; this.dependencies = dependencies; + this.namedWriteableRegistry = namedWriteableRegistry; } void onShardDone() { @@ -762,7 +776,7 @@ void onShardDone() { try (queryPhaseResultConsumer) { var failure = queryPhaseResultConsumer.failure.get(); if (failure != null) { - handleMergeFailure(failure, channelListener); + handleMergeFailure(failure, channelListener, namedWriteableRegistry); return; } final QueryPhaseResultConsumer.MergeResult mergeResult; @@ -772,7 +786,7 @@ void onShardDone() { EMPTY_PARTIAL_MERGE_RESULT ); } catch (Exception e) { - handleMergeFailure(e, channelListener); + handleMergeFailure(e, channelListener, namedWriteableRegistry); return; } // translate shard indices to those on the coordinator so that it can interpret the merge result without adjustments, @@ -799,7 +813,7 @@ void onShardDone() { && q.hasSuggestHits() == false && q.getRankShardResult() == null && searchRequest.searchRequest.scroll() == null - && isPartOfPIT(searchRequest.searchRequest, q.getContextId()) == false) { + && isPartOfPIT(searchRequest.searchRequest, q.getContextId(), namedWriteableRegistry) == false) { if (dependencies.searchService.freeReaderContext(q.getContextId())) { q.clearContextId(); } @@ -816,9 +830,20 @@ && isPartOfPIT(searchRequest.searchRequest, q.getContextId()) == false) { } } - private void handleMergeFailure(Exception e, ChannelActionListener channelListener) { + private void handleMergeFailure( + Exception e, + ChannelActionListener channelListener, + NamedWriteableRegistry namedWriteableRegistry + ) { queryPhaseResultConsumer.getSuccessfulResults() - .forEach(searchPhaseResult -> releaseLocalContext(dependencies.searchService, searchRequest, searchPhaseResult)); + .forEach( + searchPhaseResult -> releaseLocalContext( + dependencies.searchService, + searchRequest, + searchPhaseResult, + namedWriteableRegistry + ) + ); channelListener.onFailure(e); } diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index b943582787d66..71fab563b7af2 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -187,7 +187,12 @@ public TransportSearchAction( this.searchTransportService = searchTransportService; this.remoteClusterService = searchTransportService.getRemoteClusterService(); SearchTransportService.registerRequestHandler(transportService, searchService); - SearchQueryThenFetchAsyncAction.registerNodeSearchAction(searchTransportService, searchService, searchPhaseController); + SearchQueryThenFetchAsyncAction.registerNodeSearchAction( + searchTransportService, + searchService, + searchPhaseController, + namedWriteableRegistry + ); this.clusterService = clusterService; this.transportService = transportService; this.searchService = searchService;