Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@
import org.elasticsearch.compute.data.LongVector;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.lucene.LuceneSourceOperator;
import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator;
import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperator;
import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperatorStatus;
import org.elasticsearch.compute.operator.topn.TopNOperator;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.index.IndexSettings;
Expand Down Expand Up @@ -371,7 +372,7 @@ public void benchmark() {
throw new AssertionError("[" + layout + "][" + name + "] expected [" + expected + "] but was [" + sum + "]");
}
boolean foundStoredFieldLoader = false;
ValuesSourceReaderOperator.Status status = (ValuesSourceReaderOperator.Status) op.status();
ValuesSourceReaderOperatorStatus status = (ValuesSourceReaderOperatorStatus) op.status();
for (Map.Entry<String, Integer> e : status.readersBuilt().entrySet()) {
if (e.getKey().indexOf("stored_fields") >= 0) {
foundStoredFieldLoader = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ static TransportVersion def(int id) {
public static final TransportVersion ML_INFERENCE_COHERE_API_VERSION_8_19 = def(8_841_0_60);
public static final TransportVersion ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19 = def(8_841_0_61);
public static final TransportVersion ESQL_PROFILE_INCLUDE_PLAN_8_19 = def(8_841_0_62);
public static final TransportVersion ESQL_SPLIT_ON_BIG_VALUES_8_19 = def(8_841_0_63);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugin/esql/compute/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,5 @@
exports org.elasticsearch.compute.aggregation.table;
exports org.elasticsearch.compute.data.sort;
exports org.elasticsearch.compute.querydsl.query;
exports org.elasticsearch.compute.lucene.read;
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.lucene.read;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BytesRefBlock;
import org.elasticsearch.core.Releasable;

class ComputeBlockLoaderFactory extends DelegatingBlockLoaderFactory implements Releasable {
private final int pageSize;
private Block nullBlock;

ComputeBlockLoaderFactory(BlockFactory factory, int pageSize) {
super(factory);
this.pageSize = pageSize;
}

@Override
public Block constantNulls() {
if (nullBlock == null) {
nullBlock = factory.newConstantNullBlock(pageSize);
}
nullBlock.incRef();
return nullBlock;
}

@Override
public void close() {
if (nullBlock != null) {
nullBlock.close();
}
}

@Override
public BytesRefBlock constantBytes(BytesRef value) {
return factory.newConstantBytesRefBlockWith(value, pageSize);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.lucene.read;

import org.apache.lucene.index.SortedDocValues;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.ElementType;
import org.elasticsearch.compute.data.SingletonOrdinalsBuilder;
import org.elasticsearch.index.mapper.BlockLoader;

public abstract class DelegatingBlockLoaderFactory implements BlockLoader.BlockFactory {
protected final BlockFactory factory;

protected DelegatingBlockLoaderFactory(BlockFactory factory) {
this.factory = factory;
}

@Override
public BlockLoader.BooleanBuilder booleansFromDocValues(int expectedCount) {
return factory.newBooleanBlockBuilder(expectedCount).mvOrdering(Block.MvOrdering.SORTED_ASCENDING);
}

@Override
public BlockLoader.BooleanBuilder booleans(int expectedCount) {
return factory.newBooleanBlockBuilder(expectedCount);
}

@Override
public BlockLoader.BytesRefBuilder bytesRefsFromDocValues(int expectedCount) {
return factory.newBytesRefBlockBuilder(expectedCount).mvOrdering(Block.MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING);
}

@Override
public BlockLoader.BytesRefBuilder bytesRefs(int expectedCount) {
return factory.newBytesRefBlockBuilder(expectedCount);
}

@Override
public BlockLoader.DoubleBuilder doublesFromDocValues(int expectedCount) {
return factory.newDoubleBlockBuilder(expectedCount).mvOrdering(Block.MvOrdering.SORTED_ASCENDING);
}

@Override
public BlockLoader.DoubleBuilder doubles(int expectedCount) {
return factory.newDoubleBlockBuilder(expectedCount);
}

@Override
public BlockLoader.FloatBuilder denseVectors(int expectedVectorsCount, int dimensions) {
return factory.newFloatBlockBuilder(expectedVectorsCount * dimensions);
}

@Override
public BlockLoader.IntBuilder intsFromDocValues(int expectedCount) {
return factory.newIntBlockBuilder(expectedCount).mvOrdering(Block.MvOrdering.SORTED_ASCENDING);
}

@Override
public BlockLoader.IntBuilder ints(int expectedCount) {
return factory.newIntBlockBuilder(expectedCount);
}

@Override
public BlockLoader.LongBuilder longsFromDocValues(int expectedCount) {
return factory.newLongBlockBuilder(expectedCount).mvOrdering(Block.MvOrdering.SORTED_ASCENDING);
}

@Override
public BlockLoader.LongBuilder longs(int expectedCount) {
return factory.newLongBlockBuilder(expectedCount);
}

@Override
public BlockLoader.Builder nulls(int expectedCount) {
return ElementType.NULL.newBlockBuilder(expectedCount, factory);
}

@Override
public BlockLoader.SingletonOrdinalsBuilder singletonOrdinalsBuilder(SortedDocValues ordinals, int count) {
return new SingletonOrdinalsBuilder(factory, ordinals, count);
}

@Override
public BlockLoader.AggregateMetricDoubleBuilder aggregateMetricDoubleBuilder(int count) {
return factory.newAggregateMetricDoubleBlockBuilder(count);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.lucene.read;

import org.apache.lucene.index.LeafReaderContext;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.DocVector;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.index.fieldvisitor.StoredFieldLoader;
import org.elasticsearch.index.mapper.BlockLoader;
import org.elasticsearch.index.mapper.BlockLoaderStoredFieldsFromLeafLoader;
import org.elasticsearch.index.mapper.SourceLoader;
import org.elasticsearch.search.fetch.StoredFieldsSpec;

import java.io.IOException;

/**
* Loads values from a many leaves. Much less efficient than {@link ValuesFromSingleReader}.
*/
class ValuesFromManyReader extends ValuesReader {
private final int[] forwards;
private final int[] backwards;
private final BlockLoader.RowStrideReader[] rowStride;

private BlockLoaderStoredFieldsFromLeafLoader storedFields;

ValuesFromManyReader(ValuesSourceReaderOperator operator, DocVector docs) {
super(operator, docs);
forwards = docs.shardSegmentDocMapForwards();
backwards = docs.shardSegmentDocMapBackwards();
rowStride = new BlockLoader.RowStrideReader[operator.fields.length];
}

@Override
protected void load(Block[] target, int offset) throws IOException {
try (Run run = new Run(target)) {
run.run(offset);
}
}

class Run implements Releasable {
private final Block[] target;
private final Block.Builder[][] builders;
private final BlockLoader[][] converters;
private final Block.Builder[] fieldTypeBuilders;

Run(Block[] target) {
this.target = target;
fieldTypeBuilders = new Block.Builder[target.length];
builders = new Block.Builder[target.length][operator.shardContexts.size()];
converters = new BlockLoader[target.length][operator.shardContexts.size()];
}

void run(int offset) throws IOException {
assert offset == 0; // TODO allow non-0 offset to support splitting pages
for (int f = 0; f < operator.fields.length; f++) {
/*
* Important note: each field has a desired type, which might not match the mapped type (in the case of union-types).
* We create the final block builders using the desired type, one for each field, but then also use inner builders
* (one for each field and shard), and converters (again one for each field and shard) to actually perform the field
* loading in a way that is correct for the mapped field type, and then convert between that type and the desired type.
*/
fieldTypeBuilders[f] = operator.fields[f].info.type().newBlockBuilder(docs.getPositionCount(), operator.blockFactory);
builders[f] = new Block.Builder[operator.shardContexts.size()];
converters[f] = new BlockLoader[operator.shardContexts.size()];
}
try (
ComputeBlockLoaderFactory loaderBlockFactory = new ComputeBlockLoaderFactory(operator.blockFactory, docs.getPositionCount())
) {
int p = forwards[offset];
int shard = docs.shards().getInt(p);
int segment = docs.segments().getInt(p);
int firstDoc = docs.docs().getInt(p);
operator.positionFieldWork(shard, segment, firstDoc);
LeafReaderContext ctx = operator.ctx(shard, segment);
fieldsMoved(ctx, shard);
verifyBuilders(loaderBlockFactory, shard);
read(firstDoc, shard);

int i = offset + 1;
while (i < forwards.length) {
p = forwards[i];
shard = docs.shards().getInt(p);
segment = docs.segments().getInt(p);
boolean changedSegment = operator.positionFieldWorkDocGuaranteedAscending(shard, segment);
if (changedSegment) {
ctx = operator.ctx(shard, segment);
fieldsMoved(ctx, shard);
}
verifyBuilders(loaderBlockFactory, shard);
read(docs.docs().getInt(p), shard);
i++;
}
buildBlocks();
}
}

private void buildBlocks() {
for (int f = 0; f < target.length; f++) {
for (int s = 0; s < operator.shardContexts.size(); s++) {
if (builders[f][s] != null) {
try (Block orig = (Block) converters[f][s].convert(builders[f][s].build())) {
fieldTypeBuilders[f].copyFrom(orig, 0, orig.getPositionCount());
}
}
}
try (Block targetBlock = fieldTypeBuilders[f].build()) {
target[f] = targetBlock.filter(backwards);
}
operator.sanityCheckBlock(rowStride[f], backwards.length, target[f], f);
}
}

private void verifyBuilders(ComputeBlockLoaderFactory loaderBlockFactory, int shard) {
for (int f = 0; f < operator.fields.length; f++) {
if (builders[f][shard] == null) {
// Note that this relies on field.newShard() to set the loader and converter correctly for the current shard
builders[f][shard] = (Block.Builder) operator.fields[f].loader.builder(loaderBlockFactory, docs.getPositionCount());
converters[f][shard] = operator.fields[f].loader;
}
}
}

private void read(int doc, int shard) throws IOException {
storedFields.advanceTo(doc);
for (int f = 0; f < builders.length; f++) {
rowStride[f].read(doc, storedFields, builders[f][shard]);
}
}

@Override
public void close() {
Releasables.closeExpectNoException(fieldTypeBuilders);
for (int f = 0; f < operator.fields.length; f++) {
Releasables.closeExpectNoException(builders[f]);
}
}
}

private void fieldsMoved(LeafReaderContext ctx, int shard) throws IOException {
StoredFieldsSpec storedFieldsSpec = StoredFieldsSpec.NO_REQUIREMENTS;
for (int f = 0; f < operator.fields.length; f++) {
ValuesSourceReaderOperator.FieldWork field = operator.fields[f];
rowStride[f] = field.rowStride(ctx);
storedFieldsSpec = storedFieldsSpec.merge(field.loader.rowStrideStoredFieldSpec());
}
SourceLoader sourceLoader = null;
if (storedFieldsSpec.requiresSource()) {
sourceLoader = operator.shardContexts.get(shard).newSourceLoader().get();
storedFieldsSpec = storedFieldsSpec.merge(new StoredFieldsSpec(true, false, sourceLoader.requiredStoredFields()));
}
storedFields = new BlockLoaderStoredFieldsFromLeafLoader(
StoredFieldLoader.fromSpec(storedFieldsSpec).getLoader(ctx, null),
sourceLoader != null ? sourceLoader.leaf(ctx.reader(), null) : null
);
if (false == storedFieldsSpec.equals(StoredFieldsSpec.NO_REQUIREMENTS)) {
operator.trackStoredFields(storedFieldsSpec, false);
}
}
}
Loading