Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/135713.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 135713
summary: Add shard search subphase metrics for the fetch subphases
area: Search
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ public StoredFieldsSpec storedFieldsSpec() {
return StoredFieldsSpec.NO_REQUIREMENTS;
}

@Override
public String getName() {
return PercolatorHighlightSubFetchPhase.this.getName();
}

@Override
public void process(HitContext hit) throws IOException {
boolean singlePercolateQuery = percolateQueries.size() == 1;
Expand Down Expand Up @@ -138,6 +143,11 @@ public void process(HitContext hit) throws IOException {
};
}

@Override
public String getName() {
return "percolator_highlight";
}

static List<PercolateQuery> locatePercolatorQuery(Query query) {
if (query == null) {
return Collections.emptyList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ public StoredFieldsSpec storedFieldsSpec() {
return StoredFieldsSpec.NO_REQUIREMENTS;
}

@Override
public String getName() {
return PercolatorMatchedSlotSubFetchPhase.this.getName();
}

@Override
public void process(HitContext hitContext) throws IOException {
for (PercolateContext pc : percolateContexts) {
Expand Down Expand Up @@ -128,6 +133,11 @@ public void process(HitContext hitContext) throws IOException {
};
}

@Override
public String getName() {
return "percolator_matched_slot";
}

static class PercolateContext {
final PercolateQuery percolateQuery;
final boolean singlePercolateQuery;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.aggregations.support.ValueType;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.FetchContext;
import org.elasticsearch.search.fetch.FetchSubPhase;
import org.elasticsearch.search.fetch.FetchSubPhaseProcessor;
import org.elasticsearch.search.fetch.StoredFieldsSpec;
Expand Down Expand Up @@ -95,22 +96,38 @@ public List<FetchSubPhase> getFetchSubPhases(FetchPhaseConstructionContext conte
/**
* Set up a fetch sub phase that throws an exception on indices whose name that start with "boom".
*/
return Collections.singletonList(fetchContext -> new FetchSubPhaseProcessor() {
return Collections.singletonList(new FetchSubPhase() {
@Override
public void setNextReader(LeafReaderContext readerContext) {}

@Override
public StoredFieldsSpec storedFieldsSpec() {
return StoredFieldsSpec.NO_REQUIREMENTS;
public FetchSubPhaseProcessor getProcessor(FetchContext fetchContext) throws IOException {
return new FetchSubPhaseProcessor() {
@Override
public void setNextReader(LeafReaderContext readerContext) {}

@Override
public StoredFieldsSpec storedFieldsSpec() {
return StoredFieldsSpec.NO_REQUIREMENTS;
}

@Override
public String getName() {
return "test";
}

@Override
public void process(FetchSubPhase.HitContext hitContext) {
if (fetchContext.getIndexName().startsWith("boom")) {
throw new RuntimeException("boom");
}
}
};
}

@Override
public void process(FetchSubPhase.HitContext hitContext) {
if (fetchContext.getIndexName().startsWith("boom")) {
throw new RuntimeException("boom");
}
public String getName() {
return "test";
}
});

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregatorFactory.ExecutionMode;
import org.elasticsearch.search.fetch.FetchContext;
import org.elasticsearch.search.fetch.FetchSubPhase;
import org.elasticsearch.search.fetch.FetchSubPhaseProcessor;
import org.elasticsearch.search.fetch.StoredFieldsSpec;
Expand Down Expand Up @@ -1377,31 +1378,44 @@ public void testScriptSorting() {
public static class FetchPlugin extends Plugin implements SearchPlugin {
@Override
public List<FetchSubPhase> getFetchSubPhases(FetchPhaseConstructionContext context) {
return Collections.singletonList(fetchContext -> {
if (fetchContext.getIndexName().equals("idx")) {
return new FetchSubPhaseProcessor() {

private LeafSearchLookup leafSearchLookup;

@Override
public void setNextReader(LeafReaderContext ctx) {
leafSearchLookup = fetchContext.getSearchExecutionContext().lookup().getLeafSearchLookup(ctx);
}

@Override
public void process(FetchSubPhase.HitContext hitContext) {
leafSearchLookup.setDocument(hitContext.docId());
FieldLookup fieldLookup = leafSearchLookup.fields().get("text");
hitContext.hit().setDocumentField(new DocumentField("text_stored_lookup", fieldLookup.getValues()));
}
return Collections.singletonList(new FetchSubPhase() {
@Override
public FetchSubPhaseProcessor getProcessor(FetchContext fetchContext) throws IOException {
if (fetchContext.getIndexName().equals("idx")) {
return new FetchSubPhaseProcessor() {

private LeafSearchLookup leafSearchLookup;

@Override
public void setNextReader(LeafReaderContext ctx) {
leafSearchLookup = fetchContext.getSearchExecutionContext().lookup().getLeafSearchLookup(ctx);
}

@Override
public void process(FetchSubPhase.HitContext hitContext) {
leafSearchLookup.setDocument(hitContext.docId());
FieldLookup fieldLookup = leafSearchLookup.fields().get("text");
hitContext.hit().setDocumentField(new DocumentField("text_stored_lookup", fieldLookup.getValues()));
}

@Override
public StoredFieldsSpec storedFieldsSpec() {
return StoredFieldsSpec.NO_REQUIREMENTS;
}

@Override
public String getName() {
return "test";
}
};
}
return null;
}

@Override
public StoredFieldsSpec storedFieldsSpec() {
return StoredFieldsSpec.NO_REQUIREMENTS;
}
};
@Override
public String getName() {
return "test";
}
return null;
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,23 @@ public StoredFieldsSpec storedFieldsSpec() {
return StoredFieldsSpec.NO_REQUIREMENTS;
}

@Override
public String getName() {
return NAME;
}

@Override
public void process(HitContext hitContext) throws IOException {
hitExecute(searchContext, hitContext);
}
};
}

@Override
public String getName() {
return NAME;
}

private void hitExecute(FetchContext context, HitContext hitContext) throws IOException {
TermVectorsFetchBuilder fetchSubPhaseBuilder = (TermVectorsFetchBuilder) context.getSearchExt(NAME);
if (fetchSubPhaseBuilder == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,23 @@
import org.elasticsearch.telemetry.metric.LongHistogram;
import org.elasticsearch.telemetry.metric.MeterRegistry;

import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public final class ShardSearchPhaseAPMMetrics implements SearchOperationListener {

public static final String QUERY_SEARCH_PHASE_METRIC = "es.search.shards.phases.query.duration.histogram";
public static final String FETCH_SEARCH_PHASE_METRIC = "es.search.shards.phases.fetch.duration.histogram";
public static final String FETCH_SUBPHASE_METRIC_FORMAT = "es.search.shards.phases.fetch.subphase.%s.duration.histogram";

private final LongHistogram queryPhaseMetric;
private final LongHistogram fetchPhaseMetric;
private final Map<String, LongHistogram> fetchSubPhaseMetrics;

public ShardSearchPhaseAPMMetrics(MeterRegistry meterRegistry) {
public ShardSearchPhaseAPMMetrics(MeterRegistry meterRegistry, List<String> fetchSubPhaseNames) {
this.queryPhaseMetric = meterRegistry.registerLongHistogram(
QUERY_SEARCH_PHASE_METRIC,
"Query search phase execution times at the shard level, expressed as a histogram",
Expand All @@ -39,6 +44,17 @@ public ShardSearchPhaseAPMMetrics(MeterRegistry meterRegistry) {
"Fetch search phase execution times at the shard level, expressed as a histogram",
"ms"
);
this.fetchSubPhaseMetrics = fetchSubPhaseNames.stream()
.collect(
Collectors.toMap(
name -> name,
name -> meterRegistry.registerLongHistogram(
String.format(Locale.ROOT, FETCH_SUBPHASE_METRIC_FORMAT, name),
"Fetch sub-phase " + name + " execution times at the shard level, expressed as a histogram",
"ms"
)
)
);
}

@Override
Expand All @@ -55,6 +71,16 @@ public void onFetchPhase(SearchContext searchContext, long tookInNanos) {
recordPhaseLatency(fetchPhaseMetric, tookInNanos, searchContext.request(), rangeTimestampFrom);
}

@Override
public void onFetchSubPhase(SearchContext searchContext, String subPhaseName, long tookInNanos) {
SearchExecutionContext searchExecutionContext = searchContext.getSearchExecutionContext();
Long rangeTimestampFrom = searchExecutionContext.getRangeTimestampFrom();
LongHistogram histogramMetric = fetchSubPhaseMetrics.get(subPhaseName);
if (histogramMetric != null) {
recordPhaseLatency(histogramMetric, tookInNanos, searchContext.request(), rangeTimestampFrom);
}
}

private static void recordPhaseLatency(
LongHistogram histogramMetric,
long tookInNanos,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@ default void onFailedFetchPhase(SearchContext searchContext) {}
*/
default void onFetchPhase(SearchContext searchContext, long tookInNanos) {}

/**
* Executed after a fetch sub phase successfully finished for all docs in a shard. Used for APM metrics.
* @param searchContext the current search context
* @param subPhaseName the name of the fetch subphase
* @param tookInNanos the number of nanoseconds the fetch sub phase execution took
*/
default void onFetchSubPhase(SearchContext searchContext, String subPhaseName, long tookInNanos) {};

/**
* Executed before the DFS phase is executed
* @param searchContext the current search context
Expand Down Expand Up @@ -215,6 +223,17 @@ public void onPreDfsPhase(SearchContext searchContext) {
}
}

@Override
public void onFetchSubPhase(SearchContext searchContext, String subPhaseName, long tookInNanos) {
for (SearchOperationListener listener : listeners) {
try {
listener.onFetchSubPhase(searchContext, subPhaseName, tookInNanos);
} catch (Exception e) {
logger.warn(() -> "onFetchSubPhase listener [" + listener + "] failed", e);
}
}
}

@Override
public void onFailedDfsPhase(SearchContext searchContext) {
for (SearchOperationListener listener : listeners) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.SearchUtils;
import org.elasticsearch.search.aggregations.support.AggregationUsageService;
import org.elasticsearch.search.fetch.FetchSubPhase;
import org.elasticsearch.shutdown.PluginShutdownService;
import org.elasticsearch.snapshots.IndexMetadataRestoreTransformer;
import org.elasticsearch.snapshots.IndexMetadataRestoreTransformer.NoOpRestoreTransformer;
Expand Down Expand Up @@ -855,7 +856,10 @@ private void construct(
MergeMetrics mergeMetrics = new MergeMetrics(telemetryProvider.getMeterRegistry());

final List<SearchOperationListener> searchOperationListeners = List.of(
new ShardSearchPhaseAPMMetrics(telemetryProvider.getMeterRegistry())
new ShardSearchPhaseAPMMetrics(
telemetryProvider.getMeterRegistry(),
searchModule.getFetchSubPhases().stream().map(FetchSubPhase::getName).toList()
)
);

List<? extends SlowLogFieldProvider> slowLogFieldProviders = pluginsService.loadServiceProviders(SlowLogFieldProvider.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
Expand Down Expand Up @@ -1293,4 +1294,7 @@ public FetchPhase getFetchPhase() {
return new FetchPhase(fetchSubPhases);
}

public List<FetchSubPhase> getFetchSubPhases() {
return Collections.unmodifiableList(fetchSubPhases);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
Expand Down Expand Up @@ -172,6 +173,7 @@ && shouldExcludeInferenceFieldsFromSource(context.indexShard().indexSettings(),
boolean requiresSource = storedFieldsSpec.requiresSource();
final int[] locallyAccumulatedBytes = new int[1];
NestedDocuments nestedDocuments = context.getSearchExecutionContext().getNestedDocuments();
final Map<String, Long> subphaseAggregateDurations = new HashMap<>();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Part of the complicating factor here is that we call the subphase processors on individual hits, so we need to aggregate across all of the hits for this shard. Please let me know if I'm not understanding this code properly.


FetchPhaseDocsIterator docsIterator = new FetchPhaseDocsIterator() {

Expand Down Expand Up @@ -229,7 +231,9 @@ protected SearchHit nextDoc(int doc) throws IOException {
sourceProvider.source = hit.source();
fieldLookupProvider.setPreloadedStoredFieldValues(hit.hit().getId(), hit.loadedFields());
for (FetchSubPhaseProcessor processor : processors) {
long phaseStartTime = System.nanoTime();
processor.process(hit);
subphaseAggregateDurations.merge(processor.getName(), System.nanoTime() - phaseStartTime, Long::sum);
}

BytesReference sourceRef = hit.hit().getSourceRef();
Expand All @@ -254,6 +258,10 @@ protected SearchHit nextDoc(int doc) throws IOException {
context.queryResult()
);

for (Map.Entry<String, Long> entry : subphaseAggregateDurations.entrySet()) {
context.indexShard().getSearchOperationListener().onFetchSubPhase(context, entry.getKey(), entry.getValue());
}

if (context.isCancelled()) {
for (SearchHit hit : hits) {
// release all hits that would otherwise become owned and eventually released by SearchHits below
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,11 @@ public StoredFieldsSpec storedFieldsSpec() {
return delegate.storedFieldsSpec();
}

@Override
public String getName() {
return delegate.getName(); // since this implementation wraps the delegate, return its name
}

@Override
public void process(HitContext hitContext) throws IOException {
Timer timer = breakdown.getNewTimer(FetchSubPhaseTiming.PROCESS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,8 @@ public IndexReader topLevelReader() {
*/
FetchSubPhaseProcessor getProcessor(FetchContext fetchContext) throws IOException;

/**
* The name of this fetch sub phase. Used for logging and stats.
*/
String getName();
}
Loading