Skip to content

Commit 1793a4d

Browse files
authored
Merge branch 'main' into handle-9/deprecate-node-attrs-v2-follow-up
2 parents 8f7d322 + 623a6af commit 1793a4d

File tree

26 files changed

+713
-189
lines changed

26 files changed

+713
-189
lines changed

docs/changelog/120957.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 120957
2+
summary: Introduce `AllocationBalancingRoundSummaryService`
3+
area: Allocation
4+
type: enhancement
5+
issues: []

libs/entitlement/src/main/java/org/elasticsearch/entitlement/runtime/policy/PolicyManager.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -398,7 +398,9 @@ private ModuleEntitlements computeEntitlements(Class<?> requestingClass) {
398398
var pluginName = pluginResolver.apply(requestingClass);
399399
if (pluginName != null) {
400400
var pluginEntitlements = pluginsEntitlements.get(pluginName);
401-
if (pluginEntitlements != null) {
401+
if (pluginEntitlements == null) {
402+
return ModuleEntitlements.NONE;
403+
} else {
402404
final String scopeName;
403405
if (requestingModule.isNamed() == false) {
404406
scopeName = ALL_UNNAMED;

libs/entitlement/src/test/java/org/elasticsearch/entitlement/runtime/policy/PolicyManagerTests.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ public void testAgentsEntitlements() throws IOException, ClassNotFoundException
271271
createEmptyTestServerPolicy(),
272272
List.of(new CreateClassLoaderEntitlement()),
273273
Map.of(),
274-
c -> "test",
274+
c -> c.getPackageName().startsWith(TEST_AGENTS_PACKAGE_NAME) ? null : "test",
275275
TEST_AGENTS_PACKAGE_NAME,
276276
NO_ENTITLEMENTS_MODULE
277277
);
@@ -357,6 +357,22 @@ public void testDuplicateFlagEntitlements() {
357357
);
358358
}
359359

360+
/**
361+
* If the plugin resolver tells us a class is in a plugin, don't conclude that it's in an agent.
362+
*/
363+
public void testPluginResolverOverridesAgents() {
364+
var policyManager = new PolicyManager(
365+
createEmptyTestServerPolicy(),
366+
List.of(new CreateClassLoaderEntitlement()),
367+
Map.of(),
368+
c -> "test", // Insist that the class is in a plugin
369+
TEST_AGENTS_PACKAGE_NAME,
370+
NO_ENTITLEMENTS_MODULE
371+
);
372+
ModuleEntitlements notAgentsEntitlements = policyManager.getEntitlements(TestAgent.class);
373+
assertThat(notAgentsEntitlements.hasEntitlement(CreateClassLoaderEntitlement.class), is(false));
374+
}
375+
360376
private static Class<?> makeClassInItsOwnModule() throws IOException, ClassNotFoundException {
361377
final Path home = createTempDir();
362378
Path jar = createMockPluginJar(home);

muted-tests.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -449,6 +449,9 @@ tests:
449449
- class: org.elasticsearch.xpack.security.CoreWithSecurityClientYamlTestSuiteIT
450450
method: test {yaml=cat.aliases/10_basic/Complex alias}
451451
issue: https://github.com/elastic/elasticsearch/issues/121513
452+
- class: org.elasticsearch.xpack.security.CoreWithSecurityClientYamlTestSuiteIT
453+
method: test {yaml=snapshot.create/10_basic/Create a snapshot for missing index}
454+
issue: https://github.com/elastic/elasticsearch/issues/121536
452455

453456
# Examples:
454457
#

server/src/main/java/org/elasticsearch/ReleaseVersions.java

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -78,10 +78,10 @@ public static IntFunction<String> generateVersionsLookup(Class<?> versionContain
7878
// replace all version lists with the smallest & greatest versions
7979
versions.replaceAll((k, v) -> {
8080
if (v.size() == 1) {
81-
return List.of(v.get(0));
81+
return List.of(v.getFirst());
8282
} else {
8383
v.sort(Comparator.naturalOrder());
84-
return List.of(v.get(0), v.get(v.size() - 1));
84+
return List.of(v.getFirst(), v.getLast());
8585
}
8686
});
8787

@@ -100,14 +100,14 @@ private static IntFunction<String> lookupFunction(NavigableMap<Integer, List<Ver
100100

101101
String lowerBound, upperBound;
102102
if (versionRange != null) {
103-
lowerBound = versionRange.get(0).toString();
104-
upperBound = lastItem(versionRange).toString();
103+
lowerBound = versionRange.getFirst().toString();
104+
upperBound = versionRange.getLast().toString();
105105
} else {
106106
// infer the bounds from the surrounding entries
107107
var lowerRange = versions.lowerEntry(id);
108108
if (lowerRange != null) {
109109
// the next version is just a guess - might be a newer revision, might be a newer minor or major...
110-
lowerBound = nextVersion(lastItem(lowerRange.getValue())).toString();
110+
lowerBound = nextVersion(lowerRange.getValue().getLast()).toString();
111111
} else {
112112
// a really old version we don't have a record for
113113
// assume it's an old version id - we can just return it directly
@@ -122,7 +122,7 @@ private static IntFunction<String> lookupFunction(NavigableMap<Integer, List<Ver
122122
var upperRange = versions.higherEntry(id);
123123
if (upperRange != null) {
124124
// too hard to guess what version this id might be for using the next version - just use it directly
125-
upperBound = upperRange.getValue().get(0).toString();
125+
upperBound = upperRange.getValue().getFirst().toString();
126126
} else {
127127
// a newer version than all we know about? Can't map it...
128128
upperBound = "[" + id + "]";
@@ -133,10 +133,6 @@ private static IntFunction<String> lookupFunction(NavigableMap<Integer, List<Ver
133133
};
134134
}
135135

136-
private static <T> T lastItem(List<T> list) {
137-
return list.get(list.size() - 1);
138-
}
139-
140136
private static Version nextVersion(Version version) {
141137
return new Version(version.id + 100); // +1 to revision
142138
}
Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.cluster.routing.allocation.allocator;
11+
12+
import org.apache.logging.log4j.LogManager;
13+
import org.apache.logging.log4j.Logger;
14+
import org.elasticsearch.common.settings.ClusterSettings;
15+
import org.elasticsearch.common.settings.Setting;
16+
import org.elasticsearch.core.TimeValue;
17+
import org.elasticsearch.threadpool.Scheduler;
18+
import org.elasticsearch.threadpool.ThreadPool;
19+
20+
import java.util.ArrayList;
21+
import java.util.concurrent.ConcurrentLinkedQueue;
22+
import java.util.concurrent.atomic.AtomicReference;
23+
24+
/**
25+
* Manages the lifecycle of a series of {@link BalancingRoundSummary} results from allocation balancing rounds and creates reports thereof.
26+
* Reporting balancer round summary results will provide information with which to do cost-benefit analyses of the work that shard
27+
* allocation rebalancing executes.
28+
*
29+
* Any successfully added summary via {@link #addBalancerRoundSummary(BalancingRoundSummary)} will eventually be collected/drained and
30+
* reported. This should still be done in the event of the node stepping down from master, on the assumption that all summaries are only
31+
* added while master and should be drained for reporting. There is no need to start/stop this service with master election/stepdown because
32+
* balancer rounds will no longer be supplied when not master. It will simply drain the last summaries and then have nothing more to do.
33+
* This does have the tradeoff that non-master nodes will run a task to check for summaries to report every
34+
* {@link #BALANCER_ROUND_SUMMARIES_LOG_INTERVAL_SETTING} seconds.
35+
*/
36+
public class AllocationBalancingRoundSummaryService {
37+
38+
/** Turns on or off balancing round summary reporting. */
39+
public static final Setting<Boolean> ENABLE_BALANCER_ROUND_SUMMARIES_SETTING = Setting.boolSetting(
40+
"cluster.routing.allocation.desired_balance.enable_balancer_round_summaries",
41+
false,
42+
Setting.Property.NodeScope,
43+
Setting.Property.Dynamic
44+
);
45+
46+
/** Controls how frequently in time balancer round summaries are logged. */
47+
public static final Setting<TimeValue> BALANCER_ROUND_SUMMARIES_LOG_INTERVAL_SETTING = Setting.timeSetting(
48+
"cluster.routing.allocation.desired_balance.balanace_round_summaries_interval",
49+
TimeValue.timeValueSeconds(10),
50+
TimeValue.ZERO,
51+
Setting.Property.NodeScope,
52+
Setting.Property.Dynamic
53+
);
54+
55+
private static final Logger logger = LogManager.getLogger(AllocationBalancingRoundSummaryService.class);
56+
private final ThreadPool threadPool;
57+
private volatile boolean enableBalancerRoundSummaries;
58+
private volatile TimeValue summaryReportInterval;
59+
60+
/**
61+
* A concurrency-safe list of balancing round summaries. Balancer rounds are run and added here serially, so the queue will naturally
62+
* progress from newer to older results.
63+
*/
64+
private final ConcurrentLinkedQueue<BalancingRoundSummary> summaries = new ConcurrentLinkedQueue<>();
65+
66+
/** This reference is set when reporting is scheduled. If it is null, then reporting is inactive. */
67+
private final AtomicReference<Scheduler.Cancellable> scheduledReportFuture = new AtomicReference<>();
68+
69+
public AllocationBalancingRoundSummaryService(ThreadPool threadPool, ClusterSettings clusterSettings) {
70+
this.threadPool = threadPool;
71+
// Initialize the local setting values to avoid a null access when ClusterSettings#initializeAndWatch is called on each setting:
72+
// updating enableBalancerRoundSummaries accesses summaryReportInterval.
73+
this.enableBalancerRoundSummaries = clusterSettings.get(ENABLE_BALANCER_ROUND_SUMMARIES_SETTING);
74+
this.summaryReportInterval = clusterSettings.get(BALANCER_ROUND_SUMMARIES_LOG_INTERVAL_SETTING);
75+
76+
clusterSettings.initializeAndWatch(ENABLE_BALANCER_ROUND_SUMMARIES_SETTING, value -> {
77+
this.enableBalancerRoundSummaries = value;
78+
updateBalancingRoundSummaryReporting();
79+
});
80+
clusterSettings.initializeAndWatch(BALANCER_ROUND_SUMMARIES_LOG_INTERVAL_SETTING, value -> {
81+
// The new value will get picked up the next time that the summary report task reschedules itself on the thread pool.
82+
this.summaryReportInterval = value;
83+
});
84+
}
85+
86+
/**
87+
* Adds the summary of a balancing round. If summaries are enabled, this will eventually be reported (logging, etc.). If balancer round
88+
* summaries are not enabled in the cluster, then the summary is immediately discarded (so as not to fill up a data structure that will
89+
* never be drained).
90+
*/
91+
public void addBalancerRoundSummary(BalancingRoundSummary summary) {
92+
if (enableBalancerRoundSummaries == false) {
93+
return;
94+
}
95+
96+
summaries.add(summary);
97+
}
98+
99+
/**
100+
* Reports on all the balancer round summaries added since the last call to this method, if there are any. Then reschedules itself per
101+
* the {@link #ENABLE_BALANCER_ROUND_SUMMARIES_SETTING} and {@link #BALANCER_ROUND_SUMMARIES_LOG_INTERVAL_SETTING} settings.
102+
*/
103+
private void reportSummariesAndThenReschedule() {
104+
drainAndReportSummaries();
105+
rescheduleReporting();
106+
}
107+
108+
/**
109+
* Drains all the waiting balancer round summaries (if there are any) and reports them.
110+
*/
111+
private void drainAndReportSummaries() {
112+
var combinedSummaries = drainSummaries();
113+
if (combinedSummaries == CombinedBalancingRoundSummary.EMPTY_RESULTS) {
114+
return;
115+
}
116+
117+
logger.info("Balancing round summaries: " + combinedSummaries);
118+
}
119+
120+
/**
121+
* Returns a combined summary of all unreported allocation round summaries: may summarize a single balancer round, multiple, or none.
122+
*
123+
* @return {@link CombinedBalancingRoundSummary#EMPTY_RESULTS} if there are no balancing round summaries waiting to be reported.
124+
*/
125+
private CombinedBalancingRoundSummary drainSummaries() {
126+
ArrayList<BalancingRoundSummary> batchOfSummaries = new ArrayList<>();
127+
while (summaries.isEmpty() == false) {
128+
batchOfSummaries.add(summaries.poll());
129+
}
130+
return CombinedBalancingRoundSummary.combine(batchOfSummaries);
131+
}
132+
133+
/**
134+
* Schedules a periodic task to drain and report the latest balancer round summaries, or cancels the already running task, if the latest
135+
* setting values dictate a change to enable or disable reporting. A change to {@link #BALANCER_ROUND_SUMMARIES_LOG_INTERVAL_SETTING}
136+
* will only take effect when the periodic task completes and reschedules itself.
137+
*/
138+
private void updateBalancingRoundSummaryReporting() {
139+
if (this.enableBalancerRoundSummaries) {
140+
startReporting(this.summaryReportInterval);
141+
} else {
142+
cancelReporting();
143+
// Clear the data structure so that we don't retain unnecessary memory.
144+
drainSummaries();
145+
}
146+
}
147+
148+
/**
149+
* Schedules a reporting task, if one is not already scheduled. The reporting task will reschedule itself going forward.
150+
*/
151+
private void startReporting(TimeValue intervalValue) {
152+
if (scheduledReportFuture.get() == null) {
153+
scheduleReporting(intervalValue);
154+
}
155+
}
156+
157+
/**
158+
* Cancels the future reporting task and resets {@link #scheduledReportFuture} to null.
159+
*
160+
* Note that this is best-effort: cancellation can race with {@link #rescheduleReporting}. But that is okay because the subsequent
161+
* {@link #rescheduleReporting} will use the latest settings and choose to cancel reporting if appropriate.
162+
*/
163+
private void cancelReporting() {
164+
var future = scheduledReportFuture.getAndSet(null);
165+
if (future != null) {
166+
future.cancel();
167+
}
168+
}
169+
170+
private void scheduleReporting(TimeValue intervalValue) {
171+
scheduledReportFuture.set(
172+
threadPool.schedule(this::reportSummariesAndThenReschedule, intervalValue, threadPool.executor(ThreadPool.Names.GENERIC))
173+
);
174+
}
175+
176+
/**
177+
* Looks at the given setting values and decides whether to schedule another reporting task or cancel reporting now.
178+
*/
179+
private void rescheduleReporting() {
180+
if (this.enableBalancerRoundSummaries) {
181+
// It's possible that this races with a concurrent call to cancel reporting, but that's okay. The next rescheduleReporting call
182+
// will check the latest settings and cancel.
183+
scheduleReporting(this.summaryReportInterval);
184+
} else {
185+
cancelReporting();
186+
}
187+
}
188+
189+
// @VisibleForTesting
190+
protected void verifyNumberOfSummaries(int numberOfSummaries) {
191+
assert numberOfSummaries == summaries.size();
192+
}
193+
194+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.cluster.routing.allocation.allocator;
11+
12+
/**
13+
* Summarizes the impact to the cluster as a result of a rebalancing round.
14+
*
15+
* @param numberOfShardsToMove The number of shard moves required to move from the previous desired balance to the new one.
16+
*/
17+
public record BalancingRoundSummary(long numberOfShardsToMove) {
18+
19+
@Override
20+
public String toString() {
21+
return "BalancingRoundSummary{" + "numberOfShardsToMove=" + numberOfShardsToMove + '}';
22+
}
23+
24+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.cluster.routing.allocation.allocator;
11+
12+
import java.util.List;
13+
14+
/**
15+
* Holds combined {@link BalancingRoundSummary} results. Essentially holds a list of the balancing events and the summed up changes
16+
* across all those events: what allocation work was done across some period of time.
17+
* TODO: WIP ES-10341
18+
*
19+
* Note that each balancing round summary is the difference between, at the time, latest desired balance and the previous desired balance.
20+
* Each summary represents a step towards the next desired balance, which is based on presuming the previous desired balance is reached. So
21+
* combining them is roughly the difference between the first summary's previous desired balance and the last summary's latest desired
22+
* balance.
23+
*
24+
* @param numberOfBalancingRounds How many balancing round summaries are combined in this report.
25+
* @param numberOfShardMoves The sum of shard moves for each balancing round being combined into a single summary.
26+
*/
27+
public record CombinedBalancingRoundSummary(int numberOfBalancingRounds, long numberOfShardMoves) {
28+
29+
public static final CombinedBalancingRoundSummary EMPTY_RESULTS = new CombinedBalancingRoundSummary(0, 0);
30+
31+
public static CombinedBalancingRoundSummary combine(List<BalancingRoundSummary> summaries) {
32+
if (summaries.isEmpty()) {
33+
return EMPTY_RESULTS;
34+
}
35+
36+
int numSummaries = 0;
37+
long numberOfShardMoves = 0;
38+
for (BalancingRoundSummary summary : summaries) {
39+
++numSummaries;
40+
numberOfShardMoves += summary.numberOfShardsToMove();
41+
}
42+
return new CombinedBalancingRoundSummary(numSummaries, numberOfShardMoves);
43+
}
44+
45+
}

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalance.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,7 @@
2424
* strictly increasing sequence number. A new master term restarts the index values from zero. The balancer,
2525
* which runs async to reroute, uses the latest request's data to compute the desired balance.
2626
* @param assignments a set of the (persistent) node IDs to which each {@link ShardId} should be allocated
27-
* @param weightsPerNode The node weights calculated based on
28-
* {@link org.elasticsearch.cluster.routing.allocation.allocator.WeightFunction#calculateNodeWeight}
27+
* @param weightsPerNode The node weights calculated based on {@link WeightFunction#calculateNodeWeight}
2928
*/
3029
public record DesiredBalance(
3130
long lastConvergedIndex,

0 commit comments

Comments
 (0)