Skip to content

Commit 2743ade

Browse files
authored
Merge branch 'main' into one-categorize-blockhash
2 parents 3d06571 + 045f6a3 commit 2743ade

File tree

11 files changed

+239
-17
lines changed

11 files changed

+239
-17
lines changed

muted-tests.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,9 @@ tests:
225225
- class: org.elasticsearch.xpack.inference.InferenceCrudIT
226226
method: testSupportedStream
227227
issue: https://github.com/elastic/elasticsearch/issues/117745
228+
- class: org.elasticsearch.xpack.esql.ccq.MultiClusterSpecIT
229+
method: test {scoring.QstrWithFieldAndScoringSortedEval}
230+
issue: https://github.com/elastic/elasticsearch/issues/117751
228231

229232
# Examples:
230233
#

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,7 @@ static TransportVersion def(int id) {
211211
public static final TransportVersion ESQL_REMOVE_NODE_LEVEL_PLAN = def(8_800_00_0);
212212
public static final TransportVersion LOGSDB_TELEMETRY_CUSTOM_CUTOFF_DATE = def(8_801_00_0);
213213
public static final TransportVersion SOURCE_MODE_TELEMETRY = def(8_802_00_0);
214+
public static final TransportVersion NEW_REFRESH_CLUSTER_BLOCK = def(8_803_00_0);
214215

215216
/*
216217
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/cluster/block/ClusterBlock.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
package org.elasticsearch.cluster.block;
1111

12+
import org.elasticsearch.TransportVersions;
1213
import org.elasticsearch.common.io.stream.StreamInput;
1314
import org.elasticsearch.common.io.stream.StreamOutput;
1415
import org.elasticsearch.common.io.stream.Writeable;
@@ -21,6 +22,7 @@
2122
import java.util.EnumSet;
2223
import java.util.Locale;
2324
import java.util.Objects;
25+
import java.util.function.Predicate;
2426

2527
public class ClusterBlock implements Writeable, ToXContentFragment {
2628

@@ -142,7 +144,12 @@ public void writeTo(StreamOutput out) throws IOException {
142144
out.writeVInt(id);
143145
out.writeOptionalString(uuid);
144146
out.writeString(description);
145-
out.writeEnumSet(levels);
147+
if (out.getTransportVersion().onOrAfter(TransportVersions.NEW_REFRESH_CLUSTER_BLOCK)) {
148+
out.writeEnumSet(levels);
149+
} else {
150+
// do not send ClusterBlockLevel.REFRESH to old nodes
151+
out.writeEnumSet(filterLevels(levels, level -> ClusterBlockLevel.REFRESH.equals(level) == false));
152+
}
146153
out.writeBoolean(retryable);
147154
out.writeBoolean(disableStatePersistence);
148155
RestStatus.writeTo(out, status);
@@ -185,4 +192,19 @@ public int hashCode() {
185192
public boolean isAllowReleaseResources() {
186193
return allowReleaseResources;
187194
}
195+
196+
static EnumSet<ClusterBlockLevel> filterLevels(EnumSet<ClusterBlockLevel> levels, Predicate<ClusterBlockLevel> predicate) {
197+
assert levels != null;
198+
int size = levels.size();
199+
if (size == 0 || (size == 1 && predicate.test(levels.iterator().next()))) {
200+
return levels;
201+
}
202+
var filteredLevels = EnumSet.noneOf(ClusterBlockLevel.class);
203+
for (ClusterBlockLevel level : levels) {
204+
if (predicate.test(level)) {
205+
filteredLevels.add(level);
206+
}
207+
}
208+
return filteredLevels;
209+
}
188210
}

server/src/main/java/org/elasticsearch/cluster/block/ClusterBlockLevel.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ public enum ClusterBlockLevel {
1515
READ,
1616
WRITE,
1717
METADATA_READ,
18-
METADATA_WRITE;
18+
METADATA_WRITE,
19+
REFRESH;
1920

2021
public static final EnumSet<ClusterBlockLevel> ALL = EnumSet.allOf(ClusterBlockLevel.class);
2122
public static final EnumSet<ClusterBlockLevel> READ_WRITE = EnumSet.of(READ, WRITE);

server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,15 @@ public class IndexMetadata implements Diffable<IndexMetadata>, ToXContentFragmen
140140
RestStatus.TOO_MANY_REQUESTS,
141141
EnumSet.of(ClusterBlockLevel.WRITE)
142142
);
143+
public static final ClusterBlock INDEX_REFRESH_BLOCK = new ClusterBlock(
144+
14,
145+
"index refresh blocked, waiting for shard(s) to be started",
146+
true,
147+
false,
148+
false,
149+
RestStatus.REQUEST_TIMEOUT,
150+
EnumSet.of(ClusterBlockLevel.REFRESH)
151+
);
143152

144153
// 'event.ingested' (part of Elastic Common Schema) range is tracked in cluster state, along with @timestamp
145154
public static final String EVENT_INGESTED_FIELD_NAME = "event.ingested";

server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.elasticsearch.cluster.ClusterStateUpdateTask;
2929
import org.elasticsearch.cluster.block.ClusterBlockLevel;
3030
import org.elasticsearch.cluster.block.ClusterBlocks;
31+
import org.elasticsearch.cluster.node.DiscoveryNode;
3132
import org.elasticsearch.cluster.node.DiscoveryNodes;
3233
import org.elasticsearch.cluster.routing.IndexRoutingTable;
3334
import org.elasticsearch.cluster.routing.RoutingTable;
@@ -127,6 +128,16 @@ public class MetadataCreateIndexService {
127128

128129
public static final int MAX_INDEX_NAME_BYTES = 255;
129130

131+
/**
132+
* Name of the setting used to allow blocking refreshes on newly created indices.
133+
*/
134+
public static final String USE_INDEX_REFRESH_BLOCK_SETTING_NAME = "stateless.indices.use_refresh_block_upon_index_creation";
135+
136+
@FunctionalInterface
137+
interface ClusterBlocksTransformer {
138+
void apply(ClusterBlocks.Builder clusterBlocks, IndexMetadata indexMetadata, TransportVersion minClusterTransportVersion);
139+
}
140+
130141
private final Settings settings;
131142
private final ClusterService clusterService;
132143
private final IndicesService indicesService;
@@ -139,6 +150,7 @@ public class MetadataCreateIndexService {
139150
private final boolean forbidPrivateIndexSettings;
140151
private final Set<IndexSettingProvider> indexSettingProviders;
141152
private final ThreadPool threadPool;
153+
private final ClusterBlocksTransformer blocksTransformerUponIndexCreation;
142154

143155
public MetadataCreateIndexService(
144156
final Settings settings,
@@ -166,6 +178,7 @@ public MetadataCreateIndexService(
166178
this.shardLimitValidator = shardLimitValidator;
167179
this.indexSettingProviders = indexSettingProviders.getIndexSettingProviders();
168180
this.threadPool = threadPool;
181+
this.blocksTransformerUponIndexCreation = createClusterBlocksTransformerForIndexCreation(settings);
169182
}
170183

171184
/**
@@ -540,8 +553,10 @@ private ClusterState applyCreateIndexWithTemporaryService(
540553
currentState,
541554
indexMetadata,
542555
metadataTransformer,
556+
blocksTransformerUponIndexCreation,
543557
allocationService.getShardRoutingRoleStrategy()
544558
);
559+
assert assertHasRefreshBlock(indexMetadata, updated, updated.getMinTransportVersion());
545560
if (request.performReroute()) {
546561
updated = allocationService.reroute(updated, "index [" + indexMetadata.getIndex().getName() + "] created", rerouteListener);
547562
}
@@ -1294,6 +1309,7 @@ static ClusterState clusterStateCreateIndex(
12941309
ClusterState currentState,
12951310
IndexMetadata indexMetadata,
12961311
BiConsumer<Metadata.Builder, IndexMetadata> metadataTransformer,
1312+
ClusterBlocksTransformer blocksTransformer,
12971313
ShardRoutingRoleStrategy shardRoutingRoleStrategy
12981314
) {
12991315
final Metadata newMetadata;
@@ -1307,6 +1323,9 @@ static ClusterState clusterStateCreateIndex(
13071323

13081324
var blocksBuilder = ClusterBlocks.builder().blocks(currentState.blocks());
13091325
blocksBuilder.updateBlocks(indexMetadata);
1326+
if (blocksTransformer != null) {
1327+
blocksTransformer.apply(blocksBuilder, indexMetadata, currentState.getMinTransportVersion());
1328+
}
13101329

13111330
var routingTableBuilder = RoutingTable.builder(shardRoutingRoleStrategy, currentState.routingTable())
13121331
.addAsNew(newMetadata.index(indexMetadata.getIndex().getName()));
@@ -1745,4 +1764,39 @@ public static void validateStoreTypeSetting(Settings indexSettings) {
17451764
);
17461765
}
17471766
}
1767+
1768+
private static boolean useRefreshBlock(Settings settings) {
1769+
return DiscoveryNode.isStateless(settings) && settings.getAsBoolean(USE_INDEX_REFRESH_BLOCK_SETTING_NAME, false);
1770+
}
1771+
1772+
static ClusterBlocksTransformer createClusterBlocksTransformerForIndexCreation(Settings settings) {
1773+
if (useRefreshBlock(settings) == false) {
1774+
return (clusterBlocks, indexMetadata, minClusterTransportVersion) -> {};
1775+
}
1776+
logger.debug("applying refresh block on index creation");
1777+
return (clusterBlocks, indexMetadata, minClusterTransportVersion) -> {
1778+
if (applyRefreshBlock(indexMetadata, minClusterTransportVersion)) {
1779+
// Applies the INDEX_REFRESH_BLOCK to the index. This block will remain in cluster state until an unpromotable shard is
1780+
// started or a configurable delay is elapsed.
1781+
clusterBlocks.addIndexBlock(indexMetadata.getIndex().getName(), IndexMetadata.INDEX_REFRESH_BLOCK);
1782+
}
1783+
};
1784+
}
1785+
1786+
private static boolean applyRefreshBlock(IndexMetadata indexMetadata, TransportVersion minClusterTransportVersion) {
1787+
return 0 < indexMetadata.getNumberOfReplicas() // index has replicas
1788+
&& indexMetadata.getResizeSourceIndex() == null // index is not a split/shrink index
1789+
&& indexMetadata.getInSyncAllocationIds().values().stream().allMatch(Set::isEmpty) // index is a new index
1790+
&& minClusterTransportVersion.onOrAfter(TransportVersions.NEW_REFRESH_CLUSTER_BLOCK);
1791+
}
1792+
1793+
private boolean assertHasRefreshBlock(IndexMetadata indexMetadata, ClusterState clusterState, TransportVersion minTransportVersion) {
1794+
var hasRefreshBlock = clusterState.blocks().hasIndexBlock(indexMetadata.getIndex().getName(), IndexMetadata.INDEX_REFRESH_BLOCK);
1795+
if (useRefreshBlock(settings) == false || applyRefreshBlock(indexMetadata, minTransportVersion) == false) {
1796+
assert hasRefreshBlock == false : indexMetadata.getIndex();
1797+
} else {
1798+
assert hasRefreshBlock : indexMetadata.getIndex();
1799+
}
1800+
return true;
1801+
}
17481802
}

server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,8 @@ public void testToXContent() throws IOException {
167167
"read",
168168
"write",
169169
"metadata_read",
170-
"metadata_write"
170+
"metadata_write",
171+
"refresh"
171172
]
172173
}
173174
},
@@ -180,7 +181,8 @@ public void testToXContent() throws IOException {
180181
"read",
181182
"write",
182183
"metadata_read",
183-
"metadata_write"
184+
"metadata_write",
185+
"refresh"
184186
]
185187
}
186188
}
@@ -440,7 +442,8 @@ public void testToXContent_FlatSettingTrue_ReduceMappingFalse() throws IOExcepti
440442
"read",
441443
"write",
442444
"metadata_read",
443-
"metadata_write"
445+
"metadata_write",
446+
"refresh"
444447
]
445448
}
446449
},
@@ -453,7 +456,8 @@ public void testToXContent_FlatSettingTrue_ReduceMappingFalse() throws IOExcepti
453456
"read",
454457
"write",
455458
"metadata_read",
456-
"metadata_write"
459+
"metadata_write",
460+
"refresh"
457461
]
458462
}
459463
}
@@ -712,7 +716,8 @@ public void testToXContent_FlatSettingFalse_ReduceMappingTrue() throws IOExcepti
712716
"read",
713717
"write",
714718
"metadata_read",
715-
"metadata_write"
719+
"metadata_write",
720+
"refresh"
716721
]
717722
}
718723
},
@@ -725,7 +730,8 @@ public void testToXContent_FlatSettingFalse_ReduceMappingTrue() throws IOExcepti
725730
"read",
726731
"write",
727732
"metadata_read",
728-
"metadata_write"
733+
"metadata_write",
734+
"refresh"
729735
]
730736
}
731737
}

server/src/test/java/org/elasticsearch/cluster/block/ClusterBlockTests.java

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,19 +10,22 @@
1010
package org.elasticsearch.cluster.block;
1111

1212
import org.elasticsearch.TransportVersion;
13+
import org.elasticsearch.TransportVersions;
1314
import org.elasticsearch.common.UUIDs;
1415
import org.elasticsearch.common.io.stream.BytesStreamOutput;
1516
import org.elasticsearch.common.io.stream.StreamInput;
1617
import org.elasticsearch.rest.RestStatus;
1718
import org.elasticsearch.test.ESTestCase;
1819

19-
import java.util.Arrays;
2020
import java.util.Collections;
21-
import java.util.List;
21+
import java.util.EnumSet;
2222
import java.util.Map;
2323

2424
import static java.util.EnumSet.copyOf;
25+
import static org.elasticsearch.test.TransportVersionUtils.getFirstVersion;
26+
import static org.elasticsearch.test.TransportVersionUtils.getPreviousVersion;
2527
import static org.elasticsearch.test.TransportVersionUtils.randomVersion;
28+
import static org.elasticsearch.test.TransportVersionUtils.randomVersionBetween;
2629
import static org.hamcrest.CoreMatchers.endsWith;
2730
import static org.hamcrest.CoreMatchers.equalTo;
2831
import static org.hamcrest.CoreMatchers.not;
@@ -36,7 +39,7 @@ public void testSerialization() throws Exception {
3639
int iterations = randomIntBetween(5, 20);
3740
for (int i = 0; i < iterations; i++) {
3841
TransportVersion version = randomVersion(random());
39-
ClusterBlock clusterBlock = randomClusterBlock();
42+
ClusterBlock clusterBlock = randomClusterBlock(version);
4043

4144
BytesStreamOutput out = new BytesStreamOutput();
4245
out.setTransportVersion(version);
@@ -50,13 +53,41 @@ public void testSerialization() throws Exception {
5053
}
5154
}
5255

56+
public void testSerializationBwc() throws Exception {
57+
var out = new BytesStreamOutput();
58+
out.setTransportVersion(
59+
randomVersionBetween(random(), getFirstVersion(), getPreviousVersion(TransportVersions.NEW_REFRESH_CLUSTER_BLOCK))
60+
);
61+
62+
var clusterBlock = randomClusterBlock(TransportVersions.NEW_REFRESH_CLUSTER_BLOCK);
63+
clusterBlock.writeTo(out);
64+
65+
var in = out.bytes().streamInput();
66+
in.setTransportVersion(randomVersion());
67+
68+
assertClusterBlockEquals(
69+
new ClusterBlock(
70+
clusterBlock.id(),
71+
clusterBlock.uuid(),
72+
clusterBlock.description(),
73+
clusterBlock.retryable(),
74+
clusterBlock.disableStatePersistence(),
75+
clusterBlock.isAllowReleaseResources(),
76+
clusterBlock.status(),
77+
// ClusterBlockLevel.REFRESH should not be sent over the wire to nodes with version < NEW_REFRESH_CLUSTER_BLOCK
78+
ClusterBlock.filterLevels(clusterBlock.levels(), level -> ClusterBlockLevel.REFRESH.equals(level) == false)
79+
),
80+
new ClusterBlock(in)
81+
);
82+
}
83+
5384
public void testToStringDanglingComma() {
54-
final ClusterBlock clusterBlock = randomClusterBlock();
85+
final ClusterBlock clusterBlock = randomClusterBlock(randomVersion(random()));
5586
assertThat(clusterBlock.toString(), not(endsWith(",")));
5687
}
5788

5889
public void testGlobalBlocksCheckedIfNoIndicesSpecified() {
59-
ClusterBlock globalBlock = randomClusterBlock();
90+
ClusterBlock globalBlock = randomClusterBlock(randomVersion(random()));
6091
ClusterBlocks clusterBlocks = new ClusterBlocks(Collections.singleton(globalBlock), Map.of());
6192
ClusterBlockException exception = clusterBlocks.indicesBlockedException(randomFrom(globalBlock.levels()), new String[0]);
6293
assertNotNull(exception);
@@ -113,9 +144,13 @@ public void testGetIndexBlockWithId() {
113144
assertThat(builder.build().getIndexBlockWithId("index", randomValueOtherThan(blockId, ESTestCase::randomInt)), nullValue());
114145
}
115146

116-
private static ClusterBlock randomClusterBlock() {
147+
private static ClusterBlock randomClusterBlock(TransportVersion version) {
117148
final String uuid = randomBoolean() ? UUIDs.randomBase64UUID() : null;
118-
final List<ClusterBlockLevel> levels = Arrays.asList(ClusterBlockLevel.values());
149+
final EnumSet<ClusterBlockLevel> levels = ClusterBlock.filterLevels(
150+
EnumSet.allOf(ClusterBlockLevel.class),
151+
// Filter out ClusterBlockLevel.REFRESH for versions < TransportVersions.NEW_REFRESH_CLUSTER_BLOCK
152+
level -> ClusterBlockLevel.REFRESH.equals(level) == false || version.onOrAfter(TransportVersions.NEW_REFRESH_CLUSTER_BLOCK)
153+
);
119154
return new ClusterBlock(
120155
randomInt(),
121156
uuid,

0 commit comments

Comments
 (0)