diff --git a/benchmarks/src/main/java/org/elasticsearch/benchmark/_nightly/esql/ValuesSourceReaderBenchmark.java b/benchmarks/src/main/java/org/elasticsearch/benchmark/_nightly/esql/ValuesSourceReaderBenchmark.java index 6a255a5144353..9cc304828a8a7 100644 --- a/benchmarks/src/main/java/org/elasticsearch/benchmark/_nightly/esql/ValuesSourceReaderBenchmark.java +++ b/benchmarks/src/main/java/org/elasticsearch/benchmark/_nightly/esql/ValuesSourceReaderBenchmark.java @@ -142,11 +142,26 @@ static void selfTest() { private static List fields(String name) { return switch (name) { case "3_stored_keywords" -> List.of( - new ValuesSourceReaderOperator.FieldInfo("keyword_1", ElementType.BYTES_REF, shardIdx -> blockLoader("stored_keyword_1")), - new ValuesSourceReaderOperator.FieldInfo("keyword_2", ElementType.BYTES_REF, shardIdx -> blockLoader("stored_keyword_2")), - new ValuesSourceReaderOperator.FieldInfo("keyword_3", ElementType.BYTES_REF, shardIdx -> blockLoader("stored_keyword_3")) + new ValuesSourceReaderOperator.FieldInfo( + "keyword_1", + ElementType.BYTES_REF, + false, + shardIdx -> blockLoader("stored_keyword_1") + ), + new ValuesSourceReaderOperator.FieldInfo( + "keyword_2", + ElementType.BYTES_REF, + false, + shardIdx -> blockLoader("stored_keyword_2") + ), + new ValuesSourceReaderOperator.FieldInfo( + "keyword_3", + ElementType.BYTES_REF, + false, + shardIdx -> blockLoader("stored_keyword_3") + ) ); - default -> List.of(new ValuesSourceReaderOperator.FieldInfo(name, elementType(name), shardIdx -> blockLoader(name))); + default -> List.of(new ValuesSourceReaderOperator.FieldInfo(name, elementType(name), false, shardIdx -> blockLoader(name))); }; } diff --git a/server/src/main/java/org/elasticsearch/index/mapper/AbstractShapeGeometryFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/AbstractShapeGeometryFieldMapper.java index f419d87d008fe..03b48e526fd8c 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/AbstractShapeGeometryFieldMapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/AbstractShapeGeometryFieldMapper.java @@ -98,7 +98,12 @@ protected void writeExtent(BlockLoader.IntBuilder builder, Extent extent) { public BlockLoader.AllReader reader(LeafReaderContext context) throws IOException { return new BlockLoader.AllReader() { @Override - public BlockLoader.Block read(BlockLoader.BlockFactory factory, BlockLoader.Docs docs, int offset) throws IOException { + public BlockLoader.Block read( + BlockLoader.BlockFactory factory, + BlockLoader.Docs docs, + int offset, + boolean nullsFiltered + ) throws IOException { var binaryDocValues = context.reader().getBinaryDocValues(fieldName); var reader = new GeometryDocValueReader(); try (var builder = factory.ints(docs.count() - offset)) { diff --git a/server/src/main/java/org/elasticsearch/index/mapper/BlockDocValuesReader.java b/server/src/main/java/org/elasticsearch/index/mapper/BlockDocValuesReader.java index 3b1ab81b9fb58..281087f703236 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/BlockDocValuesReader.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/BlockDocValuesReader.java @@ -130,7 +130,7 @@ static class SingletonLongs extends BlockDocValuesReader { } @Override - public BlockLoader.Block read(BlockFactory factory, Docs docs, int offset) throws IOException { + public BlockLoader.Block read(BlockFactory factory, Docs docs, int offset, boolean nullsFiltered) throws IOException { if (numericDocValues instanceof BlockLoader.OptionalColumnAtATimeReader direct) { BlockLoader.Block result = direct.tryRead(factory, docs, offset); if (result != null) { @@ -179,7 +179,7 @@ static class Longs extends BlockDocValuesReader { } @Override - public BlockLoader.Block read(BlockFactory factory, Docs docs, int offset) throws IOException { + public BlockLoader.Block read(BlockFactory factory, Docs docs, int offset, boolean nullsFiltered) throws IOException { try (BlockLoader.LongBuilder builder = factory.longsFromDocValues(docs.count() - offset)) { for (int i = offset; i < docs.count(); i++) { int doc = docs.get(i); @@ -260,7 +260,7 @@ private static class SingletonInts extends BlockDocValuesReader { } @Override - public BlockLoader.Block read(BlockFactory factory, Docs docs, int offset) throws IOException { + public BlockLoader.Block read(BlockFactory factory, Docs docs, int offset, boolean nullsFiltered) throws IOException { try (BlockLoader.IntBuilder builder = factory.intsFromDocValues(docs.count() - offset)) { for (int i = offset; i < docs.count(); i++) { int doc = docs.get(i); @@ -303,7 +303,7 @@ private static class Ints extends BlockDocValuesReader { } @Override - public BlockLoader.Block read(BlockFactory factory, Docs docs, int offset) throws IOException { + public BlockLoader.Block read(BlockFactory factory, Docs docs, int offset, boolean nullsFiltered) throws IOException { try (BlockLoader.IntBuilder builder = factory.intsFromDocValues(docs.count() - offset)) { for (int i = offset; i < docs.count(); i++) { int doc = docs.get(i); @@ -397,7 +397,7 @@ private static class SingletonDoubles extends BlockDocValuesReader { } @Override - public BlockLoader.Block read(BlockFactory factory, Docs docs, int offset) throws IOException { + public BlockLoader.Block read(BlockFactory factory, Docs docs, int offset, boolean nullsFiltered) throws IOException { try (BlockLoader.DoubleBuilder builder = factory.doublesFromDocValues(docs.count() - offset)) { for (int i = offset; i < docs.count(); i++) { int doc = docs.get(i); @@ -442,7 +442,7 @@ private static class Doubles extends BlockDocValuesReader { } @Override - public BlockLoader.Block read(BlockFactory factory, Docs docs, int offset) throws IOException { + public BlockLoader.Block read(BlockFactory factory, Docs docs, int offset, boolean nullsFiltered) throws IOException { try (BlockLoader.DoubleBuilder builder = factory.doublesFromDocValues(docs.count() - offset)) { for (int i = offset; i < docs.count(); i++) { int doc = docs.get(i); @@ -540,7 +540,7 @@ private abstract static class DenseVectorValuesBlockReader blockLoaderReadValuesFromColumnAtATimeReader(Direct BlockLoader loader = fieldType.blockLoader(blContext()); List all = new ArrayList<>(); for (LeafReaderContext ctx : reader.leaves()) { - TestBlock block = (TestBlock) loader.columnAtATimeReader(ctx).read(TestBlock.factory(), TestBlock.docs(ctx), offset); + TestBlock block = (TestBlock) loader.columnAtATimeReader(ctx).read(TestBlock.factory(), TestBlock.docs(ctx), offset, false); for (int i = 0; i < block.size(); i++) { all.add(block.get(i)); } diff --git a/test/framework/src/main/java/org/elasticsearch/index/mapper/BlockLoaderTestRunner.java b/test/framework/src/main/java/org/elasticsearch/index/mapper/BlockLoaderTestRunner.java index eeb1a349d8bbc..3f4b863532233 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/mapper/BlockLoaderTestRunner.java +++ b/test/framework/src/main/java/org/elasticsearch/index/mapper/BlockLoaderTestRunner.java @@ -113,7 +113,7 @@ private Object load(BlockLoader blockLoader, LeafReaderContext context, MapperSe } } BlockLoader.Docs docs = TestBlock.docs(docArray); - var block = (TestBlock) columnAtATimeReader.read(TestBlock.factory(), docs, offset); + var block = (TestBlock) columnAtATimeReader.read(TestBlock.factory(), docs, offset, false); assertThat(block.size(), equalTo(docArray.length - offset)); return block.get(0); } diff --git a/test/framework/src/main/java/org/elasticsearch/index/mapper/MapperTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/mapper/MapperTestCase.java index e078e33d51b18..80727795f8bb0 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/mapper/MapperTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/mapper/MapperTestCase.java @@ -1542,7 +1542,7 @@ public void testSingletonLongBulkBlockReading() throws IOException { var columnReader = (BlockDocValuesReader.SingletonLongs) blockLoader.columnAtATimeReader(context); assertThat(columnReader.numericDocValues, instanceOf(BlockLoader.OptionalColumnAtATimeReader.class)); var docBlock = TestBlock.docs(IntStream.range(0, 3).toArray()); - var block = (TestBlock) columnReader.read(TestBlock.factory(), docBlock, 0); + var block = (TestBlock) columnReader.read(TestBlock.factory(), docBlock, 0, false); for (int i = 0; i < block.size(); i++) { assertThat(block.get(i), equalTo(expectedSampleValues[i])); } @@ -1567,7 +1567,7 @@ public void testSingletonLongBulkBlockReading() throws IOException { var columnReader = (BlockDocValuesReader.SingletonLongs) blockLoader.columnAtATimeReader(context); assertThat(columnReader.numericDocValues, not(instanceOf(BlockLoader.OptionalColumnAtATimeReader.class))); var docBlock = TestBlock.docs(IntStream.range(0, 3).toArray()); - var block = (TestBlock) columnReader.read(TestBlock.factory(), docBlock, 0); + var block = (TestBlock) columnReader.read(TestBlock.factory(), docBlock, 0, false); assertThat(block.get(0), equalTo(expectedSampleValues[0])); assertThat(block.get(1), nullValue()); assertThat(block.get(2), equalTo(expectedSampleValues[2])); @@ -1597,7 +1597,7 @@ public void testSingletonLongBulkBlockReading() throws IOException { var columnReader = blockLoader.columnAtATimeReader(context); assertThat(columnReader, instanceOf(BlockDocValuesReader.Longs.class)); var docBlock = TestBlock.docs(IntStream.range(0, 3).toArray()); - var block = (TestBlock) columnReader.read(TestBlock.factory(), docBlock, 0); + var block = (TestBlock) columnReader.read(TestBlock.factory(), docBlock, 0, false); assertThat(block.get(0), equalTo(expectedSampleValues[0])); assertThat(block.get(1), equalTo(List.of(expectedSampleValues[0], expectedSampleValues[1]))); assertThat(block.get(2), equalTo(expectedSampleValues[2])); diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesFromSingleReader.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesFromSingleReader.java index de7136c2fade3..59e54103eb310 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesFromSingleReader.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesFromSingleReader.java @@ -112,7 +112,7 @@ private void loadFromSingleLeaf(long jumboBytes, Block[] target, ValuesReaderDoc loadFromRowStrideReaders(jumboBytes, target, storedFieldsSpec, rowStrideReaders, ctx, docs, offset); } for (ColumnAtATimeWork r : columnAtATimeReaders) { - target[r.idx] = (Block) r.reader.read(loaderBlockFactory, docs, offset); + target[r.idx] = (Block) r.reader.read(loaderBlockFactory, docs, offset, operator.fields[r.idx].info.nullsFiltered()); operator.sanityCheckBlock(r.reader, docs.count() - offset, target[r.idx], r.idx); } if (log.isDebugEnabled()) { diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperator.java index 6d0ebb9c312d0..3f3d73eec88ae 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperator.java @@ -81,10 +81,15 @@ public String describe() { /** * Configuration for a field to load. * - * {@code blockLoader} maps shard index to the {@link BlockLoader}s - * which load the actual blocks. + * @param nullsFiltered if {@code true}, then target docs passed from the source operator are guaranteed to have a value + * for the field; otherwise, the guarantee is unknown. This enables optimizations for block loaders, + * treating the field as dense (every document has value) even if it is sparse in the index. + * For example, "FROM index | WHERE x != null | STATS sum(x)", after filtering out documents + * without value for field x, all target documents returned from the source operator + * will have a value for field x whether x is dense or sparse in the index. + * @param blockLoader maps shard index to the {@link BlockLoader}s which load the actual blocks. */ - public record FieldInfo(String name, ElementType type, IntFunction blockLoader) {} + public record FieldInfo(String name, ElementType type, boolean nullsFiltered, IntFunction blockLoader) {} public record ShardContext(IndexReader reader, Supplier newSourceLoader, double storedFieldsSequentialProportion) {} diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java index c247b4765548a..d8f9787c2864c 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java @@ -174,7 +174,12 @@ public void testPushRoundToToQuery() throws IOException { ValuesSourceReaderOperator.Factory load = new ValuesSourceReaderOperator.Factory( ByteSizeValue.ofGb(1), List.of( - new ValuesSourceReaderOperator.FieldInfo("v", ElementType.LONG, f -> new BlockDocValuesReader.LongsBlockLoader("v")) + new ValuesSourceReaderOperator.FieldInfo( + "v", + ElementType.LONG, + false, + f -> new BlockDocValuesReader.LongsBlockLoader("v") + ) ), List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> { throw new UnsupportedOperationException(); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneQueryEvaluatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneQueryEvaluatorTests.java index 3879ab117d734..b4996465f9496 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneQueryEvaluatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneQueryEvaluatorTests.java @@ -207,6 +207,7 @@ private List runQuery(Set values, Query query, boolean shuffleDocs new ValuesSourceReaderOperator.FieldInfo( FIELD, ElementType.BYTES_REF, + false, unused -> new BlockDocValuesReader.BytesRefsFromOrdsBlockLoader(FIELD) ) ), diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperatorTests.java index ea624288a8c82..b58f68ab53beb 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperatorTests.java @@ -490,6 +490,7 @@ public MappedFieldType fieldType(String name) { f -> new ValuesSourceReaderOperator.FieldInfo( f.ft.name(), f.elementType, + false, n -> f.ft.blockLoader(ValuesSourceReaderOperatorTests.blContext()) ) ) diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/SingletonOrdinalsBuilderTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/SingletonOrdinalsBuilderTests.java index a1155b4291ec7..68dedbf1c79d6 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/SingletonOrdinalsBuilderTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/SingletonOrdinalsBuilderTests.java @@ -91,7 +91,7 @@ public int get(int i) { } }; var columnAtATimeReader = blockLoader.columnAtATimeReader(ctx); - try (BlockLoader.Block block = columnAtATimeReader.read(blockFactory, docs, start)) { + try (BlockLoader.Block block = columnAtATimeReader.read(blockFactory, docs, start, false)) { BytesRefBlock result = (BytesRefBlock) block; BytesRef scratch = new BytesRef(); for (int i = 0; i < result.getPositionCount(); i++) { @@ -241,7 +241,7 @@ public int get(int i) { } }; var columnAtATimeReader = blockLoader.columnAtATimeReader(ctx); - try (BlockLoader.Block block = columnAtATimeReader.read(blockFactory, docs, start)) { + try (BlockLoader.Block block = columnAtATimeReader.read(blockFactory, docs, start, false)) { BytesRefBlock result = (BytesRefBlock) block; assertNotNull(result.asVector()); boolean enclosedInSingleRange = false; diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValueSourceReaderTypeConversionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValueSourceReaderTypeConversionTests.java index 6de494fe26b6a..d560a8e054152 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValueSourceReaderTypeConversionTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValueSourceReaderTypeConversionTests.java @@ -244,7 +244,7 @@ private static Operator.OperatorFactory factory( ) { return new ValuesSourceReaderOperator.Factory( ByteSizeValue.ofGb(1), - List.of(new ValuesSourceReaderOperator.FieldInfo(name, elementType, shardIdx -> { + List.of(new ValuesSourceReaderOperator.FieldInfo(name, elementType, false, shardIdx -> { if (shardIdx < 0 || shardIdx >= INDICES.size()) { fail("unexpected shardIdx [" + shardIdx + "]"); } @@ -544,15 +544,20 @@ public void testLoadAllInOnePageShuffled() { } private static ValuesSourceReaderOperator.FieldInfo fieldInfo(MappedFieldType ft, ElementType elementType) { - return new ValuesSourceReaderOperator.FieldInfo(ft.name(), elementType, shardIdx -> getBlockLoaderFor(shardIdx, ft, null)); + return new ValuesSourceReaderOperator.FieldInfo(ft.name(), elementType, false, shardIdx -> getBlockLoaderFor(shardIdx, ft, null)); } private static ValuesSourceReaderOperator.FieldInfo fieldInfo(MappedFieldType ft, MappedFieldType ftX, ElementType elementType) { - return new ValuesSourceReaderOperator.FieldInfo(ft.name(), elementType, shardIdx -> getBlockLoaderFor(shardIdx, ft, ftX)); + return new ValuesSourceReaderOperator.FieldInfo(ft.name(), elementType, false, shardIdx -> getBlockLoaderFor(shardIdx, ft, ftX)); } private ValuesSourceReaderOperator.FieldInfo fieldInfo(String fieldName, ElementType elementType, DataType toType) { - return new ValuesSourceReaderOperator.FieldInfo(fieldName, elementType, shardIdx -> getBlockLoaderFor(shardIdx, fieldName, toType)); + return new ValuesSourceReaderOperator.FieldInfo( + fieldName, + elementType, + false, + shardIdx -> getBlockLoaderFor(shardIdx, fieldName, toType) + ); } private static MappedFieldType.BlockLoaderContext blContext() { @@ -878,6 +883,7 @@ private List infoAndChecksForEachType( new ValuesSourceReaderOperator.FieldInfo( "constant_bytes", ElementType.BYTES_REF, + false, shardIdx -> BlockLoader.constantBytes(new BytesRef("foo")) ), checks::constantBytes, @@ -886,7 +892,7 @@ private List infoAndChecksForEachType( ); r.add( new FieldCase( - new ValuesSourceReaderOperator.FieldInfo("null", ElementType.NULL, shardIdx -> BlockLoader.CONSTANT_NULLS), + new ValuesSourceReaderOperator.FieldInfo("null", ElementType.NULL, false, shardIdx -> BlockLoader.CONSTANT_NULLS), checks::constantNulls, StatusChecks::constantNulls ) @@ -1404,8 +1410,18 @@ public void testNullsShared() { new ValuesSourceReaderOperator.Factory( ByteSizeValue.ofGb(1), List.of( - new ValuesSourceReaderOperator.FieldInfo("null1", ElementType.NULL, shardIdx -> BlockLoader.CONSTANT_NULLS), - new ValuesSourceReaderOperator.FieldInfo("null2", ElementType.NULL, shardIdx -> BlockLoader.CONSTANT_NULLS) + new ValuesSourceReaderOperator.FieldInfo( + "null1", + ElementType.NULL, + false, + shardIdx -> BlockLoader.CONSTANT_NULLS + ), + new ValuesSourceReaderOperator.FieldInfo( + "null2", + ElementType.NULL, + false, + shardIdx -> BlockLoader.CONSTANT_NULLS + ) ), shardContexts, 0 @@ -1485,7 +1501,7 @@ public void testManyShards() throws IOException { MappedFieldType ft = mapperService(indexKey).fieldType("key"); var readerFactory = new ValuesSourceReaderOperator.Factory( ByteSizeValue.ofGb(1), - List.of(new ValuesSourceReaderOperator.FieldInfo("key", ElementType.INT, shardIdx -> { + List.of(new ValuesSourceReaderOperator.FieldInfo("key", ElementType.INT, false, shardIdx -> { seenShards.add(shardIdx); return ft.blockLoader(blContext()); })), @@ -1692,8 +1708,8 @@ public ColumnAtATimeReader columnAtATimeReader(LeafReaderContext context) throws } return new ColumnAtATimeReader() { @Override - public Block read(BlockFactory factory, Docs docs, int offset) throws IOException { - Block block = reader.read(factory, docs, offset); + public Block read(BlockFactory factory, Docs docs, int offset, boolean nullsFiltered) throws IOException { + Block block = reader.read(factory, docs, offset, nullsFiltered); Page page = new Page((org.elasticsearch.compute.data.Block) block); return convertEvaluator.eval(page); } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperatorTests.java index 2dd0e7c4de41b..3291ef4b5a2e2 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperatorTests.java @@ -155,7 +155,7 @@ public static Operator.OperatorFactory factory(IndexReader reader, MappedFieldTy static Operator.OperatorFactory factory(IndexReader reader, String name, ElementType elementType, BlockLoader loader) { return new ValuesSourceReaderOperator.Factory( ByteSizeValue.ofGb(1), - List.of(new ValuesSourceReaderOperator.FieldInfo(name, elementType, shardIdx -> { + List.of(new ValuesSourceReaderOperator.FieldInfo(name, elementType, false, shardIdx -> { if (shardIdx != 0) { fail("unexpected shardIdx [" + shardIdx + "]"); } @@ -554,7 +554,7 @@ private Page shuffle(Page source) { } private static ValuesSourceReaderOperator.FieldInfo fieldInfo(MappedFieldType ft, ElementType elementType) { - return new ValuesSourceReaderOperator.FieldInfo(ft.name(), elementType, shardIdx -> { + return new ValuesSourceReaderOperator.FieldInfo(ft.name(), elementType, false, shardIdx -> { if (shardIdx != 0) { fail("unexpected shardIdx [" + shardIdx + "]"); } @@ -902,6 +902,7 @@ private List infoAndChecksForEachType( new ValuesSourceReaderOperator.FieldInfo( "constant_bytes", ElementType.BYTES_REF, + false, shardIdx -> BlockLoader.constantBytes(new BytesRef("foo")) ), checks::constantBytes, @@ -910,7 +911,7 @@ private List infoAndChecksForEachType( ); r.add( new FieldCase( - new ValuesSourceReaderOperator.FieldInfo("null", ElementType.NULL, shardIdx -> BlockLoader.CONSTANT_NULLS), + new ValuesSourceReaderOperator.FieldInfo("null", ElementType.NULL, false, shardIdx -> BlockLoader.CONSTANT_NULLS), checks::constantNulls, StatusChecks::constantNulls ) @@ -1622,8 +1623,18 @@ public void testNullsShared() { new ValuesSourceReaderOperator.Factory( ByteSizeValue.ofGb(1), List.of( - new ValuesSourceReaderOperator.FieldInfo("null1", ElementType.NULL, shardIdx -> BlockLoader.CONSTANT_NULLS), - new ValuesSourceReaderOperator.FieldInfo("null2", ElementType.NULL, shardIdx -> BlockLoader.CONSTANT_NULLS) + new ValuesSourceReaderOperator.FieldInfo( + "null1", + ElementType.NULL, + false, + shardIdx -> BlockLoader.CONSTANT_NULLS + ), + new ValuesSourceReaderOperator.FieldInfo( + "null2", + ElementType.NULL, + false, + shardIdx -> BlockLoader.CONSTANT_NULLS + ) ), List.of( new ValuesSourceReaderOperator.ShardContext( @@ -1766,7 +1777,7 @@ public void testManyShards() throws IOException { MappedFieldType ft = mapperService.fieldType("key"); var readerFactory = new ValuesSourceReaderOperator.Factory( ByteSizeValue.ofGb(1), - List.of(new ValuesSourceReaderOperator.FieldInfo("key", ElementType.INT, shardIdx -> { + List.of(new ValuesSourceReaderOperator.FieldInfo("key", ElementType.INT, false, shardIdx -> { seenShards.add(shardIdx); return ft.blockLoader(blContext()); })), diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java index 72427a5afc9d6..dc8a14abf1e12 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java @@ -293,6 +293,7 @@ private void runLookup(List keyTypes, PopulateIndices populateIndices) new ValuesSourceReaderOperator.FieldInfo( "key" + idx, PlannerUtils.toElementType(keyTypes.get(idx)), + false, shard -> searchContext.getSearchExecutionContext().getFieldType("key" + idx).blockLoader(blContext()) ) ); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java index e33f0ad4b2904..b52c6472f962a 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java @@ -435,6 +435,7 @@ private static Operator extractFieldsOperator( new ValuesSourceReaderOperator.FieldInfo( extractField.name(), PlannerUtils.toElementType(extractField.dataType()), + false, shardIdx -> { if (shardIdx != 0) { throw new IllegalStateException("only one shard"); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java index d5101e6b8be7c..5b32cdbbacdc9 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java @@ -45,9 +45,15 @@ import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.NestedLookup; import org.elasticsearch.index.mapper.SourceLoader; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.ConstantScoreQueryBuilder; +import org.elasticsearch.index.query.ExistsQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.index.query.SearchExecutionContext; +import org.elasticsearch.index.query.TermQueryBuilder; +import org.elasticsearch.index.query.TermsQueryBuilder; import org.elasticsearch.index.search.NestedHelper; import org.elasticsearch.logging.LogManager; import org.elasticsearch.logging.Logger; @@ -78,6 +84,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Optional; import java.util.Set; @@ -169,7 +176,7 @@ public final PhysicalOperation fieldExtractPhysicalOperation(FieldExtractExec fi for (Attribute attr : fieldExtractExec.attributesToExtract()) { layout.append(attr); } - var fields = extractFields(fieldExtractExec.attributesToExtract(), fieldExtractExec::fieldExtractPreference); + var fields = extractFields(fieldExtractExec); if (fieldExtractExec instanceof TimeSeriesFieldExtractExec) { // TODO: consolidate with ValuesSourceReaderOperator return source.with(new TimeSeriesExtractFieldOperator.Factory(fields, shardContexts), layout.build()); @@ -193,7 +200,8 @@ private BlockLoader getBlockLoaderFor(int shardId, Attribute attr, MappedFieldTy } boolean isUnsupported = attr.dataType() == DataType.UNSUPPORTED; - BlockLoader blockLoader = shardContext.blockLoader(getFieldName(attr), isUnsupported, fieldExtractPreference); + String fieldName = getFieldName(attr); + BlockLoader blockLoader = shardContext.blockLoader(fieldName, isUnsupported, fieldExtractPreference); MultiTypeEsField unionTypes = findUnionTypes(attr); if (unionTypes != null) { // Use the fully qualified name `cluster:index-name` because multiple types are resolved on coordinator with the cluster prefix @@ -322,21 +330,53 @@ public PhysicalOperation timeSeriesSourceOperation(TimeSeriesSourceExec ts, Loca return PhysicalOperation.fromSource(luceneFactory, layout.build()); } - private List extractFields( - List attributes, - Function preference - ) { + List extractFields(FieldExtractExec fieldExtractExec) { + List attributes = fieldExtractExec.attributesToExtract(); List fieldInfos = new ArrayList<>(attributes.size()); + Set nullsFilteredFields = new HashSet<>(); + fieldExtractExec.forEachDown(EsQueryExec.class, queryExec -> { + QueryBuilder q = queryExec.query(); + if (q != null) { + nullsFilteredFields.addAll(nullsFilteredFieldsAfterSourceQuery(q)); + } + }); for (Attribute attr : attributes) { DataType dataType = attr.dataType(); - var fieldExtractPreference = preference.apply(attr); + var fieldExtractPreference = fieldExtractExec.fieldExtractPreference(attr); ElementType elementType = PlannerUtils.toElementType(dataType, fieldExtractPreference); IntFunction loader = s -> getBlockLoaderFor(s, attr, fieldExtractPreference); - fieldInfos.add(new ValuesSourceReaderOperator.FieldInfo(getFieldName(attr), elementType, loader)); + String fieldName = getFieldName(attr); + boolean nullsFiltered = nullsFilteredFields.contains(fieldName); + fieldInfos.add(new ValuesSourceReaderOperator.FieldInfo(fieldName, elementType, nullsFiltered, loader)); } return fieldInfos; } + /** + * Returns the set of fields that are guaranteed to be dense after the source query. + */ + static Set nullsFilteredFieldsAfterSourceQuery(QueryBuilder sourceQuery) { + return switch (sourceQuery) { + case ExistsQueryBuilder q -> Set.of(q.fieldName()); + case TermQueryBuilder q -> Set.of(q.fieldName()); + case TermsQueryBuilder q -> Set.of(q.fieldName()); + case RangeQueryBuilder q -> Set.of(q.fieldName()); + case ConstantScoreQueryBuilder q -> nullsFilteredFieldsAfterSourceQuery(q.innerQuery()); + // TODO: support SingleValueQuery + case BoolQueryBuilder q -> { + final Set fields = new HashSet<>(); + for (List clauses : List.of(q.must(), q.filter())) { + for (QueryBuilder c : clauses) { + fields.addAll(nullsFilteredFieldsAfterSourceQuery(c)); + } + } + // safe to ignore must_not and should clauses + yield fields; + } + default -> Set.of(); + }; + } + /** * Build a {@link SourceOperator.SourceOperatorFactory} that counts documents in the search index. */ @@ -533,8 +573,8 @@ public ColumnAtATimeReader columnAtATimeReader(LeafReaderContext context) throws } return new ColumnAtATimeReader() { @Override - public Block read(BlockFactory factory, Docs docs, int offset) throws IOException { - Block block = reader.read(factory, docs, offset); + public Block read(BlockFactory factory, Docs docs, int offset, boolean nullsFiltered) throws IOException { + Block block = reader.read(factory, docs, offset, nullsFiltered); return typeConverter.convert((org.elasticsearch.compute.data.Block) block); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProvidersTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProvidersTests.java new file mode 100644 index 0000000000000..31b720b65ebac --- /dev/null +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProvidersTests.java @@ -0,0 +1,185 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.planner; + +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.compute.lucene.DataPartitioning; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexMode; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.IndexVersion; +import org.elasticsearch.index.cache.bitset.BitsetFilterCache; +import org.elasticsearch.index.fielddata.FieldDataContext; +import org.elasticsearch.index.fielddata.IndexFieldData; +import org.elasticsearch.index.fielddata.IndexFieldDataCache; +import org.elasticsearch.index.mapper.KeywordFieldMapper; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.mapper.MapperMetrics; +import org.elasticsearch.index.mapper.Mapping; +import org.elasticsearch.index.mapper.MappingLookup; +import org.elasticsearch.index.mapper.NestedLookup; +import org.elasticsearch.index.mapper.NumberFieldMapper; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.ExistsQueryBuilder; +import org.elasticsearch.index.query.MatchAllQueryBuilder; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.RangeQueryBuilder; +import org.elasticsearch.index.query.SearchExecutionContext; +import org.elasticsearch.index.query.TermQueryBuilder; +import org.elasticsearch.search.internal.AliasFilter; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.IndexSettingsModule; +import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; +import org.elasticsearch.xpack.esql.core.expression.FoldContext; +import org.elasticsearch.xpack.esql.core.tree.Source; +import org.elasticsearch.xpack.esql.core.type.DataType; +import org.elasticsearch.xpack.esql.core.type.EsField; +import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec; +import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec; +import org.mockito.Mockito; + +import java.util.List; +import java.util.Map; +import java.util.function.BiFunction; + +import static java.util.Collections.emptyMap; +import static org.hamcrest.Matchers.equalTo; + +public class EsPhysicalOperationProvidersTests extends ESTestCase { + + public void testNullsFilteredFieldInfos() { + record TestCase(QueryBuilder query, List nullsFilteredFields) { + + } + List testCases = List.of( + new TestCase(new MatchAllQueryBuilder(), List.of()), + new TestCase(null, List.of()), + new TestCase(new ExistsQueryBuilder("f1"), List.of("f1")), + new TestCase(new ExistsQueryBuilder("f2"), List.of("f2")), + new TestCase( + new BoolQueryBuilder().should(new ExistsQueryBuilder("f1")).should(new ExistsQueryBuilder("f2")).minimumShouldMatch(1), + List.of() + ), + new TestCase( + new BoolQueryBuilder().should(new ExistsQueryBuilder("f1")).should(new ExistsQueryBuilder("f2")).minimumShouldMatch(2), + List.of() + ), + new TestCase(new BoolQueryBuilder().filter(new ExistsQueryBuilder("f1")), List.of("f1")), + new TestCase(new BoolQueryBuilder().filter(new ExistsQueryBuilder("f1")), List.of("f1")), + new TestCase(new BoolQueryBuilder().filter(new ExistsQueryBuilder("f1")).should(new RangeQueryBuilder("f2")), List.of("f1")), + new TestCase(new BoolQueryBuilder().filter(new ExistsQueryBuilder("f2")).mustNot(new RangeQueryBuilder("f1")), List.of("f2")), + new TestCase(new TermQueryBuilder("f3", "v3"), List.of("f3")), + new TestCase(new BoolQueryBuilder().filter(new ExistsQueryBuilder("f1")).must(new TermQueryBuilder("f1", "v1")), List.of("f1")) + ); + EsPhysicalOperationProviders provider = new EsPhysicalOperationProviders( + FoldContext.small(), + List.of(new EsPhysicalOperationProviders.DefaultShardContext(0, () -> {}, createMockContext(), AliasFilter.EMPTY)), + null, + new PhysicalSettings(DataPartitioning.AUTO, ByteSizeValue.ofMb(1)) + ); + for (TestCase testCase : testCases) { + EsQueryExec queryExec = new EsQueryExec( + Source.EMPTY, + "test", + IndexMode.STANDARD, + Map.of(), + List.of(), + testCase.query, + null, + null, + 10 + ); + FieldExtractExec fieldExtractExec = new FieldExtractExec( + Source.EMPTY, + queryExec, + List.of( + new FieldAttribute( + Source.EMPTY, + "f1", + new EsField("f1", DataType.KEYWORD, Map.of(), false, EsField.TimeSeriesFieldType.NONE) + ), + new FieldAttribute( + Source.EMPTY, + "f2", + new EsField("f2", DataType.KEYWORD, Map.of(), false, EsField.TimeSeriesFieldType.NONE) + ), + new FieldAttribute( + Source.EMPTY, + "f3", + new EsField("f3", DataType.KEYWORD, Map.of(), false, EsField.TimeSeriesFieldType.NONE) + ), + new FieldAttribute( + Source.EMPTY, + "f4", + new EsField("f4", DataType.KEYWORD, Map.of(), false, EsField.TimeSeriesFieldType.NONE) + ) + ), + MappedFieldType.FieldExtractPreference.NONE + ); + var fieldInfos = provider.extractFields(fieldExtractExec); + for (var field : fieldInfos) { + assertThat( + "query: " + testCase.query + ", field: " + field.name(), + field.nullsFiltered(), + equalTo(testCase.nullsFilteredFields.contains(field.name())) + ); + } + } + } + + protected static SearchExecutionContext createMockContext() { + Index index = new Index(randomAlphaOfLengthBetween(1, 10), "_na_"); + IndexSettings idxSettings = IndexSettingsModule.newIndexSettings( + index, + Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current()).build() + ); + BitsetFilterCache bitsetFilterCache = new BitsetFilterCache(idxSettings, Mockito.mock(BitsetFilterCache.Listener.class)); + BiFunction> indexFieldDataLookup = (fieldType, fdc) -> { + IndexFieldData.Builder builder = fieldType.fielddataBuilder(fdc); + return builder.build(new IndexFieldDataCache.None(), null); + }; + MappingLookup lookup = MappingLookup.fromMapping(Mapping.EMPTY); + return new SearchExecutionContext( + 0, + 0, + idxSettings, + bitsetFilterCache, + indexFieldDataLookup, + null, + lookup, + null, + null, + null, + null, + null, + null, + () -> 0, + null, + null, + () -> true, + null, + emptyMap(), + MapperMetrics.NOOP + ) { + @Override + public MappedFieldType getFieldType(String name) { + return randomFrom( + new KeywordFieldMapper.KeywordFieldType(name), + new NumberFieldMapper.NumberFieldType(name, randomFrom(NumberFieldMapper.NumberType.values())) + ); + } + + @Override + public NestedLookup nestedLookup() { + return NestedLookup.EMPTY; + } + }; + } +} diff --git a/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/mapper/AggregateMetricDoubleFieldMapper.java b/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/mapper/AggregateMetricDoubleFieldMapper.java index 3658313642700..bb891a57064bd 100644 --- a/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/mapper/AggregateMetricDoubleFieldMapper.java +++ b/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/mapper/AggregateMetricDoubleFieldMapper.java @@ -571,7 +571,7 @@ public String toString() { } @Override - public Block read(BlockFactory factory, Docs docs, int offset) throws IOException { + public Block read(BlockFactory factory, Docs docs, int offset, boolean nullsFiltered) throws IOException { try (var builder = factory.aggregateMetricDoubleBuilder(docs.count() - offset)) { copyDoubleValuesToBuilder(docs, offset, builder.min(), minValues); copyDoubleValuesToBuilder(docs, offset, builder.max(), maxValues); diff --git a/x-pack/plugin/mapper-constant-keyword/src/test/java/org/elasticsearch/xpack/constantkeyword/mapper/ConstantKeywordFieldMapperTests.java b/x-pack/plugin/mapper-constant-keyword/src/test/java/org/elasticsearch/xpack/constantkeyword/mapper/ConstantKeywordFieldMapperTests.java index 1a94ca1b8d40a..d5405018dd64c 100644 --- a/x-pack/plugin/mapper-constant-keyword/src/test/java/org/elasticsearch/xpack/constantkeyword/mapper/ConstantKeywordFieldMapperTests.java +++ b/x-pack/plugin/mapper-constant-keyword/src/test/java/org/elasticsearch/xpack/constantkeyword/mapper/ConstantKeywordFieldMapperTests.java @@ -286,7 +286,7 @@ public int count() { public int get(int i) { return 0; } - }, 0); + }, 0, false); assertThat(block.get(0), nullValue()); } }