Skip to content

Commit bd9c9ab

Browse files
committed
Integration test
1 parent bd2e10e commit bd9c9ab

File tree

3 files changed

+249
-2
lines changed

3 files changed

+249
-2
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,246 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.qa.single_node;
9+
10+
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
11+
12+
import org.elasticsearch.client.Request;
13+
import org.elasticsearch.client.Response;
14+
import org.elasticsearch.client.ResponseException;
15+
import org.elasticsearch.common.Strings;
16+
import org.elasticsearch.core.CheckedConsumer;
17+
import org.elasticsearch.test.MapMatcher;
18+
import org.elasticsearch.test.TestClustersThreadFilter;
19+
import org.elasticsearch.test.cluster.ElasticsearchCluster;
20+
import org.elasticsearch.test.rest.ESRestTestCase;
21+
import org.elasticsearch.xcontent.XContentBuilder;
22+
import org.elasticsearch.xcontent.XContentType;
23+
import org.elasticsearch.xcontent.json.JsonXContent;
24+
import org.elasticsearch.xpack.esql.AssertWarnings;
25+
import org.elasticsearch.xpack.esql.qa.rest.ProfileLogger;
26+
import org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase;
27+
import org.hamcrest.Matcher;
28+
import org.junit.ClassRule;
29+
import org.junit.Rule;
30+
31+
import java.io.IOException;
32+
import java.util.ArrayList;
33+
import java.util.List;
34+
import java.util.Map;
35+
36+
import static org.elasticsearch.test.ListMatcher.matchesList;
37+
import static org.elasticsearch.test.MapMatcher.assertMap;
38+
import static org.elasticsearch.test.MapMatcher.matchesMap;
39+
import static org.elasticsearch.xpack.esql.EsqlTestUtils.entityToMap;
40+
import static org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase.requestObjectBuilder;
41+
import static org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase.runEsql;
42+
import static org.elasticsearch.xpack.esql.qa.single_node.RestEsqlIT.commonProfile;
43+
import static org.elasticsearch.xpack.esql.qa.single_node.RestEsqlIT.fixTypesOnProfile;
44+
import static org.hamcrest.Matchers.any;
45+
import static org.hamcrest.Matchers.instanceOf;
46+
import static org.hamcrest.Matchers.startsWith;
47+
48+
/**
49+
* Tests for pushing expressions into field loading.
50+
*/
51+
@ThreadLeakFilters(filters = TestClustersThreadFilter.class)
52+
public class PushExpressionToLoadIT extends ESRestTestCase {
53+
@ClassRule
54+
public static ElasticsearchCluster cluster = Clusters.testCluster(spec -> spec.plugin("inference-service-test"));
55+
56+
@Rule(order = Integer.MIN_VALUE)
57+
public ProfileLogger profileLogger = new ProfileLogger();
58+
59+
public void testLength() throws IOException {
60+
String value = "v".repeat(between(0, 256));
61+
test(
62+
justType("keyword"),
63+
b -> b.value(value),
64+
"LENGTH(test)",
65+
matchesList().item(value.length()),
66+
"Utf8CodePointsFromOrds.SingletonOrdinals"
67+
);
68+
}
69+
70+
public void testVCosine() throws IOException {
71+
test(
72+
justType("dense_vector"),
73+
b -> b.startArray().value(128).value(128).value(0).endArray(),
74+
"V_COSINE(test, [0, 255, 255])",
75+
matchesList().item(0.5),
76+
"BlockDocValuesReader.FloatDenseVectorNormalizedValuesBlockReader"
77+
);
78+
}
79+
80+
private void test(
81+
CheckedConsumer<XContentBuilder, IOException> mapping,
82+
CheckedConsumer<XContentBuilder, IOException> value,
83+
String functionInvocation,
84+
Matcher<?> expectedValue,
85+
String expectedLoader
86+
) throws IOException {
87+
indexValue(mapping, value);
88+
RestEsqlTestCase.RequestObjectBuilder builder = requestObjectBuilder().query("""
89+
FROM test
90+
| EVAL test =""" + " " + functionInvocation + """
91+
| STATS test = VALUES(test)""");
92+
/*
93+
* TODO if you just do KEEP test then the load is in the data node reduce driver and not merged:
94+
* \_ProjectExec[[test{f}#7]]
95+
* \_FieldExtractExec[test{f}#7]<[],[]>
96+
* \_EsQueryExec[test], indexMode[standard]]
97+
* \_ExchangeSourceExec[[test{f}#7],false]}, {cluster_name=test-cluster, node_name=test-cluster-0, descrip
98+
* \_ProjectExec[[test{r}#3]]
99+
* \_EvalExec[[LENGTH(test{f}#7) AS test#3]]
100+
* \_LimitExec[1000[INTEGER],50]
101+
* \_ExchangeSourceExec[[test{f}#7],false]}], query={to
102+
*/
103+
builder.profile(true);
104+
Map<String, Object> result = runEsql(builder, new AssertWarnings.NoWarnings(), profileLogger, RestEsqlTestCase.Mode.SYNC);
105+
assertResultMap(
106+
result,
107+
getResultMatcher(result).entry(
108+
"profile",
109+
matchesMap() //
110+
.entry("drivers", instanceOf(List.class))
111+
.entry("plans", instanceOf(List.class))
112+
.entry("planning", matchesMap().extraOk())
113+
.entry("query", matchesMap().extraOk())
114+
),
115+
matchesList().item(matchesMap().entry("name", "test").entry("type", any(String.class))),
116+
matchesList().item(expectedValue)
117+
);
118+
@SuppressWarnings("unchecked")
119+
List<Map<String, Object>> profiles = (List<Map<String, Object>>) ((Map<String, Object>) result.get("profile")).get("drivers");
120+
for (Map<String, Object> p : profiles) {
121+
fixTypesOnProfile(p);
122+
assertThat(p, commonProfile());
123+
List<String> sig = new ArrayList<>();
124+
@SuppressWarnings("unchecked")
125+
List<Map<String, Object>> operators = (List<Map<String, Object>>) p.get("operators");
126+
for (Map<String, Object> o : operators) {
127+
sig.add(checkOperatorProfile(o, expectedLoader));
128+
}
129+
String description = p.get("description").toString();
130+
switch (description) {
131+
case "data" -> assertMap(
132+
sig,
133+
matchesList().item("LuceneSourceOperator")
134+
.item("ValuesSourceReaderOperator") // the real work is here, checkOperatorProfile checks the status
135+
.item("EvalOperator") // this one just renames the field
136+
.item("AggregationOperator")
137+
.item("ExchangeSinkOperator")
138+
);
139+
case "node_reduce" -> logger.info("node_reduce {}", sig);
140+
case "final" -> logger.info("final {}", sig);
141+
default -> throw new IllegalArgumentException("can't match " + description);
142+
}
143+
}
144+
}
145+
146+
private void indexValue(CheckedConsumer<XContentBuilder, IOException> mapping, CheckedConsumer<XContentBuilder, IOException> value)
147+
throws IOException {
148+
try {
149+
// Delete the index if it has already been created.
150+
client().performRequest(new Request("DELETE", "test"));
151+
} catch (ResponseException e) {
152+
if (e.getResponse().getStatusLine().getStatusCode() != 404) {
153+
throw e;
154+
}
155+
}
156+
157+
Request createIndex = new Request("PUT", "test");
158+
try (XContentBuilder config = JsonXContent.contentBuilder()) {
159+
config.startObject();
160+
config.startObject("settings");
161+
{
162+
config.startObject("index");
163+
config.field("number_of_shards", 1);
164+
config.endObject();
165+
}
166+
config.endObject();
167+
config.startObject("mappings");
168+
{
169+
config.startObject("properties").startObject("test");
170+
mapping.accept(config);
171+
config.endObject().endObject();
172+
}
173+
config.endObject();
174+
175+
createIndex.setJsonEntity(Strings.toString(config.endObject()));
176+
}
177+
Response createResponse = client().performRequest(createIndex);
178+
assertThat(
179+
entityToMap(createResponse.getEntity(), XContentType.JSON),
180+
matchesMap().entry("shards_acknowledged", true).entry("index", "test").entry("acknowledged", true)
181+
);
182+
183+
Request bulk = new Request("POST", "/_bulk");
184+
bulk.addParameter("refresh", "");
185+
try (XContentBuilder doc = JsonXContent.contentBuilder()) {
186+
doc.startObject();
187+
doc.field("test");
188+
value.accept(doc);
189+
doc.endObject();
190+
bulk.setJsonEntity("""
191+
{"create":{"_index":"test"}}
192+
""" + Strings.toString(doc) + "\n");
193+
}
194+
Response bulkResponse = client().performRequest(bulk);
195+
assertThat(entityToMap(bulkResponse.getEntity(), XContentType.JSON), matchesMap().entry("errors", false).extraOk());
196+
}
197+
198+
private CheckedConsumer<XContentBuilder, IOException> justType(String type) {
199+
return b -> b.field("type", type);
200+
}
201+
202+
private static String checkOperatorProfile(Map<String, Object> o, String expectedLoader) {
203+
String name = (String) o.get("operator");
204+
name = PushQueriesIT.TO_NAME.matcher(name).replaceAll("");
205+
if (name.equals("ValuesSourceReaderOperator")) {
206+
MapMatcher expectedOp = matchesMap().entry("operator", startsWith(name))
207+
.entry(
208+
"status",
209+
matchesMap().entry("readers_built", matchesMap().entry("test:column_at_a_time:" + expectedLoader, 1)).extraOk()
210+
);
211+
assertMap(o, expectedOp);
212+
}
213+
return name;
214+
}
215+
216+
@Override
217+
protected String getTestRestCluster() {
218+
return cluster.getHttpAddresses();
219+
}
220+
221+
@Override
222+
protected boolean preserveClusterUponCompletion() {
223+
// Preserve the cluser to speed up the semantic_text tests
224+
return true;
225+
}
226+
227+
private static boolean setupEmbeddings = false;
228+
229+
private void setUpTextEmbeddingInferenceEndpoint() throws IOException {
230+
setupEmbeddings = true;
231+
Request request = new Request("PUT", "_inference/text_embedding/test");
232+
request.setJsonEntity("""
233+
{
234+
"service": "text_embedding_test_service",
235+
"service_settings": {
236+
"model": "my_model",
237+
"api_key": "abc64",
238+
"dimensions": 128
239+
},
240+
"task_settings": {
241+
}
242+
}
243+
""");
244+
adminClient().performRequest(request);
245+
}
246+
}

x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/PushQueriesIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -515,7 +515,7 @@ private String semanticTextWithKeyword() {
515515
}""";
516516
}
517517

518-
private static final Pattern TO_NAME = Pattern.compile("\\[.+", Pattern.DOTALL);
518+
static final Pattern TO_NAME = Pattern.compile("\\[.+", Pattern.DOTALL);
519519

520520
private static String checkOperatorProfile(Map<String, Object> o, Matcher<String> query) {
521521
String name = (String) o.get("operator");

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/blockloader/BlockLoaderExpression.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@
2222
* <ul>
2323
* <li>
2424
* {@code V_COSINE(vector, [constant_vector])} - vector is ~512 floats
25-
* and V_COSINE is one double.
25+
* and V_COSINE is one double. We can find the similarity without any
26+
* copies if we combine.
2627
* </li>
2728
* <li>
2829
* {@code ST_CENTROID(shape)} - shapes can be quite large. Centroids

0 commit comments

Comments
 (0)