Skip to content

Commit 8c13be2

Browse files
committed
Merge branch 'main' into data-stream-settings-transport-action
2 parents 912de59 + edc8da4 commit 8c13be2

File tree

40 files changed

+1549
-201
lines changed

40 files changed

+1549
-201
lines changed

docs/changelog/126091.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 126091
2+
summary: Allow balancing weights to be set per tier
3+
area: Allocation
4+
type: enhancement
5+
issues: []

docs/changelog/127225.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 127225
2+
summary: Fix count optimization with pushable union types
3+
area: ES|QL
4+
type: bug
5+
issues:
6+
- 127200

muted-tests.yml

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -408,9 +408,6 @@ tests:
408408
- class: org.elasticsearch.xpack.esql.qa.multi_node.EsqlSpecIT
409409
method: test {rerank.Reranker before a limit ASYNC}
410410
issue: https://github.com/elastic/elasticsearch/issues/127051
411-
- class: org.elasticsearch.xpack.esql.qa.single_node.EsqlSpecIT
412-
method: test {rrf.SimpleRrf ASYNC}
413-
issue: https://github.com/elastic/elasticsearch/issues/127063
414411
- class: org.elasticsearch.packaging.test.DockerTests
415412
method: test026InstallBundledRepositoryPlugins
416413
issue: https://github.com/elastic/elasticsearch/issues/127081
@@ -426,9 +423,6 @@ tests:
426423
- class: org.elasticsearch.xpack.esql.qa.single_node.GenerativeIT
427424
method: test
428425
issue: https://github.com/elastic/elasticsearch/issues/127157
429-
- class: org.elasticsearch.xpack.esql.qa.single_node.EsqlSpecIT
430-
method: test {fork.ForkWithWhereSortDescAndLimit SYNC}
431-
issue: https://github.com/elastic/elasticsearch/issues/127326
432426
- class: org.elasticsearch.geometry.utils.SpatialEnvelopeVisitorTests
433427
method: testVisitGeoPointsWrapping
434428
issue: https://github.com/elastic/elasticsearch/issues/123425
@@ -441,6 +435,18 @@ tests:
441435
- class: org.elasticsearch.xpack.esql.heap_attack.HeapAttackIT
442436
method: testLookupExplosionNoFetch
443437
issue: https://github.com/elastic/elasticsearch/issues/127365
438+
- class: org.elasticsearch.action.admin.cluster.state.TransportClusterStateActionDisruptionIT
439+
method: testNonLocalRequestAlwaysFindsMasterAndWaitsForMetadata
440+
issue: https://github.com/elastic/elasticsearch/issues/127422
441+
- class: org.elasticsearch.xpack.esql.qa.single_node.PushQueriesIT
442+
method: testPushCaseInsensitiveEqualityOnDefaults
443+
issue: https://github.com/elastic/elasticsearch/issues/127431
444+
- class: org.elasticsearch.action.admin.cluster.state.TransportClusterStateActionDisruptionIT
445+
method: testLocalRequestAlwaysSucceeds
446+
issue: https://github.com/elastic/elasticsearch/issues/127423
447+
- class: org.elasticsearch.action.admin.cluster.state.TransportClusterStateActionDisruptionIT
448+
method: testLocalRequestWaitsForMetadata
449+
issue: https://github.com/elastic/elasticsearch/issues/127466
444450

445451
# Examples:
446452
#

server/src/main/java/org/elasticsearch/cluster/ClusterModule.java

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,10 @@
3838
import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster;
3939
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
4040
import org.elasticsearch.cluster.routing.allocation.allocator.BalancerSettings;
41+
import org.elasticsearch.cluster.routing.allocation.allocator.BalancingWeightsFactory;
4142
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator;
4243
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator.DesiredBalanceReconcilerAction;
44+
import org.elasticsearch.cluster.routing.allocation.allocator.GlobalBalancingWeightsFactory;
4345
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator;
4446
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
4547
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
@@ -146,11 +148,20 @@ public ClusterModule(
146148
this.deciderList = createAllocationDeciders(settings, clusterService.getClusterSettings(), clusterPlugins);
147149
this.allocationDeciders = new AllocationDeciders(deciderList);
148150
final BalancerSettings balancerSettings = new BalancerSettings(clusterService.getClusterSettings());
149-
var nodeAllocationStatsAndWeightsCalculator = new NodeAllocationStatsAndWeightsCalculator(writeLoadForecaster, balancerSettings);
151+
final BalancingWeightsFactory balancingWeightsFactory = getBalancingWeightsFactory(
152+
clusterPlugins,
153+
balancerSettings,
154+
clusterService.getClusterSettings()
155+
);
156+
var nodeAllocationStatsAndWeightsCalculator = new NodeAllocationStatsAndWeightsCalculator(
157+
writeLoadForecaster,
158+
balancingWeightsFactory
159+
);
150160
this.shardsAllocator = createShardsAllocator(
151161
settings,
152162
clusterService.getClusterSettings(),
153163
balancerSettings,
164+
balancingWeightsFactory,
154165
threadPool,
155166
clusterPlugins,
156167
clusterService,
@@ -203,6 +214,22 @@ public ShardRouting.Role newEmptyRole(int copyIndex) {
203214
};
204215
}
205216

217+
static BalancingWeightsFactory getBalancingWeightsFactory(
218+
List<ClusterPlugin> clusterPlugins,
219+
BalancerSettings balancerSettings,
220+
ClusterSettings clusterSettings
221+
) {
222+
final var strategies = clusterPlugins.stream()
223+
.map(pl -> pl.getBalancingWeightsFactory(balancerSettings, clusterSettings))
224+
.filter(Objects::nonNull)
225+
.toList();
226+
return switch (strategies.size()) {
227+
case 0 -> new GlobalBalancingWeightsFactory(balancerSettings);
228+
case 1 -> strategies.getFirst();
229+
default -> throw new IllegalArgumentException("multiple plugins define balancing weights factories, which is not permitted");
230+
};
231+
}
232+
206233
private ClusterState reconcile(ClusterState clusterState, RerouteStrategy rerouteStrategy) {
207234
return allocationService.executeWithRoutingAllocation(clusterState, "reconcile-desired-balance", rerouteStrategy);
208235
}
@@ -439,6 +466,7 @@ private static ShardsAllocator createShardsAllocator(
439466
Settings settings,
440467
ClusterSettings clusterSettings,
441468
BalancerSettings balancerSettings,
469+
BalancingWeightsFactory balancingWeightsFactory,
442470
ThreadPool threadPool,
443471
List<ClusterPlugin> clusterPlugins,
444472
ClusterService clusterService,
@@ -448,12 +476,15 @@ private static ShardsAllocator createShardsAllocator(
448476
NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator
449477
) {
450478
Map<String, Supplier<ShardsAllocator>> allocators = new HashMap<>();
451-
allocators.put(BALANCED_ALLOCATOR, () -> new BalancedShardsAllocator(balancerSettings, writeLoadForecaster));
479+
allocators.put(
480+
BALANCED_ALLOCATOR,
481+
() -> new BalancedShardsAllocator(balancerSettings, writeLoadForecaster, balancingWeightsFactory)
482+
);
452483
allocators.put(
453484
DESIRED_BALANCE_ALLOCATOR,
454485
() -> new DesiredBalanceShardsAllocator(
455486
clusterSettings,
456-
new BalancedShardsAllocator(balancerSettings, writeLoadForecaster),
487+
new BalancedShardsAllocator(balancerSettings, writeLoadForecaster, balancingWeightsFactory),
457488
threadPool,
458489
clusterService,
459490
reconciler,

server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -135,14 +135,16 @@ private SnapshotsInProgress(Map<ProjectRepo, ByRepo> entries, Set<String> nodesI
135135

136136
@FixForMultiProject
137137
@Deprecated(forRemoval = true)
138-
public SnapshotsInProgress withUpdatedEntriesForRepo(String repository, List<Entry> updatedEntries) {
139-
return withUpdatedEntriesForRepo(Metadata.DEFAULT_PROJECT_ID, repository, updatedEntries);
138+
public SnapshotsInProgress createCopyWithUpdatedEntriesForRepo(String repository, List<Entry> updatedEntries) {
139+
return createCopyWithUpdatedEntriesForRepo(Metadata.DEFAULT_PROJECT_ID, repository, updatedEntries);
140140
}
141141

142-
public SnapshotsInProgress withUpdatedEntriesForRepo(ProjectId projectId, String repository, List<Entry> updatedEntries) {
142+
public SnapshotsInProgress createCopyWithUpdatedEntriesForRepo(ProjectId projectId, String repository, List<Entry> updatedEntries) {
143143
if (updatedEntries.equals(forRepo(projectId, repository))) {
144+
// No changes to apply, return the current object.
144145
return this;
145146
}
147+
146148
final Map<ProjectRepo, ByRepo> copy = new HashMap<>(this.entries);
147149
final var projectRepo = new ProjectRepo(projectId, repository);
148150
if (updatedEntries.isEmpty()) {
@@ -153,13 +155,14 @@ public SnapshotsInProgress withUpdatedEntriesForRepo(ProjectId projectId, String
153155
} else {
154156
copy.put(projectRepo, new ByRepo(updatedEntries));
155157
}
158+
156159
return new SnapshotsInProgress(copy, nodesIdsForRemoval);
157160
}
158161

159162
public SnapshotsInProgress withAddedEntry(Entry entry) {
160163
final List<Entry> forRepo = new ArrayList<>(forRepo(entry.projectId(), entry.repository()));
161164
forRepo.add(entry);
162-
return withUpdatedEntriesForRepo(entry.projectId(), entry.repository(), forRepo);
165+
return createCopyWithUpdatedEntriesForRepo(entry.projectId(), entry.repository(), forRepo);
163166
}
164167

165168
/**

server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.elasticsearch.cluster.routing.IndexRoutingTable;
3434
import org.elasticsearch.cluster.routing.RerouteService;
3535
import org.elasticsearch.cluster.routing.ShardRouting;
36+
import org.elasticsearch.cluster.routing.ShardRoutingState;
3637
import org.elasticsearch.cluster.routing.allocation.AllocationService;
3738
import org.elasticsearch.cluster.routing.allocation.FailedShard;
3839
import org.elasticsearch.cluster.routing.allocation.StaleShard;
@@ -634,10 +635,11 @@ public ClusterState execute(BatchExecutionContext<StartedShardUpdateTask> batchE
634635
List<ShardRouting> shardRoutingsToBeApplied = new ArrayList<>(batchExecutionContext.taskContexts().size());
635636
Set<ShardRouting> seenShardRoutings = new HashSet<>(); // to prevent duplicates
636637
final Map<Index, ClusterStateTimeRanges> updatedTimestampRanges = new HashMap<>();
638+
637639
final ClusterState initialState = batchExecutionContext.initialState();
638640
for (var taskContext : batchExecutionContext.taskContexts()) {
639-
final var task = taskContext.getTask();
640-
final StartedShardEntry startedShardEntry = task.getEntry();
641+
final ShardStateAction.StartedShardUpdateTask task = taskContext.getTask();
642+
final StartedShardEntry startedShardEntry = task.getStartedShardEntry();
641643
final Optional<ProjectMetadata> project = initialState.metadata().lookupProject(startedShardEntry.shardId.getIndex());
642644
final ShardRouting matched = project.map(ProjectMetadata::id)
643645
.map(id -> initialState.routingTable(id).getByAllocationId(startedShardEntry.shardId, startedShardEntry.allocationId))
@@ -917,9 +919,16 @@ public int hashCode() {
917919
}
918920
}
919921

922+
/**
923+
* Task that runs on the master node. Handles responding to the request listener with the result of the update request.
924+
* Task is created when the master node receives a data node request to mark a shard as {@link ShardRoutingState#STARTED}.
925+
*
926+
* @param entry Information about the newly sharted shard.
927+
* @param listener Channel listener with which to respond to the data node.
928+
*/
920929
public record StartedShardUpdateTask(StartedShardEntry entry, ActionListener<Void> listener) implements ClusterStateTaskListener {
921930

922-
public StartedShardEntry getEntry() {
931+
public StartedShardEntry getStartedShardEntry() {
923932
return entry;
924933
}
925934

server/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocationStatsAndWeightsCalculator.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@
1515
import org.elasticsearch.cluster.routing.RoutingNode;
1616
import org.elasticsearch.cluster.routing.RoutingNodes;
1717
import org.elasticsearch.cluster.routing.ShardRouting;
18-
import org.elasticsearch.cluster.routing.allocation.allocator.BalancerSettings;
18+
import org.elasticsearch.cluster.routing.allocation.allocator.BalancingWeights;
19+
import org.elasticsearch.cluster.routing.allocation.allocator.BalancingWeightsFactory;
1920
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalance;
2021
import org.elasticsearch.cluster.routing.allocation.allocator.WeightFunction;
2122
import org.elasticsearch.common.util.Maps;
@@ -28,7 +29,7 @@
2829
*/
2930
public class NodeAllocationStatsAndWeightsCalculator {
3031
private final WriteLoadForecaster writeLoadForecaster;
31-
private final BalancerSettings balancerSettings;
32+
private final BalancingWeightsFactory balancingWeightsFactory;
3233

3334
/**
3435
* Node shard allocation stats and the total node weight.
@@ -42,9 +43,12 @@ public record NodeAllocationStatsAndWeight(
4243
float currentNodeWeight
4344
) {}
4445

45-
public NodeAllocationStatsAndWeightsCalculator(WriteLoadForecaster writeLoadForecaster, BalancerSettings balancerSettings) {
46+
public NodeAllocationStatsAndWeightsCalculator(
47+
WriteLoadForecaster writeLoadForecaster,
48+
BalancingWeightsFactory balancingWeightsFactory
49+
) {
4650
this.writeLoadForecaster = writeLoadForecaster;
47-
this.balancerSettings = balancerSettings;
51+
this.balancingWeightsFactory = balancingWeightsFactory;
4852
}
4953

5054
/**
@@ -60,18 +64,14 @@ public Map<String, NodeAllocationStatsAndWeight> nodesAllocationStatsAndWeights(
6064
// must not use licensed features when just starting up
6165
writeLoadForecaster.refreshLicense();
6266
}
63-
var weightFunction = new WeightFunction(
64-
balancerSettings.getShardBalanceFactor(),
65-
balancerSettings.getIndexBalanceFactor(),
66-
balancerSettings.getWriteLoadBalanceFactor(),
67-
balancerSettings.getDiskUsageBalanceFactor()
68-
);
67+
final BalancingWeights balancingWeights = balancingWeightsFactory.create();
6968
var avgShardsPerNode = WeightFunction.avgShardPerNode(metadata, routingNodes);
7069
var avgWriteLoadPerNode = WeightFunction.avgWriteLoadPerNode(writeLoadForecaster, metadata, routingNodes);
7170
var avgDiskUsageInBytesPerNode = WeightFunction.avgDiskUsageInBytesPerNode(clusterInfo, metadata, routingNodes);
7271

7372
var nodeAllocationStatsAndWeights = Maps.<String, NodeAllocationStatsAndWeight>newMapWithExpectedSize(routingNodes.size());
7473
for (RoutingNode node : routingNodes) {
74+
WeightFunction weightFunction = balancingWeights.weightFunctionForNode(node);
7575
int shards = 0;
7676
int undesiredShards = 0;
7777
double forecastedWriteLoad = 0.0;

0 commit comments

Comments
 (0)