Skip to content

Commit 17fd729

Browse files
authored
Merge branch 'main' into log_on_no_balancing_progress
2 parents 061e2ad + 29f2d0c commit 17fd729

File tree

11 files changed

+134
-26
lines changed

11 files changed

+134
-26
lines changed

modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/210_conditional_processor.yml

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,3 +221,44 @@ teardown:
221221
index: test
222222
id: "2"
223223
catch: missing
224+
225+
---
226+
"Test conditionals support params and statements":
227+
- do:
228+
ingest.put_pipeline:
229+
id: "my_pipeline"
230+
body: >
231+
{
232+
"description": "_description",
233+
"processors": [
234+
{
235+
"set" : {
236+
"if" : {
237+
"source": "def hit = params.success_codes.containsKey(ctx.code); return hit != null && hit == true;",
238+
"params": {
239+
"success_codes" : {
240+
"10": true,
241+
"20": true
242+
}
243+
}
244+
},
245+
"field" : "result",
246+
"value" : "success"
247+
}
248+
}
249+
]
250+
}
251+
- match: { acknowledged: true }
252+
253+
- do:
254+
index:
255+
index: test
256+
id: "1"
257+
pipeline: "my_pipeline"
258+
body: { code: "20" }
259+
260+
- do:
261+
get:
262+
index: test
263+
id: "1"
264+
- match: { _source.result: "success" }

muted-tests.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -467,6 +467,9 @@ tests:
467467
- class: org.elasticsearch.xpack.inference.integration.CCMStorageServiceIT
468468
method: testStoreAndGetCCMModel
469469
issue: https://github.com/elastic/elasticsearch/issues/137630
470+
- class: org.elasticsearch.xpack.esql.qa.single_node.GenerativeMetricsIT
471+
method: test
472+
issue: https://github.com/elastic/elasticsearch/issues/137655
470473

471474
# Examples:
472475
#

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -704,12 +704,18 @@ private boolean balanceByWeights(NodeSorter sorter) {
704704
lowIdx = 0;
705705
highIdx = relevantNodes - 1;
706706

707+
assert allocation.isSimulating() == false || routingNodes.getRelocatingShardCount() == 1
708+
: "unexpected relocation shard count ["
709+
+ routingNodes.getRelocatingShardCount()
710+
+ "] when balancing index ["
711+
+ index
712+
+ "], isSimulating=["
713+
+ allocation.isSimulating()
714+
+ "], earlyReturn=["
715+
+ completeEarlyOnShardAssignmentChange
716+
+ "]";
717+
707718
if (routingNodes.getRelocatingShardCount() > 0) {
708-
// ES-12955: Check routingNodes.getRelocatingShardCount() > 0 in case the first relocation is a THROTTLE.
709-
// This should rarely happen since in most cases, we don't throttle unless there is an existing relocation.
710-
// But it can happen in production for frozen indices when the cache is still being prepared. It can also
711-
// happen in tests because we have decider like RandomAllocationDecider that can randomly return THROTTLE
712-
// when there is no existing relocation.
713719
shardBalanced = true;
714720
}
715721
if (completeEarlyOnShardAssignmentChange && shardBalanced) {
@@ -821,6 +827,20 @@ public boolean moveShards() {
821827
shardRouting,
822828
bestNonPreferredShardMovementsTracker::shardIsBetterThanCurrent
823829
);
830+
// A THROTTLE allocation decision can happen when not simulating
831+
assert moveDecision.isDecisionTaken() == false
832+
|| allocation.isSimulating() == false
833+
|| moveDecision.getAllocationDecision() != AllocationDecision.THROTTLED
834+
: "unexpected allocation decision ["
835+
+ moveDecision.getAllocationDecision()
836+
+ "] (isSimulating="
837+
+ allocation.isSimulating()
838+
+ ") with "
839+
+ (shardMoved ? "" : "no ")
840+
+ "prior shard movements when moving shard ["
841+
+ shardRouting
842+
+ "]";
843+
824844
if (moveDecision.isDecisionTaken() && moveDecision.cannotRemainAndCanMove()) {
825845
// Defer moving of not-preferred until we've moved the NOs
826846
if (moveDecision.getCanRemainDecision().type() == Type.NOT_PREFERRED) {
@@ -1233,6 +1253,21 @@ private boolean allocateUnassigned() {
12331253
ShardRouting shard = primary[i];
12341254
final ProjectIndex index = projectIndex(shard);
12351255
final AllocateUnassignedDecision allocationDecision = decideAllocateUnassigned(index, shard);
1256+
1257+
assert allocationDecision.isDecisionTaken() : "decision not taken for unassigned shard [" + shard + "]";
1258+
1259+
// If we see a THROTTLE decision, it's either:
1260+
// 1. Not simulating
1261+
// 2. Or, there is shard assigned before this one
1262+
assert allocation.isSimulating() == false
1263+
|| allocationDecision.getAllocationStatus() != AllocationStatus.DECIDERS_THROTTLED
1264+
|| shardAssignmentChanged
1265+
: "unexpected THROTTLE decision (isSimulating="
1266+
+ allocation.isSimulating()
1267+
+ ") with no prior assignment when allocating unassigned shard ["
1268+
+ shard
1269+
+ "]";
1270+
12361271
final String assignedNodeId = allocationDecision.getTargetNode() != null
12371272
? allocationDecision.getTargetNode().getId()
12381273
: null;
@@ -1269,9 +1304,6 @@ private boolean allocateUnassigned() {
12691304
assert allocationDecision.getAllocationStatus() == AllocationStatus.DECIDERS_THROTTLED;
12701305
final long shardSize = getExpectedShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE, allocation);
12711306
minNode.addShard(projectIndex(shard), shard.initialize(minNode.getNodeId(), null, shardSize));
1272-
// If we see a throttle decision in simulation, there must be other shards that got assigned before it.
1273-
assert allocation.isSimulating() == false || shardAssignmentChanged
1274-
: "shard " + shard + " was throttled but no other shards were assigned";
12751307
} else {
12761308
if (logger.isTraceEnabled()) {
12771309
logger.trace("No Node found to assign shard [{}]", shard);

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputer.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -486,6 +486,13 @@ public DesiredBalance compute(
486486
|| info.lastAllocationStatus() == UnassignedInfo.AllocationStatus.DECIDERS_THROTTLED) : "Unexpected stats in: " + info;
487487

488488
if (hasChanges == false && info.lastAllocationStatus() == UnassignedInfo.AllocationStatus.DECIDERS_THROTTLED) {
489+
// Unassigned ignored shards must be based on the provided set of ignoredShards
490+
assert ignoredShards.contains(discardAllocationStatus(shard))
491+
|| ignoredShards.stream().filter(ShardRouting::primary).anyMatch(primary -> primary.shardId().equals(shard.shardId()))
492+
: "ignored shard "
493+
+ shard
494+
+ " unexpectedly has THROTTLE status and no counterpart in the provided ignoredShards set "
495+
+ ignoredShards;
489496
// Simulation could not progress due to missing information in any of the deciders.
490497
// Currently, this could happen if `HasFrozenCacheAllocationDecider` is still fetching the data.
491498
// Progress would be made after the followup reroute call.

server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,8 @@ public Decision canRebalance(RoutingAllocation allocation) {
144144
int relocatingFrozenShards = allocation.routingNodes().getRelocatingFrozenShardCount();
145145
int relocatingShards = allocation.routingNodes().getRelocatingShardCount();
146146
if (allocation.isSimulating() && relocatingShards >= 2) {
147+
// This branch should no longer run after https://github.com/elastic/elasticsearch/pull/134786
148+
assert false : "allocation simulation should have returned earlier and not hit throttling";
147149
// BalancedShardAllocator is prone to perform unnecessary moves when cluster_concurrent_rebalance is set to high values (>2).
148150
// (See https://github.com/elastic/elasticsearch/issues/87279)
149151
// Above allocator is used in DesiredBalanceComputer. Since we do not move actual shard data during calculation

server/src/main/java/org/elasticsearch/search/crossproject/CrossProjectIndexExpressionsRewriter.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ public class CrossProjectIndexExpressionsRewriter {
3535
public static TransportVersion NO_MATCHING_PROJECT_EXCEPTION_VERSION = TransportVersion.fromName("no_matching_project_exception");
3636

3737
private static final Logger logger = LogManager.getLogger(CrossProjectIndexExpressionsRewriter.class);
38-
private static final String ORIGIN_PROJECT_KEY = "_origin";
3938
private static final String[] MATCH_ALL = new String[] { Metadata.ALL };
4039
private static final String EXCLUSION = "-";
4140
private static final String DATE_MATH = "<";
@@ -161,12 +160,12 @@ private static IndexRewriteResult rewriteQualifiedExpression(
161160
String indexExpression = splitResource[1];
162161
maybeThrowOnUnsupportedResource(indexExpression);
163162

164-
if (originProjectAlias != null && ORIGIN_PROJECT_KEY.equals(requestedProjectAlias)) {
163+
if (originProjectAlias != null && ProjectRoutingResolver.ORIGIN.equals(requestedProjectAlias)) {
165164
// handling case where we have a qualified expression like: _origin:indexName
166165
return new IndexRewriteResult(indexExpression);
167166
}
168167

169-
if (originProjectAlias == null && ORIGIN_PROJECT_KEY.equals(requestedProjectAlias)) {
168+
if (originProjectAlias == null && ProjectRoutingResolver.ORIGIN.equals(requestedProjectAlias)) {
170169
// handling case where we have a qualified expression like: _origin:indexName but no _origin project is set
171170
throw new NoMatchingProjectException(requestedProjectAlias);
172171
}

server/src/main/java/org/elasticsearch/search/crossproject/CrossProjectRoutingResolver.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
*/
2626
public class CrossProjectRoutingResolver implements ProjectRoutingResolver {
2727
private static final String ALIAS = "_alias:";
28-
private static final String ORIGIN = "_origin";
2928
private static final int ALIAS_LENGTH = ALIAS.length();
3029
private static final String ALIAS_MATCH_ALL = ALIAS + "*";
3130
private static final String ALIAS_MATCH_ORIGIN = ALIAS + ORIGIN;

server/src/main/java/org/elasticsearch/search/crossproject/ProjectRoutingResolver.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,11 @@
1414
*/
1515
public interface ProjectRoutingResolver {
1616

17+
/**
18+
* The reserved term for representing the origin project in project routing.
19+
*/
20+
String ORIGIN = "_origin";
21+
1722
/**
1823
* Filters the specified TargetProjects based on the provided project routing string
1924
* @param projectRouting the project_routing specified in the request object

server/src/test/java/org/elasticsearch/cluster/routing/allocation/RandomAllocationDeciderTests.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -206,29 +206,33 @@ public RandomAllocationDecider(Random random) {
206206

207207
@Override
208208
public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) {
209-
return getRandomDecision();
209+
return getRandomDecision(allocation.isSimulating() == false || allocation.routingNodes().getRelocatingShardCount() > 0);
210210
}
211211

212-
private Decision getRandomDecision() {
212+
private Decision getRandomDecision(boolean canThrottle) {
213213
if (alwaysSayYes) {
214214
return Decision.YES;
215215
}
216216
return switch (random.nextInt(10)) {
217217
case 9, 8, 7, 6, 5 -> Decision.NO;
218-
case 4 -> Decision.THROTTLE;
218+
case 4 -> canThrottle ? Decision.THROTTLE : Decision.YES;
219219
case 3, 2, 1 -> Decision.YES;
220220
default -> Decision.ALWAYS;
221221
};
222222
}
223223

224224
@Override
225225
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
226-
return getRandomDecision();
226+
return getRandomDecision(
227+
allocation.isSimulating() == false
228+
|| allocation.routingNodes().getIncomingRecoveries(node.nodeId()) > 0
229+
|| allocation.routingNodes().getOutgoingRecoveries(node.nodeId()) > 0
230+
);
227231
}
228232

229233
@Override
230234
public Decision canRemain(IndexMetadata indexMetadata, ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
231-
return getRandomDecision();
235+
return getRandomDecision(false); // throttle does not make sense for canRemain
232236
}
233237

234238
}

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/allocation/decider/HasFrozenCacheAllocationDecider.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,12 @@ public class HasFrozenCacheAllocationDecider extends AllocationDecider {
2828
"value of [" + SHARED_CACHE_SIZE_SETTING.getKey() + "] on this node is not known yet"
2929
);
3030

31+
private static final Decision NO_STILL_FETCHING = Decision.single(
32+
Decision.Type.NO,
33+
NAME,
34+
"Shard movement is not allowed in simulation when value of [" + SHARED_CACHE_SIZE_SETTING.getKey() + "] on this node is not known"
35+
);
36+
3137
private static final Decision HAS_FROZEN_CACHE = Decision.single(
3238
Decision.Type.YES,
3339
NAME,
@@ -62,26 +68,26 @@ public HasFrozenCacheAllocationDecider(FrozenCacheInfoService frozenCacheService
6268

6369
@Override
6470
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
65-
return canAllocateToNode(allocation.metadata().indexMetadata(shardRouting.index()), node.node());
71+
return canAllocateToNode(allocation.metadata().indexMetadata(shardRouting.index()), node.node(), allocation);
6672
}
6773

6874
@Override
6975
public Decision canRemain(IndexMetadata indexMetadata, ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
70-
return canAllocateToNode(indexMetadata, node.node());
76+
return canAllocateToNode(indexMetadata, node.node(), allocation);
7177
}
7278

7379
@Override
7480
public Decision canAllocate(IndexMetadata indexMetadata, RoutingNode node, RoutingAllocation allocation) {
75-
return canAllocateToNode(indexMetadata, node.node());
81+
return canAllocateToNode(indexMetadata, node.node(), allocation);
7682
}
7783

7884
@Override
7985
public Decision shouldAutoExpandToNode(IndexMetadata indexMetadata, DiscoveryNode node, RoutingAllocation allocation) {
80-
return canAllocateToNode(indexMetadata, node);
86+
return canAllocateToNode(indexMetadata, node, allocation);
8187
}
8288

8389
// Package private for tests
84-
Decision canAllocateToNode(IndexMetadata indexMetadata, DiscoveryNode discoveryNode) {
90+
Decision canAllocateToNode(IndexMetadata indexMetadata, DiscoveryNode discoveryNode, RoutingAllocation allocation) {
8591
if (indexMetadata.isPartialSearchableSnapshot() == false) {
8692
return Decision.ALWAYS;
8793
}
@@ -90,7 +96,8 @@ Decision canAllocateToNode(IndexMetadata indexMetadata, DiscoveryNode discoveryN
9096
case HAS_CACHE -> HAS_FROZEN_CACHE;
9197
case NO_CACHE -> NO_FROZEN_CACHE;
9298
case FAILED -> UNKNOWN_FROZEN_CACHE;
93-
case FETCHING -> STILL_FETCHING;
99+
// TODO: considering returning NO as well for non-simulation ES-13378
100+
case FETCHING -> allocation.isSimulating() ? NO_STILL_FETCHING : STILL_FETCHING;
94101
case UNKNOWN -> NO_UNKNOWN_NODE;
95102
};
96103
}

0 commit comments

Comments
 (0)