Skip to content

Commit 318188d

Browse files
authored
Add PIT information to collapse requests (#114133)
1 parent 8eec8d9 commit 318188d

File tree

2 files changed

+79
-14
lines changed

2 files changed

+79
-14
lines changed

server/src/main/java/org/elasticsearch/action/search/ExpandSearchPhase.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
package org.elasticsearch.action.search;
1111

1212
import org.elasticsearch.action.ActionListener;
13+
import org.elasticsearch.common.Strings;
1314
import org.elasticsearch.common.util.Maps;
1415
import org.elasticsearch.index.query.BoolQueryBuilder;
1516
import org.elasticsearch.index.query.InnerHitBuilder;
@@ -82,8 +83,15 @@ private void doRun() {
8283
CollapseBuilder innerCollapseBuilder = innerHitBuilder.getInnerCollapseBuilder();
8384
SearchSourceBuilder sourceBuilder = buildExpandSearchSourceBuilder(innerHitBuilder, innerCollapseBuilder).query(groupQuery)
8485
.postFilter(searchRequest.source().postFilter())
85-
.runtimeMappings(searchRequest.source().runtimeMappings());
86+
.runtimeMappings(searchRequest.source().runtimeMappings())
87+
.pointInTimeBuilder(searchRequest.source().pointInTimeBuilder());
8688
SearchRequest groupRequest = new SearchRequest(searchRequest);
89+
if (searchRequest.pointInTimeBuilder() != null) {
90+
// if the original request has a point in time, we propagate it to the inner search request
91+
// and clear the indices and preference from the inner search request
92+
groupRequest.indices(Strings.EMPTY_ARRAY);
93+
groupRequest.preference(null);
94+
}
8795
groupRequest.source(sourceBuilder);
8896
multiRequest.add(groupRequest);
8997
}

server/src/test/java/org/elasticsearch/action/search/ExpandSearchPhaseTests.java

Lines changed: 70 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,18 @@
1111

1212
import org.apache.lucene.search.TotalHits;
1313
import org.elasticsearch.action.ActionListener;
14+
import org.elasticsearch.common.Strings;
15+
import org.elasticsearch.common.bytes.BytesArray;
1416
import org.elasticsearch.common.document.DocumentField;
17+
import org.elasticsearch.common.util.concurrent.AtomicArray;
1518
import org.elasticsearch.index.query.BoolQueryBuilder;
1619
import org.elasticsearch.index.query.InnerHitBuilder;
1720
import org.elasticsearch.index.query.QueryBuilder;
1821
import org.elasticsearch.index.query.QueryBuilders;
1922
import org.elasticsearch.search.AbstractSearchTestCase;
2023
import org.elasticsearch.search.SearchHit;
2124
import org.elasticsearch.search.SearchHits;
25+
import org.elasticsearch.search.builder.PointInTimeBuilder;
2226
import org.elasticsearch.search.builder.SearchSourceBuilder;
2327
import org.elasticsearch.search.collapse.CollapseBuilder;
2428
import org.elasticsearch.test.ESTestCase;
@@ -317,14 +321,12 @@ public void testExpandRequestOptions() throws IOException {
317321
@Override
318322
void sendExecuteMultiSearch(MultiSearchRequest request, SearchTask task, ActionListener<MultiSearchResponse> listener) {
319323
final QueryBuilder postFilter = QueryBuilders.existsQuery("foo");
320-
assertTrue(request.requests().stream().allMatch((r) -> "foo".equals(r.preference())));
324+
assertTrue(request.requests().stream().allMatch((r) -> "foobar".equals(r.preference())));
321325
assertTrue(request.requests().stream().allMatch((r) -> "baz".equals(r.routing())));
322326
assertTrue(request.requests().stream().allMatch((r) -> version == r.source().version()));
323327
assertTrue(request.requests().stream().allMatch((r) -> seqNoAndTerm == r.source().seqNoAndPrimaryTerm()));
324328
assertTrue(request.requests().stream().allMatch((r) -> postFilter.equals(r.source().postFilter())));
325-
assertTrue(request.requests().stream().allMatch((r) -> r.source().fetchSource().fetchSource() == false));
326-
assertTrue(request.requests().stream().allMatch((r) -> r.source().fetchSource().includes().length == 0));
327-
assertTrue(request.requests().stream().allMatch((r) -> r.source().fetchSource().excludes().length == 0));
329+
assertTrue(request.requests().stream().allMatch((r) -> r.source().fetchSource() == null));
328330
}
329331
};
330332
mockSearchPhaseContext.getRequest()
@@ -338,17 +340,72 @@ void sendExecuteMultiSearch(MultiSearchRequest request, SearchTask task, ActionL
338340
.preference("foobar")
339341
.routing("baz");
340342

341-
SearchHits hits = SearchHits.empty(new TotalHits(1, TotalHits.Relation.EQUAL_TO), 1.0f);
342-
ExpandSearchPhase phase = new ExpandSearchPhase(mockSearchPhaseContext, hits, () -> new SearchPhase("test") {
343+
SearchHit hit = new SearchHit(1, "ID");
344+
hit.setDocumentField("someField", new DocumentField("someField", Collections.singletonList("foo")));
345+
SearchHits hits = new SearchHits(new SearchHit[] { hit }, new TotalHits(1, TotalHits.Relation.EQUAL_TO), 1.0F);
346+
try {
347+
ExpandSearchPhase phase = new ExpandSearchPhase(mockSearchPhaseContext, hits, () -> new SearchPhase("test") {
348+
@Override
349+
public void run() {
350+
mockSearchPhaseContext.sendSearchResponse(new SearchResponseSections(hits, null, null, false, null, null, 1), null);
351+
}
352+
});
353+
phase.run();
354+
mockSearchPhaseContext.assertNoFailure();
355+
} finally {
356+
hits.decRef();
357+
}
358+
} finally {
359+
var resp = mockSearchPhaseContext.searchResponse.get();
360+
if (resp != null) {
361+
resp.decRef();
362+
}
363+
}
364+
}
365+
366+
public void testExpandSearchRespectsOriginalPIT() {
367+
MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(1);
368+
final PointInTimeBuilder pit = new PointInTimeBuilder(new BytesArray("foo"));
369+
try {
370+
boolean version = randomBoolean();
371+
final boolean seqNoAndTerm = randomBoolean();
372+
373+
mockSearchPhaseContext.searchTransport = new SearchTransportService(null, null, null) {
343374
@Override
344-
public void run() {
345-
mockSearchPhaseContext.sendSearchResponse(new SearchResponseSections(hits, null, null, false, null, null, 1), null);
375+
void sendExecuteMultiSearch(MultiSearchRequest request, SearchTask task, ActionListener<MultiSearchResponse> listener) {
376+
assertTrue(request.requests().stream().allMatch((r) -> r.preference() == null));
377+
assertTrue(request.requests().stream().allMatch((r) -> r.indices() == Strings.EMPTY_ARRAY));
378+
assertTrue(request.requests().stream().allMatch((r) -> r.source().pointInTimeBuilder().equals(pit)));
346379
}
347-
});
348-
phase.run();
349-
mockSearchPhaseContext.assertNoFailure();
350-
assertNotNull(mockSearchPhaseContext.searchResponse.get());
351-
mockSearchPhaseContext.execute(() -> {});
380+
};
381+
mockSearchPhaseContext.getRequest()
382+
.source(
383+
new SearchSourceBuilder().collapse(
384+
new CollapseBuilder("someField").setInnerHits(
385+
new InnerHitBuilder().setName("foobarbaz").setVersion(version).setSeqNoAndPrimaryTerm(seqNoAndTerm)
386+
)
387+
).fetchSource(false).postFilter(QueryBuilders.existsQuery("foo")).pointInTimeBuilder(pit)
388+
)
389+
.routing("baz");
390+
391+
SearchHit hit = new SearchHit(1, "ID");
392+
hit.setDocumentField("someField", new DocumentField("someField", Collections.singletonList("foo")));
393+
SearchHits hits = new SearchHits(new SearchHit[] { hit }, new TotalHits(1, TotalHits.Relation.EQUAL_TO), 1.0F);
394+
try {
395+
ExpandSearchPhase phase = new ExpandSearchPhase(mockSearchPhaseContext, hits, () -> new SearchPhase("test") {
396+
@Override
397+
public void run() {
398+
mockSearchPhaseContext.sendSearchResponse(
399+
new SearchResponseSections(hits, null, null, false, null, null, 1),
400+
new AtomicArray<>(0)
401+
);
402+
}
403+
});
404+
phase.run();
405+
mockSearchPhaseContext.assertNoFailure();
406+
} finally {
407+
hits.decRef();
408+
}
352409
} finally {
353410
var resp = mockSearchPhaseContext.searchResponse.get();
354411
if (resp != null) {

0 commit comments

Comments
 (0)