Skip to content

Commit 42e925b

Browse files
craigtavernerfzowl
authored andcommitted
Multi-relation support, a pre-requisite for views and sub-queries (elastic#136780)
This is extracted from the views prototype at elastic#134995. It provides the underlying support for multiple `FROM ...` commands within the same ES|QL query, something that both non-materialized views and subqueries require. A key feature of this work is that all `UnresolvedRelation` instances are treated equivalently, which means they all support CPS and CPS just as before. This implies that sub-queries using this will support CCS/CPS directly, and for views it means: * The underlying indexes within the view definitions will support CCS/CPS as if they were in the original FROM command * Views are strictly resolved in the coordinator of the local cluster, so that the indexes defined within the views can then be resolved using CCS/CPS rules as normal.
1 parent aae0ad0 commit 42e925b

35 files changed

+758
-474
lines changed

benchmarks/src/main/java/org/elasticsearch/benchmark/_nightly/esql/QueryPlanningBenchmark.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,13 @@
1414
import org.elasticsearch.common.settings.Settings;
1515
import org.elasticsearch.index.IndexMode;
1616
import org.elasticsearch.license.XPackLicenseState;
17-
import org.elasticsearch.xpack.esql.analysis.*;
17+
import org.elasticsearch.xpack.esql.analysis.Analyzer;
18+
import org.elasticsearch.xpack.esql.analysis.AnalyzerContext;
19+
import org.elasticsearch.xpack.esql.analysis.AnalyzerSettings;
20+
import org.elasticsearch.xpack.esql.analysis.EnrichResolution;
21+
import org.elasticsearch.xpack.esql.analysis.Verifier;
1822
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
23+
import org.elasticsearch.xpack.esql.core.tree.Source;
1924
import org.elasticsearch.xpack.esql.core.type.EsField;
2025
import org.elasticsearch.xpack.esql.core.util.DateUtils;
2126
import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry;
@@ -26,6 +31,7 @@
2631
import org.elasticsearch.xpack.esql.optimizer.LogicalPlanOptimizer;
2732
import org.elasticsearch.xpack.esql.parser.EsqlParser;
2833
import org.elasticsearch.xpack.esql.parser.QueryParams;
34+
import org.elasticsearch.xpack.esql.plan.IndexPattern;
2935
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
3036
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
3137
import org.elasticsearch.xpack.esql.session.Configuration;
@@ -108,7 +114,7 @@ public void setup() {
108114
new AnalyzerContext(
109115
config,
110116
functionRegistry,
111-
IndexResolution.valid(esIndex),
117+
Map.of(new IndexPattern(Source.EMPTY, esIndex.name()), IndexResolution.valid(esIndex)),
112118
Map.of(),
113119
new EnrichResolution(),
114120
InferenceResolution.EMPTY,

x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/EsqlSpecTestCase.java

Lines changed: 49 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import java.util.Locale;
5252
import java.util.Map;
5353
import java.util.TreeMap;
54+
import java.util.concurrent.Callable;
5455
import java.util.stream.Collectors;
5556
import java.util.stream.IntStream;
5657
import java.util.stream.LongStream;
@@ -122,23 +123,59 @@ protected EsqlSpecTestCase(
122123
this.mode = randomFrom(Mode.values());
123124
}
124125

125-
private static boolean dataLoaded = false;
126+
private static class Protected {
127+
private volatile boolean completed = false;
128+
private volatile boolean started = false;
129+
private volatile Throwable failure = null;
130+
131+
private void protectedBlock(Callable<Void> callable) {
132+
if (completed) {
133+
return;
134+
}
135+
// In case tests get run in parallel, we ensure only one setup is run, and other tests wait for this
136+
synchronized (this) {
137+
if (completed) {
138+
return;
139+
}
140+
if (started) {
141+
// Should only happen if a previous test setup failed, possibly with partial setup, let's fail fast the current test
142+
if (failure != null) {
143+
fail(failure, "Previous test setup failed: " + failure.getMessage());
144+
}
145+
fail("Previous test setup failed with unknown error");
146+
}
147+
started = true;
148+
try {
149+
callable.call();
150+
completed = true;
151+
} catch (Throwable t) {
152+
failure = t;
153+
fail(failure, "Current test setup failed: " + failure.getMessage());
154+
}
155+
}
156+
}
157+
158+
private synchronized void reset() {
159+
completed = false;
160+
started = false;
161+
failure = null;
162+
}
163+
}
164+
165+
private static final Protected INGEST = new Protected();
126166
protected static boolean testClustersOk = true;
127167

128168
@Before
129-
public void setup() throws IOException {
169+
public void setup() {
130170
assumeTrue("test clusters were broken", testClustersOk);
131-
boolean supportsLookup = supportsIndexModeLookup();
132-
boolean supportsSourceMapping = supportsSourceFieldMapping();
133-
boolean supportsInferenceTestService = supportsInferenceTestService();
134-
if (dataLoaded == false) {
135-
if (supportsInferenceTestService) {
171+
INGEST.protectedBlock(() -> {
172+
// Inference endpoints must be created before ingesting any datasets that rely on them (mapping of inference_id)
173+
if (supportsInferenceTestService()) {
136174
createInferenceEndpoints(adminClient());
137175
}
138-
139-
loadDataSetIntoEs(client(), supportsLookup, supportsSourceMapping, supportsInferenceTestService);
140-
dataLoaded = true;
141-
}
176+
loadDataSetIntoEs(client(), supportsIndexModeLookup(), supportsSourceFieldMapping(), supportsInferenceTestService());
177+
return null;
178+
});
142179
}
143180

144181
@AfterClass
@@ -147,15 +184,14 @@ public static void wipeTestData() throws IOException {
147184
return;
148185
}
149186
try {
150-
dataLoaded = false;
151187
adminClient().performRequest(new Request("DELETE", "/*"));
152188
} catch (ResponseException e) {
153189
// 404 here just means we had no indexes
154190
if (e.getResponse().getStatusLine().getStatusCode() != 404) {
155191
throw e;
156192
}
157193
}
158-
194+
INGEST.reset();
159195
deleteInferenceEndpoints(adminClient());
160196
}
161197

x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,9 @@ public class CsvTestsDataLoader {
7878
private static final TestDataset LANGUAGES = new TestDataset("languages");
7979
private static final TestDataset LANGUAGES_LOOKUP = LANGUAGES.withIndex("languages_lookup").withSetting("lookup-settings.json");
8080
private static final TestDataset LANGUAGES_NON_UNIQUE_KEY = new TestDataset("languages_non_unique_key");
81-
private static final TestDataset LANGUAGES_LOOKUP_NON_UNIQUE_KEY = LANGUAGES_NON_UNIQUE_KEY.withIndex("languages_lookup_non_unique_key")
82-
.withSetting("lookup-settings.json");
81+
private static final TestDataset LANGUAGES_LOOKUP_NON_UNIQUE_KEY = LANGUAGES_LOOKUP.withIndex("languages_lookup_non_unique_key")
82+
.withData("languages_non_unique_key.csv")
83+
.withDynamicTypeMapping(Map.of("country", "text"));
8384
private static final TestDataset LANGUAGES_NESTED_FIELDS = new TestDataset(
8485
"languages_nested_fields",
8586
"mapping-languages_nested_fields.json",
@@ -423,11 +424,13 @@ private static void loadDataSetIntoEs(
423424
Logger logger = LogManager.getLogger(CsvTestsDataLoader.class);
424425

425426
Set<String> loadedDatasets = new HashSet<>();
427+
logger.info("Loading test datasets");
426428
for (var dataset : availableDatasetsForEs(supportsIndexModeLookup, supportsSourceFieldMapping, inferenceEnabled, timeSeriesOnly)) {
427429
load(client, dataset, logger, indexCreator);
428430
loadedDatasets.add(dataset.indexName);
429431
}
430432
forceMerge(client, loadedDatasets, logger);
433+
logger.info("Loading enrich policies");
431434
for (var policy : ENRICH_POLICIES) {
432435
loadEnrichPolicy(client, policy.policyName, policy.policyFileName, logger);
433436
}
@@ -569,6 +572,7 @@ private static void deleteInferenceEndpoint(RestClient client, String inferenceI
569572
}
570573

571574
private static void loadEnrichPolicy(RestClient client, String policyName, String policyFileName, Logger logger) throws IOException {
575+
logger.info("Loading enrich policy [{}] from file [{}]", policyName, policyFileName);
572576
URL policyMapping = getResource("/" + policyFileName);
573577
String entity = readTextFile(policyMapping);
574578
Request request = new Request("PUT", "/_enrich/policy/" + policyName);
@@ -588,6 +592,7 @@ private static URL getResource(String name) {
588592
}
589593

590594
private static void load(RestClient client, TestDataset dataset, Logger logger, IndexCreator indexCreator) throws IOException {
595+
logger.info("Loading dataset [{}] into ES index [{}]", dataset.dataFileName, dataset.indexName);
591596
URL mapping = getResource("/" + dataset.mappingFileName);
592597
Settings indexSettings = dataset.readSettingsFile();
593598
indexCreator.createIndex(client, dataset.indexName, readMappingFile(mapping, dataset.typeMapping), indexSettings);
@@ -854,15 +859,16 @@ public record TestDataset(
854859
String dataFileName,
855860
String settingFileName,
856861
boolean allowSubFields,
857-
@Nullable Map<String, String> typeMapping,
862+
@Nullable Map<String, String> typeMapping, // Override mappings read from mappings file
863+
@Nullable Map<String, String> dynamicTypeMapping, // Define mappings not in the mapping files, but available from field-caps
858864
boolean requiresInferenceEndpoint
859865
) {
860866
public TestDataset(String indexName, String mappingFileName, String dataFileName) {
861-
this(indexName, mappingFileName, dataFileName, null, true, null, false);
867+
this(indexName, mappingFileName, dataFileName, null, true, null, null, false);
862868
}
863869

864870
public TestDataset(String indexName) {
865-
this(indexName, "mapping-" + indexName + ".json", indexName + ".csv", null, true, null, false);
871+
this(indexName, "mapping-" + indexName + ".json", indexName + ".csv", null, true, null, null, false);
866872
}
867873

868874
public TestDataset withIndex(String indexName) {
@@ -873,6 +879,7 @@ public TestDataset withIndex(String indexName) {
873879
settingFileName,
874880
allowSubFields,
875881
typeMapping,
882+
dynamicTypeMapping,
876883
requiresInferenceEndpoint
877884
);
878885
}
@@ -885,6 +892,7 @@ public TestDataset withData(String dataFileName) {
885892
settingFileName,
886893
allowSubFields,
887894
typeMapping,
895+
dynamicTypeMapping,
888896
requiresInferenceEndpoint
889897
);
890898
}
@@ -897,6 +905,7 @@ public TestDataset noData() {
897905
settingFileName,
898906
allowSubFields,
899907
typeMapping,
908+
dynamicTypeMapping,
900909
requiresInferenceEndpoint
901910
);
902911
}
@@ -909,6 +918,7 @@ public TestDataset withSetting(String settingFileName) {
909918
settingFileName,
910919
allowSubFields,
911920
typeMapping,
921+
dynamicTypeMapping,
912922
requiresInferenceEndpoint
913923
);
914924
}
@@ -921,6 +931,7 @@ public TestDataset noSubfields() {
921931
settingFileName,
922932
false,
923933
typeMapping,
934+
dynamicTypeMapping,
924935
requiresInferenceEndpoint
925936
);
926937
}
@@ -933,12 +944,35 @@ public TestDataset withTypeMapping(Map<String, String> typeMapping) {
933944
settingFileName,
934945
allowSubFields,
935946
typeMapping,
947+
dynamicTypeMapping,
948+
requiresInferenceEndpoint
949+
);
950+
}
951+
952+
public TestDataset withDynamicTypeMapping(Map<String, String> dynamicTypeMapping) {
953+
return new TestDataset(
954+
indexName,
955+
mappingFileName,
956+
dataFileName,
957+
settingFileName,
958+
allowSubFields,
959+
typeMapping,
960+
dynamicTypeMapping,
936961
requiresInferenceEndpoint
937962
);
938963
}
939964

940965
public TestDataset withInferenceEndpoint(boolean needsInference) {
941-
return new TestDataset(indexName, mappingFileName, dataFileName, settingFileName, allowSubFields, typeMapping, needsInference);
966+
return new TestDataset(
967+
indexName,
968+
mappingFileName,
969+
dataFileName,
970+
settingFileName,
971+
allowSubFields,
972+
typeMapping,
973+
dynamicTypeMapping,
974+
needsInference
975+
);
942976
}
943977

944978
private Settings readSettingsFile() throws IOException {

x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@
102102
import org.elasticsearch.xpack.esql.inference.InferenceService;
103103
import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext;
104104
import org.elasticsearch.xpack.esql.parser.QueryParam;
105+
import org.elasticsearch.xpack.esql.plan.IndexPattern;
105106
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
106107
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
107108
import org.elasticsearch.xpack.esql.plan.logical.Explain;
@@ -451,11 +452,11 @@ public static TransportVersion randomMinimumVersion() {
451452
public static AnalyzerContext testAnalyzerContext(
452453
Configuration configuration,
453454
EsqlFunctionRegistry functionRegistry,
454-
IndexResolution indexResolution,
455+
Map<IndexPattern, IndexResolution> indexResolutions,
455456
EnrichResolution enrichResolution,
456457
InferenceResolution inferenceResolution
457458
) {
458-
return testAnalyzerContext(configuration, functionRegistry, indexResolution, Map.of(), enrichResolution, inferenceResolution);
459+
return testAnalyzerContext(configuration, functionRegistry, indexResolutions, Map.of(), enrichResolution, inferenceResolution);
459460
}
460461

461462
/**
@@ -464,15 +465,15 @@ public static AnalyzerContext testAnalyzerContext(
464465
public static AnalyzerContext testAnalyzerContext(
465466
Configuration configuration,
466467
EsqlFunctionRegistry functionRegistry,
467-
IndexResolution indexResolution,
468+
Map<IndexPattern, IndexResolution> indexResolutions,
468469
Map<String, IndexResolution> lookupResolution,
469470
EnrichResolution enrichResolution,
470471
InferenceResolution inferenceResolution
471472
) {
472473
return new AnalyzerContext(
473474
configuration,
474475
functionRegistry,
475-
indexResolution,
476+
indexResolutions,
476477
lookupResolution,
477478
enrichResolution,
478479
inferenceResolution,

x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4150,14 +4150,15 @@ required_capability: inline_stats
41504150
required_capability: fix_join_output_merging
41514151

41524152
FROM languages_lookup_non_unique_key
4153+
| EVAL country = MV_SORT(country)
41534154
| KEEP country, language_name
41544155
| EVAL language_code = null::integer
41554156
| INLINE STATS MAX(language_code) BY language_code
41564157
| SORT country
41574158
| LIMIT 5
41584159
;
41594160

4160-
country:text |language_name:keyword |MAX(language_code):integer |language_code:integer
4161+
country:keyword |language_name:keyword |MAX(language_code):integer |language_code:integer
41614162
Atlantis |null |null |null
41624163
[Austria, Germany]|German |null |null
41634164
Canada |English |null |null

x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -247,11 +247,12 @@ FROM employees
247247
| EVAL language_code = emp_no % 10
248248
| LOOKUP JOIN languages_lookup_non_unique_key ON language_code
249249
| WHERE emp_no > 10090 AND emp_no < 10096
250+
| EVAL country = MV_SORT(country)
250251
| SORT emp_no, country
251252
| KEEP emp_no, language_code, language_name, country
252253
;
253254

254-
emp_no:integer | language_code:integer | language_name:keyword | country:text
255+
emp_no:integer | language_code:integer | language_name:keyword | country:keyword
255256
10091 | 1 | English | Canada
256257
10091 | 1 | null | United Kingdom
257258
10091 | 1 | English | United States of America
@@ -272,11 +273,12 @@ FROM employees
272273
| LIMIT 5
273274
| EVAL language_code = emp_no % 10
274275
| LOOKUP JOIN languages_lookup_non_unique_key ON language_code
276+
| EVAL country = MV_SORT(country)
275277
| KEEP emp_no, language_code, language_name, country
276278
;
277279

278280
ignoreOrder:true
279-
emp_no:integer | language_code:integer | language_name:keyword | country:text
281+
emp_no:integer | language_code:integer | language_name:keyword | country:keyword
280282
10001 | 1 | English | Canada
281283
10001 | 1 | English | null
282284
10001 | 1 | null | United Kingdom
@@ -324,7 +326,7 @@ ROW language_code = 2
324326

325327
ignoreOrder:true
326328
language_code:integer | country:text | language_name:keyword
327-
2 | [Austria, Germany] | German
329+
2 | [Germany, Austria] | German
328330
2 | Switzerland | German
329331
2 | null | German
330332
;

0 commit comments

Comments
 (0)