Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/128552.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 128552
summary: Fix - NPE on batched query execution when the request is part of PIT with alias filters
area: Search
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteUtils;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.stats.CommonStats;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.support.IndicesOptions;
Expand Down Expand Up @@ -126,6 +127,28 @@ public void testBasic() {
}
}

public void testIndexWithAlias() {
String indexName = "index_1";
String alias = "alias_1";
assertAcked(indicesAdmin().prepareCreate(indexName).setSettings(indexSettings(10, 0)).addAlias(new Alias(alias)));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did this test fail without the fix? I was unclear on whether we need to add a filter to the alias, or not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the filter is required. I inadvertently committed a version that did not include the filtering logic. Updating with some enhancements

int numDocs = randomIntBetween(50, 150);
for (int i = 0; i < numDocs; i++) {
String id = Integer.toString(i);
prepareIndex(indexName).setId(id).setSource("value", i).get();
}
refresh(indexName);
BytesReference pitId = openPointInTime(new String[] { alias }, TimeValue.timeValueMinutes(1)).getPointInTimeId();
;
try {
assertResponse(prepareSearch().setPointInTime(new PointInTimeBuilder(pitId)), resp1 -> {
assertThat(resp1.pointInTimeId(), equalTo(pitId));
assertHitCount(resp1, numDocs);
});
} finally {
closePointInTime(pitId);
}
}

public void testMultipleIndices() {
int numIndices = randomIntBetween(1, 5);
for (int i = 1; i <= numIndices; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,10 +391,14 @@ private static ShardSearchRequest tryRewriteWithUpdatedSortValue(
return request;
}

private static boolean isPartOfPIT(SearchRequest request, ShardSearchContextId contextId) {
private static boolean isPartOfPIT(
SearchRequest request,
ShardSearchContextId contextId,
NamedWriteableRegistry namedWriteableRegistry
) {
final PointInTimeBuilder pointInTimeBuilder = request.pointInTimeBuilder();
if (pointInTimeBuilder != null) {
return request.pointInTimeBuilder().getSearchContextId(null).contains(contextId);
return request.pointInTimeBuilder().getSearchContextId(namedWriteableRegistry).contains(contextId);
} else {
return false;
}
Expand Down Expand Up @@ -546,7 +550,8 @@ private void onNodeQueryFailure(Exception e, NodeQueryRequest request, CanMatchP
static void registerNodeSearchAction(
SearchTransportService searchTransportService,
SearchService searchService,
SearchPhaseController searchPhaseController
SearchPhaseController searchPhaseController,
NamedWriteableRegistry namedWriteableRegistry
) {
var transportService = searchTransportService.transportService();
var threadPool = transportService.getThreadPool();
Expand Down Expand Up @@ -576,7 +581,8 @@ static void registerNodeSearchAction(
request,
cancellableTask,
channel,
dependencies
dependencies,
namedWriteableRegistry
);
// TODO: log activating or otherwise limiting parallelism might be helpful here
for (int i = 0; i < workers; i++) {
Expand All @@ -587,12 +593,17 @@ static void registerNodeSearchAction(
TransportActionProxy.registerProxyAction(transportService, NODE_SEARCH_ACTION_NAME, true, NodeQueryResponse::new);
}

private static void releaseLocalContext(SearchService searchService, NodeQueryRequest request, SearchPhaseResult result) {
private static void releaseLocalContext(
SearchService searchService,
NodeQueryRequest request,
SearchPhaseResult result,
NamedWriteableRegistry namedWriteableRegistry
) {
var phaseResult = result.queryResult() != null ? result.queryResult() : result.rankFeatureResult();
if (phaseResult != null
&& phaseResult.hasSearchContext()
&& request.searchRequest.scroll() == null
&& isPartOfPIT(request.searchRequest, phaseResult.getContextId()) == false) {
&& isPartOfPIT(request.searchRequest, phaseResult.getContextId(), namedWriteableRegistry) == false) {
searchService.freeReaderContext(phaseResult.getContextId());
}
}
Expand Down Expand Up @@ -736,13 +747,15 @@ private static final class QueryPerNodeState {
private final CountDown countDown;
private final TransportChannel channel;
private volatile BottomSortValuesCollector bottomSortCollector;
private final NamedWriteableRegistry namedWriteableRegistry;

private QueryPerNodeState(
QueryPhaseResultConsumer queryPhaseResultConsumer,
NodeQueryRequest searchRequest,
CancellableTask task,
TransportChannel channel,
Dependencies dependencies
Dependencies dependencies,
NamedWriteableRegistry namedWriteableRegistry
) {
this.queryPhaseResultConsumer = queryPhaseResultConsumer;
this.searchRequest = searchRequest;
Expand All @@ -752,6 +765,7 @@ private QueryPerNodeState(
this.countDown = new CountDown(queryPhaseResultConsumer.getNumShards());
this.channel = channel;
this.dependencies = dependencies;
this.namedWriteableRegistry = namedWriteableRegistry;
}

void onShardDone() {
Expand All @@ -762,7 +776,7 @@ void onShardDone() {
try (queryPhaseResultConsumer) {
var failure = queryPhaseResultConsumer.failure.get();
if (failure != null) {
handleMergeFailure(failure, channelListener);
handleMergeFailure(failure, channelListener, namedWriteableRegistry);
return;
}
final QueryPhaseResultConsumer.MergeResult mergeResult;
Expand All @@ -772,7 +786,7 @@ void onShardDone() {
EMPTY_PARTIAL_MERGE_RESULT
);
} catch (Exception e) {
handleMergeFailure(e, channelListener);
handleMergeFailure(e, channelListener, namedWriteableRegistry);
return;
}
// translate shard indices to those on the coordinator so that it can interpret the merge result without adjustments,
Expand All @@ -799,7 +813,7 @@ void onShardDone() {
&& q.hasSuggestHits() == false
&& q.getRankShardResult() == null
&& searchRequest.searchRequest.scroll() == null
&& isPartOfPIT(searchRequest.searchRequest, q.getContextId()) == false) {
&& isPartOfPIT(searchRequest.searchRequest, q.getContextId(), namedWriteableRegistry) == false) {
if (dependencies.searchService.freeReaderContext(q.getContextId())) {
q.clearContextId();
}
Expand All @@ -816,9 +830,20 @@ && isPartOfPIT(searchRequest.searchRequest, q.getContextId()) == false) {
}
}

private void handleMergeFailure(Exception e, ChannelActionListener<TransportResponse> channelListener) {
private void handleMergeFailure(
Exception e,
ChannelActionListener<TransportResponse> channelListener,
NamedWriteableRegistry namedWriteableRegistry
) {
queryPhaseResultConsumer.getSuccessfulResults()
.forEach(searchPhaseResult -> releaseLocalContext(dependencies.searchService, searchRequest, searchPhaseResult));
.forEach(
searchPhaseResult -> releaseLocalContext(
dependencies.searchService,
searchRequest,
searchPhaseResult,
namedWriteableRegistry
)
);
channelListener.onFailure(e);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,12 @@ public TransportSearchAction(
this.searchTransportService = searchTransportService;
this.remoteClusterService = searchTransportService.getRemoteClusterService();
SearchTransportService.registerRequestHandler(transportService, searchService);
SearchQueryThenFetchAsyncAction.registerNodeSearchAction(searchTransportService, searchService, searchPhaseController);
SearchQueryThenFetchAsyncAction.registerNodeSearchAction(
searchTransportService,
searchService,
searchPhaseController,
namedWriteableRegistry
);
this.clusterService = clusterService;
this.transportService = transportService;
this.searchService = searchService;
Expand Down