Skip to content

Commit 9f05ded

Browse files
committed
Test for CCS with filters
1 parent 83ce15a commit 9f05ded

File tree

1 file changed

+203
-0
lines changed

1 file changed

+203
-0
lines changed
Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.action;
9+
10+
import org.elasticsearch.client.internal.Client;
11+
import org.elasticsearch.common.Strings;
12+
import org.elasticsearch.common.settings.Settings;
13+
import org.elasticsearch.index.query.QueryBuilder;
14+
import org.elasticsearch.index.query.RangeQueryBuilder;
15+
16+
import java.io.IOException;
17+
import java.util.HashSet;
18+
import java.util.List;
19+
import java.util.Map;
20+
import java.util.Set;
21+
22+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
23+
import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList;
24+
import static org.hamcrest.Matchers.equalTo;
25+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
26+
import static org.hamcrest.Matchers.hasItems;
27+
import static org.hamcrest.Matchers.hasSize;
28+
import static org.hamcrest.Matchers.is;
29+
import static org.hamcrest.Matchers.lessThanOrEqualTo;
30+
31+
public class CrossClusterQueryWithFiltersIT extends AbstractCrossClusterTestCase {
32+
@Override
33+
protected Map<String, Boolean> skipUnavailableForRemoteClusters() {
34+
return Map.of(REMOTE_CLUSTER_1, false, REMOTE_CLUSTER_2, false);
35+
}
36+
37+
protected void assertClusterMetadataSuccess(EsqlExecutionInfo.Cluster clusterMetatata, int shards, long took, String indexExpression) {
38+
assertThat(clusterMetatata.getIndexExpression(), equalTo(indexExpression));
39+
assertThat(clusterMetatata.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
40+
assertThat(clusterMetatata.getTook().millis(), greaterThanOrEqualTo(0L));
41+
assertThat(clusterMetatata.getTook().millis(), lessThanOrEqualTo(took));
42+
assertThat(clusterMetatata.getTotalShards(), equalTo(shards));
43+
assertThat(clusterMetatata.getSuccessfulShards(), equalTo(shards));
44+
assertThat(clusterMetatata.getSkippedShards(), equalTo(0));
45+
assertThat(clusterMetatata.getFailedShards(), equalTo(0));
46+
}
47+
48+
protected void assertClusterMetadataNoShards(EsqlExecutionInfo.Cluster clusterMetatata, int shards, long took, String indexExpression) {
49+
assertThat(clusterMetatata.getIndexExpression(), equalTo(indexExpression));
50+
assertThat(clusterMetatata.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
51+
assertThat(clusterMetatata.getTook().millis(), greaterThanOrEqualTo(0L));
52+
assertThat(clusterMetatata.getTook().millis(), lessThanOrEqualTo(took));
53+
assertThat(clusterMetatata.getTotalShards(), equalTo(0));
54+
assertThat(clusterMetatata.getSuccessfulShards(), equalTo(0));
55+
assertThat(clusterMetatata.getSkippedShards(), equalTo(0));
56+
assertThat(clusterMetatata.getFailedShards(), equalTo(0));
57+
}
58+
59+
protected EsqlQueryResponse runQuery(String query, Boolean ccsMetadataInResponse, QueryBuilder filter) {
60+
EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest();
61+
request.query(query);
62+
request.pragmas(AbstractEsqlIntegTestCase.randomPragmas());
63+
request.profile(randomInt(5) == 2);
64+
request.columnar(randomBoolean());
65+
if (ccsMetadataInResponse != null) {
66+
request.includeCCSMetadata(ccsMetadataInResponse);
67+
}
68+
if (filter != null) {
69+
request.filter(filter);
70+
}
71+
return runQuery(request);
72+
}
73+
74+
public void testTimestampFilterFromQuery() throws IOException {
75+
int docsTest1 = 50;
76+
int docsTest2 = 30;
77+
int localShards = randomIntBetween(1, 5);
78+
int remoteShards = randomIntBetween(1, 5);
79+
populateDateIndex(LOCAL_CLUSTER, LOCAL_INDEX, localShards, docsTest1, "2024-11-26");
80+
populateDateIndex(REMOTE_CLUSTER_1, REMOTE_INDEX, remoteShards, docsTest2, "2023-11-26");
81+
82+
// Both indices are included
83+
var filter = new RangeQueryBuilder("@timestamp").from("2023-01-01").to("now");
84+
try (EsqlQueryResponse resp = runQuery("from logs-1,cluster-a:logs-2", randomBoolean(), filter)) {
85+
List<List<Object>> values = getValuesList(resp);
86+
assertThat(values, hasSize(docsTest1 + docsTest2));
87+
assertThat(resp.columns().stream().map(ColumnInfoImpl::name).toList(), hasItems("@timestamp", "tag-local", "tag-cluster-a"));
88+
89+
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
90+
assertNotNull(executionInfo);
91+
assertThat(executionInfo.isCrossClusterSearch(), is(true));
92+
long overallTookMillis = executionInfo.overallTook().millis();
93+
assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
94+
95+
assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, LOCAL_CLUSTER)));
96+
97+
EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1);
98+
assertClusterMetadataSuccess(remoteCluster, remoteShards, overallTookMillis, "logs-2");
99+
100+
EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER);
101+
assertClusterMetadataSuccess(localCluster, localShards, overallTookMillis, "logs-1");
102+
}
103+
104+
// Only local is included
105+
filter = new RangeQueryBuilder("@timestamp").from("2024-01-01").to("now");
106+
try (EsqlQueryResponse resp = runQuery("from logs-1,cluster-a:logs-2", randomBoolean(), filter)) {
107+
List<List<Object>> values = getValuesList(resp);
108+
assertThat(values, hasSize(docsTest1));
109+
// assertThat(resp.columns().stream().map(ColumnInfoImpl::name).toList(), hasItems("@timestamp", "tag-local", "tag-cluster-a"));
110+
111+
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
112+
assertNotNull(executionInfo);
113+
assertThat(executionInfo.isCrossClusterSearch(), is(true));
114+
long overallTookMillis = executionInfo.overallTook().millis();
115+
assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
116+
117+
assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, LOCAL_CLUSTER)));
118+
119+
EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER);
120+
assertClusterMetadataSuccess(localCluster, localShards, overallTookMillis, "logs-1");
121+
122+
EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1);
123+
assertClusterMetadataNoShards(remoteCluster, remoteShards, overallTookMillis, "logs-2");
124+
}
125+
126+
//
127+
// // Only local is included
128+
// filter = new RangeQueryBuilder("@timestamp").from("2024-01-01").to("now");
129+
// try (EsqlQueryResponse resp = runQuery("from logs-*,c*:logs-*", randomBoolean(), filter)) {
130+
// List<List<Object>> values = getValuesList(resp);
131+
// assertThat(values, hasSize(docsTest1));
132+
//
133+
// EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
134+
// assertNotNull(executionInfo);
135+
// assertThat(executionInfo.isCrossClusterSearch(), is(true));
136+
// long overallTookMillis = executionInfo.overallTook().millis();
137+
// assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
138+
//
139+
// assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, LOCAL_CLUSTER)));
140+
//
141+
// EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1);
142+
// // Remote has no shards due to filter
143+
// assertClusterMetadataNoShards(remoteCluster, remoteShards, overallTookMillis, "logs-*");
144+
//
145+
// EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER);
146+
// assertClusterMetadataSuccess(localCluster, localShards, overallTookMillis, "logs-*");
147+
// }
148+
149+
// Both indices are filtered out
150+
// var filter = new RangeQueryBuilder("@timestamp").from("2025-01-01").to("now");
151+
// try (EsqlQueryResponse resp = runQuery("from logs-*,c*:logs-*", randomBoolean(), filter)) {
152+
// List<List<Object>> values = getValuesList(resp);
153+
// assertThat(values, hasSize(0));
154+
// assertThat(resp.columns().stream().map(ColumnInfoImpl::name).toList(), hasItems("@timestamp", "tag-local", "tag-cluster-a"));
155+
//
156+
// EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
157+
// assertNotNull(executionInfo);
158+
// assertThat(executionInfo.isCrossClusterSearch(), is(true));
159+
// long overallTookMillis = executionInfo.overallTook().millis();
160+
// assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
161+
//
162+
// assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, LOCAL_CLUSTER)));
163+
//
164+
// EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1);
165+
// // Remote has no shards due to filter
166+
// assertClusterMetadataNoShards(remoteCluster, remoteShards, overallTookMillis, "logs-*");
167+
//
168+
// EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER);
169+
// assertClusterMetadataNoShards(localCluster, localShards, overallTookMillis, "logs-*");
170+
// }
171+
172+
}
173+
174+
protected void populateDateIndex(String clusterAlias, String indexName, int numShards, int numDocs, String date) {
175+
Client client = client(clusterAlias);
176+
String tag = Strings.isEmpty(clusterAlias) ? "local" : clusterAlias;
177+
assertAcked(
178+
client.admin()
179+
.indices()
180+
.prepareCreate(indexName)
181+
.setSettings(Settings.builder().put("index.number_of_shards", numShards))
182+
.setMapping(
183+
"id",
184+
"type=keyword",
185+
"tag-" + tag,
186+
"type=keyword",
187+
"v",
188+
"type=long",
189+
"const",
190+
"type=long",
191+
"@timestamp",
192+
"type=date"
193+
)
194+
);
195+
Set<String> ids = new HashSet<>();
196+
for (int i = 0; i < numDocs; i++) {
197+
String id = Long.toString(i);
198+
client.prepareIndex(indexName).setSource("id", id, "tag-" + tag, tag, "v", i, "@timestamp", date).get();
199+
}
200+
client.admin().indices().prepareRefresh(indexName).get();
201+
}
202+
203+
}

0 commit comments

Comments
 (0)