Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
126 commits
Select commit Hold shift + click to select a range
9d9b6af
unmute tests
drempapis Jan 3, 2025
02ddf82
revert
drempapis Jan 3, 2025
bf79ef3
Merge remote-tracking branch 'upstream/main'
drempapis Jan 3, 2025
f1c91bd
Merge remote-tracking branch 'upstream/main'
drempapis Jan 7, 2025
ba67bff
Merge remote-tracking branch 'upstream/main'
drempapis Jan 7, 2025
2c3654a
Merge remote-tracking branch 'upstream/main'
drempapis Jan 8, 2025
58d4762
Merge remote-tracking branch 'upstream/main'
drempapis Jan 8, 2025
bc38090
Merge remote-tracking branch 'upstream/main'
drempapis Jan 9, 2025
ef0447b
Merge remote-tracking branch 'upstream/main'
drempapis Jan 9, 2025
fe009d7
Merge remote-tracking branch 'upstream/main'
drempapis Jan 10, 2025
a747a40
Merge remote-tracking branch 'upstream/main'
drempapis Jan 10, 2025
f3e47ae
Merge remote-tracking branch 'upstream/main'
drempapis Jan 10, 2025
2bc0107
Merge remote-tracking branch 'upstream/main'
drempapis Jan 13, 2025
f3b3d00
Merge remote-tracking branch 'upstream/main'
drempapis Jan 13, 2025
f52789e
Merge remote-tracking branch 'upstream/main'
drempapis Jan 14, 2025
ec243b7
Merge remote-tracking branch 'upstream/main'
drempapis Jan 14, 2025
f93eb9b
Merge remote-tracking branch 'upstream/main'
drempapis Jan 16, 2025
2777916
Merge remote-tracking branch 'upstream/main'
drempapis Jan 16, 2025
623bd7b
Merge remote-tracking branch 'upstream/main'
drempapis Jan 16, 2025
3fee6af
Merge remote-tracking branch 'upstream/main'
drempapis Jan 16, 2025
af3fff9
Merge remote-tracking branch 'upstream/main'
drempapis Jan 17, 2025
a41bbad
Merge remote-tracking branch 'upstream/main'
drempapis Jan 17, 2025
8115a61
Merge remote-tracking branch 'upstream/main'
drempapis Jan 20, 2025
6b2361e
Merge remote-tracking branch 'upstream/main'
drempapis Jan 20, 2025
41dcc1c
Merge remote-tracking branch 'upstream/main'
drempapis Jan 21, 2025
2df7f62
Merge remote-tracking branch 'upstream/main'
drempapis Jan 21, 2025
6b95b0c
Merge remote-tracking branch 'upstream/main'
drempapis Jan 22, 2025
eadf8cf
Merge remote-tracking branch 'upstream/main'
drempapis Jan 23, 2025
41dc557
Merge remote-tracking branch 'upstream/main'
drempapis Jan 24, 2025
e0e1740
Merge remote-tracking branch 'upstream/main'
drempapis Jan 24, 2025
1ba2eaa
Merge remote-tracking branch 'upstream/main'
drempapis Jan 24, 2025
6820c35
Merge remote-tracking branch 'upstream/main'
drempapis Jan 24, 2025
8280559
Merge remote-tracking branch 'upstream/main'
drempapis Jan 24, 2025
e82375e
Merge remote-tracking branch 'upstream/main'
drempapis Jan 27, 2025
ce4c1c9
Merge remote-tracking branch 'upstream/main'
drempapis Jan 27, 2025
ff92f92
Merge remote-tracking branch 'upstream/main'
drempapis Jan 28, 2025
c0f4d18
Merge remote-tracking branch 'upstream/main'
drempapis Jan 28, 2025
02ac377
Merge remote-tracking branch 'upstream/main'
drempapis Jan 29, 2025
42aa647
Merge remote-tracking branch 'upstream/main'
drempapis Jan 30, 2025
bd258ab
Merge remote-tracking branch 'upstream/main'
drempapis Jan 30, 2025
0d5b0d3
Merge remote-tracking branch 'upstream/main'
drempapis Jan 31, 2025
a6f543a
Merge remote-tracking branch 'upstream/main'
drempapis Jan 31, 2025
89a6aca
Merge remote-tracking branch 'upstream/main'
drempapis Jan 31, 2025
19b0900
Merge remote-tracking branch 'upstream/main'
drempapis Jan 31, 2025
9f1af25
Merge remote-tracking branch 'upstream/main'
drempapis Feb 3, 2025
9cedf78
Merge remote-tracking branch 'upstream/main'
drempapis Feb 7, 2025
2404c81
WIP
piergm Feb 10, 2025
a23541b
Merge remote-tracking branch 'upstream/main'
drempapis Feb 11, 2025
37dec8b
Dimi's work :D
piergm Feb 11, 2025
2ce21d8
small changes and java docs
piergm Feb 11, 2025
28c9677
add method to stop traking index execution
piergm Feb 11, 2025
4851a68
update
drempapis Feb 11, 2025
6858dfc
[CI] Auto commit changes from spotless
Feb 11, 2025
fa3a2fd
Update docs/changelog/122262.yaml
piergm Feb 11, 2025
088c1e5
update
drempapis Feb 12, 2025
9cddba5
Add smoke test for new ThreadPoolExecutor
drempapis Feb 12, 2025
c1f0b3d
Rework executor shutdown method
drempapis Feb 12, 2025
e50af6f
Delete unusesd import
drempapis Feb 12, 2025
23ad8c6
[CI] Auto commit changes from spotless
Feb 12, 2025
6a83b07
Add some metric logs
drempapis Feb 12, 2025
7af835a
[CI] Auto commit changes from spotless
Feb 12, 2025
9f0ec33
update
drempapis Feb 13, 2025
0d58081
[CI] Auto commit changes from spotless
Feb 13, 2025
dab0de9
revert logging level
drempapis Feb 13, 2025
585d8f7
update:
drempapis Feb 13, 2025
15311a3
update
drempapis Feb 13, 2025
5125276
[CI] Auto commit changes from spotless
Feb 13, 2025
b8a7294
Merge branch 'elastic:main' into mp_search_load-per-index
piergm Feb 13, 2025
406af11
Merge remote-tracking branch 'upstream/main'
drempapis Feb 14, 2025
d324d5f
Merge remote-tracking branch 'upstream/main'
drempapis Feb 14, 2025
fc2b041
Merge remote-tracking branch 'upstream/main'
drempapis Feb 14, 2025
206454b
Merge remote-tracking branch 'upstream/main'
drempapis Feb 14, 2025
f36ed2a
Merge remote-tracking branch 'upstream/main'
drempapis Feb 17, 2025
acc28b5
Merge remote-tracking branch 'upstream/main'
drempapis Feb 17, 2025
f8d3ce0
Merge remote-tracking branch 'upstream/main'
drempapis Feb 17, 2025
c34cc87
Merge remote-tracking branch 'upstream/main'
drempapis Feb 17, 2025
e2c36f8
Merge remote-tracking branch 'upstream/main'
drempapis Feb 17, 2025
8398854
Merge remote-tracking branch 'upstream/main'
drempapis Feb 18, 2025
037eedd
Merge remote-tracking branch 'upstream/main'
drempapis Feb 18, 2025
2a8a7d3
Merge remote-tracking branch 'upstream/main'
drempapis Feb 19, 2025
83b8fab
Merge remote-tracking branch 'upstream/main'
drempapis Feb 19, 2025
b6a0df7
Merge remote-tracking branch 'upstream/main'
drempapis Feb 19, 2025
c33891e
Merge remote-tracking branch 'upstream/main'
drempapis Feb 21, 2025
0c64b54
Merge remote-tracking branch 'upstream/main'
drempapis Feb 21, 2025
e7ba6ee
Merge remote-tracking branch 'upstream/main'
drempapis Feb 21, 2025
66453b4
Merge remote-tracking branch 'upstream/main'
drempapis Feb 24, 2025
a345f39
Merge remote-tracking branch 'upstream/main'
drempapis Feb 25, 2025
4700d59
Merge branch 'main' into mp_search_load-per-index
piergm Mar 3, 2025
f563dcd
removes unnecessary debug output
piergm Mar 3, 2025
a8a5efa
[CI] Auto commit changes from spotless
Mar 3, 2025
3454904
iter
piergm Mar 3, 2025
68c78f2
[CI] Auto commit changes from spotless
Mar 3, 2025
d8477e9
iter
piergm Mar 3, 2025
5c574e4
Merge branch 'main' into mp_search_load-per-index
piergm Mar 3, 2025
1e47dde
[CI] Auto commit changes from spotless
Mar 3, 2025
e9dac7a
Merge remote-tracking branch 'upstream/main'
drempapis Mar 4, 2025
936a08d
Update code after review
drempapis Mar 5, 2025
2cb1681
Merge remote-tracking branch 'upstream/main'
drempapis Mar 5, 2025
d823c4f
Merge branch 'main' into mp_search_load-per-index_review
drempapis Mar 5, 2025
a8debe3
update
drempapis Mar 5, 2025
b2cb778
update
drempapis Mar 5, 2025
610b0fe
update for debug
drempapis Mar 5, 2025
ff01846
update
drempapis Mar 6, 2025
d90c490
Merge remote-tracking branch 'upstream/main'
drempapis Mar 6, 2025
bfa9305
Merge branch 'main' into mp_search_load-per-index_review
drempapis Mar 6, 2025
6652730
update
drempapis Mar 6, 2025
23cc8ca
update
drempapis Mar 6, 2025
d38a1f5
update
drempapis Mar 7, 2025
202aef4
update
drempapis Mar 7, 2025
e044b26
update
drempapis Mar 7, 2025
887ec8e
update
drempapis Mar 7, 2025
1eb1381
update
drempapis Mar 7, 2025
c725c0c
update
drempapis Mar 7, 2025
7cfcf5c
update
drempapis Mar 7, 2025
76cdebc
update
drempapis Mar 7, 2025
a2e879e
revert code
drempapis Mar 7, 2025
a08adf4
update
drempapis Mar 7, 2025
66011ec
update
drempapis Mar 7, 2025
0188bb9
revert code
drempapis Mar 7, 2025
387caa3
[CI] Auto commit changes from spotless
Mar 7, 2025
491a91e
update for review
drempapis Mar 7, 2025
c71baf9
Merge branch 'mp_search_load-per-index' of github.com:piergm/elastics…
drempapis Mar 7, 2025
2d187be
revert config
drempapis Mar 7, 2025
b13c457
Merge branch 'main' into mp_search_load-per-index
drempapis Mar 7, 2025
02b19f9
Fix missing newline at end of XML file
drempapis Mar 7, 2025
e9002db
Fix newline at end of XML file
drempapis Mar 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .idea/runConfigurations/Debug_Elasticsearch__node_2_.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion .idea/runConfigurations/Debug_Elasticsearch__node_3_.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions docs/changelog/122262.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 122262
summary: Measure search load per index
area: Search
type: feature
issues: []
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.cluster.coordination;

import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.search.stats.ShardSearchPerIndexTimeTrackingMetrics;

/**
* Service responsible for cleaning up task execution time tracking for deleted indices.
* Implements the ClusterStateListener interface to listen for cluster state changes.
*/
public class SearchIndexTimeTrackingCleanupService implements ClusterStateListener {

private ShardSearchPerIndexTimeTrackingMetrics listener;

/**
* Constructor.
*
* @param listener the listener for shard search time tracking metrics
*/
public SearchIndexTimeTrackingCleanupService(ShardSearchPerIndexTimeTrackingMetrics listener) {
this.listener = listener;
}

/**
* Called when the cluster state changes. Stops tracking execution time for deleted indices.
*
* @param event the cluster changed event
*/
@Override
public void clusterChanged(ClusterChangedEvent event) {
for (Index index : event.indicesDeleted()) {
listener.stopTrackingIndex(index.getName());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index.search.stats;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.ExponentiallyWeightedMovingAverage;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.SearchOperationListener;
import org.elasticsearch.search.internal.SearchContext;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.LongAdder;

/**
* This class implements the {@link SearchOperationListener} interface to track the execution time of search operations
* on a per-index basis. It uses a {@link ConcurrentHashMap} to store the execution times and an
* {@link ExponentiallyWeightedMovingAverage} to calculate the exponentially weighted moving average (EWMA) of the
* execution times.
*/
public final class ShardSearchPerIndexTimeTrackingMetrics implements SearchOperationListener {

private static final Logger logger = LogManager.getLogger(ShardSearchPerIndexTimeTrackingMetrics.class);

private final ConcurrentHashMap<String, Tuple<LongAdder, ExponentiallyWeightedMovingAverage>> indexExecutionTime;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sorry for saying it in such a straightforward manner, but do we really want to add more logic based on this class?

Especially on a per-shard basis, using the math in ExponentiallyWeightedMovingAverage seems questionable.

We calculate newValue = alpha * lastValue + (1 - alpha) * currentValue. In a large number of use-cases you may see the fetch and query times be an order of magnitude apart. So now, assuming the query always matches, we will essentially flap between two values constantly for a shard?

Why not just use the existing metrics we have in org.elasticsearch.index.search.stats.ShardSearchStats? EWMA makes no sense here, if anything isn't total query time and it's derivative what we care about?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, we left it in because we where not sure about it and I asked Dimi to leave a comment on the PR about this.

Copy link
Contributor

@drempapis drempapis Mar 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All right, let's check how it can be adapted into the ShardSearchStats


private final double ewmaAlpha;

/**
* Constructs a new ShardSearchPerIndexTimeTrackingMetrics instance with the specified EWMA alpha value.
*
* @param ewmaAlpha the alpha value for the EWMA calculation
*/
public ShardSearchPerIndexTimeTrackingMetrics(double ewmaAlpha) {
this.indexExecutionTime = new ConcurrentHashMap<>();
this.ewmaAlpha = ewmaAlpha;
}

/**
* Tracks the execution time of the query phase of a search operation.
*
* @param searchContext the search context
* @param tookInNanos the time taken in nanoseconds
*/
@Override
public void onQueryPhase(SearchContext searchContext, long tookInNanos) {
trackExecutionTime(searchContext, tookInNanos);
}

/**
* Tracks the execution time of a failed query phase of a search operation.
*
* @param searchContext the search context
* @param tookInNanos the time taken in nanoseconds
*/
@Override
public void onFailedQueryPhase(SearchContext searchContext, long tookInNanos) {
trackExecutionTime(searchContext, tookInNanos);
}

/**
* Tracks the execution time of the fetch phase of a search operation.
*
* @param searchContext the search context
* @param tookInNanos the time taken in nanoseconds
*/
@Override
public void onFetchPhase(SearchContext searchContext, long tookInNanos) {
trackExecutionTime(searchContext, tookInNanos);
}

/**
* Tracks the execution time of a failed fetch phase of a search operation.
*
* @param searchContext the search context
* @param tookInNanos the time taken in nanoseconds
*/
@Override
public void onFailedFetchPhase(SearchContext searchContext, long tookInNanos) {
trackExecutionTime(searchContext, tookInNanos);
}

/**
* Tracks the execution time of a search operation.
*
* @param searchContext the search context
* @param tookInNanos the time taken in nanoseconds
*/
private void trackExecutionTime(SearchContext searchContext, long tookInNanos) {
IndexShard indexShard = searchContext.indexShard();
if (indexShard != null && indexShard.isSystem() == false) {
String indexName = indexShard.shardId().getIndexName();
if (indexName != null) {
Tuple<LongAdder, ExponentiallyWeightedMovingAverage> t = indexExecutionTime.computeIfAbsent(
indexName,
k -> new Tuple<>(new LongAdder(), new ExponentiallyWeightedMovingAverage(ewmaAlpha, 0))
);
t.v1().add(tookInNanos);
t.v2().addValue(tookInNanos);
}
}
}

/**
* Gets the total execution time for tasks associated with a specific index.
*
* @param indexName the name of the index
* @return the total execution time for the index
*/
public long getSearchLoadPerIndex(String indexName) {
Tuple<LongAdder, ExponentiallyWeightedMovingAverage> t = indexExecutionTime.get(indexName);
return (t != null) ? t.v1().sum() : 0;
}

/**
* Gets the exponentially weighted moving average (EWMA) of the execution time for tasks associated with a specific index name.
*
* @param indexName the name of the index
* @return the EWMA of the execution time for the index
*/
public double getLoadEMWAPerIndex(String indexName) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The EMWA is still under consideration if we need to calculate and export it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe some more context here:
We are calculating load on a per-index basis, loads are then collected and summed up (TODO) in the master node. With this information we will need to calculate which of the indices where under the most load and act based on that. The idea is to then normalize the "global" index load and act on the normalized values.
That said the per-node EMWA is not really suitable to be summed across nodes in our opinion.
That's why this comment.

Tuple<LongAdder, ExponentiallyWeightedMovingAverage> t = indexExecutionTime.get(indexName);
return (t != null) ? t.v2().getAverage() : 0;
}

/**
* Stops tracking the execution time for tasks associated with a specific index.
*
* @param indexName the name of the index
*/
public void stopTrackingIndex(String indexName) {
if (indexExecutionTime.containsKey(indexName)) {
indexExecutionTime.remove(indexName);
} else {
logger.debug("Trying to stop tracking index [{}] that was never tracked", indexName);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void onPreQueryPhase(SearchContext searchContext) {
}

@Override
public void onFailedQueryPhase(SearchContext searchContext) {
public void onFailedQueryPhase(SearchContext searchContext, long tookInNanos) {
computeStats(searchContext, statsHolder -> {
if (searchContext.hasOnlySuggest()) {
statsHolder.suggestCurrent.dec();
Expand Down Expand Up @@ -92,7 +92,7 @@ public void onPreFetchPhase(SearchContext searchContext) {
}

@Override
public void onFailedFetchPhase(SearchContext searchContext) {
public void onFailedFetchPhase(SearchContext searchContext, long tookInNanos) {
computeStats(searchContext, statsHolder -> {
statsHolder.fetchCurrent.dec();
statsHolder.fetchFailure.inc();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,17 @@ default void onPreQueryPhase(SearchContext searchContext) {}
/**
* Executed if a query phased failed.
* @param searchContext the current search context
* @param tookInNanos the number of nanoseconds the query execution took
*/
default void onFailedQueryPhase(SearchContext searchContext) {}
default void onFailedQueryPhase(SearchContext searchContext, long tookInNanos) {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need to track the execution time when a phase fails.


/**
* Executed after the query phase successfully finished.
* Note: this is not invoked if the query phase execution failed.
* @param searchContext the current search context
* @param tookInNanos the number of nanoseconds the query execution took
*
* @see #onFailedQueryPhase(SearchContext)
* @see #onFailedQueryPhase(SearchContext, long)
*/
default void onQueryPhase(SearchContext searchContext, long tookInNanos) {}

Expand All @@ -52,16 +53,17 @@ default void onPreFetchPhase(SearchContext searchContext) {}
/**
* Executed if a fetch phased failed.
* @param searchContext the current search context
* @param tookInNanos the number of nanoseconds the query execution took
*/
default void onFailedFetchPhase(SearchContext searchContext) {}
default void onFailedFetchPhase(SearchContext searchContext, long tookInNanos) {}

/**
* Executed after the fetch phase successfully finished.
* Note: this is not invoked if the fetch phase execution failed.
* @param searchContext the current search context
* @param tookInNanos the number of nanoseconds the fetch execution took
*
* @see #onFailedFetchPhase(SearchContext)
* @see #onFailedFetchPhase(SearchContext, long)
*/
default void onFetchPhase(SearchContext searchContext, long tookInNanos) {}

Expand Down Expand Up @@ -128,10 +130,10 @@ public void onPreQueryPhase(SearchContext searchContext) {
}

@Override
public void onFailedQueryPhase(SearchContext searchContext) {
public void onFailedQueryPhase(SearchContext searchContext, long tookInNanos) {
for (SearchOperationListener listener : listeners) {
try {
listener.onFailedQueryPhase(searchContext);
listener.onFailedQueryPhase(searchContext, tookInNanos);
} catch (Exception e) {
logger.warn(() -> "onFailedQueryPhase listener [" + listener + "] failed", e);
}
Expand Down Expand Up @@ -161,10 +163,10 @@ public void onPreFetchPhase(SearchContext searchContext) {
}

@Override
public void onFailedFetchPhase(SearchContext searchContext) {
public void onFailedFetchPhase(SearchContext searchContext, long tookInNanos) {
for (SearchOperationListener listener : listeners) {
try {
listener.onFailedFetchPhase(searchContext);
listener.onFailedFetchPhase(searchContext, tookInNanos);
} catch (Exception e) {
logger.warn(() -> "onFailedFetchPhase listener [" + listener + "] failed", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.cluster.coordination.CoordinationDiagnosticsService;
import org.elasticsearch.cluster.coordination.Coordinator;
import org.elasticsearch.cluster.coordination.MasterHistoryService;
import org.elasticsearch.cluster.coordination.SearchIndexTimeTrackingCleanupService;
import org.elasticsearch.cluster.coordination.StableMasterHealthIndicatorService;
import org.elasticsearch.cluster.metadata.DataStreamFailureStoreSettings;
import org.elasticsearch.cluster.metadata.DataStreamGlobalRetentionSettings;
Expand Down Expand Up @@ -82,6 +83,7 @@
import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.SuppressForbidden;
Expand Down Expand Up @@ -120,6 +122,7 @@
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.index.mapper.MapperMetrics;
import org.elasticsearch.index.mapper.SourceFieldMetrics;
import org.elasticsearch.index.search.stats.ShardSearchPerIndexTimeTrackingMetrics;
import org.elasticsearch.index.search.stats.ShardSearchPhaseAPMMetrics;
import org.elasticsearch.index.shard.SearchOperationListener;
import org.elasticsearch.indices.ExecutorSelector;
Expand Down Expand Up @@ -818,10 +821,17 @@ private void construct(
threadPool::relativeTimeInMillis
);
MapperMetrics mapperMetrics = new MapperMetrics(sourceFieldMetrics);

ShardSearchPerIndexTimeTrackingMetrics listener = new ShardSearchPerIndexTimeTrackingMetrics(
EsExecutors.TaskTrackingConfig.DEFAULT.getEwmaAlpha()
);
final List<SearchOperationListener> searchOperationListeners = List.of(
new ShardSearchPhaseAPMMetrics(telemetryProvider.getMeterRegistry())
new ShardSearchPhaseAPMMetrics(telemetryProvider.getMeterRegistry()),
listener
);

clusterService.addListener(new SearchIndexTimeTrackingCleanupService(listener));

List<? extends SlowLogFieldProvider> slowLogFieldProviders = pluginsService.loadServiceProviders(SlowLogFieldProvider.class);
// NOTE: the response of index/search slow log fields below must be calculated dynamically on every call
// because the responses may change dynamically at runtime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2018,9 +2018,9 @@ public void close() {
}
} else {
if (fetch) {
listener.onFailedFetchPhase(context);
listener.onFailedFetchPhase(context, System.nanoTime() - time);
} else {
listener.onFailedQueryPhase(context);
listener.onFailedQueryPhase(context, System.nanoTime() - time);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void onPreQueryPhase(SearchContext searchContext) {
}

@Override
public void onFailedQueryPhase(SearchContext searchContext) {
public void onFailedQueryPhase(SearchContext searchContext, long tookInNanos) {
assertNotNull(searchContext);
failedQuery.incrementAndGet();
}
Expand All @@ -71,7 +71,7 @@ public void onPreFetchPhase(SearchContext searchContext) {
}

@Override
public void onFailedFetchPhase(SearchContext searchContext) {
public void onFailedFetchPhase(SearchContext searchContext, long tookInNanos) {
assertNotNull(searchContext);
failedFetch.incrementAndGet();
}
Expand Down Expand Up @@ -191,7 +191,7 @@ public void validateReaderContext(ReaderContext readerContext, TransportRequest
assertEquals(0, freeScrollContext.get());
assertEquals(0, validateSearchContext.get());

compositeListener.onFailedFetchPhase(ctx);
compositeListener.onFailedFetchPhase(ctx, -1);
assertEquals(2, preFetch.get());
assertEquals(2, preQuery.get());
assertEquals(2, failedFetch.get());
Expand All @@ -204,7 +204,7 @@ public void validateReaderContext(ReaderContext readerContext, TransportRequest
assertEquals(0, freeScrollContext.get());
assertEquals(0, validateSearchContext.get());

compositeListener.onFailedQueryPhase(ctx);
compositeListener.onFailedQueryPhase(ctx, -1);
assertEquals(2, preFetch.get());
assertEquals(2, preQuery.get());
assertEquals(2, failedFetch.get());
Expand Down
Loading