Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,13 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.xpack.esql.analysis.*;
import org.elasticsearch.xpack.esql.analysis.Analyzer;
import org.elasticsearch.xpack.esql.analysis.AnalyzerContext;
import org.elasticsearch.xpack.esql.analysis.AnalyzerSettings;
import org.elasticsearch.xpack.esql.analysis.EnrichResolution;
import org.elasticsearch.xpack.esql.analysis.Verifier;
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.core.type.EsField;
import org.elasticsearch.xpack.esql.core.util.DateUtils;
import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry;
Expand All @@ -26,6 +31,7 @@
import org.elasticsearch.xpack.esql.optimizer.LogicalPlanOptimizer;
import org.elasticsearch.xpack.esql.parser.EsqlParser;
import org.elasticsearch.xpack.esql.parser.QueryParams;
import org.elasticsearch.xpack.esql.plan.IndexPattern;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
import org.elasticsearch.xpack.esql.session.Configuration;
Expand Down Expand Up @@ -108,7 +114,7 @@ public void setup() {
new AnalyzerContext(
config,
functionRegistry,
IndexResolution.valid(esIndex),
Map.of(new IndexPattern(Source.EMPTY, esIndex.name()), IndexResolution.valid(esIndex)),
Map.of(),
new EnrichResolution(),
InferenceResolution.EMPTY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
Expand Down Expand Up @@ -122,23 +123,59 @@ protected EsqlSpecTestCase(
this.mode = randomFrom(Mode.values());
}

private static boolean dataLoaded = false;
private static class Protected {
private volatile boolean completed = false;
private volatile boolean started = false;
private volatile Throwable failure = null;

private void protectedBlock(Callable<Void> callable) {
if (completed) {
return;
}
// In case tests get run in parallel, we ensure only one setup is run, and other tests wait for this
synchronized (this) {
if (completed) {
return;
}
if (started) {
// Should only happen if a previous test setup failed, possibly with partial setup, let's fail fast the current test
if (failure != null) {
fail(failure, "Previous test setup failed: " + failure.getMessage());
}
fail("Previous test setup failed with unknown error");
}
started = true;
try {
callable.call();
completed = true;
} catch (Throwable t) {
failure = t;
fail(failure, "Current test setup failed: " + failure.getMessage());
}
}
}

private synchronized void reset() {
completed = false;
started = false;
failure = null;
}
}

private static final Protected INGEST = new Protected();
protected static boolean testClustersOk = true;

@Before
public void setup() throws IOException {
public void setup() {
assumeTrue("test clusters were broken", testClustersOk);
boolean supportsLookup = supportsIndexModeLookup();
boolean supportsSourceMapping = supportsSourceFieldMapping();
boolean supportsInferenceTestService = supportsInferenceTestService();
if (dataLoaded == false) {
if (supportsInferenceTestService) {
INGEST.protectedBlock(() -> {
// Inference endpoints must be created before ingesting any datasets that rely on them (mapping of inference_id)
if (supportsInferenceTestService()) {
createInferenceEndpoints(adminClient());
}

loadDataSetIntoEs(client(), supportsLookup, supportsSourceMapping, supportsInferenceTestService);
dataLoaded = true;
}
loadDataSetIntoEs(client(), supportsIndexModeLookup(), supportsSourceFieldMapping(), supportsInferenceTestService());
return null;
});
}

@AfterClass
Expand All @@ -147,15 +184,14 @@ public static void wipeTestData() throws IOException {
return;
}
try {
dataLoaded = false;
adminClient().performRequest(new Request("DELETE", "/*"));
} catch (ResponseException e) {
// 404 here just means we had no indexes
if (e.getResponse().getStatusLine().getStatusCode() != 404) {
throw e;
}
}

INGEST.reset();
deleteInferenceEndpoints(adminClient());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,9 @@ public class CsvTestsDataLoader {
private static final TestDataset LANGUAGES = new TestDataset("languages");
private static final TestDataset LANGUAGES_LOOKUP = LANGUAGES.withIndex("languages_lookup").withSetting("lookup-settings.json");
private static final TestDataset LANGUAGES_NON_UNIQUE_KEY = new TestDataset("languages_non_unique_key");
private static final TestDataset LANGUAGES_LOOKUP_NON_UNIQUE_KEY = LANGUAGES_NON_UNIQUE_KEY.withIndex("languages_lookup_non_unique_key")
.withSetting("lookup-settings.json");
private static final TestDataset LANGUAGES_LOOKUP_NON_UNIQUE_KEY = LANGUAGES_LOOKUP.withIndex("languages_lookup_non_unique_key")
.withData("languages_non_unique_key.csv")
.withDynamicTypeMapping(Map.of("country", "text"));
private static final TestDataset LANGUAGES_NESTED_FIELDS = new TestDataset(
"languages_nested_fields",
"mapping-languages_nested_fields.json",
Expand Down Expand Up @@ -423,11 +424,13 @@ private static void loadDataSetIntoEs(
Logger logger = LogManager.getLogger(CsvTestsDataLoader.class);

Set<String> loadedDatasets = new HashSet<>();
logger.info("Loading test datasets");
for (var dataset : availableDatasetsForEs(supportsIndexModeLookup, supportsSourceFieldMapping, inferenceEnabled, timeSeriesOnly)) {
load(client, dataset, logger, indexCreator);
loadedDatasets.add(dataset.indexName);
}
forceMerge(client, loadedDatasets, logger);
logger.info("Loading enrich policies");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: I think those 2 logs are redundant since we have per data set/enrich log entry.
Also I hope they are not loaded per test case as it would flood CI logs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are logged only on the first test that needs data, then the EsqlSpecTestCase records that data is loaded and does not load it again. I found these very useful in debugging some odd cases where assertions killed cluster members. As mentioned in my responses to Stas before, these changes to the csv tests could actually be moved into a separate PR, but since I will already have 3 or 4 PRs for the Views MVP, I was hoping not to split the PRs up even further.

for (var policy : ENRICH_POLICIES) {
loadEnrichPolicy(client, policy.policyName, policy.policyFileName, logger);
}
Expand Down Expand Up @@ -569,6 +572,7 @@ private static void deleteInferenceEndpoint(RestClient client, String inferenceI
}

private static void loadEnrichPolicy(RestClient client, String policyName, String policyFileName, Logger logger) throws IOException {
logger.info("Loading enrich policy [{}] from file [{}]", policyName, policyFileName);
URL policyMapping = getResource("/" + policyFileName);
String entity = readTextFile(policyMapping);
Request request = new Request("PUT", "/_enrich/policy/" + policyName);
Expand All @@ -588,6 +592,7 @@ private static URL getResource(String name) {
}

private static void load(RestClient client, TestDataset dataset, Logger logger, IndexCreator indexCreator) throws IOException {
logger.info("Loading dataset [{}] into ES index [{}]", dataset.dataFileName, dataset.indexName);
URL mapping = getResource("/" + dataset.mappingFileName);
Settings indexSettings = dataset.readSettingsFile();
indexCreator.createIndex(client, dataset.indexName, readMappingFile(mapping, dataset.typeMapping), indexSettings);
Expand Down Expand Up @@ -854,15 +859,16 @@ public record TestDataset(
String dataFileName,
String settingFileName,
boolean allowSubFields,
@Nullable Map<String, String> typeMapping,
@Nullable Map<String, String> typeMapping, // Override mappings read from mappings file
@Nullable Map<String, String> dynamicTypeMapping, // Define mappings not in the mapping files, but available from field-caps
boolean requiresInferenceEndpoint
) {
public TestDataset(String indexName, String mappingFileName, String dataFileName) {
this(indexName, mappingFileName, dataFileName, null, true, null, false);
this(indexName, mappingFileName, dataFileName, null, true, null, null, false);
}

public TestDataset(String indexName) {
this(indexName, "mapping-" + indexName + ".json", indexName + ".csv", null, true, null, false);
this(indexName, "mapping-" + indexName + ".json", indexName + ".csv", null, true, null, null, false);
}

public TestDataset withIndex(String indexName) {
Expand All @@ -873,6 +879,7 @@ public TestDataset withIndex(String indexName) {
settingFileName,
allowSubFields,
typeMapping,
dynamicTypeMapping,
requiresInferenceEndpoint
);
}
Expand All @@ -885,6 +892,7 @@ public TestDataset withData(String dataFileName) {
settingFileName,
allowSubFields,
typeMapping,
dynamicTypeMapping,
requiresInferenceEndpoint
);
}
Expand All @@ -897,6 +905,7 @@ public TestDataset noData() {
settingFileName,
allowSubFields,
typeMapping,
dynamicTypeMapping,
requiresInferenceEndpoint
);
}
Expand All @@ -909,6 +918,7 @@ public TestDataset withSetting(String settingFileName) {
settingFileName,
allowSubFields,
typeMapping,
dynamicTypeMapping,
requiresInferenceEndpoint
);
}
Expand All @@ -921,6 +931,7 @@ public TestDataset noSubfields() {
settingFileName,
false,
typeMapping,
dynamicTypeMapping,
requiresInferenceEndpoint
);
}
Expand All @@ -933,12 +944,35 @@ public TestDataset withTypeMapping(Map<String, String> typeMapping) {
settingFileName,
allowSubFields,
typeMapping,
dynamicTypeMapping,
requiresInferenceEndpoint
);
}

public TestDataset withDynamicTypeMapping(Map<String, String> dynamicTypeMapping) {
return new TestDataset(
indexName,
mappingFileName,
dataFileName,
settingFileName,
allowSubFields,
typeMapping,
dynamicTypeMapping,
requiresInferenceEndpoint
);
}

public TestDataset withInferenceEndpoint(boolean needsInference) {
return new TestDataset(indexName, mappingFileName, dataFileName, settingFileName, allowSubFields, typeMapping, needsInference);
return new TestDataset(
indexName,
mappingFileName,
dataFileName,
settingFileName,
allowSubFields,
typeMapping,
dynamicTypeMapping,
needsInference
);
}

private Settings readSettingsFile() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
import org.elasticsearch.xpack.esql.inference.InferenceService;
import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext;
import org.elasticsearch.xpack.esql.parser.QueryParam;
import org.elasticsearch.xpack.esql.plan.IndexPattern;
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
import org.elasticsearch.xpack.esql.plan.logical.Limit;
Expand Down Expand Up @@ -441,11 +442,11 @@ public static TransportVersion randomMinimumVersion() {
public static AnalyzerContext testAnalyzerContext(
Configuration configuration,
EsqlFunctionRegistry functionRegistry,
IndexResolution indexResolution,
Map<IndexPattern, IndexResolution> indexResolutions,
EnrichResolution enrichResolution,
InferenceResolution inferenceResolution
) {
return testAnalyzerContext(configuration, functionRegistry, indexResolution, Map.of(), enrichResolution, inferenceResolution);
return testAnalyzerContext(configuration, functionRegistry, indexResolutions, Map.of(), enrichResolution, inferenceResolution);
}

/**
Expand All @@ -454,15 +455,15 @@ public static AnalyzerContext testAnalyzerContext(
public static AnalyzerContext testAnalyzerContext(
Configuration configuration,
EsqlFunctionRegistry functionRegistry,
IndexResolution indexResolution,
Map<IndexPattern, IndexResolution> indexResolutions,
Map<String, IndexResolution> lookupResolution,
EnrichResolution enrichResolution,
InferenceResolution inferenceResolution
) {
return new AnalyzerContext(
configuration,
functionRegistry,
indexResolution,
indexResolutions,
lookupResolution,
enrichResolution,
inferenceResolution,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3976,14 +3976,15 @@ required_capability: inline_stats
required_capability: fix_join_output_merging

FROM languages_lookup_non_unique_key
| EVAL country = MV_SORT(country)
| KEEP country, language_name
| EVAL language_code = null::integer
| INLINE STATS MAX(language_code) BY language_code
| SORT country
| LIMIT 5
;

country:text |language_name:keyword |MAX(language_code):integer |language_code:integer
country:keyword |language_name:keyword |MAX(language_code):integer |language_code:integer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am curious - why this change is happening in this pull?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could indeed be part of a separate PR, but the basic changes to CsvTestsDataLoader were made to facilitate views testing, and it was small enough that I thought it convenient to include in this PR. I already expect to have 3 or 4 PRs to complete the first functional Views, and hoped not to have even more. But if the changes to CsvTestsDataLoader and these two csv-spec files are cluttering this PR too much, I can move them out to another PR.

Atlantis |null |null |null
[Austria, Germany]|German |null |null
Canada |English |null |null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,11 +247,12 @@ FROM employees
| EVAL language_code = emp_no % 10
| LOOKUP JOIN languages_lookup_non_unique_key ON language_code
| WHERE emp_no > 10090 AND emp_no < 10096
| EVAL country = MV_SORT(country)
| SORT emp_no, country
| KEEP emp_no, language_code, language_name, country
;

emp_no:integer | language_code:integer | language_name:keyword | country:text
emp_no:integer | language_code:integer | language_name:keyword | country:keyword
10091 | 1 | English | Canada
10091 | 1 | null | United Kingdom
10091 | 1 | English | United States of America
Expand All @@ -272,11 +273,12 @@ FROM employees
| LIMIT 5
| EVAL language_code = emp_no % 10
| LOOKUP JOIN languages_lookup_non_unique_key ON language_code
| EVAL country = MV_SORT(country)
| KEEP emp_no, language_code, language_name, country
;

ignoreOrder:true
emp_no:integer | language_code:integer | language_name:keyword | country:text
emp_no:integer | language_code:integer | language_name:keyword | country:keyword
10001 | 1 | English | Canada
10001 | 1 | English | null
10001 | 1 | null | United Kingdom
Expand Down Expand Up @@ -324,7 +326,7 @@ ROW language_code = 2

ignoreOrder:true
language_code:integer | country:text | language_name:keyword
2 | [Austria, Germany] | German
2 | [Germany, Austria] | German
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the order change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is related to the mapping changes in CsvTestsDataLoader for the lookup index called languages_lookup_non_unique_key.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But as mentioned in my other comment, these changes could also be moved to a separate PR. I'd only included them because I will already have 3 or 4 PRs for Views, and hoped to not have even more than that.

2 | Switzerland | German
2 | null | German
;
Expand Down
Loading