Skip to content
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
ca0eb27
Add DfsPhase metrics
drempapis Apr 3, 2025
4d7e8b2
Add missing code
drempapis Apr 3, 2025
1b3917c
Merge branch 'main' into ShardSearchStats_usage_search_load_per_index
drempapis Apr 4, 2025
f7d62f1
Add testing
drempapis Apr 4, 2025
463e995
Add tests
drempapis Apr 7, 2025
86254f9
[CI] Auto commit changes from spotless
Apr 7, 2025
8cb0de3
update spot
drempapis Apr 7, 2025
7df5739
Merge branch 'ShardSearchStats_usage_search_load_per_index' of github…
drempapis Apr 7, 2025
18d4e8c
Merge branch 'main' into ShardSearchStats_usage_search_load_per_index
drempapis Apr 7, 2025
b29e627
Merge branch 'main' into ShardSearchStats_usage_search_load_per_index
drempapis Apr 7, 2025
667e627
Add version for bwc
drempapis Apr 7, 2025
3331988
Merge branch 'main' into ShardSearchStats_usage_search_load_per_index
drempapis Apr 8, 2025
f03499b
Merge branch 'main' into ShardSearchStats_usage_search_load_per_index
drempapis Apr 8, 2025
3ca903a
Update after review
drempapis Apr 8, 2025
934a983
Merge branch 'main' into ShardSearchStats_usage_search_load_per_index
drempapis Apr 8, 2025
7034507
t Merge branch 'ShardSearchStats_usage_search_load_per_index' of gith…
drempapis Apr 8, 2025
d1c9475
Merge branch 'main' into ShardSearchStats_usage_search_load_per_index
drempapis Apr 9, 2025
50cc46a
merge main
drempapis Jun 30, 2025
e1c095d
[CI] Auto commit changes from spotless
Jun 30, 2025
d1709ea
Merge branch 'main' into ShardSearchStats_usage_search_load_per_index
drempapis Jun 30, 2025
14dfccc
update after reviw - revert code
drempapis Jul 7, 2025
fc5195f
update after reviw - revert code
drempapis Jul 7, 2025
9ec9ffa
update after reviw - revert code
drempapis Jul 7, 2025
49f096e
update after review - revert code
drempapis Jul 7, 2025
14a813e
Merge branch 'main' into ShardSearchStats_usage_search_load_per_index
drempapis Jul 7, 2025
7d87db6
Merge branch 'main' into ShardSearchStats_usage_search_load_per_index
drempapis Jul 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -567,10 +567,12 @@ public void testSimpleStats() throws Exception {
// make sure that number of requests in progress is 0
assertThat(stats.getIndex("test1").getTotal().getIndexing().getTotal().getIndexCurrent(), equalTo(0L));
assertThat(stats.getIndex("test1").getTotal().getIndexing().getTotal().getDeleteCurrent(), equalTo(0L));
assertThat(stats.getIndex("test1").getTotal().getSearch().getTotal().getDfsCurrent(), equalTo(0L));
assertThat(stats.getIndex("test1").getTotal().getSearch().getTotal().getFetchCurrent(), equalTo(0L));
assertThat(stats.getIndex("test1").getTotal().getSearch().getTotal().getQueryCurrent(), equalTo(0L));
assertThat(stats.getIndex("test2").getTotal().getIndexing().getTotal().getIndexCurrent(), equalTo(0L));
assertThat(stats.getIndex("test2").getTotal().getIndexing().getTotal().getDeleteCurrent(), equalTo(0L));
assertThat(stats.getIndex("test2").getTotal().getSearch().getTotal().getDfsCurrent(), equalTo(0L));
assertThat(stats.getIndex("test2").getTotal().getSearch().getTotal().getFetchCurrent(), equalTo(0L));
assertThat(stats.getIndex("test2").getTotal().getSearch().getTotal().getQueryCurrent(), equalTo(0L));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
static final String STANDARD_INDEX_COUNT = "es.indices.standard.total";
static final String STANDARD_BYTES_SIZE = "es.indices.standard.size";
static final String STANDARD_DOCS_COUNT = "es.indices.standard.docs.total";
static final String STANDARD_DFS_COUNT = "es.indices.standard.dfs.total";
static final String STANDARD_DFS_TIME = "es.indices.standard.dfs.time";
static final String STANDARD_DFS_FAILURE = "es.indices.standard.dfs.failure.total";
static final String STANDARD_QUERY_COUNT = "es.indices.standard.query.total";
static final String STANDARD_QUERY_TIME = "es.indices.standard.query.time";
static final String STANDARD_QUERY_FAILURE = "es.indices.standard.query.failure.total";
Expand Down Expand Up @@ -245,6 +248,10 @@ public void testIndicesMetrics() {
telemetry,
1,
Map.of(
STANDARD_DFS_COUNT,
equalTo(search1.getDfsCount()),
STANDARD_DFS_TIME,
equalTo(search1.getDfsTimeInMillis()),
STANDARD_QUERY_COUNT,
equalTo(numStandardIndices),
STANDARD_QUERY_TIME,
Expand Down Expand Up @@ -334,6 +341,8 @@ public void testIndicesMetrics() {
telemetry,
4,
Map.of(
STANDARD_DFS_FAILURE,
equalTo(0L),
STANDARD_QUERY_FAILURE,
equalTo(0L),
STANDARD_FETCH_FAILURE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ public void testSimpleStats() throws Exception {

IndicesStatsResponse indicesStats = indicesAdmin().prepareStats().get();
logger.debug("###### indices search stats: {}", indicesStats.getTotal().getSearch());
assertThat(indicesStats.getTotal().getSearch().getTotal().getDfsCount(), equalTo(0L));
assertThat(indicesStats.getTotal().getSearch().getTotal().getDfsTimeInMillis(), equalTo(0L));
assertThat(indicesStats.getTotal().getSearch().getTotal().getQueryCount(), greaterThan(0L));
assertThat(indicesStats.getTotal().getSearch().getTotal().getQueryTimeInMillis(), greaterThan(0L));
assertThat(indicesStats.getTotal().getSearch().getTotal().getFetchCount(), greaterThan(0L));
Expand All @@ -138,6 +140,8 @@ public void testSimpleStats() throws Exception {

indicesStats = indicesAdmin().prepareStats().setGroups("group1").get();
assertThat(indicesStats.getTotal().getSearch().getGroupStats(), notNullValue());
assertThat(indicesStats.getTotal().getSearch().getGroupStats().get("group1").getDfsCount(), equalTo(0L));
assertThat(indicesStats.getTotal().getSearch().getGroupStats().get("group1").getDfsTimeInMillis(), equalTo(0L));
assertThat(indicesStats.getTotal().getSearch().getGroupStats().get("group1").getQueryCount(), greaterThan(0L));
assertThat(indicesStats.getTotal().getSearch().getGroupStats().get("group1").getQueryTimeInMillis(), greaterThan(0L));
assertThat(indicesStats.getTotal().getSearch().getGroupStats().get("group1").getFetchCount(), greaterThan(0L));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ static TransportVersion def(int id) {
public static final TransportVersion ML_INFERENCE_COHERE_API_VERSION = def(9_110_0_00);
public static final TransportVersion ESQL_PROFILE_INCLUDE_PLAN = def(9_111_0_00);
public static final TransportVersion MAPPINGS_IN_DATA_STREAMS = def(9_112_0_00);
public static final TransportVersion DFS_STATS = def(9_113_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ public class SearchStats implements Writeable, ToXContentFragment {

public static class Stats implements Writeable, ToXContentFragment {

private long dfsCount;
private long dfsTimeInMillis;
private long dfsCurrent;

private long queryCount;
private long queryTimeInMillis;
private long queryCurrent;
Expand All @@ -46,6 +50,7 @@ public static class Stats implements Writeable, ToXContentFragment {
private long suggestTimeInMillis;
private long suggestCurrent;

private long dfsFailure;
private long queryFailure;
private long fetchFailure;

Expand All @@ -58,6 +63,10 @@ private Stats() {
}

public Stats(
long dfsCount,
long dfsTimeInMillis,
long dfsCurrent,
long dfsFailure,
long queryCount,
long queryTimeInMillis,
long queryCurrent,
Expand All @@ -74,6 +83,11 @@ public Stats(
long suggestCurrent,
double recentSearchLoad
) {
this.dfsCount = dfsCount;
this.dfsTimeInMillis = dfsTimeInMillis;
this.dfsCurrent = dfsCurrent;
this.dfsFailure = dfsFailure;

this.queryCount = queryCount;
this.queryTimeInMillis = queryTimeInMillis;
this.queryCurrent = queryCurrent;
Expand All @@ -97,6 +111,13 @@ public Stats(
}

private Stats(StreamInput in) throws IOException {
if (in.getTransportVersion().onOrAfter(TransportVersions.DFS_STATS)) {
dfsCount = in.readVLong();
dfsTimeInMillis = in.readVLong();
dfsCurrent = in.readVLong();
dfsFailure = in.readVLong();
}

queryCount = in.readVLong();
queryTimeInMillis = in.readVLong();
queryCurrent = in.readVLong();
Expand Down Expand Up @@ -125,6 +146,13 @@ private Stats(StreamInput in) throws IOException {

@Override
public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().onOrAfter(TransportVersions.DFS_STATS)) {
out.writeVLong(dfsCount);
out.writeVLong(dfsTimeInMillis);
out.writeVLong(dfsCurrent);
out.writeVLong(dfsFailure);
}

out.writeVLong(queryCount);
out.writeVLong(queryTimeInMillis);
out.writeVLong(queryCurrent);
Expand Down Expand Up @@ -152,6 +180,11 @@ public void writeTo(StreamOutput out) throws IOException {
}

public void add(Stats stats) {
dfsCount += stats.dfsCount;
dfsTimeInMillis += stats.dfsTimeInMillis;
dfsCurrent += stats.dfsCurrent;
dfsFailure += stats.dfsFailure;

queryCount += stats.queryCount;
queryTimeInMillis += stats.queryTimeInMillis;
queryCurrent += stats.queryCurrent;
Expand All @@ -174,6 +207,10 @@ public void add(Stats stats) {
}

public void addForClosingShard(Stats stats) {
dfsCount += stats.dfsCount;
dfsTimeInMillis += stats.dfsTimeInMillis;
dfsFailure += stats.dfsFailure;

queryCount += stats.queryCount;
queryTimeInMillis += stats.queryTimeInMillis;
queryFailure += stats.queryFailure;
Expand All @@ -193,6 +230,26 @@ public void addForClosingShard(Stats stats) {
recentSearchLoad += stats.recentSearchLoad;
}

public long getDfsCount() {
return dfsCount;
}

public TimeValue getDfsTime() {
return new TimeValue(dfsTimeInMillis);
}

public long getDfsTimeInMillis() {
return dfsTimeInMillis;
}

public long getDfsCurrent() {
return dfsCurrent;
}

public long getDfsFailure() {
return dfsFailure;
}

public long getQueryCount() {
return queryCount;
}
Expand Down Expand Up @@ -275,6 +332,11 @@ public static Stats readStats(StreamInput in) throws IOException {

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(Fields.DFS_TOTAL, dfsCount);
builder.humanReadableField(Fields.DFS_TIME_IN_MILLIS, Fields.DFS_TIME, getDfsTime());
builder.field(Fields.DFS_CURRENT, dfsCurrent);
builder.field(Fields.DFS_FAILURE, dfsFailure);

builder.field(Fields.QUERY_TOTAL, queryCount);
builder.humanReadableField(Fields.QUERY_TIME_IN_MILLIS, Fields.QUERY_TIME, getQueryTime());
builder.field(Fields.QUERY_CURRENT, queryCurrent);
Expand Down Expand Up @@ -303,7 +365,11 @@ public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Stats that = (Stats) o;
return queryCount == that.queryCount
return dfsCount == that.dfsCount
&& dfsTimeInMillis == that.dfsTimeInMillis
&& dfsCurrent == that.dfsCurrent
&& dfsFailure == that.dfsFailure
&& queryCount == that.queryCount
&& queryTimeInMillis == that.queryTimeInMillis
&& queryCurrent == that.queryCurrent
&& queryFailure == that.queryFailure
Expand All @@ -323,6 +389,10 @@ public boolean equals(Object o) {
@Override
public int hashCode() {
return Objects.hash(
dfsCount,
dfsTimeInMillis,
dfsCurrent,
dfsFailure,
queryCount,
queryTimeInMillis,
queryCurrent,
Expand Down Expand Up @@ -437,6 +507,11 @@ static final class Fields {
static final String SEARCH = "search";
static final String OPEN_CONTEXTS = "open_contexts";
static final String GROUPS = "groups";
static final String DFS_TOTAL = "dfs_total";
static final String DFS_TIME = "dfs_time";
static final String DFS_TIME_IN_MILLIS = "dfs_time_in_millis";
static final String DFS_CURRENT = "dfs_current";
static final String DFS_FAILURE = "dfs_failure";
static final String QUERY_TOTAL = "query_total";
static final String QUERY_TIME = "query_time";
static final String QUERY_TIME_IN_MILLIS = "query_time_in_millis";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,28 @@ public void onFetchPhase(SearchContext searchContext, long tookInNanos) {
});
}

@Override
public void onPreDfsPhase(SearchContext searchContext) {
computeStats(searchContext, statsHolder -> statsHolder.dfsCurrent.inc());
}

@Override
public void onDfsPhase(SearchContext searchContext, long tookInNanos) {
computeStats(searchContext, statsHolder -> {
statsHolder.recentSearchLoad.addIncrement(tookInNanos, System.nanoTime());
statsHolder.dfsMetric.inc(tookInNanos);
statsHolder.dfsCurrent.dec();
});
}

@Override
public void onFailedDfsPhase(SearchContext searchContext) {
computeStats(searchContext, statsHolder -> {
statsHolder.dfsCurrent.dec();
statsHolder.dfsFailure.inc();
});
}

private void computeStats(SearchContext searchContext, Consumer<StatsHolder> consumer) {
consumer.accept(totalStats);
var groupStats = searchContext.groupStats();
Expand Down Expand Up @@ -164,6 +186,7 @@ public void onFreeScrollContext(ReaderContext readerContext) {
}

static final class StatsHolder {
final MeanMetric dfsMetric = new MeanMetric();
final MeanMetric queryMetric = new MeanMetric();
final MeanMetric fetchMetric = new MeanMetric();
/* We store scroll statistics in microseconds because with nanoseconds we run the risk of overflowing the total stats if there are
Expand All @@ -175,11 +198,13 @@ static final class StatsHolder {
*/
final MeanMetric scrollMetric = new MeanMetric();
final MeanMetric suggestMetric = new MeanMetric();
final CounterMetric dfsCurrent = new CounterMetric();
final CounterMetric queryCurrent = new CounterMetric();
final CounterMetric fetchCurrent = new CounterMetric();
final CounterMetric scrollCurrent = new CounterMetric();
final CounterMetric suggestCurrent = new CounterMetric();

final CounterMetric dfsFailure = new CounterMetric();
final CounterMetric queryFailure = new CounterMetric();
final CounterMetric fetchFailure = new CounterMetric();

Expand All @@ -192,6 +217,10 @@ static final class StatsHolder {

SearchStats.Stats stats() {
return new SearchStats.Stats(
dfsMetric.count(),
TimeUnit.NANOSECONDS.toMillis(dfsMetric.sum()),
dfsCurrent.count(),
dfsFailure.count(),
queryMetric.count(),
TimeUnit.NANOSECONDS.toMillis(queryMetric.sum()),
queryCurrent.count(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,28 @@ default void onFailedFetchPhase(SearchContext searchContext) {}
*/
default void onFetchPhase(SearchContext searchContext, long tookInNanos) {}

/**
* Executed before the DFS phase is executed
* @param searchContext the current search context
*/
default void onPreDfsPhase(SearchContext searchContext) {}

/**
* Executed after the query DFS successfully finished.
* Note: this is not invoked if the DFS phase execution failed.
* @param searchContext the current search context
* @param tookInNanos the number of nanoseconds the query execution took
*
* @see #onFailedQueryPhase(SearchContext)
*/
default void onDfsPhase(SearchContext searchContext, long tookInNanos) {}

/**
* Executed if a dfs phased failed.
* @param searchContext the current search context
*/
default void onFailedDfsPhase(SearchContext searchContext) {}

/**
* Executed when a new reader context was created
* @param readerContext the created context
Expand Down Expand Up @@ -182,6 +204,39 @@ public void onFetchPhase(SearchContext searchContext, long tookInNanos) {
}
}

@Override
public void onPreDfsPhase(SearchContext searchContext) {
for (SearchOperationListener listener : listeners) {
try {
listener.onPreDfsPhase(searchContext);
} catch (Exception e) {
logger.warn(() -> "onPreDfsPhase listener [" + listener + "] failed", e);
}
}
}

@Override
public void onFailedDfsPhase(SearchContext searchContext) {
for (SearchOperationListener listener : listeners) {
try {
listener.onFailedDfsPhase(searchContext);
} catch (Exception e) {
logger.warn(() -> "onFailedDfsPhase listener [" + listener + "] failed", e);
}
}
}

@Override
public void onDfsPhase(SearchContext searchContext, long tookInNanos) {
for (SearchOperationListener listener : listeners) {
try {
listener.onDfsPhase(searchContext, tookInNanos);
} catch (Exception e) {
logger.warn(() -> "onDfsPhase listener [" + listener + "] failed", e);
}
}
}

@Override
public void onNewReaderContext(ReaderContext readerContext) {
for (SearchOperationListener listener : listeners) {
Expand Down
Loading