Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
import org.elasticsearch.index.fielddata.IndexNumericFieldData;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.MappingLookup;
import org.elasticsearch.index.mapper.NumberFieldMapper.NumberFieldType;
import org.elasticsearch.index.mapper.NumberFieldMapper.NumberType;
import org.elasticsearch.index.mapper.SourceFieldMetrics;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.plugins.PluginsLoader;
Expand Down Expand Up @@ -90,7 +92,7 @@ public class ScriptScoreBenchmark {
private final SearchLookup lookup = new SearchLookup(
fieldTypes::get,
(mft, lookup, fdo) -> mft.fielddataBuilder(FieldDataContext.noRuntimeFields("benchmark")).build(fieldDataCache, breakerService),
SourceProvider.fromStoredFields()
SourceProvider.fromLookup(MappingLookup.EMPTY, null, SourceFieldMetrics.NOOP)
);

@Param({ "expression", "metal", "painless_cast", "painless_def" })
Expand Down
5 changes: 5 additions & 0 deletions docs/changelog/128213.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 128213
summary: Refactor `SourceProvider` creation to consistently use `MappingLookup`
area: Mapping
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ protected void assertFetch(MapperService mapperService, String field, Object val
ValueFetcher nativeFetcher = ft.valueFetcher(searchExecutionContext, format);
ParsedDocument doc = mapperService.documentMapper().parse(source);
withLuceneIndex(mapperService, iw -> iw.addDocuments(doc.docs()), ir -> {
Source s = SourceProvider.fromStoredFields().getSource(ir.leaves().get(0), 0);
Source s = SourceProvider.fromLookup(mapperService.mappingLookup(), null, mapperService.getMapperMetrics().sourceFieldMetrics())
.getSource(ir.leaves().get(0), 0);
docValueFetcher.setNextReader(ir.leaves().get(0));
nativeFetcher.setNextReader(ir.leaves().get(0));
List<Object> fromDocValues = docValueFetcher.fetchValues(s, 0, new ArrayList<>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ public static StoredFieldLoader create(boolean loadSource, Set<String> fields) {
* otherwise, uses the heuristic defined in {@link StoredFieldLoader#reader(LeafReaderContext, int[])}.
*/
public static StoredFieldLoader create(boolean loadSource, Set<String> fields, boolean forceSequentialReader) {
if (loadSource == false && fields.isEmpty()) {
return StoredFieldLoader.empty();
}
List<String> fieldsToLoad = fieldsToLoad(loadSource, fields);
return new StoredFieldLoader() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.IndexVersion;
Expand All @@ -39,6 +40,7 @@
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
import org.elasticsearch.search.lookup.LeafFieldLookupProvider;
import org.elasticsearch.search.lookup.SearchLookup;
import org.elasticsearch.search.lookup.SourceFilter;
import org.elasticsearch.search.lookup.SourceProvider;
import org.elasticsearch.xcontent.XContentParserConfiguration;

Expand Down Expand Up @@ -162,8 +164,8 @@ public boolean isSourceSynthetic() {
}

@Override
public SourceLoader newSourceLoader(boolean forceSyntheticSource) {
return in.newSourceLoader(forceSyntheticSource);
public SourceLoader newSourceLoader(@Nullable SourceFilter filter, boolean forceSyntheticSource) {
return in.newSourceLoader(filter, forceSyntheticSource);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.lucene.search.Queries;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.IndexSortConfig;
Expand Down Expand Up @@ -57,6 +58,7 @@
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
import org.elasticsearch.search.lookup.LeafFieldLookupProvider;
import org.elasticsearch.search.lookup.SearchLookup;
import org.elasticsearch.search.lookup.SourceFilter;
import org.elasticsearch.search.lookup.SourceProvider;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.xcontent.XContentParserConfiguration;
Expand Down Expand Up @@ -439,15 +441,15 @@ public boolean isSourceSynthetic() {
/**
* Build something to load source {@code _source}.
*/
public SourceLoader newSourceLoader(boolean forceSyntheticSource) {
public SourceLoader newSourceLoader(@Nullable SourceFilter filter, boolean forceSyntheticSource) {
if (forceSyntheticSource) {
return new SourceLoader.Synthetic(
null,
filter,
() -> mappingLookup.getMapping().syntheticFieldLoader(null),
mapperMetrics.sourceFieldMetrics()
);
}
return mappingLookup.newSourceLoader(null, mapperMetrics.sourceFieldMetrics());
return mappingLookup.newSourceLoader(filter, mapperMetrics.sourceFieldMetrics());
}

/**
Expand Down Expand Up @@ -506,9 +508,7 @@ public SearchLookup lookup() {
}

public SourceProvider createSourceProvider() {
return isSourceSynthetic()
? SourceProvider.fromSyntheticSource(mappingLookup.getMapping(), null, mapperMetrics.sourceFieldMetrics())
: SourceProvider.fromStoredFields();
return SourceProvider.fromLookup(mappingLookup, null, mapperMetrics.sourceFieldMetrics());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.internal.ShardSearchContextId;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.lookup.SourceFilter;
import org.elasticsearch.search.profile.Profilers;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.rank.context.QueryPhaseRankShardContext;
Expand Down Expand Up @@ -943,8 +944,8 @@ public ReaderContext readerContext() {
}

@Override
public SourceLoader newSourceLoader() {
return searchExecutionContext.newSourceLoader(request.isForceSyntheticSource());
public SourceLoader newSourceLoader(@Nullable SourceFilter filter) {
return searchExecutionContext.newSourceLoader(filter, request.isForceSyntheticSource());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ public class FetchContext {
/**
* Create a FetchContext based on a SearchContext
*/
public FetchContext(SearchContext searchContext) {
public FetchContext(SearchContext searchContext, SourceLoader sourceLoader) {
this.searchContext = searchContext;
this.sourceLoader = searchContext.newSourceLoader();
this.sourceLoader = sourceLoader;
this.storedFieldsContext = buildStoredFieldsContext(searchContext);
this.fetchSourceContext = buildFetchSourceContext(searchContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,8 @@ public Source getSource(LeafReaderContext ctx, int doc) {
}

private SearchHits buildSearchHits(SearchContext context, int[] docIdsToLoad, Profiler profiler, RankDocShardInfo rankDocs) {

FetchContext fetchContext = new FetchContext(context);
SourceLoader sourceLoader = context.newSourceLoader();
SourceLoader sourceLoader = context.newSourceLoader(null);
FetchContext fetchContext = new FetchContext(context, sourceLoader);

PreloadedSourceProvider sourceProvider = new PreloadedSourceProvider();
PreloadedFieldLookupProvider fieldLookupProvider = new PreloadedFieldLookupProvider();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.apache.lucene.search.TotalHits;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
import org.elasticsearch.index.mapper.IdLoader;
Expand All @@ -33,6 +34,7 @@
import org.elasticsearch.search.fetch.subphase.InnerHitsContext;
import org.elasticsearch.search.fetch.subphase.ScriptFieldsContext;
import org.elasticsearch.search.fetch.subphase.highlight.SearchHighlightContext;
import org.elasticsearch.search.lookup.SourceFilter;
import org.elasticsearch.search.profile.Profilers;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.rank.context.QueryPhaseRankShardContext;
Expand Down Expand Up @@ -453,8 +455,8 @@ public ReaderContext readerContext() {
}

@Override
public SourceLoader newSourceLoader() {
return in.newSourceLoader();
public SourceLoader newSourceLoader(@Nullable SourceFilter filter) {
return in.newSourceLoader(filter);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.search.fetch.subphase.InnerHitsContext;
import org.elasticsearch.search.fetch.subphase.ScriptFieldsContext;
import org.elasticsearch.search.fetch.subphase.highlight.SearchHighlightContext;
import org.elasticsearch.search.lookup.SourceFilter;
import org.elasticsearch.search.profile.Profilers;
import org.elasticsearch.search.query.QueryPhase;
import org.elasticsearch.search.query.QuerySearchResult;
Expand Down Expand Up @@ -441,7 +442,7 @@ public String toString() {
/**
* Build something to load source {@code _source}.
*/
public abstract SourceLoader newSourceLoader();
public abstract SourceLoader newSourceLoader(@Nullable SourceFilter sourceFilter);

public abstract IdLoader newIdLoader();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.search.lookup;

import org.apache.lucene.index.LeafReaderContext;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.fieldvisitor.LeafStoredFieldLoader;
import org.elasticsearch.index.fieldvisitor.StoredFieldLoader;
import org.elasticsearch.index.mapper.SourceLoader;

import java.io.IOException;
import java.util.Map;

/**
* A {@link SourceProvider} that loads _source from a concurrent search.
*
* NOTE: This is written under the assumption that individual segments are accessed by a single
* thread, even if separate segments may be searched concurrently. If we ever implement
* within-segment concurrency this will have to work entirely differently.
* **/
class ConcurrentSegmentSourceProvider implements SourceProvider {
private final SourceLoader sourceLoader;
private final StoredFieldLoader storedFieldLoader;
private final Map<Object, Leaf> leaves = ConcurrentCollections.newConcurrentMap();

ConcurrentSegmentSourceProvider(SourceLoader loader, boolean loadSource) {
this.sourceLoader = loader;
// we force a sequential reader here since it is used during query execution where documents are scanned sequentially
this.storedFieldLoader = StoredFieldLoader.create(loadSource, sourceLoader.requiredStoredFields(), true);
}

@Override
public Source getSource(LeafReaderContext ctx, int doc) throws IOException {
final Object id = ctx.id();
var leaf = leaves.get(id);
if (leaf == null) {
leaf = new Leaf(sourceLoader.leaf(ctx.reader(), null), storedFieldLoader.getLoader(ctx, null));
var existing = leaves.put(id, leaf);
assert existing == null : "unexpected source provider [" + existing + "]";
}
return leaf.getSource(ctx, doc);
}

private static class Leaf implements SourceProvider {
private final SourceLoader.Leaf sourceLoader;
private final LeafStoredFieldLoader storedFieldLoader;
int doc = -1;
Source source = null;

private Leaf(SourceLoader.Leaf sourceLoader, LeafStoredFieldLoader storedFieldLoader) {
this.sourceLoader = sourceLoader;
this.storedFieldLoader = storedFieldLoader;
}

@Override
public Source getSource(LeafReaderContext ctx, int doc) throws IOException {
if (this.doc == doc) {
return source;
}
this.doc = doc;
storedFieldLoader.advanceTo(doc);
return source = sourceLoader.source(storedFieldLoader, doc);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,8 @@
package org.elasticsearch.search.lookup;

import org.apache.lucene.index.LeafReaderContext;
import org.elasticsearch.index.fieldvisitor.StoredFieldLoader;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.MappingLookup;
import org.elasticsearch.index.mapper.SourceFieldMetrics;
import org.elasticsearch.index.mapper.SourceLoader;

import java.io.IOException;

Expand All @@ -28,27 +26,14 @@ public interface SourceProvider {
Source getSource(LeafReaderContext ctx, int doc) throws IOException;

/**
* A SourceProvider that loads source from stored fields
* A SourceProvider that delegate loading source to the provided {@link MappingLookup}.
*
* The returned SourceProvider is thread-safe across segments, in that it may be
* safely used by a searcher that searches different segments on different threads,
* but it is not safe to use this to access documents from the same segment across
* multiple threads.
*/
static SourceProvider fromStoredFields() {
StoredFieldLoader storedFieldLoader = StoredFieldLoader.sequentialSource();
return new StoredFieldSourceProvider(storedFieldLoader);
}

/**
* A SourceProvider that loads source from synthetic source
*
* The returned SourceProvider is thread-safe across segments, in that it may be
* safely used by a searcher that searches different segments on different threads,
* but it is not safe to use this to access documents from the same segment across
* multiple threads.
*/
static SourceProvider fromSyntheticSource(Mapping mapping, SourceFilter filter, SourceFieldMetrics metrics) {
return new SyntheticSourceProvider(new SourceLoader.Synthetic(filter, () -> mapping.syntheticFieldLoader(filter), metrics));
static SourceProvider fromLookup(MappingLookup lookup, SourceFilter filter, SourceFieldMetrics metrics) {
return new ConcurrentSegmentSourceProvider(lookup.newSourceLoader(filter, metrics), lookup.isSourceSynthetic() == false);
}
}

This file was deleted.

Loading
Loading