Skip to content

Commit f26b0d4

Browse files
Merge branch 'main' into brothermich/ES-10264
2 parents 0addc02 + 3af0568 commit f26b0d4

File tree

30 files changed

+619
-151
lines changed

30 files changed

+619
-151
lines changed

plugins/examples/custom-suggester/src/main/java/org/elasticsearch/example/customsuggester/CustomSuggester.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111

1212
import org.apache.lucene.search.IndexSearcher;
1313
import org.apache.lucene.util.CharsRefBuilder;
14-
import org.elasticsearch.common.text.Text;
14+
import org.elasticsearch.xcontent.Text;
1515
import org.elasticsearch.search.suggest.Suggest;
1616
import org.elasticsearch.search.suggest.Suggester;
1717

plugins/examples/custom-suggester/src/main/java/org/elasticsearch/example/customsuggester/CustomSuggestion.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111

1212
import org.elasticsearch.common.io.stream.StreamInput;
1313
import org.elasticsearch.common.io.stream.StreamOutput;
14-
import org.elasticsearch.common.text.Text;
14+
import org.elasticsearch.xcontent.Text;
1515
import org.elasticsearch.search.suggest.Suggest;
1616
import org.elasticsearch.xcontent.ParseField;
1717
import org.elasticsearch.xcontent.XContentBuilder;

server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexStateService.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import org.elasticsearch.core.Tuple;
6666
import org.elasticsearch.index.Index;
6767
import org.elasticsearch.index.IndexNotFoundException;
68+
import org.elasticsearch.index.IndexReshardService;
6869
import org.elasticsearch.index.IndexVersion;
6970
import org.elasticsearch.index.shard.IndexLongFieldRange;
7071
import org.elasticsearch.index.shard.ShardId;
@@ -352,6 +353,12 @@ static ClusterState addIndexClosedBlocks(
352353
);
353354
}
354355

356+
// Check if index closing conflicts with ongoing resharding
357+
Set<Index> reshardingIndices = IndexReshardService.reshardingIndices(currentProjectState, indicesToClose);
358+
if (reshardingIndices.isEmpty() == false) {
359+
throw new IllegalArgumentException("Cannot close indices that are being resharded: " + reshardingIndices);
360+
}
361+
355362
final ClusterBlocks.Builder blocks = ClusterBlocks.builder(currentState.blocks());
356363

357364
for (Index index : indicesToClose) {
@@ -940,6 +947,22 @@ static Tuple<ClusterState, List<IndexResult>> closeRoutingTable(
940947
continue;
941948
}
942949

950+
// Check if index closing conflicts with ongoing resharding
951+
Set<Index> reshardingIndices = IndexReshardService.reshardingIndices(currentProjectState, Set.of(index));
952+
if (reshardingIndices.isEmpty() == false) {
953+
closingResults.put(
954+
result.getKey(),
955+
new IndexResult(
956+
result.getKey(),
957+
new IllegalStateException(
958+
"verification of shards before closing " + index + " succeeded but index is being resharded in the meantime"
959+
)
960+
)
961+
);
962+
logger.debug("verification of shards before closing {} succeeded but index is being resharded in the meantime", index);
963+
continue;
964+
}
965+
943966
blocks.removeIndexBlockWithId(projectId, index.getName(), INDEX_CLOSED_BLOCK_ID);
944967
blocks.addIndexBlock(projectId, index.getName(), INDEX_CLOSED_BLOCK);
945968
final IndexMetadata.Builder updatedMetadata = IndexMetadata.builder(indexMetadata).state(IndexMetadata.State.CLOSE);
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index;
11+
12+
import org.elasticsearch.cluster.ProjectState;
13+
import org.elasticsearch.cluster.metadata.IndexMetadata;
14+
15+
import java.util.HashSet;
16+
import java.util.Set;
17+
18+
/**
19+
* Resharding is currently only implemented in Serverless. This service only exposes minimal metadata about resharding
20+
* needed by other services.
21+
*/
22+
public class IndexReshardService {
23+
/**
24+
* Returns the indices from the provided set that are currently being resharded.
25+
*/
26+
public static Set<Index> reshardingIndices(final ProjectState projectState, final Set<Index> indicesToCheck) {
27+
final Set<Index> indices = new HashSet<>();
28+
for (Index index : indicesToCheck) {
29+
IndexMetadata indexMetadata = projectState.metadata().index(index);
30+
31+
if (indexMetadata != null && indexMetadata.getReshardingMetadata() != null) {
32+
indices.add(index);
33+
}
34+
}
35+
return indices;
36+
}
37+
}

server/src/test/java/org/elasticsearch/cluster/metadata/MetadataIndexStateServiceTests.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.elasticsearch.cluster.routing.UnassignedInfo;
2828
import org.elasticsearch.common.settings.Settings;
2929
import org.elasticsearch.core.Nullable;
30+
import org.elasticsearch.core.Tuple;
3031
import org.elasticsearch.index.Index;
3132
import org.elasticsearch.index.IndexNotFoundException;
3233
import org.elasticsearch.index.IndexVersion;
@@ -45,6 +46,7 @@
4546
import java.util.Collections;
4647
import java.util.HashMap;
4748
import java.util.HashSet;
49+
import java.util.List;
4850
import java.util.Map;
4951
import java.util.Set;
5052
import java.util.stream.Collectors;
@@ -164,6 +166,38 @@ public void testCloseRoutingTableWithSnapshottedIndex() {
164166
assertThat(updatedState.blocks().hasIndexBlockWithId(projectId, index.getName(), INDEX_CLOSED_BLOCK_ID), is(true));
165167
}
166168

169+
public void testCloseRoutingTableWithReshardingIndex() {
170+
ClusterState state = stateWithProject("testCloseRoutingTableWithReshardingIndex", projectId);
171+
172+
String indexName = "resharding-index";
173+
ClusterBlock block = MetadataIndexStateService.createIndexClosingBlock();
174+
state = addOpenedIndex(projectId, indexName, randomIntBetween(1, 5), randomIntBetween(0, 5), state);
175+
176+
var updatedMetadata = IndexMetadata.builder(state.metadata().getProject(projectId).index(indexName))
177+
.reshardingMetadata(IndexReshardingMetadata.newSplitByMultiple(randomIntBetween(1, 5), 2))
178+
.build();
179+
state = ClusterState.builder(state)
180+
.putProjectMetadata(ProjectMetadata.builder(state.metadata().getProject(projectId)).put(updatedMetadata, true))
181+
.build();
182+
183+
state = ClusterState.builder(state)
184+
.blocks(ClusterBlocks.builder().blocks(state.blocks()).addIndexBlock(projectId, indexName, block))
185+
.build();
186+
187+
final Index index = state.metadata().getProject(projectId).index(indexName).getIndex();
188+
final Tuple<ClusterState, List<IndexResult>> result = MetadataIndexStateService.closeRoutingTable(
189+
state,
190+
projectId,
191+
Map.of(index, block),
192+
Map.of(index, new IndexResult(index)),
193+
TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY
194+
);
195+
final ClusterState updatedState = result.v1();
196+
assertIsOpened(index.getName(), updatedState, projectId);
197+
assertThat(updatedState.blocks().hasIndexBlockWithId(projectId, index.getName(), INDEX_CLOSED_BLOCK_ID), is(true));
198+
assertThat(result.v2().get(0).getException(), notNullValue());
199+
}
200+
167201
public void testAddIndexClosedBlocks() {
168202
final ClusterState initialState = stateWithProject("testAddIndexClosedBlocks", projectId);
169203
{
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index;
11+
12+
import org.elasticsearch.cluster.ClusterName;
13+
import org.elasticsearch.cluster.ClusterState;
14+
import org.elasticsearch.cluster.metadata.IndexMetadata;
15+
import org.elasticsearch.cluster.metadata.IndexReshardingMetadata;
16+
import org.elasticsearch.cluster.metadata.ProjectMetadata;
17+
import org.elasticsearch.test.ESTestCase;
18+
19+
import java.util.HashSet;
20+
import java.util.Set;
21+
22+
public class IndexReshardServiceTests extends ESTestCase {
23+
public void testReshardingIndices() {
24+
var indexMetadata1 = IndexMetadata.builder(randomAlphaOfLength(5)).settings(indexSettings(IndexVersion.current(), 1, 0)).build();
25+
var indexMetadata2 = IndexMetadata.builder(indexMetadata1.getIndex().getName() + "2")
26+
.settings(indexSettings(IndexVersion.current(), 1, 0))
27+
.reshardingMetadata(IndexReshardingMetadata.newSplitByMultiple(1, 2))
28+
.build();
29+
30+
var projectId = randomProjectIdOrDefault();
31+
32+
var projectMetadataBuilder = ProjectMetadata.builder(projectId);
33+
projectMetadataBuilder.put(indexMetadata1, false);
34+
projectMetadataBuilder.put(indexMetadata2, false);
35+
36+
final ClusterState clusterState = ClusterState.builder(new ClusterName("testReshardingIndices"))
37+
.putProjectMetadata(projectMetadataBuilder)
38+
.build();
39+
40+
Set<Index> indicesToCheck = new HashSet<>();
41+
indicesToCheck.add(indexMetadata1.getIndex());
42+
indicesToCheck.add(indexMetadata2.getIndex());
43+
44+
Set<Index> reshardingIndices = IndexReshardService.reshardingIndices(clusterState.projectState(projectId), indicesToCheck);
45+
46+
assertEquals(1, reshardingIndices.size());
47+
assertTrue(reshardingIndices.contains(indexMetadata2.getIndex()));
48+
}
49+
}

test/framework/src/main/java/org/elasticsearch/indices/CrankyCircuitBreakerService.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public class CrankyCircuitBreakerService extends CircuitBreakerService {
2929
*/
3030
public static final String ERROR_MESSAGE = "cranky breaker";
3131

32-
private final CircuitBreaker breaker = new CircuitBreaker() {
32+
public static final class CrankyCircuitBreaker implements CircuitBreaker {
3333
private final AtomicLong used = new AtomicLong();
3434

3535
@Override
@@ -82,7 +82,9 @@ public Durability getDurability() {
8282
public void setLimitAndOverhead(long limit, double overhead) {
8383

8484
}
85-
};
85+
}
86+
87+
private final CrankyCircuitBreaker breaker = new CrankyCircuitBreaker();
8688

8789
@Override
8890
public CircuitBreaker getBreaker(String name) {

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/LocalCircuitBreaker.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ public final class LocalCircuitBreaker implements CircuitBreaker, Releasable {
2828
private final long maxOverReservedBytes;
2929
private long reservedBytes;
3030
private final AtomicBoolean closed = new AtomicBoolean(false);
31+
private volatile Thread activeThread;
3132

3233
public record SizeSettings(long overReservedBytes, long maxOverReservedBytes) {
3334
public SizeSettings(Settings settings) {
@@ -57,6 +58,7 @@ public void circuitBreak(String fieldName, long bytesNeeded) {
5758

5859
@Override
5960
public void addEstimateBytesAndMaybeBreak(long bytes, String label) throws CircuitBreakingException {
61+
assert assertSingleThread();
6062
if (bytes <= reservedBytes) {
6163
reservedBytes -= bytes;
6264
maybeReduceReservedBytes();
@@ -68,6 +70,7 @@ public void addEstimateBytesAndMaybeBreak(long bytes, String label) throws Circu
6870

6971
@Override
7072
public void addWithoutBreaking(long bytes) {
73+
assert assertSingleThread();
7174
if (bytes <= reservedBytes) {
7275
reservedBytes -= bytes;
7376
maybeReduceReservedBytes();
@@ -130,6 +133,7 @@ public void setLimitAndOverhead(long limit, double overhead) {
130133

131134
@Override
132135
public void close() {
136+
assert assertSingleThread();
133137
if (closed.compareAndSet(false, true)) {
134138
breaker.addWithoutBreaking(-reservedBytes);
135139
}
@@ -139,4 +143,34 @@ public void close() {
139143
public String toString() {
140144
return "LocalCircuitBreaker[" + reservedBytes + "/" + overReservedBytes + ":" + maxOverReservedBytes + "]";
141145
}
146+
147+
private boolean assertSingleThread() {
148+
Thread activeThread = this.activeThread;
149+
Thread currentThread = Thread.currentThread();
150+
assert activeThread == null || activeThread == currentThread
151+
: "Local breaker must be accessed by a single thread at a time: expected ["
152+
+ activeThread
153+
+ "] != actual ["
154+
+ currentThread
155+
+ "]";
156+
return true;
157+
}
158+
159+
/**
160+
* Marks the beginning of a run loop for assertion purposes.
161+
* Sets the current thread as the only thread allowed to access this breaker.
162+
*/
163+
public boolean assertBeginRunLoop() {
164+
activeThread = Thread.currentThread();
165+
return true;
166+
}
167+
168+
/**
169+
* Marks the end of a run loop for assertion purposes.
170+
* Clears the active thread to allow other threads to access this breaker.
171+
*/
172+
public boolean assertEndRunLoop() {
173+
activeThread = null;
174+
return true;
175+
}
142176
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ShardContext.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,4 +53,10 @@ public interface ShardContext {
5353
* Returns something to load values from this field into a {@link Block}.
5454
*/
5555
BlockLoader blockLoader(String name, boolean asUnsupportedSource, MappedFieldType.FieldExtractPreference fieldExtractPreference);
56+
57+
/**
58+
* Returns the {@link MappedFieldType} for the given field name.
59+
* By default, this delegate to {@link org.elasticsearch.index.query.SearchExecutionContext#getFieldType(String)}
60+
*/
61+
MappedFieldType fieldType(String name);
5662
}

0 commit comments

Comments
 (0)