Skip to content

Commit c118254

Browse files
authored
Fix handling telemetry on compound retrievers branch (elastic#112309)
1 parent e2a1605 commit c118254

File tree

3 files changed

+229
-5
lines changed

3 files changed

+229
-5
lines changed

server/src/internalClusterTest/java/org/elasticsearch/search/ccs/CCSUsageTelemetryIT.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
import org.elasticsearch.search.builder.SearchSourceBuilder;
3333
import org.elasticsearch.search.query.SlowRunningQueryBuilder;
3434
import org.elasticsearch.search.query.ThrowingQueryBuilder;
35+
import org.elasticsearch.search.retriever.MinimalCompoundRetrieverIT;
36+
import org.elasticsearch.search.retriever.RetrieverBuilder;
3537
import org.elasticsearch.tasks.Task;
3638
import org.elasticsearch.test.AbstractMultiClustersTestCase;
3739
import org.elasticsearch.test.InternalTestCluster;
@@ -49,6 +51,7 @@
4951
import java.lang.annotation.Target;
5052
import java.util.Arrays;
5153
import java.util.Collection;
54+
import java.util.Collections;
5255
import java.util.HashMap;
5356
import java.util.List;
5457
import java.util.Map;
@@ -620,6 +623,20 @@ public void testPITSearch() throws ExecutionException, InterruptedException {
620623
assertThat(telemetry.getSuccessCount(), equalTo(2L));
621624
}
622625

626+
public void testCompoundRetrieverSearch() throws ExecutionException, InterruptedException {
627+
RetrieverBuilder compoundRetriever = new MinimalCompoundRetrieverIT.CompoundRetriever(Collections.emptyList());
628+
Map<String, Object> testClusterInfo = setupClusters();
629+
String localIndex = (String) testClusterInfo.get("local.index");
630+
String remoteIndex = (String) testClusterInfo.get("remote.index");
631+
632+
SearchRequest searchRequest = makeSearchRequest(localIndex, "*:" + remoteIndex);
633+
searchRequest.source(new SearchSourceBuilder().retriever(compoundRetriever));
634+
635+
CCSTelemetrySnapshot telemetry = getTelemetryFromSearch(searchRequest);
636+
assertThat(telemetry.getTotalCount(), equalTo(1L));
637+
assertThat(telemetry.getSuccessCount(), equalTo(1L));
638+
}
639+
623640
private CCSTelemetrySnapshot getTelemetrySnapshot(String nodeName) {
624641
var usage = cluster(LOCAL_CLUSTER).getInstance(UsageService.class, nodeName);
625642
return usage.getCcsUsageHolder().getCCSTelemetrySnapshot();
Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.search.retriever;
10+
11+
import org.elasticsearch.action.search.SearchRequest;
12+
import org.elasticsearch.action.search.SearchResponse;
13+
import org.elasticsearch.client.internal.Client;
14+
import org.elasticsearch.common.Strings;
15+
import org.elasticsearch.common.settings.Setting;
16+
import org.elasticsearch.common.settings.Settings;
17+
import org.elasticsearch.core.TimeValue;
18+
import org.elasticsearch.index.query.MatchAllQueryBuilder;
19+
import org.elasticsearch.index.query.QueryBuilder;
20+
import org.elasticsearch.index.query.QueryRewriteContext;
21+
import org.elasticsearch.search.builder.SearchSourceBuilder;
22+
import org.elasticsearch.test.AbstractMultiClustersTestCase;
23+
import org.elasticsearch.test.InternalTestCluster;
24+
import org.elasticsearch.transport.RemoteClusterAware;
25+
import org.elasticsearch.xcontent.XContentBuilder;
26+
27+
import java.io.IOException;
28+
import java.util.Collection;
29+
import java.util.Collections;
30+
import java.util.HashMap;
31+
import java.util.List;
32+
import java.util.Map;
33+
import java.util.concurrent.ExecutionException;
34+
35+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
36+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
37+
import static org.hamcrest.Matchers.equalTo;
38+
39+
public class MinimalCompoundRetrieverIT extends AbstractMultiClustersTestCase {
40+
41+
// CrossClusterSearchIT
42+
private static final String REMOTE_CLUSTER = "cluster_a";
43+
44+
@Override
45+
protected Collection<String> remoteClusterAlias() {
46+
return List.of(REMOTE_CLUSTER);
47+
}
48+
49+
@Override
50+
protected Map<String, Boolean> skipUnavailableForRemoteClusters() {
51+
return Map.of(REMOTE_CLUSTER, randomBoolean());
52+
}
53+
54+
@Override
55+
protected boolean reuseClusters() {
56+
return false;
57+
}
58+
59+
public void testSimpleSearch() throws ExecutionException, InterruptedException {
60+
RetrieverBuilder compoundRetriever = new CompoundRetriever(Collections.emptyList());
61+
Map<String, Object> testClusterInfo = setupTwoClusters();
62+
String localIndex = (String) testClusterInfo.get("local.index");
63+
String remoteIndex = (String) testClusterInfo.get("remote.index");
64+
SearchRequest searchRequest = new SearchRequest(localIndex, REMOTE_CLUSTER + ":" + remoteIndex);
65+
searchRequest.source(new SearchSourceBuilder().retriever(compoundRetriever));
66+
assertResponse(client(LOCAL_CLUSTER).search(searchRequest), response -> {
67+
assertNotNull(response);
68+
69+
SearchResponse.Clusters clusters = response.getClusters();
70+
assertFalse("search cluster results should NOT be marked as partial", clusters.hasPartialResults());
71+
assertThat(clusters.getTotal(), equalTo(2));
72+
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL), equalTo(2));
73+
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED), equalTo(0));
74+
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.RUNNING), equalTo(0));
75+
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL), equalTo(0));
76+
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.FAILED), equalTo(0));
77+
assertThat(response.getHits().getTotalHits().value, equalTo(testClusterInfo.get("total_docs")));
78+
});
79+
}
80+
81+
private Map<String, Object> setupTwoClusters() {
82+
int totalDocs = 0;
83+
String localIndex = "demo";
84+
int numShardsLocal = randomIntBetween(2, 10);
85+
Settings localSettings = indexSettings(numShardsLocal, randomIntBetween(0, 1)).build();
86+
assertAcked(
87+
client(LOCAL_CLUSTER).admin()
88+
.indices()
89+
.prepareCreate(localIndex)
90+
.setSettings(localSettings)
91+
.setMapping("some_field", "type=keyword")
92+
);
93+
totalDocs += indexDocs(client(LOCAL_CLUSTER), localIndex);
94+
95+
String remoteIndex = "prod";
96+
int numShardsRemote = randomIntBetween(2, 10);
97+
final InternalTestCluster remoteCluster = cluster(REMOTE_CLUSTER);
98+
remoteCluster.ensureAtLeastNumDataNodes(randomIntBetween(1, 3));
99+
assertAcked(
100+
client(REMOTE_CLUSTER).admin()
101+
.indices()
102+
.prepareCreate(remoteIndex)
103+
.setSettings(indexSettings(numShardsRemote, randomIntBetween(0, 1)))
104+
.setMapping("some_field", "type=keyword")
105+
);
106+
assertFalse(
107+
client(REMOTE_CLUSTER).admin()
108+
.cluster()
109+
.prepareHealth(remoteIndex)
110+
.setWaitForYellowStatus()
111+
.setTimeout(TimeValue.timeValueSeconds(10))
112+
.get()
113+
.isTimedOut()
114+
);
115+
totalDocs += indexDocs(client(REMOTE_CLUSTER), remoteIndex);
116+
117+
String skipUnavailableKey = Strings.format("cluster.remote.%s.skip_unavailable", REMOTE_CLUSTER);
118+
Setting<?> skipUnavailableSetting = cluster(REMOTE_CLUSTER).clusterService().getClusterSettings().get(skipUnavailableKey);
119+
boolean skipUnavailable = (boolean) cluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).clusterService()
120+
.getClusterSettings()
121+
.get(skipUnavailableSetting);
122+
123+
Map<String, Object> clusterInfo = new HashMap<>();
124+
clusterInfo.put("local.num_shards", numShardsLocal);
125+
clusterInfo.put("local.index", localIndex);
126+
clusterInfo.put("remote.num_shards", numShardsRemote);
127+
clusterInfo.put("remote.index", remoteIndex);
128+
clusterInfo.put("remote.skip_unavailable", skipUnavailable);
129+
clusterInfo.put("total_docs", (long) totalDocs);
130+
return clusterInfo;
131+
}
132+
133+
private int indexDocs(Client client, String index) {
134+
int numDocs = between(500, 1200);
135+
for (int i = 0; i < numDocs; i++) {
136+
client.prepareIndex(index).setSource("some_field", i).get();
137+
}
138+
client.admin().indices().prepareRefresh(index).get();
139+
return numDocs;
140+
}
141+
142+
public static class CompoundRetriever extends RetrieverBuilder {
143+
144+
private final List<RetrieverBuilder> sources;
145+
146+
public CompoundRetriever(List<RetrieverBuilder> sources) {
147+
this.sources = sources;
148+
}
149+
150+
@Override
151+
public boolean isCompound() {
152+
return true;
153+
}
154+
155+
@Override
156+
public QueryBuilder topDocsQuery() {
157+
throw new UnsupportedOperationException("should not be called");
158+
}
159+
160+
@Override
161+
public RetrieverBuilder rewrite(QueryRewriteContext ctx) throws IOException {
162+
if (ctx.getPointInTimeBuilder() == null) {
163+
throw new IllegalStateException("PIT is required");
164+
}
165+
if (sources.isEmpty()) {
166+
StandardRetrieverBuilder standardRetrieverBuilder = new StandardRetrieverBuilder();
167+
standardRetrieverBuilder.queryBuilder = new MatchAllQueryBuilder();
168+
return standardRetrieverBuilder;
169+
}
170+
return sources.get(0);
171+
}
172+
173+
@Override
174+
public void extractToSearchSourceBuilder(SearchSourceBuilder searchSourceBuilder, boolean compoundUsed) {
175+
throw new UnsupportedOperationException("should not be called");
176+
}
177+
178+
@Override
179+
public String getName() {
180+
return "compound_retriever";
181+
}
182+
183+
@Override
184+
protected void doToXContent(XContentBuilder builder, Params params) throws IOException {
185+
// no-op
186+
}
187+
188+
@Override
189+
protected boolean doEquals(Object o) {
190+
return false;
191+
}
192+
193+
@Override
194+
protected int doHashCode() {
195+
return 0;
196+
}
197+
}
198+
}

server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -312,8 +312,12 @@ public long buildTookInMillis() {
312312

313313
@Override
314314
protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
315-
ActionListener<SearchResponse> loggingAndMetrics = new SearchResponseActionListener((SearchTask) task, listener);
316-
executeRequest((SearchTask) task, searchRequest, loggingAndMetrics, AsyncSearchActionProvider::new);
315+
executeRequest(
316+
(SearchTask) task,
317+
searchRequest,
318+
new SearchResponseActionListener((SearchTask) task, listener),
319+
AsyncSearchActionProvider::new
320+
);
317321
}
318322

319323
void executeRequest(
@@ -498,7 +502,7 @@ void executeRequest(
498502
// We set the keep alive to -1 to indicate that we don't need the pit id in the response.
499503
// This is needed since we delete the pit prior to sending the response so the id doesn't exist anymore.
500504
source.pointInTimeBuilder(new PointInTimeBuilder(resp.getPointInTimeId()).setKeepAlive(TimeValue.MINUS_ONE));
501-
executeRequest(task, original, new ActionListener<>() {
505+
var pitListener = new SearchResponseActionListener(task, listener) {
502506
@Override
503507
public void onResponse(SearchResponse response) {
504508
// we need to close the PIT first so we delay the release of the response to after the closing
@@ -514,7 +518,8 @@ public void onResponse(SearchResponse response) {
514518
public void onFailure(Exception e) {
515519
closePIT(client, original.source().pointInTimeBuilder(), () -> listener.onFailure(e));
516520
}
517-
}, searchPhaseProvider);
521+
};
522+
executeRequest(task, original, pitListener, searchPhaseProvider);
518523
}));
519524
} else {
520525
Rewriteable.rewriteAndFetch(
@@ -1846,7 +1851,11 @@ private class SearchResponseActionListener implements ActionListener<SearchRespo
18461851
SearchResponseActionListener(SearchTask task, ActionListener<SearchResponse> listener) {
18471852
this.task = task;
18481853
this.listener = listener;
1849-
usageBuilder = new CCSUsage.Builder();
1854+
if (listener instanceof SearchResponseActionListener srListener) {
1855+
usageBuilder = srListener.usageBuilder;
1856+
} else {
1857+
usageBuilder = new CCSUsage.Builder();
1858+
}
18501859
}
18511860

18521861
/**

0 commit comments

Comments
 (0)