Skip to content

Commit af78439

Browse files
authored
Merge branch 'main' into moreGeoFix
2 parents ce11df1 + 19c035f commit af78439

File tree

17 files changed

+611
-110
lines changed

17 files changed

+611
-110
lines changed

build-tools-internal/src/main/java/org/elasticsearch/gradle/internal/ElasticsearchTestBasePlugin.java

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -330,24 +330,22 @@ private static void configureEntitlements(Project project) {
330330
.matching(test -> TEST_TASKS_WITH_ENTITLEMENTS.contains(test.getName()))
331331
.configureEach(test -> {
332332
// See also SystemJvmOptions.maybeAttachEntitlementAgent.
333+
SystemPropertyCommandLineArgumentProvider nonInputSystemProperties = test.getExtensions()
334+
.getByType(SystemPropertyCommandLineArgumentProvider.class);
333335

334336
// Agent
335-
if (agentFiles.isEmpty() == false) {
336-
test.getInputs().files(agentFiles);
337-
test.systemProperty("es.entitlement.agentJar", agentFiles.getAsPath());
338-
test.systemProperty("jdk.attach.allowAttachSelf", true);
339-
}
337+
test.getInputs().files(agentFiles).optional(true);
338+
nonInputSystemProperties.systemProperty("es.entitlement.agentJar", agentFiles::getAsPath);
339+
nonInputSystemProperties.systemProperty("jdk.attach.allowAttachSelf", () -> agentFiles.isEmpty() ? "false" : "true");
340340

341341
// Bridge
342-
if (bridgeFiles.isEmpty() == false) {
343-
String modulesContainingEntitlementInstrumentation = "java.logging,java.net.http,java.naming,jdk.net";
344-
test.getInputs().files(bridgeFiles);
345-
// Tests may not be modular, but the JDK still is
346-
test.jvmArgs(
347-
"--add-exports=java.base/org.elasticsearch.entitlement.bridge=ALL-UNNAMED,"
348-
+ modulesContainingEntitlementInstrumentation
349-
);
350-
}
342+
String modulesContainingEntitlementInstrumentation = "java.logging,java.net.http,java.naming,jdk.net";
343+
test.getInputs().files(bridgeFiles).optional(true);
344+
// Tests may not be modular, but the JDK still is
345+
test.jvmArgs(
346+
"--add-exports=java.base/org.elasticsearch.entitlement.bridge=ALL-UNNAMED,"
347+
+ modulesContainingEntitlementInstrumentation
348+
);
351349
});
352350
}
353351

muted-tests.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -648,9 +648,6 @@ tests:
648648
- class: org.elasticsearch.compute.data.BasicBlockTests
649649
method: testFloatBlock
650650
issue: https://github.com/elastic/elasticsearch/issues/133621
651-
- class: org.elasticsearch.xpack.esql.action.TimeSeriesIT
652-
method: testProfile
653-
issue: https://github.com/elastic/elasticsearch/issues/133624
654651

655652
# Examples:
656653
#

server/src/main/java/org/elasticsearch/action/support/replication/TransportBroadcastReplicationAction.java

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,13 @@
2222
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException;
2323
import org.elasticsearch.client.internal.node.NodeClient;
2424
import org.elasticsearch.cluster.ClusterState;
25+
import org.elasticsearch.cluster.ProjectState;
2526
import org.elasticsearch.cluster.metadata.IndexMetadata;
2627
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
2728
import org.elasticsearch.cluster.metadata.ProjectMetadata;
2829
import org.elasticsearch.cluster.project.ProjectResolver;
29-
import org.elasticsearch.cluster.routing.IndexRoutingTable;
30-
import org.elasticsearch.cluster.routing.RoutingTable;
30+
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
31+
import org.elasticsearch.cluster.routing.OperationRouting;
3132
import org.elasticsearch.cluster.service.ClusterService;
3233
import org.elasticsearch.common.io.stream.Writeable;
3334
import org.elasticsearch.common.util.concurrent.EsExecutors;
@@ -39,6 +40,7 @@
3940

4041
import java.util.ArrayList;
4142
import java.util.Arrays;
43+
import java.util.Iterator;
4244
import java.util.List;
4345
import java.util.Map;
4446
import java.util.concurrent.Executor;
@@ -98,8 +100,9 @@ public void accept(ActionListener<Response> listener) {
98100
assert totalShardCopyCount == 0 && successShardCopyCount == 0 && allFailures.isEmpty() : "shouldn't call this twice";
99101

100102
final ClusterState clusterState = clusterService.state();
101-
final ProjectMetadata project = projectResolver.getProjectMetadata(clusterState);
102-
final List<ShardId> shards = shards(request, project, clusterState.routingTable(project.id()));
103+
final ProjectState projectState = projectResolver.getProjectState(clusterState);
104+
final ProjectMetadata project = projectState.metadata();
105+
final List<ShardId> shards = shards(request, projectState);
103106
final Map<String, IndexMetadata> indexMetadataByName = project.indices();
104107

105108
try (var refs = new RefCountingRunnable(() -> finish(listener))) {
@@ -185,17 +188,17 @@ protected void shardExecute(Task task, Request request, ShardId shardId, ActionL
185188
/**
186189
* @return all shard ids the request should run on
187190
*/
188-
protected List<ShardId> shards(Request request, ProjectMetadata project, RoutingTable indexRoutingTables) {
191+
protected List<ShardId> shards(Request request, ProjectState projectState) {
189192
assert Transports.assertNotTransportThread("may hit all the shards");
190193
List<ShardId> shardIds = new ArrayList<>();
191-
String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(project, request);
194+
195+
OperationRouting operationRouting = clusterService.operationRouting();
196+
197+
String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(projectState.metadata(), request);
192198
for (String index : concreteIndices) {
193-
IndexMetadata indexMetadata = project.indices().get(index);
194-
if (indexMetadata != null) {
195-
final IndexRoutingTable indexRoutingTable = indexRoutingTables.indicesRouting().get(index);
196-
for (int i = 0; i < indexRoutingTable.size(); i++) {
197-
shardIds.add(indexRoutingTable.shard(i).shardId());
198-
}
199+
Iterator<IndexShardRoutingTable> iterator = operationRouting.allWritableShards(projectState, index);
200+
while (iterator.hasNext()) {
201+
shardIds.add(iterator.next().shardId());
199202
}
200203
}
201204
return shardIds;

server/src/main/java/org/elasticsearch/cluster/routing/IndexRouting.java

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -159,11 +159,18 @@ public void checkIndexSplitAllowed() {}
159159
* @param shardId shardId to which the current document is routed based on hashing
160160
* @return Updated shardId
161161
*/
162-
protected final int rerouteIfResharding(int shardId) {
162+
protected final int rerouteWritesIfResharding(int shardId) {
163+
return rerouteFromSplitTargetShard(shardId, IndexReshardingState.Split.TargetShardState.HANDOFF);
164+
}
165+
166+
protected final int rerouteSearchIfResharding(int shardId) {
167+
return rerouteFromSplitTargetShard(shardId, IndexReshardingState.Split.TargetShardState.SPLIT);
168+
}
169+
170+
private int rerouteFromSplitTargetShard(int shardId, IndexReshardingState.Split.TargetShardState minimumRequiredState) {
171+
assert indexReshardingMetadata == null || indexReshardingMetadata.isSplit() : "Index resharding state is not a split";
163172
if (indexReshardingMetadata != null && indexReshardingMetadata.getSplit().isTargetShard(shardId)) {
164-
assert indexReshardingMetadata.isSplit() : "Index resharding state is not a split";
165-
if (indexReshardingMetadata.getSplit()
166-
.targetStateAtLeast(shardId, IndexReshardingState.Split.TargetShardState.HANDOFF) == false) {
173+
if (indexReshardingMetadata.getSplit().targetStateAtLeast(shardId, minimumRequiredState) == false) {
167174
return indexReshardingMetadata.getSplit().sourceShard(shardId);
168175
}
169176
}
@@ -217,21 +224,21 @@ public int indexShard(String id, @Nullable String routing, XContentType sourceTy
217224
}
218225
checkRoutingRequired(id, routing);
219226
int shardId = shardId(id, routing);
220-
return rerouteIfResharding(shardId);
227+
return rerouteWritesIfResharding(shardId);
221228
}
222229

223230
@Override
224231
public int updateShard(String id, @Nullable String routing) {
225232
checkRoutingRequired(id, routing);
226233
int shardId = shardId(id, routing);
227-
return rerouteIfResharding(shardId);
234+
return rerouteWritesIfResharding(shardId);
228235
}
229236

230237
@Override
231238
public int deleteShard(String id, @Nullable String routing) {
232239
checkRoutingRequired(id, routing);
233240
int shardId = shardId(id, routing);
234-
return rerouteIfResharding(shardId);
241+
return rerouteWritesIfResharding(shardId);
235242
}
236243

237244
@Override
@@ -262,7 +269,7 @@ protected int shardId(String id, @Nullable String routing) {
262269

263270
@Override
264271
public void collectSearchShards(String routing, IntConsumer consumer) {
265-
consumer.accept(hashToShardId(effectiveRoutingToHash(routing)));
272+
consumer.accept(rerouteSearchIfResharding(hashToShardId(effectiveRoutingToHash(routing))));
266273
}
267274
}
268275

@@ -290,7 +297,7 @@ protected int shardId(String id, @Nullable String routing) {
290297
public void collectSearchShards(String routing, IntConsumer consumer) {
291298
int hash = effectiveRoutingToHash(routing);
292299
for (int i = 0; i < routingPartitionSize; i++) {
293-
consumer.accept(hashToShardId(hash + i));
300+
consumer.accept(rerouteSearchIfResharding(hashToShardId(hash + i)));
294301
}
295302
}
296303
}
@@ -339,7 +346,7 @@ public int indexShard(String id, @Nullable String routing, XContentType sourceTy
339346
checkNoRouting(routing);
340347
hash = hashSource(sourceType, source).buildHash(IndexRouting.ExtractFromSource::defaultOnEmpty);
341348
int shardId = hashToShardId(hash);
342-
return (rerouteIfResharding(shardId));
349+
return (rerouteWritesIfResharding(shardId));
343350
}
344351

345352
public String createId(XContentType sourceType, BytesReference source, byte[] suffix) {
@@ -480,14 +487,14 @@ public int updateShard(String id, @Nullable String routing) {
480487
public int deleteShard(String id, @Nullable String routing) {
481488
checkNoRouting(routing);
482489
int shardId = idToHash(id);
483-
return (rerouteIfResharding(shardId));
490+
return (rerouteWritesIfResharding(shardId));
484491
}
485492

486493
@Override
487494
public int getShard(String id, @Nullable String routing) {
488495
checkNoRouting(routing);
489496
int shardId = idToHash(id);
490-
return (rerouteIfResharding(shardId));
497+
return (rerouteWritesIfResharding(shardId));
491498
}
492499

493500
private void checkNoRouting(@Nullable String routing) {

server/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java

Lines changed: 58 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111

1212
import org.elasticsearch.cluster.ProjectState;
1313
import org.elasticsearch.cluster.metadata.IndexMetadata;
14+
import org.elasticsearch.cluster.metadata.IndexReshardingMetadata;
15+
import org.elasticsearch.cluster.metadata.IndexReshardingState;
1416
import org.elasticsearch.cluster.metadata.ProjectMetadata;
1517
import org.elasticsearch.cluster.node.DiscoveryNodes;
1618
import org.elasticsearch.common.Strings;
@@ -26,6 +28,7 @@
2628
import java.util.Arrays;
2729
import java.util.Collections;
2830
import java.util.HashSet;
31+
import java.util.Iterator;
2932
import java.util.List;
3033
import java.util.Map;
3134
import java.util.Set;
@@ -112,6 +115,10 @@ public List<ShardIterator> searchShards(
112115
return res;
113116
}
114117

118+
public Iterator<IndexShardRoutingTable> allWritableShards(ProjectState projectState, String index) {
119+
return allWriteAddressableShards(projectState, index);
120+
}
121+
115122
public static ShardIterator getShards(RoutingTable routingTable, ShardId shardId) {
116123
final IndexShardRoutingTable shard = routingTable.shardRoutingTable(shardId);
117124
return shard.activeInitializingShardsRandomIt();
@@ -125,7 +132,7 @@ private static Set<IndexShardRoutingTable> computeTargetedShards(
125132
// we use set here and not list since we might get duplicates
126133
final Set<IndexShardRoutingTable> set = new HashSet<>();
127134
if (routing == null || routing.isEmpty()) {
128-
collectTargetShardsNoRouting(projectState.routingTable(), concreteIndices, set);
135+
collectTargetShardsNoRouting(projectState, concreteIndices, set);
129136
} else {
130137
collectTargetShardsWithRouting(projectState, concreteIndices, routing, set);
131138
}
@@ -147,20 +154,64 @@ private static void collectTargetShardsWithRouting(
147154
indexRouting.collectSearchShards(r, s -> set.add(RoutingTable.shardRoutingTable(indexRoutingTable, s)));
148155
}
149156
} else {
150-
for (int i = 0; i < indexRoutingTable.size(); i++) {
151-
set.add(indexRoutingTable.shard(i));
157+
Iterator<IndexShardRoutingTable> iterator = allSearchAddressableShards(projectState, index);
158+
while (iterator.hasNext()) {
159+
set.add(iterator.next());
152160
}
153161
}
154162
}
155163
}
156164

157-
private static void collectTargetShardsNoRouting(RoutingTable routingTable, String[] concreteIndices, Set<IndexShardRoutingTable> set) {
165+
private static void collectTargetShardsNoRouting(ProjectState projectState, String[] concreteIndices, Set<IndexShardRoutingTable> set) {
158166
for (String index : concreteIndices) {
159-
final IndexRoutingTable indexRoutingTable = indexRoutingTable(routingTable, index);
160-
for (int i = 0; i < indexRoutingTable.size(); i++) {
161-
set.add(indexRoutingTable.shard(i));
167+
Iterator<IndexShardRoutingTable> iterator = allSearchAddressableShards(projectState, index);
168+
while (iterator.hasNext()) {
169+
set.add(iterator.next());
170+
}
171+
}
172+
}
173+
174+
/**
175+
* Returns an iterator of shards that can possibly serve searches. A shard may not be addressable during processes like resharding.
176+
* This logic is not related to shard state or a recovery process. A shard returned here may f.e. be unassigned.
177+
*/
178+
private static Iterator<IndexShardRoutingTable> allSearchAddressableShards(ProjectState projectState, String index) {
179+
return allShardsExceptSplitTargetsInStateBefore(projectState, index, IndexReshardingState.Split.TargetShardState.SPLIT);
180+
}
181+
182+
/**
183+
* Returns an iterator of shards that can possibly serve writes. A shard may not be addressable during processes like resharding.
184+
* This logic is not related to shard state or a recovery process. A shard returned here may f.e. be unassigned.
185+
*/
186+
private static Iterator<IndexShardRoutingTable> allWriteAddressableShards(ProjectState projectState, String index) {
187+
return allShardsExceptSplitTargetsInStateBefore(projectState, index, IndexReshardingState.Split.TargetShardState.HANDOFF);
188+
}
189+
190+
/**
191+
* Filters shards based on their state in resharding metadata. If resharing metadata is not present returns all shards.
192+
*/
193+
private static Iterator<IndexShardRoutingTable> allShardsExceptSplitTargetsInStateBefore(
194+
ProjectState projectState,
195+
String index,
196+
IndexReshardingState.Split.TargetShardState targetShardState
197+
) {
198+
final IndexRoutingTable indexRoutingTable = indexRoutingTable(projectState.routingTable(), index);
199+
final IndexMetadata indexMetadata = indexMetadata(projectState.metadata(), index);
200+
if (indexMetadata.getReshardingMetadata() == null) {
201+
return indexRoutingTable.allShards().iterator();
202+
}
203+
204+
final IndexReshardingMetadata indexReshardingMetadata = indexMetadata.getReshardingMetadata();
205+
assert indexReshardingMetadata.isSplit();
206+
final IndexReshardingState.Split splitState = indexReshardingMetadata.getSplit();
207+
208+
var shards = new ArrayList<IndexShardRoutingTable>();
209+
for (int i = 0; i < indexRoutingTable.size(); i++) {
210+
if (splitState.isTargetShard(i) == false || splitState.targetStateAtLeast(i, targetShardState)) {
211+
shards.add(indexRoutingTable.shard(i));
162212
}
163213
}
214+
return shards.iterator();
164215
}
165216

166217
private ShardIterator preferenceActiveShardIterator(

server/src/main/java/org/elasticsearch/index/codec/vectors/IVFVectorsWriter.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import org.apache.lucene.store.IndexInput;
2929
import org.apache.lucene.store.IndexOutput;
3030
import org.apache.lucene.store.RandomAccessInput;
31-
import org.apache.lucene.store.ReadAdvice;
3231
import org.apache.lucene.util.LongValues;
3332
import org.apache.lucene.util.VectorUtil;
3433
import org.elasticsearch.core.IOUtils;
@@ -300,13 +299,8 @@ private void mergeOneFieldIVF(FieldInfo fieldInfo, MergeState mergeState) throws
300299
// Even when the file might be sample, the reads will be always in increase order, therefore we set the ReadAdvice to SEQUENTIAL
301300
// so the OS can optimize read ahead in low memory situations.
302301
try (
303-
IndexInput vectors = mergeState.segmentInfo.dir.openInput(
304-
tempRawVectorsFileName,
305-
IOContext.DEFAULT.withReadAdvice(ReadAdvice.SEQUENTIAL)
306-
);
307-
IndexInput docs = docsFileName == null
308-
? null
309-
: mergeState.segmentInfo.dir.openInput(docsFileName, IOContext.DEFAULT.withReadAdvice(ReadAdvice.SEQUENTIAL))
302+
IndexInput vectors = mergeState.segmentInfo.dir.openInput(tempRawVectorsFileName, IOContext.READONCE);
303+
IndexInput docs = docsFileName == null ? null : mergeState.segmentInfo.dir.openInput(docsFileName, IOContext.READONCE)
310304
) {
311305
final FloatVectorValues floatVectorValues = getFloatVectorValues(fieldInfo, docs, vectors, numVectors);
312306

server/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -248,8 +248,7 @@ public void testShardsList() throws InterruptedException, ExecutionException {
248248
logger.debug("--> using initial state:\n{}", clusterService.state());
249249
List<ShardId> shards = broadcastReplicationAction.shards(
250250
new DummyBroadcastRequest().indices(shardId.getIndexName()),
251-
clusterState.metadata().getProject(projectId),
252-
clusterState.routingTable(projectId)
251+
clusterState.projectState(projectId)
253252
);
254253
assertThat(shards.size(), equalTo(1));
255254
assertThat(shards.get(0), equalTo(shardId));

0 commit comments

Comments
 (0)