diff --git a/docs/changelog/137382.yaml b/docs/changelog/137382.yaml new file mode 100644 index 0000000000000..1e044843e9242 --- /dev/null +++ b/docs/changelog/137382.yaml @@ -0,0 +1,5 @@ +pr: 137382 +summary: Make field fusion generic +area: ES|QL +type: enhancement +issues: [] diff --git a/server/src/main/java/module-info.java b/server/src/main/java/module-info.java index 6a4d447c8067d..92e60b309fd3b 100644 --- a/server/src/main/java/module-info.java +++ b/server/src/main/java/module-info.java @@ -501,4 +501,5 @@ exports org.elasticsearch.index.codec.vectors.es93 to org.elasticsearch.test.knn; exports org.elasticsearch.search.crossproject; exports org.elasticsearch.index.mapper.blockloader.docvalues; + exports org.elasticsearch.index.mapper.blockloader; } diff --git a/server/src/main/java/org/elasticsearch/index/mapper/KeywordFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/KeywordFieldMapper.java index 56bb2e479124f..041c9feefea44 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/KeywordFieldMapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/KeywordFieldMapper.java @@ -59,7 +59,9 @@ import org.elasticsearch.index.fielddata.SourceValueFetcherSortedBinaryIndexFieldData; import org.elasticsearch.index.fielddata.StoredFieldSortedBinaryIndexFieldData; import org.elasticsearch.index.fielddata.plain.SortedSetOrdinalsIndexFieldData; +import org.elasticsearch.index.mapper.blockloader.BlockLoaderFunctionConfig; import org.elasticsearch.index.mapper.blockloader.docvalues.BytesRefsFromOrdsBlockLoader; +import org.elasticsearch.index.mapper.blockloader.docvalues.Utf8CodePointsFromOrdsBlockLoader; import org.elasticsearch.index.query.AutomatonQueryWithDescription; import org.elasticsearch.index.query.SearchExecutionContext; import org.elasticsearch.index.similarity.SimilarityProvider; @@ -822,7 +824,19 @@ NamedAnalyzer normalizer() { @Override public BlockLoader blockLoader(BlockLoaderContext blContext) { if (hasDocValues() && (blContext.fieldExtractPreference() != FieldExtractPreference.STORED || isSyntheticSourceEnabled())) { - return new BytesRefsFromOrdsBlockLoader(name()); + return switch (blContext.blockLoaderFunctionConfig()) { + case null -> new BytesRefsFromOrdsBlockLoader(name()); + case BlockLoaderFunctionConfig.Named named -> switch (named.name()) { + case "LENGTH" -> new Utf8CodePointsFromOrdsBlockLoader(named.warnings(), name()); + default -> throw new UnsupportedOperationException("unknown fusion config [" + named.name() + "]"); + }; + default -> throw new UnsupportedOperationException( + "unknown fusion config [" + blContext.blockLoaderFunctionConfig() + "]" + ); + }; + } + if (blContext.blockLoaderFunctionConfig() != null) { + throw new UnsupportedOperationException("function fusing only supported for doc values"); } if (isStored()) { return new BlockStoredFieldsReader.BytesFromBytesRefsBlockLoader(name()); diff --git a/server/src/main/java/org/elasticsearch/index/mapper/MappedFieldType.java b/server/src/main/java/org/elasticsearch/index/mapper/MappedFieldType.java index 1c5d0cc2dfa15..14281a72e88bf 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/MappedFieldType.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/MappedFieldType.java @@ -35,6 +35,7 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.fielddata.FieldDataContext; import org.elasticsearch.index.fielddata.IndexFieldData; +import org.elasticsearch.index.mapper.blockloader.BlockLoaderFunctionConfig; import org.elasticsearch.index.query.DistanceFeatureQueryBuilder; import org.elasticsearch.index.query.QueryRewriteContext; import org.elasticsearch.index.query.QueryShardException; @@ -710,11 +711,4 @@ default BlockLoaderFunctionConfig blockLoaderFunctionConfig() { } } - /** - * Marker interface that contains the configuration needed to transform loaded values into blocks. - * Is retrievable from the {@link BlockLoaderContext}. The {@link MappedFieldType} can use this configuration to choose the appropriate - * implementation for transforming loaded values into blocks. - */ - public interface BlockLoaderFunctionConfig {} - } diff --git a/server/src/main/java/org/elasticsearch/index/mapper/blockloader/BlockLoaderFunctionConfig.java b/server/src/main/java/org/elasticsearch/index/mapper/blockloader/BlockLoaderFunctionConfig.java new file mode 100644 index 0000000000000..1d908b9db5b28 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/mapper/blockloader/BlockLoaderFunctionConfig.java @@ -0,0 +1,42 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.mapper.blockloader; + +import org.elasticsearch.index.mapper.MappedFieldType; + +/** + * Configuration needed to transform loaded values into blocks. + * {@link MappedFieldType}s will find me in + * {@link MappedFieldType.BlockLoaderContext#blockLoaderFunctionConfig()} and + * use this configuration to choose the appropriate implementation for + * transforming loaded values into blocks. + */ +public interface BlockLoaderFunctionConfig { + /** + * Name used in descriptions. + */ + String name(); + + record Named(String name, Warnings warnings) implements BlockLoaderFunctionConfig { + @Override + public int hashCode() { + return name.hashCode(); + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + Named named = (Named) o; + return name.equals(named.name); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/index/mapper/blockloader/docvalues/Warnings.java b/server/src/main/java/org/elasticsearch/index/mapper/blockloader/Warnings.java similarity index 95% rename from server/src/main/java/org/elasticsearch/index/mapper/blockloader/docvalues/Warnings.java rename to server/src/main/java/org/elasticsearch/index/mapper/blockloader/Warnings.java index 67fcb543d48c7..78ca012e3654f 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/blockloader/docvalues/Warnings.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/blockloader/Warnings.java @@ -7,7 +7,7 @@ * License v3.0 only", or the "Server Side Public License, v 1". */ -package org.elasticsearch.index.mapper.blockloader.docvalues; +package org.elasticsearch.index.mapper.blockloader; /** * Warnings returned when loading values for ESQL. These are returned as HTTP 299 headers like so: @@ -17,7 +17,7 @@ * < Warning: 299 Elasticsearch-${ver} "Line 1:27: java.lang.IllegalArgumentException: single-value function encountered multi-value" * } */ -interface Warnings { +public interface Warnings { /** * Register a warning. ESQL deduplicates and limits the number of warnings returned so it should * be fine to blast as many warnings into this as you encounter. diff --git a/server/src/main/java/org/elasticsearch/index/mapper/blockloader/docvalues/Utf8CodePointsFromOrdsBlockLoader.java b/server/src/main/java/org/elasticsearch/index/mapper/blockloader/docvalues/Utf8CodePointsFromOrdsBlockLoader.java index b5f044e813e31..a32f05ac0f416 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/blockloader/docvalues/Utf8CodePointsFromOrdsBlockLoader.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/blockloader/docvalues/Utf8CodePointsFromOrdsBlockLoader.java @@ -16,11 +16,12 @@ import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.RamUsageEstimator; import org.apache.lucene.util.UnicodeUtil; +import org.elasticsearch.index.mapper.blockloader.Warnings; import java.io.IOException; import java.util.Arrays; -import static org.elasticsearch.index.mapper.blockloader.docvalues.Warnings.registerSingleValueWarning; +import static org.elasticsearch.index.mapper.blockloader.Warnings.registerSingleValueWarning; /** * A count of utf-8 code points for {@code keyword} style fields that are stored as a lookup table. diff --git a/server/src/main/java/org/elasticsearch/index/mapper/vectors/DenseVectorFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/vectors/DenseVectorFieldMapper.java index 27d578d62befe..5903d50ec3671 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/vectors/DenseVectorFieldMapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/vectors/DenseVectorFieldMapper.java @@ -72,6 +72,7 @@ import org.elasticsearch.index.mapper.SourceLoader; import org.elasticsearch.index.mapper.SourceValueFetcher; import org.elasticsearch.index.mapper.ValueFetcher; +import org.elasticsearch.index.mapper.blockloader.BlockLoaderFunctionConfig; import org.elasticsearch.index.mapper.blockloader.docvalues.DenseVectorBlockLoader; import org.elasticsearch.index.mapper.blockloader.docvalues.DenseVectorBlockLoaderProcessor; import org.elasticsearch.index.mapper.blockloader.docvalues.DenseVectorFromBinaryBlockLoader; @@ -3376,10 +3377,10 @@ public interface SimilarityFunction { } /** - * Configuration for a {@link MappedFieldType.BlockLoaderFunctionConfig} that calculates vector similarity. + * Configuration for a {@link BlockLoaderFunctionConfig} that calculates vector similarity. * Functions that use this config should use SIMILARITY_FUNCTION_NAME as their name. */ - public static class VectorSimilarityFunctionConfig implements MappedFieldType.BlockLoaderFunctionConfig { + public static class VectorSimilarityFunctionConfig implements BlockLoaderFunctionConfig { private final SimilarityFunction similarityFunction; private final float[] vector; @@ -3388,7 +3389,6 @@ public static class VectorSimilarityFunctionConfig implements MappedFieldType.Bl public VectorSimilarityFunctionConfig(SimilarityFunction similarityFunction, float[] vector) { this.similarityFunction = similarityFunction; this.vector = vector; - } /** @@ -3402,6 +3402,11 @@ public VectorSimilarityFunctionConfig forByteVector() { return this; } + @Override + public String name() { + return similarityFunction.toString(); + } + public byte[] vectorAsBytes() { assert vectorAsBytes != null : "vectorAsBytes is null, call forByteVector() first"; return vectorAsBytes; diff --git a/server/src/test/java/org/elasticsearch/index/mapper/blockloader/docvalues/MockWarnings.java b/server/src/test/java/org/elasticsearch/index/mapper/blockloader/MockWarnings.java similarity index 77% rename from server/src/test/java/org/elasticsearch/index/mapper/blockloader/docvalues/MockWarnings.java rename to server/src/test/java/org/elasticsearch/index/mapper/blockloader/MockWarnings.java index cfb9ffc93fc76..37331d1fdfb86 100644 --- a/server/src/test/java/org/elasticsearch/index/mapper/blockloader/docvalues/MockWarnings.java +++ b/server/src/test/java/org/elasticsearch/index/mapper/blockloader/MockWarnings.java @@ -7,14 +7,13 @@ * License v3.0 only", or the "Server Side Public License, v 1". */ -package org.elasticsearch.index.mapper.blockloader.docvalues; +package org.elasticsearch.index.mapper.blockloader; import java.util.ArrayList; import java.util.List; -// TODO move me once we have more of these loaders -class MockWarnings implements Warnings { - record MockWarning(Class exceptionClass, String message) {} +public class MockWarnings implements Warnings { + public record MockWarning(Class exceptionClass, String message) {} private final List warnings = new ArrayList<>(); diff --git a/server/src/test/java/org/elasticsearch/index/mapper/blockloader/docvalues/Utf8CodePointsFromOrdsBlockLoaderTests.java b/server/src/test/java/org/elasticsearch/index/mapper/blockloader/docvalues/Utf8CodePointsFromOrdsBlockLoaderTests.java index dac645d4c7557..705dd71479e9e 100644 --- a/server/src/test/java/org/elasticsearch/index/mapper/blockloader/docvalues/Utf8CodePointsFromOrdsBlockLoaderTests.java +++ b/server/src/test/java/org/elasticsearch/index/mapper/blockloader/docvalues/Utf8CodePointsFromOrdsBlockLoaderTests.java @@ -14,6 +14,7 @@ import org.elasticsearch.index.mapper.BlockLoader; import org.elasticsearch.index.mapper.KeywordFieldMapper; import org.elasticsearch.index.mapper.TestBlock; +import org.elasticsearch.index.mapper.blockloader.MockWarnings; import org.hamcrest.Matcher; import java.io.IOException; diff --git a/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/FunctionEsField.java b/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/FunctionEsField.java index ec4e2831524c8..d8585fd4bfeac 100644 --- a/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/FunctionEsField.java +++ b/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/FunctionEsField.java @@ -8,7 +8,7 @@ package org.elasticsearch.xpack.esql.core.type; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.mapper.blockloader.BlockLoaderFunctionConfig; import java.io.IOException; import java.util.Map; @@ -16,15 +16,15 @@ /** * EsField that represents a function being applied to a field on extraction. It receives a - * {@link org.elasticsearch.index.mapper.MappedFieldType.BlockLoaderFunctionConfig} that will be passed down to the block loading process + * {@link BlockLoaderFunctionConfig} that will be passed down to the block loading process * to apply the function at data load time. */ public class FunctionEsField extends EsField { // Not serialized as it will be created on the data node - private final transient MappedFieldType.BlockLoaderFunctionConfig functionConfig; + private final transient BlockLoaderFunctionConfig functionConfig; - public FunctionEsField(EsField esField, DataType dataType, MappedFieldType.BlockLoaderFunctionConfig functionConfig) { + public FunctionEsField(EsField esField, DataType dataType, BlockLoaderFunctionConfig functionConfig) { this( esField.getName(), dataType, @@ -43,7 +43,7 @@ private FunctionEsField( boolean aggregatable, boolean isAlias, TimeSeriesFieldType timeSeriesFieldType, - MappedFieldType.BlockLoaderFunctionConfig functionConfig + BlockLoaderFunctionConfig functionConfig ) { super(name, esDataType, properties, aggregatable, isAlias, timeSeriesFieldType); this.functionConfig = functionConfig; @@ -54,7 +54,7 @@ public void writeTo(StreamOutput out) throws IOException { throw new UnsupportedOperationException("FunctionEsField is not serializable, should be created on data nodes"); } - public MappedFieldType.BlockLoaderFunctionConfig functionConfig() { + public BlockLoaderFunctionConfig functionConfig() { return functionConfig; } @@ -70,4 +70,9 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(super.hashCode(), functionConfig); } + + @Override + public Exact getExactInfo() { + return new Exact(false, "merged with " + functionConfig.name()); + } } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ShardContext.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ShardContext.java index 2d7ed5e0c6cee..89440a440f23b 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ShardContext.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ShardContext.java @@ -13,6 +13,7 @@ import org.elasticsearch.index.mapper.BlockLoader; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.SourceLoader; +import org.elasticsearch.index.mapper.blockloader.BlockLoaderFunctionConfig; import org.elasticsearch.search.sort.SortAndFormats; import org.elasticsearch.search.sort.SortBuilder; @@ -58,7 +59,7 @@ BlockLoader blockLoader( String name, boolean asUnsupportedSource, MappedFieldType.FieldExtractPreference fieldExtractPreference, - MappedFieldType.BlockLoaderFunctionConfig blockLoaderFunctionConfig + BlockLoaderFunctionConfig blockLoaderFunctionConfig ); /** diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java index aaa2ceb97000a..ba753c96c5222 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java @@ -43,6 +43,7 @@ import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.NumberFieldMapper; import org.elasticsearch.index.mapper.SourceLoader; +import org.elasticsearch.index.mapper.blockloader.BlockLoaderFunctionConfig; import org.elasticsearch.indices.CrankyCircuitBreakerService; import org.elasticsearch.search.internal.ContextIndexSearcher; import org.elasticsearch.search.sort.SortAndFormats; @@ -467,7 +468,7 @@ public BlockLoader blockLoader( String name, boolean asUnsupportedSource, MappedFieldType.FieldExtractPreference fieldExtractPreference, - MappedFieldType.BlockLoaderFunctionConfig blockLoaderFunctionConfig + BlockLoaderFunctionConfig blockLoaderFunctionConfig ) { throw new UnsupportedOperationException(); } diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/PushExpressionToLoadIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/PushExpressionToLoadIT.java new file mode 100644 index 0000000000000..cc343ee37f27b --- /dev/null +++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/PushExpressionToLoadIT.java @@ -0,0 +1,246 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.qa.single_node; + +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; + +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.ResponseException; +import org.elasticsearch.common.Strings; +import org.elasticsearch.core.CheckedConsumer; +import org.elasticsearch.test.MapMatcher; +import org.elasticsearch.test.TestClustersThreadFilter; +import org.elasticsearch.test.cluster.ElasticsearchCluster; +import org.elasticsearch.test.rest.ESRestTestCase; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentType; +import org.elasticsearch.xcontent.json.JsonXContent; +import org.elasticsearch.xpack.esql.AssertWarnings; +import org.elasticsearch.xpack.esql.qa.rest.ProfileLogger; +import org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase; +import org.hamcrest.Matcher; +import org.junit.ClassRule; +import org.junit.Rule; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.test.ListMatcher.matchesList; +import static org.elasticsearch.test.MapMatcher.assertMap; +import static org.elasticsearch.test.MapMatcher.matchesMap; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.entityToMap; +import static org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase.requestObjectBuilder; +import static org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase.runEsql; +import static org.elasticsearch.xpack.esql.qa.single_node.RestEsqlIT.commonProfile; +import static org.elasticsearch.xpack.esql.qa.single_node.RestEsqlIT.fixTypesOnProfile; +import static org.hamcrest.Matchers.any; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.startsWith; + +/** + * Tests for pushing expressions into field loading. + */ +@ThreadLeakFilters(filters = TestClustersThreadFilter.class) +public class PushExpressionToLoadIT extends ESRestTestCase { + @ClassRule + public static ElasticsearchCluster cluster = Clusters.testCluster(spec -> spec.plugin("inference-service-test")); + + @Rule(order = Integer.MIN_VALUE) + public ProfileLogger profileLogger = new ProfileLogger(); + + public void testLength() throws IOException { + String value = "v".repeat(between(0, 256)); + test( + justType("keyword"), + b -> b.value(value), + "LENGTH(test)", + matchesList().item(value.length()), + "Utf8CodePointsFromOrds.SingletonOrdinals" + ); + } + + public void testVCosine() throws IOException { + test( + justType("dense_vector"), + b -> b.startArray().value(128).value(128).value(0).endArray(), + "V_COSINE(test, [0, 255, 255])", + matchesList().item(0.5), + "BlockDocValuesReader.FloatDenseVectorNormalizedValuesBlockReader" + ); + } + + private void test( + CheckedConsumer mapping, + CheckedConsumer value, + String functionInvocation, + Matcher expectedValue, + String expectedLoader + ) throws IOException { + indexValue(mapping, value); + RestEsqlTestCase.RequestObjectBuilder builder = requestObjectBuilder().query(""" + FROM test + | EVAL test =""" + " " + functionInvocation + """ + | STATS test = VALUES(test)"""); + /* + * TODO if you just do KEEP test then the load is in the data node reduce driver and not merged: + * \_ProjectExec[[test{f}#7]] + * \_FieldExtractExec[test{f}#7]<[],[]> + * \_EsQueryExec[test], indexMode[standard]] + * \_ExchangeSourceExec[[test{f}#7],false]}, {cluster_name=test-cluster, node_name=test-cluster-0, descrip + * \_ProjectExec[[test{r}#3]] + * \_EvalExec[[LENGTH(test{f}#7) AS test#3]] + * \_LimitExec[1000[INTEGER],50] + * \_ExchangeSourceExec[[test{f}#7],false]}], query={to + */ + builder.profile(true); + Map result = runEsql(builder, new AssertWarnings.NoWarnings(), profileLogger, RestEsqlTestCase.Mode.SYNC); + assertResultMap( + result, + getResultMatcher(result).entry( + "profile", + matchesMap() // + .entry("drivers", instanceOf(List.class)) + .entry("plans", instanceOf(List.class)) + .entry("planning", matchesMap().extraOk()) + .entry("query", matchesMap().extraOk()) + ), + matchesList().item(matchesMap().entry("name", "test").entry("type", any(String.class))), + matchesList().item(expectedValue) + ); + @SuppressWarnings("unchecked") + List> profiles = (List>) ((Map) result.get("profile")).get("drivers"); + for (Map p : profiles) { + fixTypesOnProfile(p); + assertThat(p, commonProfile()); + List sig = new ArrayList<>(); + @SuppressWarnings("unchecked") + List> operators = (List>) p.get("operators"); + for (Map o : operators) { + sig.add(checkOperatorProfile(o, expectedLoader)); + } + String description = p.get("description").toString(); + switch (description) { + case "data" -> assertMap( + sig, + matchesList().item("LuceneSourceOperator") + .item("ValuesSourceReaderOperator") // the real work is here, checkOperatorProfile checks the status + .item("EvalOperator") // this one just renames the field + .item("AggregationOperator") + .item("ExchangeSinkOperator") + ); + case "node_reduce" -> logger.info("node_reduce {}", sig); + case "final" -> logger.info("final {}", sig); + default -> throw new IllegalArgumentException("can't match " + description); + } + } + } + + private void indexValue(CheckedConsumer mapping, CheckedConsumer value) + throws IOException { + try { + // Delete the index if it has already been created. + client().performRequest(new Request("DELETE", "test")); + } catch (ResponseException e) { + if (e.getResponse().getStatusLine().getStatusCode() != 404) { + throw e; + } + } + + Request createIndex = new Request("PUT", "test"); + try (XContentBuilder config = JsonXContent.contentBuilder()) { + config.startObject(); + config.startObject("settings"); + { + config.startObject("index"); + config.field("number_of_shards", 1); + config.endObject(); + } + config.endObject(); + config.startObject("mappings"); + { + config.startObject("properties").startObject("test"); + mapping.accept(config); + config.endObject().endObject(); + } + config.endObject(); + + createIndex.setJsonEntity(Strings.toString(config.endObject())); + } + Response createResponse = client().performRequest(createIndex); + assertThat( + entityToMap(createResponse.getEntity(), XContentType.JSON), + matchesMap().entry("shards_acknowledged", true).entry("index", "test").entry("acknowledged", true) + ); + + Request bulk = new Request("POST", "/_bulk"); + bulk.addParameter("refresh", ""); + try (XContentBuilder doc = JsonXContent.contentBuilder()) { + doc.startObject(); + doc.field("test"); + value.accept(doc); + doc.endObject(); + bulk.setJsonEntity(""" + {"create":{"_index":"test"}} + """ + Strings.toString(doc) + "\n"); + } + Response bulkResponse = client().performRequest(bulk); + assertThat(entityToMap(bulkResponse.getEntity(), XContentType.JSON), matchesMap().entry("errors", false).extraOk()); + } + + private CheckedConsumer justType(String type) { + return b -> b.field("type", type); + } + + private static String checkOperatorProfile(Map o, String expectedLoader) { + String name = (String) o.get("operator"); + name = PushQueriesIT.TO_NAME.matcher(name).replaceAll(""); + if (name.equals("ValuesSourceReaderOperator")) { + MapMatcher expectedOp = matchesMap().entry("operator", startsWith(name)) + .entry( + "status", + matchesMap().entry("readers_built", matchesMap().entry("test:column_at_a_time:" + expectedLoader, 1)).extraOk() + ); + assertMap(o, expectedOp); + } + return name; + } + + @Override + protected String getTestRestCluster() { + return cluster.getHttpAddresses(); + } + + @Override + protected boolean preserveClusterUponCompletion() { + // Preserve the cluser to speed up the semantic_text tests + return true; + } + + private static boolean setupEmbeddings = false; + + private void setUpTextEmbeddingInferenceEndpoint() throws IOException { + setupEmbeddings = true; + Request request = new Request("PUT", "_inference/text_embedding/test"); + request.setJsonEntity(""" + { + "service": "text_embedding_test_service", + "service_settings": { + "model": "my_model", + "api_key": "abc64", + "dimensions": 128 + }, + "task_settings": { + } + } + """); + adminClient().performRequest(request); + } +} diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/PushQueriesIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/PushQueriesIT.java index 6663880380ad3..0076ee4d9fb43 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/PushQueriesIT.java +++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/PushQueriesIT.java @@ -515,7 +515,7 @@ private String semanticTextWithKeyword() { }"""; } - private static final Pattern TO_NAME = Pattern.compile("\\[.+", Pattern.DOTALL); + static final Pattern TO_NAME = Pattern.compile("\\[.+", Pattern.DOTALL); private static String checkOperatorProfile(Map o, Matcher query) { String name = (String) o.get("operator"); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/BlockLoaderWarnings.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/BlockLoaderWarnings.java new file mode 100644 index 0000000000000..278035dade015 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/BlockLoaderWarnings.java @@ -0,0 +1,42 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.expression.function; + +import org.elasticsearch.compute.operator.DriverContext; +import org.elasticsearch.compute.operator.Warnings; +import org.elasticsearch.xpack.esql.core.tree.Source; + +/** + * Shim between the {@link org.elasticsearch.index.mapper.blockloader.Warnings} in server and + * our {@link Warnings}. Also adds laziness because our {@link Warnings} are a little expensive + * on creation and {@link org.elasticsearch.index.mapper.blockloader.Warnings} wants to be + * cheap to create. + */ +public class BlockLoaderWarnings implements org.elasticsearch.index.mapper.blockloader.Warnings { + private final DriverContext.WarningsMode warningsMode; + private final Source source; + private Warnings delegate; + + public BlockLoaderWarnings(DriverContext.WarningsMode warningsMode, Source source) { + this.warningsMode = warningsMode; + this.source = source; + } + + @Override + public void registerException(Class exceptionClass, String message) { + if (delegate == null) { + delegate = Warnings.createOnlyWarnings( + warningsMode, + source.source().getLineNumber(), + source.source().getColumnNumber(), + source.text() + ); + } + delegate.registerException(exceptionClass, message); + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/blockloader/BlockLoaderExpression.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/blockloader/BlockLoaderExpression.java index 61ed67dae1222..8888977e31211 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/blockloader/BlockLoaderExpression.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/blockloader/BlockLoaderExpression.java @@ -7,17 +7,55 @@ package org.elasticsearch.xpack.esql.expression.function.blockloader; -import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.index.mapper.blockloader.BlockLoaderFunctionConfig; +import org.elasticsearch.xpack.esql.core.expression.Expression; +import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; +import org.elasticsearch.xpack.esql.stats.SearchStats; /** - * {@link org.elasticsearch.xpack.esql.core.expression.Expression}s that can be implemented as part of value loading implement this - * interface to provide the {@link MappedFieldType.BlockLoaderFunctionConfig} that will be used to load and - * transform the value of the field. + * {@link Expression} that can be "pushed" into value loading. Most of the time + * we load values into {@link Block}s and then run the expressions on them, but + * sometimes it's worth short-circuiting this process and running the expression + * in the tight loop we use for loading: + *
    + *
  • + * {@code V_COSINE(vector, [constant_vector])} - vector is ~512 floats + * and V_COSINE is one double. We can find the similarity without any + * copies if we combine. + *
  • + *
  • + * {@code ST_CENTROID(shape)} - shapes can be quite large. Centroids + * are just one point. + *
  • + *
  • + * {@code LENGTH(string)} - strings can be quite long, but string length + * is always an int. For more fun, {@code keyword}s are usually stored + * using a dictionary, and it's fairly easy to optimize + * running {@code LENGTH} once per dictionary entry. + *
  • + *
  • + * {@code MV_COUNT(anything)} - counts are always integers. + *
  • + *
  • + * {@code MV_MIN} and {@code MV_MAX} - loads much fewer data for + * multivalued fields. + *
  • + *
*/ public interface BlockLoaderExpression { + /** + * The field and loading configuration that replaces this expression, effectively + * "fusing" the expression into the load. Or null if the fusion isn't possible. + */ + @Nullable + PushedBlockLoaderExpression tryPushToFieldLoading(SearchStats stats); /** - * Returns the configuration that will be used to load the value of the field and transform it + * Expression "fused" to the block loader. + * @param field the field whose load we're fusing into + * @param config the expression's configuration */ - MappedFieldType.BlockLoaderFunctionConfig getBlockLoaderFunctionConfig(); + record PushedBlockLoaderExpression(FieldAttribute field, BlockLoaderFunctionConfig config) {} } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/string/Length.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/string/Length.java index 3b442a8583a0a..225a0006ad731 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/string/Length.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/string/Length.java @@ -12,15 +12,21 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.compute.ann.Evaluator; +import org.elasticsearch.compute.operator.DriverContext; import org.elasticsearch.compute.operator.EvalOperator.ExpressionEvaluator; +import org.elasticsearch.index.mapper.blockloader.BlockLoaderFunctionConfig; import org.elasticsearch.xpack.esql.core.expression.Expression; +import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; import org.elasticsearch.xpack.esql.core.tree.NodeInfo; import org.elasticsearch.xpack.esql.core.tree.Source; import org.elasticsearch.xpack.esql.core.type.DataType; +import org.elasticsearch.xpack.esql.expression.function.BlockLoaderWarnings; import org.elasticsearch.xpack.esql.expression.function.Example; import org.elasticsearch.xpack.esql.expression.function.FunctionInfo; import org.elasticsearch.xpack.esql.expression.function.Param; +import org.elasticsearch.xpack.esql.expression.function.blockloader.BlockLoaderExpression; import org.elasticsearch.xpack.esql.expression.function.scalar.UnaryScalarFunction; +import org.elasticsearch.xpack.esql.stats.SearchStats; import java.io.IOException; import java.util.List; @@ -28,7 +34,7 @@ import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.DEFAULT; import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.isString; -public class Length extends UnaryScalarFunction { +public class Length extends UnaryScalarFunction implements BlockLoaderExpression { public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Expression.class, "Length", Length::new); @FunctionInfo( @@ -90,4 +96,16 @@ protected NodeInfo info() { public ExpressionEvaluator.Factory toEvaluator(ToEvaluator toEvaluator) { return new LengthEvaluator.Factory(source(), toEvaluator.apply(field())); } + + @Override + public PushedBlockLoaderExpression tryPushToFieldLoading(SearchStats stats) { + if (field instanceof FieldAttribute f) { + if (stats.hasDocValues(f.fieldName()) == false) { + return null; + } + BlockLoaderWarnings warnings = new BlockLoaderWarnings(DriverContext.WarningsMode.COLLECT, source()); + return new PushedBlockLoaderExpression(f, new BlockLoaderFunctionConfig.Named("LENGTH", warnings)); + } + return null; + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/vector/CosineSimilarity.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/vector/CosineSimilarity.java index d74aefdd0d360..011b4854c0d33 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/vector/CosineSimilarity.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/vector/CosineSimilarity.java @@ -40,6 +40,11 @@ public float calculateSimilarity(byte[] leftVector, byte[] rightVector) { public float calculateSimilarity(float[] leftVector, float[] rightVector) { return VectorUtil.cosine(leftVector, rightVector); } + + @Override + public String toString() { + return "Cosine"; + } }; @FunctionInfo( diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/vector/DotProduct.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/vector/DotProduct.java index e319b235e5eaa..f4697b14cbe80 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/vector/DotProduct.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/vector/DotProduct.java @@ -41,6 +41,11 @@ public float calculateSimilarity(byte[] leftVector, byte[] rightVector) { public float calculateSimilarity(float[] leftVector, float[] rightVector) { return VectorUtil.dotProduct(leftVector, rightVector); } + + @Override + public String toString() { + return "DotProduct"; + } }; @FunctionInfo( diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/vector/Hamming.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/vector/Hamming.java index 8ebd771221c48..7e49eea2554ca 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/vector/Hamming.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/vector/Hamming.java @@ -36,6 +36,11 @@ public float calculateSimilarity(byte[] leftVector, byte[] rightVector) { public float calculateSimilarity(float[] leftVector, float[] rightVector) { throw new UnsupportedOperationException("Hamming distance is not supported for float vectors"); } + + @Override + public String toString() { + return "Hamming"; + } }; public static final DenseVectorFieldMapper.SimilarityFunction EVALUATOR_SIMILARITY_FUNCTION = new DenseVectorFieldMapper.SimilarityFunction() { @@ -56,6 +61,11 @@ public float calculateSimilarity(float[] leftVector, float[] rightVector) { } return Hamming.calculateSimilarity(a, b); } + + @Override + public String toString() { + return "Hamming"; + } }; @FunctionInfo( diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/vector/L1Norm.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/vector/L1Norm.java index 09bf2681f8559..6c066e7fe0bf5 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/vector/L1Norm.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/vector/L1Norm.java @@ -49,6 +49,11 @@ public float calculateSimilarity(float[] leftVector, float[] rightVector) { } return result; } + + @Override + public String toString() { + return "L1Norm"; + } }; @FunctionInfo( diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/vector/L2Norm.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/vector/L2Norm.java index a6350fc98535c..46c26ea7d8d11 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/vector/L2Norm.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/vector/L2Norm.java @@ -37,6 +37,11 @@ public float calculateSimilarity(byte[] leftVector, byte[] rightVector) { public float calculateSimilarity(float[] leftVector, float[] rightVector) { return (float) Math.sqrt(VectorUtil.squareDistance(leftVector, rightVector)); } + + @Override + public String toString() { + return "L2Norm"; + } }; @FunctionInfo( diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/vector/VectorSimilarityFunction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/vector/VectorSimilarityFunction.java index 94c3c54ec29fb..0d3a7025eee26 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/vector/VectorSimilarityFunction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/vector/VectorSimilarityFunction.java @@ -18,9 +18,9 @@ import org.elasticsearch.compute.operator.EvalOperator; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; -import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.vectors.DenseVectorFieldMapper; import org.elasticsearch.xpack.esql.core.expression.Expression; +import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; import org.elasticsearch.xpack.esql.core.expression.FoldContext; import org.elasticsearch.xpack.esql.core.expression.Literal; import org.elasticsearch.xpack.esql.core.expression.TypeResolutions; @@ -29,6 +29,7 @@ import org.elasticsearch.xpack.esql.core.type.DataType; import org.elasticsearch.xpack.esql.evaluator.mapper.EvaluatorMapper; import org.elasticsearch.xpack.esql.expression.function.blockloader.BlockLoaderExpression; +import org.elasticsearch.xpack.esql.stats.SearchStats; import java.io.IOException; import java.util.ArrayList; @@ -207,17 +208,35 @@ public void close() { } @Override - public MappedFieldType.BlockLoaderFunctionConfig getBlockLoaderFunctionConfig() { - // PushDownVectorSimilarityFunctions checks that one of the sides is a literal - Literal literal = (Literal) (left() instanceof Literal ? left() : right()); - @SuppressWarnings("unchecked") - List numberList = (List) literal.value(); - float[] vector = new float[numberList.size()]; - for (int i = 0; i < numberList.size(); i++) { - vector[i] = numberList.get(i).floatValue(); + public final PushedBlockLoaderExpression tryPushToFieldLoading(SearchStats stats) { + // Bail if we're not directly comparing a field with a literal. + Literal literal; + FieldAttribute field; + if (left() instanceof Literal lit && right() instanceof FieldAttribute f) { + literal = lit; + field = f; + } else if (left() instanceof FieldAttribute f && right() instanceof Literal lit) { + literal = lit; + field = f; + } else { + return null; + } + + // Bail if the field isn't indexed. + if (stats.isIndexed(field.fieldName()) == false) { + return null; } - return new DenseVectorFieldMapper.VectorSimilarityFunctionConfig(getSimilarityFunction(), vector); + List vectorList = (List) literal.value(); + float[] vectorArray = new float[vectorList.size()]; + for (int i = 0; i < vectorList.size(); i++) { + vectorArray[i] = ((Number) vectorList.get(i)).floatValue(); + } + + return new PushedBlockLoaderExpression( + field, + new DenseVectorFieldMapper.VectorSimilarityFunctionConfig(getSimilarityFunction(), vectorArray) + ); } interface VectorValueProvider extends Releasable { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizer.java index 9e852a9c0d595..8af212ccdb6e6 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizer.java @@ -15,7 +15,7 @@ import org.elasticsearch.xpack.esql.optimizer.rules.logical.local.IgnoreNullMetrics; import org.elasticsearch.xpack.esql.optimizer.rules.logical.local.InferIsNotNull; import org.elasticsearch.xpack.esql.optimizer.rules.logical.local.InferNonNullAggConstraint; -import org.elasticsearch.xpack.esql.optimizer.rules.logical.local.PushDownVectorSimilarityFunctions; +import org.elasticsearch.xpack.esql.optimizer.rules.logical.local.PushExpressionsToFieldLoad; import org.elasticsearch.xpack.esql.optimizer.rules.logical.local.ReplaceDateTruncBucketWithRoundTo; import org.elasticsearch.xpack.esql.optimizer.rules.logical.local.ReplaceFieldWithConstantOrNull; import org.elasticsearch.xpack.esql.optimizer.rules.logical.local.ReplaceTopNWithLimitAndSort; @@ -71,7 +71,7 @@ private static Batch localRewrites() { ) ); if (EsqlCapabilities.Cap.VECTOR_SIMILARITY_FUNCTIONS_PUSHDOWN.isEnabled()) { - rules.add(new PushDownVectorSimilarityFunctions()); + rules.add(new PushExpressionsToFieldLoad()); } return new Batch<>("Local rewrite", Limiter.ONCE, rules.toArray(Rule[]::new)); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/local/PushDownVectorSimilarityFunctions.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/local/PushExpressionsToFieldLoad.java similarity index 62% rename from x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/local/PushDownVectorSimilarityFunctions.java rename to x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/local/PushExpressionsToFieldLoad.java index a5dc214b442c9..7830c97331b2a 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/local/PushDownVectorSimilarityFunctions.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/local/PushExpressionsToFieldLoad.java @@ -11,12 +11,11 @@ import org.elasticsearch.xpack.esql.core.expression.AttributeSet; import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; -import org.elasticsearch.xpack.esql.core.expression.Literal; import org.elasticsearch.xpack.esql.core.expression.NameId; import org.elasticsearch.xpack.esql.core.expression.NamedExpression; import org.elasticsearch.xpack.esql.core.tree.Source; import org.elasticsearch.xpack.esql.core.type.FunctionEsField; -import org.elasticsearch.xpack.esql.expression.function.vector.VectorSimilarityFunction; +import org.elasticsearch.xpack.esql.expression.function.blockloader.BlockLoaderExpression; import org.elasticsearch.xpack.esql.optimizer.LocalLogicalOptimizerContext; import org.elasticsearch.xpack.esql.optimizer.rules.logical.OptimizerRules; import org.elasticsearch.xpack.esql.plan.logical.Aggregate; @@ -38,11 +37,9 @@ * the similarity function during value loading, when one side of the function is a literal. * It also adds the new field function attribute to the EsRelation output, and adds a projection after it to remove it from the output. */ -public class PushDownVectorSimilarityFunctions extends OptimizerRules.ParameterizedOptimizerRule< - LogicalPlan, - LocalLogicalOptimizerContext> { +public class PushExpressionsToFieldLoad extends OptimizerRules.ParameterizedOptimizerRule { - public PushDownVectorSimilarityFunctions() { + public PushExpressionsToFieldLoad() { super(OptimizerRules.TransformDirection.DOWN); } @@ -50,10 +47,15 @@ public PushDownVectorSimilarityFunctions() { protected LogicalPlan rule(LogicalPlan plan, LocalLogicalOptimizerContext context) { if (plan instanceof Eval || plan instanceof Filter || plan instanceof Aggregate) { Map addedAttrs = new HashMap<>(); - LogicalPlan transformedPlan = plan.transformExpressionsOnly( - VectorSimilarityFunction.class, - similarityFunction -> replaceFieldsForFieldTransformations(similarityFunction, addedAttrs, context) - ); + LogicalPlan transformedPlan = plan.transformExpressionsOnly(Expression.class, e -> { + if (e instanceof BlockLoaderExpression ble) { + BlockLoaderExpression.PushedBlockLoaderExpression fuse = ble.tryPushToFieldLoading(context.searchStats()); + if (fuse != null) { + return replaceFieldsForFieldTransformations(e, addedAttrs, fuse); + } + } + return e; + }); if (addedAttrs.isEmpty()) { return plan; @@ -81,57 +83,26 @@ protected LogicalPlan rule(LogicalPlan plan, LocalLogicalOptimizerContext contex } private static Expression replaceFieldsForFieldTransformations( - VectorSimilarityFunction similarityFunction, + Expression e, Map addedAttrs, - LocalLogicalOptimizerContext context + BlockLoaderExpression.PushedBlockLoaderExpression fuse ) { - // Only replace if exactly one side is a literal and the other a field attribute - if ((similarityFunction.left() instanceof Literal ^ similarityFunction.right() instanceof Literal) == false) { - return similarityFunction; - } - - Literal literal = (Literal) (similarityFunction.left() instanceof Literal ? similarityFunction.left() : similarityFunction.right()); - FieldAttribute fieldAttr = null; - if (similarityFunction.left() instanceof FieldAttribute fa) { - fieldAttr = fa; - } else if (similarityFunction.right() instanceof FieldAttribute fa) { - fieldAttr = fa; - } - // We can push down also for doc values, requires handling that case on the field mapper - if (fieldAttr == null || context.searchStats().isIndexed(fieldAttr.fieldName()) == false) { - return similarityFunction; - } - - @SuppressWarnings("unchecked") - List vectorList = (List) literal.value(); - float[] vectorArray = new float[vectorList.size()]; - int arrayHashCode = 0; - for (int i = 0; i < vectorList.size(); i++) { - vectorArray[i] = vectorList.get(i).floatValue(); - arrayHashCode = 31 * arrayHashCode + Float.floatToIntBits(vectorArray[i]); - } - // Change the similarity function to a reference of a transformation on the field - FunctionEsField functionEsField = new FunctionEsField( - fieldAttr.field(), - similarityFunction.dataType(), - similarityFunction.getBlockLoaderFunctionConfig() - ); - var name = rawTemporaryName(fieldAttr.name(), similarityFunction.nodeName(), String.valueOf(arrayHashCode)); + FunctionEsField functionEsField = new FunctionEsField(fuse.field().field(), e.dataType(), fuse.config()); + var name = rawTemporaryName(fuse.field().name(), fuse.config().name(), String.valueOf(fuse.config().hashCode())); // TODO: Check if exists before adding, retrieve the previous one var newFunctionAttr = new FieldAttribute( - fieldAttr.source(), - fieldAttr.parentName(), - fieldAttr.qualifier(), + fuse.field().source(), + fuse.field().parentName(), + fuse.field().qualifier(), name, functionEsField, - fieldAttr.nullable(), + fuse.field().nullable(), new NameId(), true ); Attribute.IdIgnoringWrapper key = newFunctionAttr.ignoreId(); if (addedAttrs.containsKey(key)) { - ; return addedAttrs.get(key); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java index 281788d29f759..55750adffd207 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java @@ -45,6 +45,7 @@ import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.NestedLookup; import org.elasticsearch.index.mapper.SourceLoader; +import org.elasticsearch.index.mapper.blockloader.BlockLoaderFunctionConfig; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.ConstantScoreQueryBuilder; import org.elasticsearch.index.query.ExistsQueryBuilder; @@ -194,7 +195,7 @@ private BlockLoader getBlockLoaderFor(int shardId, Attribute attr, MappedFieldTy } // Apply any block loader function if present - MappedFieldType.BlockLoaderFunctionConfig functionConfig = null; + BlockLoaderFunctionConfig functionConfig = null; if (attr instanceof FieldAttribute fieldAttr && fieldAttr.field() instanceof FunctionEsField functionEsField) { functionConfig = functionEsField.functionConfig(); } @@ -488,7 +489,7 @@ public BlockLoader blockLoader( String name, boolean asUnsupportedSource, MappedFieldType.FieldExtractPreference fieldExtractPreference, - MappedFieldType.BlockLoaderFunctionConfig blockLoaderFunctionConfig + BlockLoaderFunctionConfig blockLoaderFunctionConfig ) { if (asUnsupportedSource) { return BlockLoader.CONSTANT_NULLS; @@ -535,7 +536,7 @@ public FieldNamesFieldMapper.FieldNamesFieldType fieldNames() { } @Override - public MappedFieldType.BlockLoaderFunctionConfig blockLoaderFunctionConfig() { + public BlockLoaderFunctionConfig blockLoaderFunctionConfig() { return blockLoaderFunctionConfig; } }); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizerTests.java index 7925fe5ef9242..2c7d00bcb283b 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizerTests.java @@ -36,7 +36,9 @@ import org.elasticsearch.xpack.esql.core.util.Holder; import org.elasticsearch.xpack.esql.expression.Order; import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry; +import org.elasticsearch.xpack.esql.expression.function.aggregate.Count; import org.elasticsearch.xpack.esql.expression.function.aggregate.Min; +import org.elasticsearch.xpack.esql.expression.function.aggregate.Sum; import org.elasticsearch.xpack.esql.expression.function.scalar.conditional.Case; import org.elasticsearch.xpack.esql.expression.function.scalar.nulls.Coalesce; import org.elasticsearch.xpack.esql.expression.function.scalar.string.StartsWith; @@ -1392,7 +1394,7 @@ public void testVectorFunctionsUpdateIntermediateProjections() { // Check replaced field attribute var fieldAttr = as(alias.child(), FieldAttribute.class); assertThat(fieldAttr.fieldName().string(), equalTo("dense_vector")); - assertThat(fieldAttr.name(), startsWith("$$dense_vector$CosineSimilarity")); + assertThat(fieldAttr.name(), startsWith("$$dense_vector$Cosine")); var field = as(fieldAttr.field(), FunctionEsField.class); var blockLoaderFunctionConfig = as(field.functionConfig(), DenseVectorFieldMapper.VectorSimilarityFunctionConfig.class); assertThat(blockLoaderFunctionConfig.similarityFunction(), is(CosineSimilarity.SIMILARITY_FUNCTION)); @@ -1408,7 +1410,7 @@ public void testVectorFunctionsUpdateIntermediateProjections() { assertTrue( innerProject.projections() .stream() - .anyMatch(p -> (p instanceof FieldAttribute fa) && fa.name().startsWith("$$dense_vector$CosineSimilarity")) + .anyMatch(p -> (p instanceof FieldAttribute fa) && fa.name().startsWith("$$dense_vector$Cosine")) ); // EsRelation[test_all][$$dense_vector$CosineSimilarity$33{f}#33, !alias_in..] @@ -1478,7 +1480,7 @@ public void testVectorFunctionsWithDuplicateFunctions() { // Right side: CosineSimilarity field var r2CosineFieldAttr = as(r2Add.right(), FieldAttribute.class); assertThat(r2CosineFieldAttr.fieldName().string(), equalTo("dense_vector")); - assertThat(r2CosineFieldAttr.name(), startsWith("$$dense_vector$CosineSimilarity")); + assertThat(r2CosineFieldAttr.name(), startsWith("$$dense_vector$Cosine")); var r2CosineField = as(r2CosineFieldAttr.field(), FunctionEsField.class); var r2CosineConfig = as(r2CosineField.functionConfig(), DenseVectorFieldMapper.VectorSimilarityFunctionConfig.class); assertThat(r2CosineConfig.similarityFunction(), is(CosineSimilarity.SIMILARITY_FUNCTION)); @@ -1494,6 +1496,103 @@ public void testVectorFunctionsWithDuplicateFunctions() { assertTrue(esRelation.output().contains(r2CosineFieldAttr)); } + public void testLengthInEval() { + assumeTrue("requires similarity functions", EsqlCapabilities.Cap.VECTOR_SIMILARITY_FUNCTIONS_PUSHDOWN.isEnabled()); + String query = """ + FROM test + | EVAL l = LENGTH(last_name) + | KEEP l + """; + LogicalPlan plan = localPlan(plan(query, analyzer), TEST_SEARCH_STATS); + + var project = as(plan, EsqlProject.class); + assertThat(Expressions.names(project.projections()), contains("l")); + var eval = as(project.child(), Eval.class); + FieldAttribute lAttr = as(as(eval.fields().getFirst(), Alias.class).child(), FieldAttribute.class); + var limit = as(eval.child(), Limit.class); + var relation = as(limit.child(), EsRelation.class); + assertTrue(relation.output().contains(lAttr)); + } + + public void testLengthInWhere() { + assumeTrue("requires similarity functions", EsqlCapabilities.Cap.VECTOR_SIMILARITY_FUNCTIONS_PUSHDOWN.isEnabled()); + String query = """ + FROM test + | WHERE LENGTH(last_name) > 1 + """; + LogicalPlan plan = localPlan(plan(query, analyzer), TEST_SEARCH_STATS); + + var project = as(plan, EsqlProject.class); + var limit = as(project.child(), Limit.class); + var filter = as(limit.child(), Filter.class); + FieldAttribute lAttr = as(as(filter.condition(), GreaterThan.class).left(), FieldAttribute.class); + var relation = as(filter.child(), EsRelation.class); + assertTrue(relation.output().contains(lAttr)); + } + + public void testLengthInStats() { + assumeTrue("requires similarity functions", EsqlCapabilities.Cap.VECTOR_SIMILARITY_FUNCTIONS_PUSHDOWN.isEnabled()); + String query = """ + FROM test + | STATS l = SUM(LENGTH(last_name)) + """; + LogicalPlan plan = localPlan(plan(query, analyzer), TEST_SEARCH_STATS); + + var limit = as(plan, Limit.class); + var agg = as(limit.child(), Aggregate.class); + assertThat(agg.aggregates(), hasSize(1)); + as(as(agg.aggregates().getFirst(), Alias.class).child(), Sum.class); + var eval = as(agg.child(), Eval.class); + FieldAttribute lAttr = as(as(eval.fields().getFirst(), Alias.class).child(), FieldAttribute.class); + var relation = as(eval.child(), EsRelation.class); + assertTrue(relation.output().contains(lAttr)); + } + + public void testLengthInWhereAndEval() { + assumeFalse("fix me", true); + assumeTrue("requires similarity functions", EsqlCapabilities.Cap.VECTOR_SIMILARITY_FUNCTIONS_PUSHDOWN.isEnabled()); + String query = """ + FROM test + | WHERE LENGTH(last_name) > 1 + | EVAL l = LENGTH(last_name) + """; + LogicalPlan plan = localPlan(plan(query, analyzer), TEST_SEARCH_STATS); + + var project = as(plan, EsqlProject.class); + var limit = as(project.child(), Limit.class); + var eval = as(limit.child(), Eval.class); + var filter = as(eval.child(), Filter.class); + FieldAttribute lAttr = as(as(filter.condition(), GreaterThan.class).left(), FieldAttribute.class); + var relation = as(filter.child(), EsRelation.class); + assertTrue(relation.output().contains(lAttr)); + } + + public void testLengthInStatsTwice() { + assumeTrue("requires similarity functions", EsqlCapabilities.Cap.VECTOR_SIMILARITY_FUNCTIONS_PUSHDOWN.isEnabled()); + String query = """ + FROM test + | STATS l = SUM(LENGTH(last_name)) + AVG(LENGTH(last_name)) + """; + LogicalPlan plan = localPlan(plan(query, analyzer), TEST_SEARCH_STATS); + + var project = as(plan, Project.class); + var eval1 = as(project.child(), Eval.class); + var limit = as(eval1.child(), Limit.class); + var agg = as(limit.child(), Aggregate.class); + assertThat(agg.aggregates(), hasSize(2)); + var sum = as(as(agg.aggregates().getFirst(), Alias.class).child(), Sum.class); + var count = as(as(agg.aggregates().get(1), Alias.class).child(), Count.class); + var eval2 = as(agg.child(), Eval.class); + assertThat(eval2.fields(), hasSize(1)); + Alias lAlias = as(eval2.fields().getFirst(), Alias.class); + FieldAttribute lAttr = as(lAlias.child(), FieldAttribute.class); + var relation = as(eval2.child(), EsRelation.class); + assertTrue(relation.output().contains(lAttr)); + + assertThat(as(sum.field(), ReferenceAttribute.class).id(), equalTo(lAlias.id())); + assertThat(as(count.field(), ReferenceAttribute.class).id(), equalTo(lAlias.id())); + } + private IsNotNull isNotNull(Expression field) { return new IsNotNull(EMPTY, field); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/ConstantShardContextIndexedByShardId.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/ConstantShardContextIndexedByShardId.java index 4d39782c23e58..8af35d05843ee 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/ConstantShardContextIndexedByShardId.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/ConstantShardContextIndexedByShardId.java @@ -13,6 +13,7 @@ import org.elasticsearch.index.mapper.BlockLoader; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.SourceLoader; +import org.elasticsearch.index.mapper.blockloader.BlockLoaderFunctionConfig; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.search.sort.SortAndFormats; import org.elasticsearch.search.sort.SortBuilder; @@ -70,7 +71,7 @@ public BlockLoader blockLoader( String name, boolean asUnsupportedSource, MappedFieldType.FieldExtractPreference fieldExtractPreference, - MappedFieldType.BlockLoaderFunctionConfig blockLoaderFunctionConfig + BlockLoaderFunctionConfig blockLoaderFunctionConfig ) { throw new UnsupportedOperationException(); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/stats/DisabledSearchStats.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/stats/DisabledSearchStats.java index c63c7aff03909..f726b941b0bb7 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/stats/DisabledSearchStats.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/stats/DisabledSearchStats.java @@ -25,7 +25,8 @@ public boolean isIndexed(FieldName field) { @Override public boolean hasDocValues(FieldName field) { - return true; + // Some geo tests assume doc values and the loader emulates it. Nothing else does. + return field.string().endsWith("location") || field.string().endsWith("centroid") || field.string().equals("subset"); } @Override diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/type/FunctionEsFieldTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/type/FunctionEsFieldTests.java index 4a31bb4298769..b860c6975c1e3 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/type/FunctionEsFieldTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/type/FunctionEsFieldTests.java @@ -8,7 +8,7 @@ package org.elasticsearch.xpack.esql.type; import org.elasticsearch.TransportVersion; -import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.mapper.blockloader.BlockLoaderFunctionConfig; import org.elasticsearch.index.mapper.vectors.DenseVectorFieldMapper; import org.elasticsearch.xpack.esql.core.type.DataType; import org.elasticsearch.xpack.esql.core.type.EsField; @@ -34,7 +34,7 @@ protected FunctionEsField createTestInstance() { protected FunctionEsField mutateInstance(FunctionEsField instance) throws IOException { EsField esField = instance.getExactField(); DataType dataType = instance.getDataType(); - MappedFieldType.BlockLoaderFunctionConfig functionConfig = instance.functionConfig(); + BlockLoaderFunctionConfig functionConfig = instance.functionConfig(); switch (between(0, 2)) { case 0 -> esField = randomValueOtherThan(esField, () -> randomAnyEsField(4)); case 1 -> dataType = randomValueOtherThan(dataType, () -> randomFrom(DataType.types()));