Skip to content
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
e0b4e4b
Calculate search load per shard in multiproject env
drempapis May 30, 2025
6d9b8bf
export package for transport action
drempapis May 30, 2025
2d9b8b6
merge main
drempapis May 30, 2025
c058a33
update transport version
drempapis May 30, 2025
254f739
Merge branch 'main' into search_load_per_index_multiproject
drempapis May 30, 2025
6614cf9
update license headers
drempapis May 30, 2025
20574d4
Merge branch 'search_load_per_index_multiproject' of github.com:dremp…
drempapis May 30, 2025
aa0c5b2
[CI] Auto commit changes from spotless
May 30, 2025
c8fb560
Register action name in constants
drempapis May 30, 2025
062fb9f
Merge branch 'main' into search_load_per_index_multiproject
drempapis May 30, 2025
182f3b6
update after review
drempapis Jun 2, 2025
0c7bf62
Merge branch 'main' into search_load_per_index_multiproject
drempapis Jun 2, 2025
102bdc1
Removing the allocationId
drempapis Jun 3, 2025
31b63b0
[CI] Auto commit changes from spotless
Jun 3, 2025
76d1540
merge main and update after review
drempapis Jun 4, 2025
695d57b
update after review
drempapis Jun 4, 2025
b276035
update after review
drempapis Jun 4, 2025
6e73c2e
remove stale package from .info file
drempapis Jun 4, 2025
ebb6ed0
Merge branch 'main' into search_load_per_index_multiproject
drempapis Jun 4, 2025
7cfec46
update after review
drempapis Jun 4, 2025
2f3597b
Merge branch 'main' into search_load_per_index_multiproject
drempapis Jun 4, 2025
d7fab3b
merge main
drempapis Jun 5, 2025
8f5ea6b
Merge branch 'main' into search_load_per_index_multiproject
drempapis Jun 5, 2025
73d93a5
update after review
drempapis Jun 6, 2025
8f36172
update after review
drempapis Jun 6, 2025
bf6ae86
merge main and resolve conflicts
drempapis Jun 6, 2025
8b1f24d
add brief documentation for searchLoad
drempapis Jun 6, 2025
7c16279
[CI] Auto commit changes from spotless
Jun 6, 2025
91ef808
Added yaml file for the search load / shard level
drempapis Jun 6, 2025
02100c2
Merge branch 'search_load_per_index_multiproject' of github.com:dremp…
drempapis Jun 6, 2025
ca429e6
Merge branch 'main' into search_load_per_index_multiproject
drempapis Jun 6, 2025
06145ee
Merge branch 'main' into search_load_per_index_multiproject
drempapis Jun 6, 2025
bd9c5ea
Merge branch 'main' into search_load_per_index_multiproject
drempapis Jun 6, 2025
30ad337
update after review
drempapis Jun 10, 2025
c7b322a
Update server/src/main/java/org/elasticsearch/index/search/stats/Sear…
drempapis Jun 10, 2025
877c62e
update after review
drempapis Jun 10, 2025
6ced664
Merge branch 'search_load_per_index_multiproject' of github.com:dremp…
drempapis Jun 10, 2025
011e654
[CI] Auto commit changes from spotless
Jun 10, 2025
17b5611
update after review
drempapis Jun 10, 2025
a43ae30
Merge branch 'search_load_per_index_multiproject' of github.com:dremp…
drempapis Jun 10, 2025
56ce7b0
merge main to branch
drempapis Jun 10, 2025
0a3ab4c
merge main
drempapis Jun 10, 2025
7af8949
Merge branch 'main' into search_load_per_index_multiproject
drempapis Jun 10, 2025
34d7b6d
Update after review
drempapis Jun 10, 2025
cb1088a
Merge branch 'search_load_per_index_multiproject' of github.com:dremp…
drempapis Jun 10, 2025
e4e82f7
[CI] Auto commit changes from spotless
Jun 10, 2025
5efd07a
Merge branch 'main' into search_load_per_index_multiproject
drempapis Jun 10, 2025
7c0462e
Merge branch 'main' into search_load_per_index_multiproject
drempapis Jun 10, 2025
ccd37ba
Merge branch 'main' into search_load_per_index_multiproject
drempapis Jun 10, 2025
d5e2751
Merge branch 'main' into search_load_per_index_multiproject
drempapis Jun 10, 2025
eeb558d
Merge branch 'main' into search_load_per_index_multiproject
drempapis Jun 11, 2025
cb48991
Update for serverless checks
drempapis Jun 11, 2025
5bb7b9e
Merge branch 'main' into search_load_per_index_multiproject
drempapis Jun 11, 2025
b439f89
Merge branch 'main' into search_load_per_index_multiproject
drempapis Jun 11, 2025
8f30723
Merge branch 'main' into search_load_per_index_multiproject
drempapis Jun 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.mapper.MapperMetrics;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.search.stats.SearchStatsSettings;
import org.elasticsearch.index.seqno.RetentionLeaseSyncer;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.translog.TestTranslog;
Expand Down Expand Up @@ -638,7 +639,8 @@ public static final IndexShard newIndexShard(
System::nanoTime,
null,
MapperMetrics.NOOP,
new IndexingStatsSettings(ClusterSettings.createBuiltInClusterSettings())
new IndexingStatsSettings(ClusterSettings.createBuiltInClusterSettings()),
new SearchStatsSettings(ClusterSettings.createBuiltInClusterSettings())
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ static TransportVersion def(int id) {
public static final TransportVersion JOIN_ON_ALIASES = def(9_088_0_00);
public static final TransportVersion ILM_ADD_SKIP_SETTING = def(9_089_0_00);
public static final TransportVersion ML_INFERENCE_MISTRAL_CHAT_COMPLETION_ADDED = def(9_090_0_00);
public static final TransportVersion SEARCH_LOAD_PER_INDEX_STATS = def(9_091_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
import org.elasticsearch.index.IndexingPressure;
import org.elasticsearch.index.MergePolicyConfig;
import org.elasticsearch.index.engine.ThreadPoolMergeScheduler;
import org.elasticsearch.index.search.stats.SearchStatsSettings;
import org.elasticsearch.index.shard.IndexingStatsSettings;
import org.elasticsearch.indices.IndexingMemoryController;
import org.elasticsearch.indices.IndicesQueryCache;
Expand Down Expand Up @@ -636,6 +637,7 @@ public void apply(Settings value, Settings current, Settings previous) {
ShardsAvailabilityHealthIndicatorService.REPLICA_UNASSIGNED_BUFFER_TIME,
DataStreamFailureStoreSettings.DATA_STREAM_FAILURE_STORED_ENABLED_SETTING,
IndexingStatsSettings.RECENT_WRITE_LOAD_HALF_LIFE_SETTING,
SearchStatsSettings.RECENT_READ_LOAD_HALF_LIFE_SETTING,
TransportGetAllocationStatsAction.CACHE_TTL_SETTING
);
}
9 changes: 7 additions & 2 deletions server/src/main/java/org/elasticsearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.elasticsearch.index.mapper.MapperMetrics;
import org.elasticsearch.index.mapper.MapperRegistry;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.search.stats.SearchStatsSettings;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexingOperationListener;
import org.elasticsearch.index.shard.IndexingStatsSettings;
Expand Down Expand Up @@ -179,6 +180,7 @@ public interface DirectoryWrapper {
private final SetOnce<Engine.IndexCommitListener> indexCommitListener = new SetOnce<>();
private final MapperMetrics mapperMetrics;
private final IndexingStatsSettings indexingStatsSettings;
private final SearchStatsSettings searchStatsSettings;

/**
* Construct the index module for the index with the specified index settings. The index module contains extension points for plugins
Expand All @@ -200,7 +202,8 @@ public IndexModule(
final SlowLogFieldProvider slowLogFieldProvider,
final MapperMetrics mapperMetrics,
final List<SearchOperationListener> searchOperationListeners,
final IndexingStatsSettings indexingStatsSettings
final IndexingStatsSettings indexingStatsSettings,
final SearchStatsSettings searchStatsSettings
) {
this.indexSettings = indexSettings;
this.analysisRegistry = analysisRegistry;
Expand All @@ -216,6 +219,7 @@ public IndexModule(
this.recoveryStateFactories = recoveryStateFactories;
this.mapperMetrics = mapperMetrics;
this.indexingStatsSettings = indexingStatsSettings;
this.searchStatsSettings = searchStatsSettings;
}

/**
Expand Down Expand Up @@ -552,7 +556,8 @@ public IndexService newIndexService(
indexCommitListener.get(),
mapperMetrics,
queryRewriteInterceptor,
indexingStatsSettings
indexingStatsSettings,
searchStatsSettings
);
success = true;
return indexService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.elasticsearch.index.query.QueryRewriteContext;
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.index.query.SearchIndexNameMatcher;
import org.elasticsearch.index.search.stats.SearchStatsSettings;
import org.elasticsearch.index.seqno.RetentionLeaseSyncer;
import org.elasticsearch.index.shard.GlobalCheckpointSyncer;
import org.elasticsearch.index.shard.IndexEventListener;
Expand Down Expand Up @@ -170,6 +171,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private final MapperMetrics mapperMetrics;
private final QueryRewriteInterceptor queryRewriteInterceptor;
private final IndexingStatsSettings indexingStatsSettings;
private final SearchStatsSettings searchStatsSettings;

@SuppressWarnings("this-escape")
public IndexService(
Expand Down Expand Up @@ -207,7 +209,8 @@ public IndexService(
Engine.IndexCommitListener indexCommitListener,
MapperMetrics mapperMetrics,
QueryRewriteInterceptor queryRewriteInterceptor,
IndexingStatsSettings indexingStatsSettings
IndexingStatsSettings indexingStatsSettings,
SearchStatsSettings searchStatsSettings
) {
super(indexSettings);
assert indexCreationContext != IndexCreationContext.RELOAD_ANALYZERS
Expand Down Expand Up @@ -293,6 +296,7 @@ public IndexService(
this.retentionLeaseSyncTask = new AsyncRetentionLeaseSyncTask(this);
}
this.indexingStatsSettings = indexingStatsSettings;
this.searchStatsSettings = searchStatsSettings;
updateFsyncTaskIfNecessary();
}

Expand Down Expand Up @@ -583,7 +587,8 @@ public synchronized IndexShard createShard(
System::nanoTime,
indexCommitListener,
mapperMetrics,
indexingStatsSettings
indexingStatsSettings,
searchStatsSettings
);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public static class Stats implements Writeable, ToXContentFragment {
private long queryFailure;
private long fetchFailure;

private double recentSearchLoad;

private Stats() {
// for internal use, initializes all counts to 0
}
Expand All @@ -67,7 +69,8 @@ public Stats(
long scrollCurrent,
long suggestCount,
long suggestTimeInMillis,
long suggestCurrent
long suggestCurrent,
double recentSearchLoad
) {
this.queryCount = queryCount;
this.queryTimeInMillis = queryTimeInMillis;
Expand All @@ -86,6 +89,9 @@ public Stats(
this.suggestCount = suggestCount;
this.suggestTimeInMillis = suggestTimeInMillis;
this.suggestCurrent = suggestCurrent;

this.recentSearchLoad = recentSearchLoad;

}

private Stats(StreamInput in) throws IOException {
Expand All @@ -109,6 +115,10 @@ private Stats(StreamInput in) throws IOException {
queryFailure = in.readVLong();
fetchFailure = in.readVLong();
}

if (in.getTransportVersion().onOrAfter(TransportVersions.SEARCH_LOAD_PER_INDEX_STATS)) {
recentSearchLoad = in.readDouble();
}
}

@Override
Expand All @@ -133,6 +143,10 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(queryFailure);
out.writeVLong(fetchFailure);
}

if (out.getTransportVersion().onOrAfter(TransportVersions.SEARCH_LOAD_PER_INDEX_STATS)) {
out.writeDouble(recentSearchLoad);
}
}

public void add(Stats stats) {
Expand All @@ -153,6 +167,8 @@ public void add(Stats stats) {
suggestCount += stats.suggestCount;
suggestTimeInMillis += stats.suggestTimeInMillis;
suggestCurrent += stats.suggestCurrent;

recentSearchLoad += stats.recentSearchLoad;
}

public void addForClosingShard(Stats stats) {
Expand All @@ -171,6 +187,8 @@ public void addForClosingShard(Stats stats) {

suggestCount += stats.suggestCount;
suggestTimeInMillis += stats.suggestTimeInMillis;

recentSearchLoad += stats.recentSearchLoad;
}

public long getQueryCount() {
Expand Down Expand Up @@ -245,6 +263,10 @@ public long getSuggestCurrent() {
return suggestCurrent;
}

public double getSearchLoadRate() {
return recentSearchLoad;
}

public static Stats readStats(StreamInput in) throws IOException {
return new Stats(in);
}
Expand All @@ -269,6 +291,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.humanReadableField(Fields.SUGGEST_TIME_IN_MILLIS, Fields.SUGGEST_TIME, getSuggestTime());
builder.field(Fields.SUGGEST_CURRENT, suggestCurrent);

builder.field(Fields.EMW_RATE, recentSearchLoad);

return builder;
}

Expand All @@ -290,7 +314,8 @@ public boolean equals(Object o) {
&& scrollCurrent == that.scrollCurrent
&& suggestCount == that.suggestCount
&& suggestTimeInMillis == that.suggestTimeInMillis
&& suggestCurrent == that.suggestCurrent;
&& suggestCurrent == that.suggestCurrent
&& recentSearchLoad == that.recentSearchLoad;
}

@Override
Expand All @@ -309,7 +334,8 @@ public int hashCode() {
scrollCurrent,
suggestCount,
suggestTimeInMillis,
suggestCurrent
suggestCurrent,
recentSearchLoad
);
}
}
Expand Down Expand Up @@ -427,6 +453,7 @@ static final class Fields {
static final String SUGGEST_TIME = "suggest_time";
static final String SUGGEST_TIME_IN_MILLIS = "suggest_time_in_millis";
static final String SUGGEST_CURRENT = "suggest_current";
static final String EMW_RATE = "ewm_rate";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index.search.stats;

import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;

/**
* Container for cluster settings
*/
public class SearchStatsSettings {

public static final TimeValue RECENT_READ_LOAD_HALF_LIFE_DEFAULT = TimeValue.timeValueMinutes(5);
static final TimeValue RECENT_READ_LOAD_HALF_LIFE_MIN = TimeValue.timeValueSeconds(1); // A sub-second half-life makes no sense
static final TimeValue RECENT_READ_LOAD_HALF_LIFE_MAX = TimeValue.timeValueDays(100_000); // Long.MAX_VALUE nanos, rounded down

/**
* A cluster setting giving the half-life, in seconds, to use for the Exponentially Weighted Moving Rate calculation used for the
* recency-weighted read load
*
* <p>This is dynamic, but changes only apply to newly-opened shards.
*/
public static final Setting<TimeValue> RECENT_READ_LOAD_HALF_LIFE_SETTING = Setting.timeSetting(
"indices.stats.recent_read_load.half_life",
RECENT_READ_LOAD_HALF_LIFE_DEFAULT,
RECENT_READ_LOAD_HALF_LIFE_MIN,
RECENT_READ_LOAD_HALF_LIFE_MAX,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

private volatile TimeValue recentReadLoadHalfLifeForNewShards = RECENT_READ_LOAD_HALF_LIFE_SETTING.getDefault(Settings.EMPTY);

public SearchStatsSettings(ClusterSettings clusterSettings) {
clusterSettings.initializeAndWatch(RECENT_READ_LOAD_HALF_LIFE_SETTING, value -> recentReadLoadHalfLifeForNewShards = value);
}

public TimeValue getRecentReadLoadHalfLifeForNewShards() {
return recentReadLoadHalfLifeForNewShards;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
package org.elasticsearch.index.search.stats;

import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.metrics.ExponentiallyWeightedMovingRate;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.util.CollectionUtils;
Expand All @@ -26,9 +27,15 @@

public final class ShardSearchStats implements SearchOperationListener {

private final StatsHolder totalStats = new StatsHolder();
private final StatsHolder totalStats;
private final CounterMetric openContexts = new CounterMetric();
private volatile Map<String, StatsHolder> groupsStats = emptyMap();
private final SearchStatsSettings searchStatsSettings;

public ShardSearchStats(SearchStatsSettings searchStatsSettings) {
this.searchStatsSettings = searchStatsSettings;
this.totalStats = new StatsHolder(searchStatsSettings);
}

/**
* Returns the stats, including group specific stats. If the groups are null/0 length, then nothing
Expand Down Expand Up @@ -78,9 +85,11 @@ public void onFailedQueryPhase(SearchContext searchContext) {
@Override
public void onQueryPhase(SearchContext searchContext, long tookInNanos) {
computeStats(searchContext, searchContext.hasOnlySuggest() ? statsHolder -> {
statsHolder.recentSearchLoad.addIncrement(tookInNanos, System.nanoTime());
statsHolder.suggestMetric.inc(tookInNanos);
statsHolder.suggestCurrent.dec();
} : statsHolder -> {
statsHolder.recentSearchLoad.addIncrement(tookInNanos, System.nanoTime());
statsHolder.queryMetric.inc(tookInNanos);
statsHolder.queryCurrent.dec();
});
Expand All @@ -102,6 +111,7 @@ public void onFailedFetchPhase(SearchContext searchContext) {
@Override
public void onFetchPhase(SearchContext searchContext, long tookInNanos) {
computeStats(searchContext, statsHolder -> {
statsHolder.recentSearchLoad.addIncrement(tookInNanos, System.nanoTime());
statsHolder.fetchMetric.inc(tookInNanos);
statsHolder.fetchCurrent.dec();
});
Expand All @@ -123,7 +133,7 @@ private StatsHolder groupStats(String group) {
synchronized (this) {
stats = groupsStats.get(group);
if (stats == null) {
stats = new StatsHolder();
stats = new StatsHolder(searchStatsSettings);
groupsStats = Maps.copyMapWithAddedEntry(groupsStats, group, stats);
}
}
Expand Down Expand Up @@ -173,6 +183,13 @@ static final class StatsHolder {
final CounterMetric queryFailure = new CounterMetric();
final CounterMetric fetchFailure = new CounterMetric();

final ExponentiallyWeightedMovingRate recentSearchLoad;

StatsHolder(SearchStatsSettings searchStatsSettings) {
double lambdaInInverseNanos = Math.log(2.0) / searchStatsSettings.getRecentReadLoadHalfLifeForNewShards().nanos();
this.recentSearchLoad = new ExponentiallyWeightedMovingRate(lambdaInInverseNanos, System.nanoTime());
}

SearchStats.Stats stats() {
return new SearchStats.Stats(
queryMetric.count(),
Expand All @@ -188,7 +205,8 @@ SearchStats.Stats stats() {
scrollCurrent.count(),
suggestMetric.count(),
TimeUnit.NANOSECONDS.toMillis(suggestMetric.sum()),
suggestCurrent.count()
suggestCurrent.count(),
recentSearchLoad.getRate(System.nanoTime())
);
}
}
Expand Down
Loading