Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index.fieldvisitor;

import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.StoredFieldVisitor;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.index.mapper.IgnoredSourceFieldMapper;
import org.elasticsearch.search.fetch.StoredFieldsSpec;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

class IgnoredSourceFieldLoader extends StoredFieldLoader {
private final boolean forceSequentialReader;
private final Map<String, Set<String>> potentialFieldsInIgnoreSource;
private final Set<String> fieldNames;

IgnoredSourceFieldLoader(StoredFieldsSpec spec, boolean forceSequentialReader) {
assert IgnoredSourceFieldLoader.supports(spec);

fieldNames = new HashSet<>(spec.ignoredFieldsSpec().requiredIgnoredFields());
this.forceSequentialReader = forceSequentialReader;

HashMap<String, Set<String>> potentialFieldsInIgnoreSource = new HashMap<>();
for (String requiredIgnoredField : spec.ignoredFieldsSpec().requiredIgnoredFields()) {
for (String potentialStoredField : spec.ignoredFieldsSpec().format().requiredStoredFields(requiredIgnoredField)) {
potentialFieldsInIgnoreSource.computeIfAbsent(potentialStoredField, k -> new HashSet<>()).add(requiredIgnoredField);
}
}
Comment on lines +40 to +44
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hoping that we can optimize/simplify in a follow up change. This logic is now required for fields with dots and then we don't know which field part there is actually a stored field for.

Ideally we would know based from the mapping what stored field we need. For example if attributes.host.ip is requested and no attributes field is mapped, then the stored field should be _ignored_source.attributes. If attributes is mapped, but host isn't then the required stored field to load would be _ignored_source.attributes.host.

I also think requiredStoredFields(...) also always include the _doc field (via FallbackSyntheticSourceBlockLoader.splitIntoFieldPaths(...))?

Would be great if we need less maps and no sets in SFV.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, this can probably be optimized by using the mapping to figure out which stored field contains the value instead of just looking at all possible parent stored fields. But let's leave that as a follow-up.

this.potentialFieldsInIgnoreSource = potentialFieldsInIgnoreSource;
}

@Override
public LeafStoredFieldLoader getLoader(LeafReaderContext ctx, int[] docs) throws IOException {
var reader = forceSequentialReader ? sequentialReader(ctx) : reader(ctx, docs);
var visitor = new SFV(fieldNames, potentialFieldsInIgnoreSource);
return new LeafStoredFieldLoader() {

private int doc = -1;

@Override
public void advanceTo(int doc) throws IOException {
if (doc != this.doc) {
visitor.reset();
reader.accept(doc, visitor);
this.doc = doc;
}
}

@Override
public BytesReference source() {
assert false : "source() is not supported by IgnoredSourceFieldLoader";
return null;
}

@Override
public String id() {
assert false : "id() is not supported by IgnoredSourceFieldLoader";
return null;
}

@Override
public String routing() {
assert false : "routing() is not supported by IgnoredSourceFieldLoader";
return null;
}

@Override
public Map<String, List<Object>> storedFields() {
return visitor.values;
}
};
}

@Override
public List<String> fieldsToLoad() {
return potentialFieldsInIgnoreSource.keySet().stream().toList();
}

static class SFV extends StoredFieldVisitor {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future we can perhaps explore an implementation that is tuned for just fetching one stored field.

final Map<String, List<Object>> values = new HashMap<>();
final Set<String> fieldNames;
private final Set<String> unvisitedFields;
final Map<String, Set<String>> potentialFieldsInIgnoreSource;

SFV(Set<String> fieldNames, Map<String, Set<String>> potentialFieldsInIgnoreSource) {
this.fieldNames = fieldNames;
this.unvisitedFields = new HashSet<>(fieldNames);
this.potentialFieldsInIgnoreSource = potentialFieldsInIgnoreSource;
}

@Override
public Status needsField(FieldInfo fieldInfo) throws IOException {
if (unvisitedFields.isEmpty()) {
return Status.STOP;
}

Set<String> foundFields = potentialFieldsInIgnoreSource.get(fieldInfo.name);
if (foundFields == null) {
return Status.NO;
}

unvisitedFields.removeAll(foundFields);
return Status.YES;
}

@Override
public void binaryField(FieldInfo fieldInfo, byte[] value) {
values.computeIfAbsent(fieldInfo.name, k -> new ArrayList<>()).add(new BytesRef(value));
}

void reset() {
values.clear();
unvisitedFields.addAll(fieldNames);
}

}

static boolean supports(StoredFieldsSpec spec) {
return spec.onlyRequiresIgnoredFields()
&& spec.ignoredFieldsSpec().format() == IgnoredSourceFieldMapper.IgnoredSourceFormat.PER_FIELD_IGNORED_SOURCE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.StoredFieldVisitor;
import org.apache.lucene.index.StoredFields;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.bytes.BytesReference;
Expand Down Expand Up @@ -49,10 +50,26 @@ public abstract class StoredFieldLoader {
* Creates a new StoredFieldLoader using a StoredFieldsSpec
*/
public static StoredFieldLoader fromSpec(StoredFieldsSpec spec) {
return fromSpec(spec, false);
}

/**
* Creates a new StoredFieldLoader using a StoredFieldsSpec that is optimized
* for loading documents in order.
*/
public static StoredFieldLoader fromSpecSequential(StoredFieldsSpec spec) {
return fromSpec(spec, true);
}

private static StoredFieldLoader fromSpec(StoredFieldsSpec spec, boolean forceSequentialReader) {
if (spec.noRequirements()) {
return StoredFieldLoader.empty();
}
return create(spec.requiresSource(), spec.requiredStoredFields());

if (IgnoredSourceFieldLoader.supports(spec)) {
return new IgnoredSourceFieldLoader(spec, forceSequentialReader);
}
return create(spec.requiresSource(), spec.requiredStoredFields(), forceSequentialReader);
}

public static StoredFieldLoader create(boolean loadSource, Set<String> fields) {
Expand Down Expand Up @@ -85,28 +102,6 @@ public List<String> fieldsToLoad() {
};
}

/**
* Creates a new StoredFieldLoader using a StoredFieldsSpec that is optimized
* for loading documents in order.
*/
public static StoredFieldLoader fromSpecSequential(StoredFieldsSpec spec) {
if (spec.noRequirements()) {
return StoredFieldLoader.empty();
}
List<String> fieldsToLoad = fieldsToLoad(spec.requiresSource(), spec.requiredStoredFields());
return new StoredFieldLoader() {
@Override
public LeafStoredFieldLoader getLoader(LeafReaderContext ctx, int[] docs) throws IOException {
return new ReaderStoredFieldLoader(sequentialReader(ctx), spec.requiresSource(), spec.requiredStoredFields());
}

@Override
public List<String> fieldsToLoad() {
return fieldsToLoad;
}
};
}

/**
* Creates a StoredFieldLoader tuned for sequential reads of _source
*/
Expand Down Expand Up @@ -141,7 +136,8 @@ public List<String> fieldsToLoad() {
};
}

private static CheckedBiConsumer<Integer, FieldsVisitor, IOException> reader(LeafReaderContext ctx, int[] docs) throws IOException {
protected static CheckedBiConsumer<Integer, StoredFieldVisitor, IOException> reader(LeafReaderContext ctx, int[] docs)
throws IOException {
LeafReader leafReader = ctx.reader();
if (docs != null && docs.length > 10 && hasSequentialDocs(docs)) {
return sequentialReader(ctx);
Expand All @@ -150,7 +146,8 @@ private static CheckedBiConsumer<Integer, FieldsVisitor, IOException> reader(Lea
return storedFields::document;
}

private static CheckedBiConsumer<Integer, FieldsVisitor, IOException> sequentialReader(LeafReaderContext ctx) throws IOException {
protected static CheckedBiConsumer<Integer, StoredFieldVisitor, IOException> sequentialReader(LeafReaderContext ctx)
throws IOException {
LeafReader leafReader = ctx.reader();
if (leafReader instanceof SequentialStoredFieldsLeafReader lf) {
return lf.getSequentialStoredFieldsReader()::document;
Expand Down Expand Up @@ -201,7 +198,7 @@ public Map<String, List<Object>> storedFields() {

private static class ReaderStoredFieldLoader implements LeafStoredFieldLoader {

private final CheckedBiConsumer<Integer, FieldsVisitor, IOException> reader;
private final CheckedBiConsumer<Integer, StoredFieldVisitor, IOException> reader;
private final CustomFieldsVisitor visitor;
private int doc = -1;

Expand All @@ -221,7 +218,11 @@ public Status needsField(FieldInfo fieldInfo) {
return new CustomFieldsVisitor(fields, loadSource);
}

ReaderStoredFieldLoader(CheckedBiConsumer<Integer, FieldsVisitor, IOException> reader, boolean loadSource, Set<String> fields) {
ReaderStoredFieldLoader(
CheckedBiConsumer<Integer, StoredFieldVisitor, IOException> reader,
boolean loadSource,
Set<String> fields
) {
this.reader = reader;
this.visitor = getFieldsVisitor(fields, loadSource);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.Optional;
import java.util.Set;
import java.util.Stack;
import java.util.stream.Collectors;

/**
* Block loader for fields that use fallback synthetic source implementation.
Expand Down Expand Up @@ -66,14 +65,7 @@ public RowStrideReader rowStrideReader(LeafReaderContext context) throws IOExcep

@Override
public StoredFieldsSpec rowStrideStoredFieldSpec() {
Set<String> ignoredFieldNames;
if (ignoredSourceFormat == IgnoredSourceFieldMapper.IgnoredSourceFormat.PER_FIELD_IGNORED_SOURCE) {
ignoredFieldNames = fieldPaths.stream().map(IgnoredSourceFieldMapper::ignoredFieldName).collect(Collectors.toSet());
} else {
ignoredFieldNames = Set.of(IgnoredSourceFieldMapper.NAME);
}

return new StoredFieldsSpec(false, false, ignoredFieldNames);
return new StoredFieldsSpec(false, false, Set.of(), new IgnoredFieldsSpec(Set.of(fieldName), ignoredSourceFormat));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index.mapper;

import org.elasticsearch.ElasticsearchException;

import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Defines which fields need to be loaded from _ignored_source during a fetch.
*/
public record IgnoredFieldsSpec(Set<String> requiredIgnoredFields, IgnoredSourceFieldMapper.IgnoredSourceFormat format) {
public static IgnoredFieldsSpec NONE = new IgnoredFieldsSpec(Set.of(), IgnoredSourceFieldMapper.IgnoredSourceFormat.NO_IGNORED_SOURCE);

public boolean noRequirements() {
return requiredIgnoredFields.isEmpty();
}

public IgnoredFieldsSpec merge(IgnoredFieldsSpec other) {
if (this.format == IgnoredSourceFieldMapper.IgnoredSourceFormat.NO_IGNORED_SOURCE) {
return other;
}
if (other.format == IgnoredSourceFieldMapper.IgnoredSourceFormat.NO_IGNORED_SOURCE) {
return this;
}
if (other.requiredIgnoredFields.isEmpty()) {
return this;
}
if (this.requiredIgnoredFields.isEmpty()) {
return other;
}

if (this.format != other.format) {
throw new ElasticsearchException(
"failed to merge IgnoredFieldsSpec with differing formats " + this.format.name() + "," + other.format.name()
);
}

Set<String> mergedFields = new HashSet<>(requiredIgnoredFields);
mergedFields.addAll(other.requiredIgnoredFields);
return new IgnoredFieldsSpec(mergedFields, format);
}

/**
* Get the set of stored fields required to load the specified fields from _ignored_source.
*/
public Set<String> requiredStoredFields() {
return requiredIgnoredFields.stream().flatMap(field -> format.requiredStoredFields(field).stream()).collect(Collectors.toSet());

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
Expand Down Expand Up @@ -277,6 +278,11 @@ public Map<String, List<IgnoredSourceFieldMapper.NameValue>> loadSingleIgnoredFi
public void writeIgnoredFields(Collection<NameValue> ignoredFieldValues) {
assert false : "cannot write " + ignoredFieldValues.size() + " values with format NO_IGNORED_SOURCE";
}

@Override
public Set<String> requiredStoredFields(String fieldName) {
return Set.of();
}
},
SINGLE_IGNORED_SOURCE {
@Override
Expand Down Expand Up @@ -327,6 +333,11 @@ public void writeIgnoredFields(Collection<NameValue> ignoredFieldValues) {
nameValue.doc().add(new StoredField(NAME, encode(nameValue)));
}
}

@Override
public Set<String> requiredStoredFields(String fieldName) {
return Set.of(IgnoredSourceFieldMapper.NAME);
}
},
PER_FIELD_IGNORED_SOURCE {
@Override
Expand Down Expand Up @@ -403,6 +414,14 @@ public void writeIgnoredFields(Collection<NameValue> ignoredFieldValues) {
}
}
}

@Override
public Set<String> requiredStoredFields(String fieldName) {
return FallbackSyntheticSourceBlockLoader.splitIntoFieldPaths(fieldName)
.stream()
.map(IgnoredSourceFieldMapper::ignoredFieldName)
.collect(Collectors.toSet());
}
};

public abstract Map<String, List<IgnoredSourceFieldMapper.NameValue>> loadAllIgnoredFields(
Expand All @@ -416,6 +435,11 @@ public abstract Map<String, List<IgnoredSourceFieldMapper.NameValue>> loadSingle
);

public abstract void writeIgnoredFields(Collection<NameValue> ignoredFieldValues);

/**
* Get the set of stored fields needed to retrieve the value for fieldName
*/
public abstract Set<String> requiredStoredFields(String fieldName);
}

public IgnoredSourceFormat ignoredSourceFormat() {
Expand Down
Loading