Skip to content

Commit ef16bf4

Browse files
committed
Test for CCS with filters
1 parent 83ce15a commit ef16bf4

File tree

1 file changed

+200
-0
lines changed

1 file changed

+200
-0
lines changed
Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.action;
9+
10+
import org.elasticsearch.client.internal.Client;
11+
import org.elasticsearch.common.Strings;
12+
import org.elasticsearch.common.settings.Settings;
13+
import org.elasticsearch.index.query.QueryBuilder;
14+
import org.elasticsearch.index.query.RangeQueryBuilder;
15+
16+
import java.io.IOException;
17+
import java.util.HashSet;
18+
import java.util.List;
19+
import java.util.Map;
20+
import java.util.Set;
21+
22+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
23+
import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList;
24+
import static org.hamcrest.Matchers.equalTo;
25+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
26+
import static org.hamcrest.Matchers.hasSize;
27+
import static org.hamcrest.Matchers.is;
28+
import static org.hamcrest.Matchers.lessThanOrEqualTo;
29+
30+
public class CrossClusterQueryWithFiltersIT extends AbstractCrossClusterTestCase {
31+
@Override
32+
protected Map<String, Boolean> skipUnavailableForRemoteClusters() {
33+
return Map.of(REMOTE_CLUSTER_1, false, REMOTE_CLUSTER_2, false);
34+
}
35+
36+
protected void assertClusterMetadataSuccess(EsqlExecutionInfo.Cluster clusterMetatata, int shards, long took, String indexExpression) {
37+
assertThat(clusterMetatata.getIndexExpression(), equalTo(indexExpression));
38+
assertThat(clusterMetatata.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
39+
assertThat(clusterMetatata.getTook().millis(), greaterThanOrEqualTo(0L));
40+
assertThat(clusterMetatata.getTook().millis(), lessThanOrEqualTo(took));
41+
assertThat(clusterMetatata.getTotalShards(), equalTo(shards));
42+
assertThat(clusterMetatata.getSuccessfulShards(), equalTo(shards));
43+
assertThat(clusterMetatata.getSkippedShards(), equalTo(0));
44+
assertThat(clusterMetatata.getFailedShards(), equalTo(0));
45+
}
46+
47+
protected void assertClusterMetadataNoShards(EsqlExecutionInfo.Cluster clusterMetatata, int shards, long took, String indexExpression) {
48+
assertThat(clusterMetatata.getIndexExpression(), equalTo(indexExpression));
49+
assertThat(clusterMetatata.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
50+
assertThat(clusterMetatata.getTook().millis(), greaterThanOrEqualTo(0L));
51+
assertThat(clusterMetatata.getTook().millis(), lessThanOrEqualTo(took));
52+
assertThat(clusterMetatata.getTotalShards(), equalTo(0));
53+
assertThat(clusterMetatata.getSuccessfulShards(), equalTo(0));
54+
assertThat(clusterMetatata.getSkippedShards(), equalTo(0));
55+
assertThat(clusterMetatata.getFailedShards(), equalTo(0));
56+
}
57+
58+
protected EsqlQueryResponse runQuery(String query, Boolean ccsMetadataInResponse, QueryBuilder filter) {
59+
EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest();
60+
request.query(query);
61+
request.pragmas(AbstractEsqlIntegTestCase.randomPragmas());
62+
request.profile(randomInt(5) == 2);
63+
request.columnar(randomBoolean());
64+
if (ccsMetadataInResponse != null) {
65+
request.includeCCSMetadata(ccsMetadataInResponse);
66+
}
67+
if (filter != null) {
68+
request.filter(filter);
69+
}
70+
return runQuery(request);
71+
}
72+
73+
public void testTimestampFilterFromQuery() throws IOException {
74+
int docsTest1 = 50;
75+
int docsTest2 = 30;
76+
int localShards = randomIntBetween(1, 5);
77+
int remoteShards = randomIntBetween(1, 5);
78+
populateDateIndex(LOCAL_CLUSTER, LOCAL_INDEX, localShards, docsTest1, "2024-11-26");
79+
populateDateIndex(REMOTE_CLUSTER_1, REMOTE_INDEX, remoteShards, docsTest2, "2023-11-26");
80+
81+
// Both indices are included
82+
var filter = new RangeQueryBuilder("@timestamp").from("2023-01-01").to("now");
83+
try (EsqlQueryResponse resp = runQuery("from logs-1,cluster-a:logs-2", randomBoolean(), filter)) {
84+
List<List<Object>> values = getValuesList(resp);
85+
assertThat(values, hasSize(docsTest1 + docsTest2));
86+
87+
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
88+
assertNotNull(executionInfo);
89+
assertThat(executionInfo.isCrossClusterSearch(), is(true));
90+
long overallTookMillis = executionInfo.overallTook().millis();
91+
assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
92+
93+
assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, LOCAL_CLUSTER)));
94+
95+
EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1);
96+
assertClusterMetadataSuccess(remoteCluster, remoteShards, overallTookMillis, "logs-2");
97+
98+
EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER);
99+
assertClusterMetadataSuccess(localCluster, localShards, overallTookMillis, "logs-1");
100+
}
101+
102+
// Only local is included
103+
filter = new RangeQueryBuilder("@timestamp").from("2024-01-01").to("now");
104+
try (EsqlQueryResponse resp = runQuery("from logs-1,cluster-a:logs-2", randomBoolean(), filter)) {
105+
List<List<Object>> values = getValuesList(resp);
106+
assertThat(values, hasSize(docsTest1 + docsTest2));
107+
108+
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
109+
assertNotNull(executionInfo);
110+
assertThat(executionInfo.isCrossClusterSearch(), is(true));
111+
long overallTookMillis = executionInfo.overallTook().millis();
112+
assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
113+
114+
assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, LOCAL_CLUSTER)));
115+
116+
EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1);
117+
assertClusterMetadataSuccess(remoteCluster, remoteShards, overallTookMillis, "logs-");
118+
119+
EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER);
120+
assertClusterMetadataNoShards(localCluster, localShards, overallTookMillis, "logs-1");
121+
}
122+
123+
//
124+
// // Only local is included
125+
// filter = new RangeQueryBuilder("@timestamp").from("2024-01-01").to("now");
126+
// try (EsqlQueryResponse resp = runQuery("from logs-*,c*:logs-*", randomBoolean(), filter)) {
127+
// List<List<Object>> values = getValuesList(resp);
128+
// assertThat(values, hasSize(docsTest1));
129+
//
130+
// EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
131+
// assertNotNull(executionInfo);
132+
// assertThat(executionInfo.isCrossClusterSearch(), is(true));
133+
// long overallTookMillis = executionInfo.overallTook().millis();
134+
// assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
135+
//
136+
// assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, LOCAL_CLUSTER)));
137+
//
138+
// EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1);
139+
// // Remote has no shards due to filter
140+
// assertClusterMetadataNoShards(remoteCluster, remoteShards, overallTookMillis, "logs-*");
141+
//
142+
// EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER);
143+
// assertClusterMetadataSuccess(localCluster, localShards, overallTookMillis, "logs-*");
144+
// }
145+
146+
// Both indices are filtered out
147+
// var filter = new RangeQueryBuilder("@timestamp").from("2025-01-01").to("now");
148+
// try (EsqlQueryResponse resp = runQuery("from logs-*,c*:logs-*", randomBoolean(), filter)) {
149+
// List<List<Object>> values = getValuesList(resp);
150+
// assertThat(values, hasSize(0));
151+
// assertThat(resp.columns().stream().map(ColumnInfoImpl::name).toList(), hasItems("@timestamp", "tag-local", "tag-cluster-a"));
152+
//
153+
// EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
154+
// assertNotNull(executionInfo);
155+
// assertThat(executionInfo.isCrossClusterSearch(), is(true));
156+
// long overallTookMillis = executionInfo.overallTook().millis();
157+
// assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
158+
//
159+
// assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, LOCAL_CLUSTER)));
160+
//
161+
// EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1);
162+
// // Remote has no shards due to filter
163+
// assertClusterMetadataNoShards(remoteCluster, remoteShards, overallTookMillis, "logs-*");
164+
//
165+
// EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER);
166+
// assertClusterMetadataNoShards(localCluster, localShards, overallTookMillis, "logs-*");
167+
// }
168+
169+
}
170+
171+
protected void populateDateIndex(String clusterAlias, String indexName, int numShards, int numDocs, String date) {
172+
Client client = client(clusterAlias);
173+
String tag = Strings.isEmpty(clusterAlias) ? "local" : clusterAlias;
174+
assertAcked(
175+
client.admin()
176+
.indices()
177+
.prepareCreate(indexName)
178+
.setSettings(Settings.builder().put("index.number_of_shards", numShards))
179+
.setMapping(
180+
"id",
181+
"type=keyword",
182+
"tag-" + tag,
183+
"type=keyword",
184+
"v",
185+
"type=long",
186+
"const",
187+
"type=long",
188+
"@timestamp",
189+
"type=date"
190+
)
191+
);
192+
Set<String> ids = new HashSet<>();
193+
for (int i = 0; i < numDocs; i++) {
194+
String id = Long.toString(i);
195+
client.prepareIndex(indexName).setSource("id", id, "tag-" + tag, tag, "v", i, "@timestamp", date).get();
196+
}
197+
client.admin().indices().prepareRefresh(indexName).get();
198+
}
199+
200+
}

0 commit comments

Comments
 (0)