Skip to content

Commit 0108d23

Browse files
Merge branch 'main' into cuvs-snapshot-update
2 parents fb4f37f + c19d4d7 commit 0108d23

File tree

12 files changed

+712
-568
lines changed

12 files changed

+712
-568
lines changed

muted-tests.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -726,6 +726,9 @@ tests:
726726
- class: org.elasticsearch.xpack.esql.ccq.MultiClusterSpecIT
727727
method: test {csv-spec:fork.ForkWithFiltersOnConstantValues}
728728
issue: https://github.com/elastic/elasticsearch/issues/136031
729+
- class: org.elasticsearch.xpack.security.authz.microsoft.MicrosoftGraphAuthzPluginIT
730+
method: testConcurrentAuthentication
731+
issue: https://github.com/elastic/elasticsearch/issues/135777
729732

730733
# Examples:
731734
#

server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java

Lines changed: 71 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.elasticsearch.cluster.routing.ShardRouting;
2929
import org.elasticsearch.cluster.routing.UnassignedInfo;
3030
import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings;
31+
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
3132
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceMetrics;
3233
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator;
3334
import org.elasticsearch.cluster.service.ClusterService;
@@ -54,14 +55,18 @@
5455
import java.nio.file.Path;
5556
import java.util.ArrayList;
5657
import java.util.Collection;
58+
import java.util.Collections;
5759
import java.util.HashMap;
5860
import java.util.List;
5961
import java.util.Map;
62+
import java.util.Set;
6063
import java.util.concurrent.CountDownLatch;
64+
import java.util.stream.StreamSupport;
6165

6266
import static java.util.stream.IntStream.range;
6367
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
6468
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
69+
import static org.hamcrest.Matchers.equalTo;
6570
import static org.hamcrest.Matchers.everyItem;
6671
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
6772
import static org.hamcrest.Matchers.hasSize;
@@ -130,7 +135,7 @@ public void testHighNodeWriteLoadPreventsNewShardAllocation() {
130135
setUpMockTransportIndicesStatsResponse(
131136
harness.firstDiscoveryNode,
132137
indexMetadata.getNumberOfShards(),
133-
createShardStatsResponseForIndex(indexMetadata, harness.randomShardWriteLoad, harness.firstDataNodeId)
138+
createShardStatsResponseForIndex(indexMetadata, harness.maxShardWriteLoad, harness.firstDataNodeId)
134139
);
135140
setUpMockTransportIndicesStatsResponse(harness.secondDiscoveryNode, 0, List.of());
136141
setUpMockTransportIndicesStatsResponse(harness.thirdDiscoveryNode, 0, List.of());
@@ -235,7 +240,7 @@ public void testShardsAreAssignedToNotPreferredWhenAlternativeIsNo() {
235240
setUpMockTransportIndicesStatsResponse(
236241
harness.firstDiscoveryNode,
237242
indexMetadata.getNumberOfShards(),
238-
createShardStatsResponseForIndex(indexMetadata, harness.randomShardWriteLoad, harness.firstDataNodeId)
243+
createShardStatsResponseForIndex(indexMetadata, harness.maxShardWriteLoad, harness.firstDataNodeId)
239244
);
240245
setUpMockTransportIndicesStatsResponse(harness.secondDiscoveryNode, 0, List.of());
241246
setUpMockTransportIndicesStatsResponse(harness.thirdDiscoveryNode, 0, List.of());
@@ -333,7 +338,7 @@ public void testCanRemainNotPreferredIsIgnoredWhenAllOtherNodesReturnNotPreferre
333338
setUpMockTransportIndicesStatsResponse(
334339
harness.firstDiscoveryNode,
335340
indexMetadata.getNumberOfShards(),
336-
createShardStatsResponseForIndex(indexMetadata, harness.randomShardWriteLoad, harness.firstDataNodeId)
341+
createShardStatsResponseForIndex(indexMetadata, harness.maxShardWriteLoad, harness.firstDataNodeId)
337342
);
338343
setUpMockTransportIndicesStatsResponse(harness.secondDiscoveryNode, 0, List.of());
339344
setUpMockTransportIndicesStatsResponse(harness.thirdDiscoveryNode, 0, List.of());
@@ -429,15 +434,12 @@ public void testCanRemainRelocatesOneShardWhenAHotSpotOccurs() {
429434
* will show that all shards have non-empty write load stats (so that the WriteLoadDecider will evaluate assigning them to a node).
430435
*/
431436

432-
IndexMetadata indexMetadata = internalCluster().getCurrentMasterNodeInstance(ClusterService.class)
433-
.state()
434-
.getMetadata()
435-
.getProject()
436-
.index(harness.indexName);
437+
final ClusterState originalClusterState = internalCluster().getCurrentMasterNodeInstance(ClusterService.class).state();
438+
final IndexMetadata indexMetadata = originalClusterState.getMetadata().getProject().index(harness.indexName);
437439
setUpMockTransportIndicesStatsResponse(
438440
harness.firstDiscoveryNode,
439441
indexMetadata.getNumberOfShards(),
440-
createShardStatsResponseForIndex(indexMetadata, harness.randomShardWriteLoad, harness.firstDataNodeId)
442+
createShardStatsResponseForIndex(indexMetadata, harness.maxShardWriteLoad, harness.firstDataNodeId)
441443
);
442444
setUpMockTransportIndicesStatsResponse(harness.secondDiscoveryNode, 0, List.of());
443445
setUpMockTransportIndicesStatsResponse(harness.thirdDiscoveryNode, 0, List.of());
@@ -483,6 +485,7 @@ public void testCanRemainRelocatesOneShardWhenAHotSpotOccurs() {
483485
harness.randomNumberOfShards,
484486
countShardsStillAssignedToFirstNode + 1
485487
);
488+
assertThatTheBestShardWasMoved(harness, originalClusterState, desiredBalanceResponse);
486489
} catch (AssertionError error) {
487490
ClusterState state = client().admin()
488491
.cluster()
@@ -498,6 +501,36 @@ public void testCanRemainRelocatesOneShardWhenAHotSpotOccurs() {
498501
}
499502
}
500503

504+
/**
505+
* Determine which shard was moved and check that it's the "best" according to
506+
* {@link org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.Balancer.PrioritiseByShardWriteLoadComparator}
507+
*/
508+
private void assertThatTheBestShardWasMoved(
509+
TestHarness harness,
510+
ClusterState originalClusterState,
511+
DesiredBalanceResponse desiredBalanceResponse
512+
) {
513+
int movedShardId = desiredBalanceResponse.getRoutingTable().get(harness.indexName).entrySet().stream().filter(e -> {
514+
Set<String> desiredNodeIds = e.getValue().desired().nodeIds();
515+
return desiredNodeIds.contains(harness.secondDiscoveryNode.getId())
516+
|| desiredNodeIds.contains(harness.thirdDiscoveryNode.getId());
517+
}).findFirst().map(Map.Entry::getKey).orElseThrow(() -> new AssertionError("No shard was moved to a non-hot-spotting node"));
518+
519+
final BalancedShardsAllocator.Balancer.PrioritiseByShardWriteLoadComparator comparator =
520+
new BalancedShardsAllocator.Balancer.PrioritiseByShardWriteLoadComparator(
521+
desiredBalanceResponse.getClusterInfo(),
522+
originalClusterState.getRoutingNodes().node(harness.firstDataNodeId)
523+
);
524+
525+
final List<ShardRouting> bestShardsToMove = StreamSupport.stream(
526+
originalClusterState.getRoutingNodes().node(harness.firstDataNodeId).spliterator(),
527+
false
528+
).sorted(comparator).toList();
529+
530+
// The moved shard should be at the head of the sorted list
531+
assertThat(movedShardId, equalTo(bestShardsToMove.get(0).shardId().id()));
532+
}
533+
501534
public void testMaxQueueLatencyMetricIsPublished() {
502535
final Settings settings = Settings.builder()
503536
.put(
@@ -659,16 +692,35 @@ private NodeUsageStatsForThreadPools createNodeUsageStatsForThreadPools(
659692
}
660693

661694
/**
662-
* Helper to create a list of dummy {@link ShardStats} for the given index, each shard reporting a {@code peakShardWriteLoad} stat.
695+
* Helper to create a list of dummy {@link ShardStats} for the given index, each shard being randomly allocated a peak write load
696+
* between 0 and {@code maximumShardWriteLoad}. There will always be at least one shard reporting the specified
697+
* {@code maximumShardWriteLoad}.
663698
*/
664699
private List<ShardStats> createShardStatsResponseForIndex(
665700
IndexMetadata indexMetadata,
666-
float peakShardWriteLoad,
701+
float maximumShardWriteLoad,
667702
String assignedShardNodeId
668703
) {
669-
List<ShardStats> shardStats = new ArrayList<>(indexMetadata.getNumberOfShards());
704+
// Randomly distribute shards' peak write-loads so that we can check later that shard movements are prioritized correctly
705+
final double writeLoadThreshold = maximumShardWriteLoad
706+
* BalancedShardsAllocator.Balancer.PrioritiseByShardWriteLoadComparator.THRESHOLD_RATIO;
707+
final List<Double> shardPeakWriteLoads = new ArrayList<>();
708+
// Need at least one with the maximum write-load
709+
shardPeakWriteLoads.add((double) maximumShardWriteLoad);
710+
final int remainingShards = indexMetadata.getNumberOfShards() - 1;
711+
// Some over-threshold, some under
712+
for (int i = 0; i < remainingShards; ++i) {
713+
if (randomBoolean()) {
714+
shardPeakWriteLoads.add(randomDoubleBetween(writeLoadThreshold, maximumShardWriteLoad, true));
715+
} else {
716+
shardPeakWriteLoads.add(randomDoubleBetween(0.0, writeLoadThreshold, true));
717+
}
718+
}
719+
assertThat(shardPeakWriteLoads, hasSize(indexMetadata.getNumberOfShards()));
720+
Collections.shuffle(shardPeakWriteLoads, random());
721+
final List<ShardStats> shardStats = new ArrayList<>(indexMetadata.getNumberOfShards());
670722
for (int i = 0; i < indexMetadata.getNumberOfShards(); i++) {
671-
shardStats.add(createShardStats(indexMetadata, i, peakShardWriteLoad, assignedShardNodeId));
723+
shardStats.add(createShardStats(indexMetadata, i, shardPeakWriteLoads.get(i), assignedShardNodeId));
672724
}
673725
return shardStats;
674726
}
@@ -719,7 +771,7 @@ private TestHarness setUpThreeTestNodesAndAllIndexShardsOnFirstNode() {
719771
int randomUtilizationThresholdPercent = randomIntBetween(50, 100);
720772
int randomNumberOfWritePoolThreads = randomIntBetween(2, 20);
721773
long randomQueueLatencyThresholdMillis = randomLongBetween(1, 20_000);
722-
float randomShardWriteLoad = randomFloatBetween(0.0f, 0.01f, false);
774+
float maximumShardWriteLoad = randomFloatBetween(0.0f, 0.01f, false);
723775
Settings settings = enabledWriteLoadDeciderSettings(randomUtilizationThresholdPercent, randomQueueLatencyThresholdMillis);
724776

725777
internalCluster().startMasterOnlyNode(settings);
@@ -756,8 +808,8 @@ private TestHarness setUpThreeTestNodesAndAllIndexShardsOnFirstNode() {
756808
+ randomUtilizationThresholdPercent
757809
+ ", write threads: "
758810
+ randomNumberOfWritePoolThreads
759-
+ ", individual shard write loads: "
760-
+ randomShardWriteLoad
811+
+ ", maximum shard write load: "
812+
+ maximumShardWriteLoad
761813
);
762814

763815
/**
@@ -775,7 +827,7 @@ private TestHarness setUpThreeTestNodesAndAllIndexShardsOnFirstNode() {
775827

776828
// Calculate the maximum utilization a node can report while still being able to accept all relocating shards
777829
int shardWriteLoadOverhead = shardLoadUtilizationOverhead(
778-
randomShardWriteLoad * randomNumberOfShards,
830+
maximumShardWriteLoad * randomNumberOfShards,
779831
randomNumberOfWritePoolThreads
780832
);
781833
int maxUtilBelowThresholdThatAllowsAllShardsToRelocate = randomUtilizationThresholdPercent - shardWriteLoadOverhead - 1;
@@ -819,7 +871,7 @@ private TestHarness setUpThreeTestNodesAndAllIndexShardsOnFirstNode() {
819871
randomUtilizationThresholdPercent,
820872
randomNumberOfWritePoolThreads,
821873
randomQueueLatencyThresholdMillis,
822-
randomShardWriteLoad,
874+
maximumShardWriteLoad,
823875
indexName,
824876
randomNumberOfShards,
825877
maxUtilBelowThresholdThatAllowsAllShardsToRelocate
@@ -842,7 +894,7 @@ record TestHarness(
842894
int randomUtilizationThresholdPercent,
843895
int randomNumberOfWritePoolThreads,
844896
long randomQueueLatencyThresholdMillis,
845-
float randomShardWriteLoad,
897+
float maxShardWriteLoad,
846898
String indexName,
847899
int randomNumberOfShards,
848900
int maxUtilBelowThresholdThatAllowsAllShardsToRelocate

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,6 @@ static TransportVersion def(int id) {
5353
}
5454

5555
// TODO: ES-10337 we can remove all transport versions earlier than 8.18
56-
public static final TransportVersion V_7_1_0 = def(7_01_00_99);
57-
public static final TransportVersion V_7_2_0 = def(7_02_00_99);
5856
public static final TransportVersion V_7_3_0 = def(7_03_00_99);
5957
public static final TransportVersion V_7_3_2 = def(7_03_02_99);
6058
public static final TransportVersion V_7_4_0 = def(7_04_00_99);

server/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java

Lines changed: 20 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
package org.elasticsearch.action.support.replication;
1111

12+
import org.elasticsearch.TransportVersion;
1213
import org.elasticsearch.action.ActionRequestValidationException;
1314
import org.elasticsearch.action.IndicesRequest;
1415
import org.elasticsearch.action.LegacyActionRequest;
@@ -40,6 +41,11 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
4041

4142
public static final TimeValue DEFAULT_TIMEOUT = TimeValue.timeValueMinutes(1);
4243

44+
// superseded
45+
private static final TransportVersion INDEX_RESHARD_SHARDCOUNT_SUMMARY = TransportVersion.fromName("index_reshard_shardcount_summary");
46+
// bumped to use VInt instead of Int
47+
private static final TransportVersion INDEX_RESHARD_SHARDCOUNT_SMALL = TransportVersion.fromName("index_reshard_shardcount_small");
48+
4349
/**
4450
* Target shard the request should execute on. In case of index and delete requests,
4551
* shard id gets resolved by the transport action before performing request operation
@@ -51,41 +57,8 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
5157
protected String index;
5258

5359
/**
54-
* The reshardSplitShardCountSummary has been added to accommodate the Resharding feature.
55-
* This is populated when the coordinator is deciding which shards a request applies to.
56-
* For example, {@link org.elasticsearch.action.bulk.BulkOperation} splits
57-
* an incoming bulk request into shard level {@link org.elasticsearch.action.bulk.BulkShardRequest}
58-
* based on its cluster state view of the number of shards that are ready for indexing.
59-
* The purpose of this metadata is to reconcile the cluster state visible at the coordinating
60-
* node with that visible at the source shard node. (w.r.t resharding).
61-
* When an index is being split, there is a point in time when the newly created shard (target shard)
62-
* takes over its portion of the document space from the original shard (source shard).
63-
* Although the handoff is atomic at the original (source shard) and new shards (target shard),
64-
* there is a window of time between the coordinating node creating a shard request and the shard receiving and processing it.
65-
* This field is used by the original shard (source shard) when it processes the request to detect whether
66-
* the coordinator's view of the new shard's state when it created the request matches the shard's current state,
67-
* or whether the request must be reprocessed taking into account the current shard states.
68-
*
69-
* Note that we are able to get away with a single number, instead of an array of target shard states,
70-
* because we only allow splits in increments of 2x.
71-
*
72-
* Example 1:
73-
* Suppose we are resharding an index from 2 -> 4 shards. While splitting a bulk request, the coordinator observes
74-
* that target shards are not ready for indexing. So requests that are meant for shard 0 and 2 are bundled together,
75-
* sent to shard 0 with “reshardSplitShardCountSummary” 2 in the request.
76-
* Requests that are meant for shard 1 and 3 are bundled together,
77-
* sent to shard 1 with “reshardSplitShardCountSummary” 2 in the request.
78-
*
79-
* Example 2:
80-
* Suppose we are resharding an index from 4 -> 8 shards. While splitting a bulk request, the coordinator observes
81-
* that source shard 0 has completed HANDOFF but source shards 1, 2, 3 have not completed handoff.
82-
* So, the shard-bulk-request it sends to shard 0 and 4 has the "reshardSplitShardCountSummary" 8,
83-
* while the shard-bulk-request it sends to shard 1,2,3 has the "reshardSplitShardCountSummary" 4.
84-
* Note that in this case no shard-bulk-request is sent to shards 5, 6, 7 and the requests that were meant for these target shards
85-
* are bundled together with and sent to their source shards.
86-
*
87-
* A value of 0 indicates an INVALID reshardSplitShardCountSummary. Hence, a request with INVALID reshardSplitShardCountSummary
88-
* will be treated as a Summary mismatch on the source shard node.
60+
* The reshardSplitShardCountSummary has been added to support in-place resharding.
61+
* See {@link SplitShardCountSummary} for details.
8962
*/
9063
protected final SplitShardCountSummary reshardSplitShardCountSummary;
9164

@@ -128,7 +101,13 @@ public ReplicationRequest(@Nullable ShardId shardId, SplitShardCountSummary resh
128101
if (thinRead) {
129102
this.reshardSplitShardCountSummary = reshardSplitShardCountSummary;
130103
} else {
131-
this.reshardSplitShardCountSummary = new SplitShardCountSummary(in);
104+
if (in.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SMALL)) {
105+
this.reshardSplitShardCountSummary = SplitShardCountSummary.fromInt(in.readVInt());
106+
} else if (in.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SUMMARY)) {
107+
this.reshardSplitShardCountSummary = SplitShardCountSummary.fromInt(in.readInt());
108+
} else {
109+
this.reshardSplitShardCountSummary = SplitShardCountSummary.UNSET;
110+
}
132111
}
133112
}
134113

@@ -257,7 +236,11 @@ public void writeTo(StreamOutput out) throws IOException {
257236
out.writeTimeValue(timeout);
258237
out.writeString(index);
259238
out.writeVLong(routedBasedOnClusterVersion);
260-
reshardSplitShardCountSummary.writeTo(out);
239+
if (out.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SMALL)) {
240+
out.writeVInt(reshardSplitShardCountSummary.asInt());
241+
} else if (out.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SUMMARY)) {
242+
out.writeInt(reshardSplitShardCountSummary.asInt());
243+
}
261244
}
262245

263246
/**

0 commit comments

Comments
 (0)