-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Add node write load to the ClusterInfo #130411
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
DiannaHohensee
merged 13 commits into
elastic:main
from
DiannaHohensee:2025/07/01/cluster-info-service
Jul 10, 2025
Merged
Changes from 1 commit
Commits
Show all changes
13 commits
Select commit
Hold shift + click to select a range
f81f9f9
Add node write load to the ClusterInfo
DiannaHohensee ca5c9f6
Merge branch 'main' into 2025/07/01/cluster-info-service
DiannaHohensee ac350d9
change names and types
DiannaHohensee 9c7e04c
change NodeLoad data structure
DiannaHohensee 5d41e3d
Merge branch 'main' into 2025/07/01/cluster-info-service
DiannaHohensee a93d678
renaming
DiannaHohensee 5783217
rename collector
DiannaHohensee b064071
tidying up rename
DiannaHohensee 90858f8
fix serialization bug
DiannaHohensee b777c8f
Merge branch 'main' into 2025/07/01/cluster-info-service
DiannaHohensee c280c19
Merge branch 'main' into 2025/07/01/cluster-info-service
DiannaHohensee d918b56
Merge branch 'main' into 2025/07/01/cluster-info-service
DiannaHohensee a08509b
Merge branch 'main' into 2025/07/01/cluster-info-service
DiannaHohensee File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
10 changes: 10 additions & 0 deletions
10
...ernalClusterTest/resources/META-INF/services/org.elasticsearch.cluster.WriteLoadCollector
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,10 @@ | ||
| # | ||
| # Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| # or more contributor license agreements. Licensed under the "Elastic License | ||
| # 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side | ||
| # Public License v 1"; you may not use this file except in compliance with, at | ||
| # your election, the "Elastic License 2.0", the "GNU Affero General Public | ||
| # License v3.0 only", or the "Server Side Public License, v 1". | ||
| # | ||
|
|
||
| org.elasticsearch.index.shard.IndexShardIT$BogusWriteLoadCollector |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -58,9 +58,10 @@ public class ClusterInfo implements ChunkedToXContent, Writeable { | |
| final Map<NodeAndShard, String> dataPath; | ||
| final Map<NodeAndPath, ReservedSpace> reservedSpace; | ||
| final Map<String, EstimatedHeapUsage> estimatedHeapUsages; | ||
| final Map<String, NodeWriteLoad> nodeWriteLoads; | ||
|
|
||
| protected ClusterInfo() { | ||
| this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of()); | ||
| this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of()); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -73,6 +74,7 @@ protected ClusterInfo() { | |
| * @param dataPath the shard routing to datapath mapping | ||
| * @param reservedSpace reserved space per shard broken down by node and data path | ||
| * @param estimatedHeapUsages estimated heap usage broken down by node | ||
| * @param nodeWriteLoads estimated node-level write load broken down by node | ||
| * @see #shardIdentifierFromRouting | ||
| */ | ||
| public ClusterInfo( | ||
|
|
@@ -82,7 +84,8 @@ public ClusterInfo( | |
| Map<ShardId, Long> shardDataSetSizes, | ||
| Map<NodeAndShard, String> dataPath, | ||
| Map<NodeAndPath, ReservedSpace> reservedSpace, | ||
| Map<String, EstimatedHeapUsage> estimatedHeapUsages | ||
| Map<String, EstimatedHeapUsage> estimatedHeapUsages, | ||
| Map<String, NodeWriteLoad> nodeWriteLoads | ||
| ) { | ||
| this.leastAvailableSpaceUsage = Map.copyOf(leastAvailableSpaceUsage); | ||
| this.mostAvailableSpaceUsage = Map.copyOf(mostAvailableSpaceUsage); | ||
|
|
@@ -91,6 +94,7 @@ public ClusterInfo( | |
| this.dataPath = Map.copyOf(dataPath); | ||
| this.reservedSpace = Map.copyOf(reservedSpace); | ||
| this.estimatedHeapUsages = Map.copyOf(estimatedHeapUsages); | ||
| this.nodeWriteLoads = Map.copyOf(nodeWriteLoads); | ||
| } | ||
|
|
||
| public ClusterInfo(StreamInput in) throws IOException { | ||
|
|
@@ -107,6 +111,11 @@ public ClusterInfo(StreamInput in) throws IOException { | |
| } else { | ||
| this.estimatedHeapUsages = Map.of(); | ||
| } | ||
| if (in.getTransportVersion().onOrAfter(TransportVersions.NODE_WRITE_LOAD_IN_CLUSTER_INFO)) { | ||
| this.nodeWriteLoads = in.readImmutableMap(NodeWriteLoad::new); | ||
| } else { | ||
| this.nodeWriteLoads = Map.of(); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -124,6 +133,9 @@ public void writeTo(StreamOutput out) throws IOException { | |
| if (out.getTransportVersion().onOrAfter(TransportVersions.HEAP_USAGE_IN_CLUSTER_INFO)) { | ||
| out.writeMap(this.estimatedHeapUsages, StreamOutput::writeWriteable); | ||
| } | ||
| if (out.getTransportVersion().onOrAfter(TransportVersions.NODE_WRITE_LOAD_IN_CLUSTER_INFO)) { | ||
| out.writeMap(this.nodeWriteLoads, StreamOutput::writeWriteable); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -204,8 +216,8 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params | |
| return builder.endObject(); // NodeAndPath | ||
| }), | ||
| endArray() // end "reserved_sizes" | ||
| // NOTE: We don't serialize estimatedHeapUsages at this stage, to avoid | ||
| // committing to API payloads until the feature is settled | ||
| // NOTE: We don't serialize estimatedHeapUsages/nodeWriteLoads at this stage, to avoid | ||
| // committing to API payloads until the features are settled | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not entirely sure what the toXContentChunked is used for, but figured I should follow suit. |
||
| ); | ||
| } | ||
|
|
||
|
|
@@ -220,6 +232,13 @@ public Map<String, EstimatedHeapUsage> getEstimatedHeapUsages() { | |
| return estimatedHeapUsages; | ||
| } | ||
|
|
||
| /** | ||
| * Returns a map containing the node-level write load estimate for each node by node ID. | ||
| */ | ||
| public Map<String, NodeWriteLoad> getNodeWriteLoads() { | ||
| return nodeWriteLoads; | ||
| } | ||
|
|
||
| /** | ||
| * Returns a node id to disk usage mapping for the path that has the least available space on the node. | ||
| * Note that this does not take account of reserved space: there may be another path with less available _and unreserved_ space. | ||
|
|
@@ -311,12 +330,21 @@ public boolean equals(Object o) { | |
| && shardSizes.equals(that.shardSizes) | ||
| && shardDataSetSizes.equals(that.shardDataSetSizes) | ||
| && dataPath.equals(that.dataPath) | ||
| && reservedSpace.equals(that.reservedSpace); | ||
| && reservedSpace.equals(that.reservedSpace) | ||
| && nodeWriteLoads.equals(that.nodeWriteLoads); | ||
| } | ||
|
|
||
| @Override | ||
| public int hashCode() { | ||
| return Objects.hash(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, shardDataSetSizes, dataPath, reservedSpace); | ||
| return Objects.hash( | ||
| leastAvailableSpaceUsage, | ||
| mostAvailableSpaceUsage, | ||
| shardSizes, | ||
| shardDataSetSizes, | ||
| dataPath, | ||
| reservedSpace, | ||
| nodeWriteLoads | ||
| ); | ||
| } | ||
|
|
||
| @Override | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I originally thought we'd be collecting the node write load stats in the stateless code. Now it's looking like we may not have a dependency on stateless code to collect the stats, but it's not yet decided. As I've already got this implemented, I'd like to go ahead with it as is, if folks agree. It will be a testing change later, if needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it will be more than a testing change if we don't rely on stateless for collecting stats? We don't need to use the service provider pattern in that case, i.e. no need for the pluginsService.loadSingletonServiceProvider in the
NodeServiceProviderclass. That said, I am ok with us going with this for now. We can remove it later if it turns out to be unnecessary.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The team discussion seems to decide that we will either extending NodeStats API or adding a new transport action for collection purpose. So maybe we can remove it in this PR already?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I'd have to change the NodeServiceProvider line, too 👍
My inclination is to ship this as is, without revisiting the testing, so I don't block myself or others picking up the next pieces of project work. And I'm also not 100% confident proposals will survive contact with code, haha.