Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,10 @@ public interface ShardContext {
* Returns something to load values from this field into a {@link Block}.
*/
BlockLoader blockLoader(String name, boolean asUnsupportedSource, MappedFieldType.FieldExtractPreference fieldExtractPreference);

/**
* Returns the {@link MappedFieldType} for the given field name.
* By default, this delegate to {@link org.elasticsearch.index.query.SearchExecutionContext#getFieldType(String)}
*/
MappedFieldType fieldType(String name);
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,12 @@ public Page getCheckedOutput() throws IOException {
if (docCollector != null) {
blocks[blockIndex++] = docCollector.build().asBlock();
}
blocks[blockIndex++] = tsHashesBuilder.build().asBlock();
OrdinalBytesRefVector tsidVector = tsHashesBuilder.build();
blocks[blockIndex++] = tsidVector.asBlock();
tsHashesBuilder = new TsidBuilder(blockFactory, Math.min(remainingDocs, maxPageSize));
blocks[blockIndex++] = timestampsBuilder.build().asBlock();
timestampsBuilder = blockFactory.newLongVectorBuilder(Math.min(remainingDocs, maxPageSize));
System.arraycopy(fieldsReader.buildBlocks(), 0, blocks, blockIndex, fieldsToExtracts.size());
System.arraycopy(fieldsReader.buildBlocks(tsidVector.getOrdinalsVector()), 0, blocks, blockIndex, fieldsToExtracts.size());
page = new Page(currentPagePos, blocks);
currentPagePos = 0;
}
Expand Down Expand Up @@ -217,6 +218,7 @@ void readDocsForNextPage() throws IOException {
}

private boolean readValuesForOneTsid(PriorityQueue<LeafIterator> sub) throws IOException {
boolean first = true;
do {
LeafIterator top = sub.top();
currentPagePos++;
Expand All @@ -226,7 +228,8 @@ private boolean readValuesForOneTsid(PriorityQueue<LeafIterator> sub) throws IOE
}
tsHashesBuilder.appendOrdinal();
timestampsBuilder.appendLong(top.timestamp);
fieldsReader.readValues(top.segmentOrd, top.docID);
fieldsReader.readValues(top.segmentOrd, top.docID, first == false);
first = false;
if (top.nextDoc()) {
sub.updateTop();
} else if (top.docID == DocIdSetIterator.NO_MORE_DOCS) {
Expand Down Expand Up @@ -350,6 +353,7 @@ static final class ShardLevelFieldsReader implements Releasable {
private final BlockLoaderFactory blockFactory;
private final SegmentLevelFieldsReader[] segments;
private final BlockLoader[] loaders;
private final boolean[] dimensions;
private final Block.Builder[] builders;
private final StoredFieldsSpec storedFieldsSpec;
private final SourceLoader sourceLoader;
Expand Down Expand Up @@ -377,10 +381,14 @@ static final class ShardLevelFieldsReader implements Releasable {
sourceLoader = null;
}
this.storedFieldsSpec = storedFieldsSpec;
this.dimensions = new boolean[fields.size()];
for (int i = 0; i < fields.size(); i++) {
dimensions[i] = shardContext.fieldType(fields.get(i).name()).isDimension();
}
}

void readValues(int segment, int docID) throws IOException {
segments[segment].read(docID, builders);
void readValues(int segment, int docID, boolean nonDimensionFieldsOnly) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: consider adding a comment here to highlight the optimizatino, i.e. dimensions need to be fetched only once per tsid, as opposed to metrics.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, added in e8d2a87

segments[segment].read(docID, builders, nonDimensionFieldsOnly, dimensions);
}

void prepareForReading(int estimatedSize) throws IOException {
Expand All @@ -396,12 +404,46 @@ void prepareForReading(int estimatedSize) throws IOException {
}
}

Block[] buildBlocks() {
Block[] blocks = Block.Builder.buildAll(builders);
Arrays.fill(builders, null);
Block[] buildBlocks(IntVector tsidOrdinals) {
final Block[] blocks = new Block[loaders.length];
try {
for (int i = 0; i < builders.length; i++) {
if (dimensions[i]) {
blocks[i] = buildBlockForDimensionField(builders[i], tsidOrdinals);
} else {
blocks[i] = builders[i].build();
}
}
Arrays.fill(builders, null);
} finally {
if (blocks.length > 0 && blocks[blocks.length - 1] == null) {
Releasables.close(blocks);
}
}
return blocks;
}

private Block buildBlockForDimensionField(Block.Builder builder, IntVector tsidOrdinals) {
try (var values = builder.build()) {
if (values.asVector() instanceof BytesRefVector bytes) {
tsidOrdinals.incRef();
values.incRef();
return new OrdinalBytesRefVector(tsidOrdinals, bytes).asBlock();
} else if (values.areAllValuesNull()) {
return blockFactory.factory.newConstantNullBlock(tsidOrdinals.getPositionCount());
} else {
final int positionCount = tsidOrdinals.getPositionCount();
try (var newBuilder = values.elementType().newBlockBuilder(positionCount, blockFactory.factory)) {
for (int p = 0; p < positionCount; p++) {
int pos = tsidOrdinals.getInt(p);
newBuilder.copyFrom(values, pos, pos + 1);
}
return newBuilder.build();
}
}
}
}

@Override
public void close() {
Releasables.close(builders);
Expand Down Expand Up @@ -435,10 +477,18 @@ private void reinitializeIfNeeded(SourceLoader sourceLoader, StoredFieldsSpec st
}
}

void read(int docId, Block.Builder[] builder) throws IOException {
void read(int docId, Block.Builder[] builder, boolean nonDimensionFieldsOnly, boolean[] dimensions) throws IOException {
storedFields.advanceTo(docId);
for (int i = 0; i < rowStride.length; i++) {
rowStride[i].read(docId, storedFields, builder[i]);
if (nonDimensionFieldsOnly) {
for (int i = 0; i < rowStride.length; i++) {
if (dimensions[i] == false) {
rowStride[i].read(docId, storedFields, builder[i]);
}
}
} else {
for (int i = 0; i < rowStride.length; i++) {
rowStride[i].read(docId, storedFields, builder[i]);
}
}
}
}
Expand Down Expand Up @@ -480,9 +530,9 @@ public void close() {
Releasables.close(dictBuilder, ordinalsBuilder);
}

BytesRefVector build() throws IOException {
OrdinalBytesRefVector build() throws IOException {
BytesRefVector dict = null;
BytesRefVector result = null;
OrdinalBytesRefVector result = null;
IntVector ordinals = null;
try {
dict = dictBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,5 +324,10 @@ public Optional<SortAndFormats> buildSort(List<SortBuilder<?>> sorts) {
public String shardIdentifier() {
return "test";
}

@Override
public MappedFieldType fieldType(String name) {
throw new UnsupportedOperationException();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,17 @@ public static TimeSeriesSourceOperatorFactory createTimeSeriesSourceOperator(
} catch (IOException e) {
throw new UncheckedIOException(e);
}
var ctx = new LuceneSourceOperatorTests.MockShardContext(reader, 0);
var ctx = new LuceneSourceOperatorTests.MockShardContext(reader, 0) {
@Override
public MappedFieldType fieldType(String name) {
for (ExtractField e : extractFields) {
if (e.ft.name().equals(name)) {
return e.ft;
}
}
throw new IllegalArgumentException("Unknown field [" + name + "]");
}
};
Function<ShardContext, Query> queryFunction = c -> new MatchAllDocsQuery();

var fieldInfos = extractFields.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ private static class DefaultShardContextForUnmappedField extends DefaultShardCon
}

@Override
protected @Nullable MappedFieldType fieldType(String name) {
public @Nullable MappedFieldType fieldType(String name) {
var superResult = super.fieldType(name);
return superResult == null && name.equals(unmappedEsField.getName())
? new KeywordFieldMapper.KeywordFieldType(name, false /* isIndexed */, false /* hasDocValues */, Map.of() /* meta */)
Expand Down Expand Up @@ -459,7 +459,8 @@ public FieldNamesFieldMapper.FieldNamesFieldType fieldNames() {
return loader;
}

protected @Nullable MappedFieldType fieldType(String name) {
@Override
public @Nullable MappedFieldType fieldType(String name) {
return ctx.getFieldType(name);
}

Expand Down