Skip to content

Commit 22b7aa5

Browse files
committed
WIP: LOOKUP JOIN tests
1 parent 27d56f8 commit 22b7aa5

File tree

2 files changed

+74
-27
lines changed

2 files changed

+74
-27
lines changed

x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/AllSupportedFieldsIT.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
/**
3232
* Fetch all field types via cross cluster search, possible on a different version.
3333
*/
34+
// TODO: ROW + remote ENRICH
35+
// TODO: remote lookups, requires `FROM remote:*`
3436
@ThreadLeakFilters(filters = TestClustersThreadFilter.class)
3537
public class AllSupportedFieldsIT extends AllSupportedFieldsTestCase {
3638
static ElasticsearchCluster remoteCluster = Clusters.remoteCluster();
@@ -50,10 +52,12 @@ public AllSupportedFieldsIT(MappedFieldType.FieldExtractPreference extractPrefer
5052
public void createRemoteIndices() throws IOException {
5153
if (supportsNodeAssignment()) {
5254
for (Map.Entry<String, NodeInfo> e : remoteNodeToInfo().entrySet()) {
53-
createIndexForNode(remoteClient(), e.getKey(), e.getValue().id());
55+
createIndexForNode(remoteClient(), e.getKey(), e.getValue().id(), indexMode());
56+
createIndexForNode(remoteClient(), e.getKey(), e.getValue().id(), IndexMode.LOOKUP);
5457
}
5558
} else {
56-
createIndexForNode(remoteClient(), null, null);
59+
createIndexForNode(remoteClient(), null, null, indexMode());
60+
createIndexForNode(remoteClient(), null, null, IndexMode.LOOKUP);
5761
}
5862
}
5963

x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/AllSupportedFieldsTestCase.java

Lines changed: 68 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@
2121
import org.elasticsearch.core.Tuple;
2222
import org.elasticsearch.index.IndexMode;
2323
import org.elasticsearch.index.mapper.MappedFieldType;
24-
import org.elasticsearch.logging.LogManager;
25-
import org.elasticsearch.logging.Logger;
2624
import org.elasticsearch.test.MapMatcher;
2725
import org.elasticsearch.test.rest.ESRestTestCase;
2826
import org.elasticsearch.xcontent.XContentBuilder;
@@ -51,7 +49,6 @@
5149
import static org.elasticsearch.xpack.esql.action.EsqlResolveFieldsResponse.RESOLVE_FIELDS_RESPONSE_USED_TV;
5250
import static org.elasticsearch.xpack.esql.core.type.DataType.DataTypesTransportVersions.ESQL_AGGREGATE_METRIC_DOUBLE_CREATED_VERSION;
5351
import static org.elasticsearch.xpack.esql.core.type.DataType.DataTypesTransportVersions.ESQL_DENSE_VECTOR_CREATED_VERSION;
54-
import static org.elasticsearch.xpack.esql.core.type.DataType.DataTypesTransportVersions.INDEX_SOURCE;
5552
import static org.elasticsearch.xpack.esql.enrich.EnrichPolicyResolver.ESQL_USE_MINIMUM_VERSION_FOR_ENRICH_RESOLUTION;
5653
import static org.hamcrest.Matchers.any;
5754
import static org.hamcrest.Matchers.anyOf;
@@ -104,6 +101,10 @@ protected AllSupportedFieldsTestCase(MappedFieldType.FieldExtractPreference extr
104101
this.indexMode = indexMode;
105102
}
106103

104+
protected IndexMode indexMode() {
105+
return indexMode;
106+
}
107+
107108
protected record NodeInfo(
108109
String cluster,
109110
String id,
@@ -211,10 +212,13 @@ protected static Map<String, NodeInfo> fetchNodeToInfo(RestClient client, String
211212
public void createIndices() throws IOException {
212213
if (supportsNodeAssignment()) {
213214
for (Map.Entry<String, NodeInfo> e : nodeToInfo().entrySet()) {
214-
createIndexForNode(client(), e.getKey(), e.getValue().id());
215+
createIndexForNode(client(), e.getKey(), e.getValue().id(), indexMode);
216+
// Always create the lookup index so we can test LOOKUP JOIN queries with it
217+
createIndexForNode(client(), e.getKey(), e.getValue().id(), IndexMode.LOOKUP);
215218
}
216219
} else {
217-
createIndexForNode(client(), null, null);
220+
createIndexForNode(client(), null, null, indexMode);
221+
createIndexForNode(client(), null, null, IndexMode.LOOKUP);
218222
}
219223
}
220224

@@ -261,13 +265,15 @@ public final void testFetchAll() throws IOException {
261265
.entry("_index_mode", "keyword")
262266
.entry("_score", "double")
263267
.entry("_source", "_source")
264-
.entry("_version", "long");
268+
.entry("_version", "long")
269+
.entry(LOOKUP_ID_FIELD, "integer");
265270
assertMap(nameToType(columns), expectedColumns);
266271

267272
MapMatcher expectedAllValues = matchesMap();
268-
for (Map.Entry<String, NodeInfo> e : expectedIndices().entrySet()) {
273+
for (Map.Entry<String, NodeInfo> e : expectedIndices(indexMode).entrySet()) {
269274
String indexName = e.getKey();
270275
MapMatcher expectedValues = matchesMap();
276+
expectedValues = expectedValues.entry(LOOKUP_ID_FIELD, equalTo(123));
271277
for (DataType type : DataType.values()) {
272278
if (supportedInIndex(type) == false) {
273279
continue;
@@ -342,7 +348,7 @@ public final void testFetchDenseVector() throws IOException {
342348
assertMap(nameToType(columns), expectedColumns);
343349

344350
MapMatcher expectedAllValues = matchesMap();
345-
for (Map.Entry<String, NodeInfo> e : expectedIndices().entrySet()) {
351+
for (Map.Entry<String, NodeInfo> e : expectedIndices(indexMode).entrySet()) {
346352
String indexName = e.getKey();
347353
NodeInfo nodeInfo = e.getValue();
348354
MapMatcher expectedValues = matchesMap();
@@ -409,7 +415,7 @@ public final void testFetchAggregateMetricDouble() throws IOException {
409415
assertMap(nameToType(columns), expectedColumns);
410416

411417
MapMatcher expectedAllValues = matchesMap();
412-
for (Map.Entry<String, NodeInfo> e : expectedIndices().entrySet()) {
418+
for (Map.Entry<String, NodeInfo> e : expectedIndices(indexMode).entrySet()) {
413419
String indexName = e.getKey();
414420
MapMatcher expectedValues = matchesMap();
415421
expectedValues = expectedValues.entry(
@@ -450,6 +456,28 @@ public void testRow() throws IOException {
450456
}
451457
}
452458

459+
// TODO: ROW + local ENRICH
460+
public void testRowLookupJoin() throws IOException {
461+
assumeTrue(
462+
"Test has to run only once, skip on other configurations",
463+
extractPreference == MappedFieldType.FieldExtractPreference.NONE && indexMode == IndexMode.STANDARD
464+
);
465+
// TODO: For ccq, we most likely need to strip the remote lookup indices. And also use the local minimum transport version, only.
466+
Map<String, NodeInfo> expectedIndices = expectedIndices(IndexMode.LOOKUP);
467+
for (Map.Entry<String, NodeInfo> e : expectedIndices.entrySet()) {
468+
String indexName = e.getKey();
469+
String query = "ROW " + LOOKUP_ID_FIELD + " = 1234 | LOOKUP JOIN " + indexName + " ON " + LOOKUP_ID_FIELD + " | LIMIT 1";
470+
var responseAndCoordinatorVersion = runQuery(query);
471+
var responseMap = responseAndCoordinatorVersion.v1();
472+
var coordinatorVersion = responseAndCoordinatorVersion.v2();
473+
474+
// HERE next, this actually needs fixing in production code.
475+
assertMinimumVersionFromAllQueries(responseAndCoordinatorVersion);
476+
477+
// TODO: same assert as in testfetchall, I think.
478+
}
479+
}
480+
453481
/**
454482
* Run the query and return the response and the version of the coordinator.
455483
* <p>
@@ -508,24 +536,31 @@ protected void assertMinimumVersionFromAllQueries(Tuple<Map<String, Object>, Tra
508536
}
509537
}
510538

511-
protected void createIndexForNode(RestClient client, String nodeName, String nodeId) throws IOException {
512-
String indexName = indexMode.toString();
513-
if (nodeName != null) {
514-
indexName += "_" + nodeName.toLowerCase(Locale.ROOT);
515-
}
539+
protected static void createIndexForNode(RestClient client, String nodeName, String nodeId, IndexMode mode) throws IOException {
540+
String indexName = indexName(mode, nodeName);
516541
if (false == indexExists(client, indexName)) {
517-
createAllTypesIndex(client, indexName, nodeId);
542+
createAllTypesIndex(client, indexName, nodeId, mode);
518543
createAllTypesDoc(client, indexName);
519544
}
520545
}
521546

522-
private void createAllTypesIndex(RestClient client, String indexName, String nodeId) throws IOException {
547+
protected static String indexName(IndexMode mode, String nodeName) {
548+
String indexName = mode.toString();
549+
if (nodeName != null) {
550+
indexName += "_" + nodeName.toLowerCase(Locale.ROOT);
551+
}
552+
return indexName;
553+
}
554+
555+
private static final String LOOKUP_ID_FIELD = "lookup_id";
556+
557+
private static void createAllTypesIndex(RestClient client, String indexName, String nodeId, IndexMode mode) throws IOException {
523558
XContentBuilder config = JsonXContent.contentBuilder().startObject();
524559
{
525560
config.startObject("settings");
526561
config.startObject("index");
527-
config.field("mode", indexMode);
528-
if (indexMode == IndexMode.TIME_SERIES) {
562+
config.field("mode", mode);
563+
if (mode == IndexMode.TIME_SERIES) {
529564
config.field("routing_path", "f_keyword");
530565
}
531566
if (nodeId != null) {
@@ -536,26 +571,32 @@ private void createAllTypesIndex(RestClient client, String indexName, String nod
536571
}
537572
{
538573
config.startObject("mappings").startObject("properties");
574+
575+
config.startObject(LOOKUP_ID_FIELD);
576+
config.field("type", "integer");
577+
config.endObject();
578+
539579
for (DataType type : DataType.values()) {
540580
if (supportedInIndex(type) == false) {
541581
continue;
542582
}
543583
config.startObject(fieldName(type));
544-
typeMapping(indexMode, config, type);
584+
typeMapping(mode, config, type);
545585
config.endObject();
546586
}
587+
547588
config.endObject().endObject().endObject();
548589
}
549590
Request request = new Request("PUT", indexName);
550591
request.setJsonEntity(Strings.toString(config));
551592
client.performRequest(request);
552593
}
553594

554-
private String fieldName(DataType type) {
595+
private static String fieldName(DataType type) {
555596
return type == DataType.DATETIME ? "@timestamp" : "f_" + type.esType();
556597
}
557598

558-
private void typeMapping(IndexMode indexMode, XContentBuilder config, DataType type) throws IOException {
599+
private static void typeMapping(IndexMode indexMode, XContentBuilder config, DataType type) throws IOException {
559600
switch (type) {
560601
case COUNTER_DOUBLE, COUNTER_INTEGER, COUNTER_LONG -> config.field("type", type.esType().replace("counter_", ""))
561602
.field("time_series_metric", "counter");
@@ -574,8 +615,10 @@ private void typeMapping(IndexMode indexMode, XContentBuilder config, DataType t
574615
}
575616
}
576617

577-
private void createAllTypesDoc(RestClient client, String indexName) throws IOException {
618+
private static void createAllTypesDoc(RestClient client, String indexName) throws IOException {
578619
XContentBuilder doc = JsonXContent.contentBuilder().startObject();
620+
doc.field(LOOKUP_ID_FIELD);
621+
doc.value(123);
579622
for (DataType type : DataType.values()) {
580623
if (supportedInIndex(type) == false) {
581624
continue;
@@ -760,19 +803,19 @@ private boolean syntheticSourceByDefault() {
760803
};
761804
}
762805

763-
private Map<String, NodeInfo> expectedIndices() throws IOException {
806+
private Map<String, NodeInfo> expectedIndices(IndexMode indexMode) throws IOException {
764807
Map<String, NodeInfo> result = new TreeMap<>();
765808
if (supportsNodeAssignment()) {
766809
for (Map.Entry<String, NodeInfo> e : allNodeToInfo().entrySet()) {
767-
String name = indexMode + "_" + e.getKey();
810+
String name = indexName(indexMode, e.getKey());
768811
if (e.getValue().cluster != null) {
769812
name = e.getValue().cluster + ":" + name;
770813
}
771814
result.put(name, e.getValue());
772815
}
773816
} else {
774817
for (Map.Entry<String, NodeInfo> e : allNodeToInfo().entrySet()) {
775-
String name = indexMode.toString();
818+
String name = indexName(indexMode, null);
776819
if (e.getValue().cluster != null) {
777820
name = e.getValue().cluster + ":" + name;
778821
}

0 commit comments

Comments
 (0)