Skip to content

Commit c5a623a

Browse files
Mark read-only flush and verify (#119743) (#120522)
When marking read-only now flush and mark index as verified guaranteeing that we can upgrade safely to next version with N-1 indices (becoming N-2). Use this in the deprecation check.
1 parent 132350b commit c5a623a

File tree

15 files changed

+295
-21
lines changed

15 files changed

+295
-21
lines changed

docs/changelog/119743.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 119743
2+
summary: POC mark read-only
3+
area: Engine
4+
type: enhancement
5+
issues: []
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.upgrades;
11+
12+
import io.netty.handler.codec.http.HttpMethod;
13+
14+
import com.carrotsearch.randomizedtesting.annotations.Name;
15+
16+
import org.elasticsearch.TransportVersions;
17+
import org.elasticsearch.client.Request;
18+
import org.elasticsearch.client.ResponseException;
19+
import org.elasticsearch.cluster.metadata.MetadataIndexStateService;
20+
import org.hamcrest.Matchers;
21+
22+
import java.io.IOException;
23+
import java.util.Map;
24+
25+
public class AddIndexBlockRollingUpgradeIT extends AbstractRollingUpgradeTestCase {
26+
27+
private static final String INDEX_NAME = "test_add_block";
28+
29+
public AddIndexBlockRollingUpgradeIT(@Name("upgradedNodes") int upgradedNodes) {
30+
super(upgradedNodes);
31+
}
32+
33+
public void testAddBlock() throws Exception {
34+
if (isOldCluster()) {
35+
createIndex(INDEX_NAME);
36+
} else if (isMixedCluster()) {
37+
blockWrites();
38+
// this is used both for upgrading from 9.0.0 to current and from 8.18 to current.
39+
if (minimumTransportVersion().before(TransportVersions.ADD_INDEX_BLOCK_TWO_PHASE)) {
40+
assertNull(verifiedSettingValue());
41+
} else {
42+
assertThat(verifiedSettingValue(), Matchers.equalTo("true"));
43+
}
44+
} else {
45+
assertTrue(isUpgradedCluster());
46+
blockWrites();
47+
assertThat(verifiedSettingValue(), Matchers.equalTo("true"));
48+
}
49+
}
50+
51+
private static void blockWrites() throws IOException {
52+
client().performRequest(new Request(HttpMethod.PUT.name(), "/" + INDEX_NAME + "/_block/write"));
53+
54+
expectThrows(
55+
ResponseException.class,
56+
() -> client().performRequest(
57+
newXContentRequest(HttpMethod.PUT, "/" + INDEX_NAME + "/_doc/test", (builder, params) -> builder.field("test", "test"))
58+
)
59+
);
60+
}
61+
62+
@SuppressWarnings("unchecked")
63+
private static String verifiedSettingValue() throws IOException {
64+
final var settingsRequest = new Request(HttpMethod.GET.name(), "/" + INDEX_NAME + "/_settings?flat_settings");
65+
final Map<String, Object> settingsResponse = entityAsMap(client().performRequest(settingsRequest));
66+
return (String) ((Map<String, Object>) ((Map<String, Object>) settingsResponse.get(INDEX_NAME)).get("settings")).get(
67+
MetadataIndexStateService.VERIFIED_READ_ONLY_SETTING.getKey()
68+
);
69+
}
70+
}

server/src/internalClusterTest/java/org/elasticsearch/blocks/SimpleBlocksIT.java

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,21 @@
1616
import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockResponse;
1717
import org.elasticsearch.action.index.IndexRequestBuilder;
1818
import org.elasticsearch.action.support.ActiveShardCount;
19+
import org.elasticsearch.action.support.PlainActionFuture;
1920
import org.elasticsearch.action.support.master.AcknowledgedResponse;
2021
import org.elasticsearch.cluster.ClusterState;
22+
import org.elasticsearch.cluster.ClusterStateTaskListener;
23+
import org.elasticsearch.cluster.SimpleBatchedExecutor;
2124
import org.elasticsearch.cluster.block.ClusterBlockException;
2225
import org.elasticsearch.cluster.block.ClusterBlockLevel;
2326
import org.elasticsearch.cluster.metadata.IndexMetadata;
2427
import org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock;
28+
import org.elasticsearch.cluster.metadata.Metadata;
29+
import org.elasticsearch.cluster.metadata.MetadataIndexStateService;
2530
import org.elasticsearch.cluster.routing.ShardRouting;
31+
import org.elasticsearch.common.Priority;
2632
import org.elasticsearch.common.settings.Settings;
33+
import org.elasticsearch.core.Tuple;
2734
import org.elasticsearch.index.IndexNotFoundException;
2835
import org.elasticsearch.test.BackgroundIndexer;
2936
import org.elasticsearch.test.ESIntegTestCase;
@@ -266,6 +273,74 @@ public void testAddIndexBlock() throws Exception {
266273
assertHitCount(prepareSearch(indexName).setSize(0), nbDocs);
267274
}
268275

276+
public void testReAddUnverifiedIndexBlock() {
277+
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
278+
createIndex(indexName);
279+
ensureGreen(indexName);
280+
281+
final int nbDocs = randomIntBetween(0, 50);
282+
indexRandom(
283+
randomBoolean(),
284+
false,
285+
randomBoolean(),
286+
IntStream.range(0, nbDocs).mapToObj(i -> prepareIndex(indexName).setId(String.valueOf(i)).setSource("num", i)).collect(toList())
287+
);
288+
289+
final APIBlock block = APIBlock.WRITE;
290+
try {
291+
AddIndexBlockResponse response = indicesAdmin().prepareAddBlock(block, indexName).get();
292+
assertTrue("Add block [" + block + "] to index [" + indexName + "] not acknowledged: " + response, response.isAcknowledged());
293+
assertIndexHasBlock(block, indexName);
294+
295+
removeVerified(indexName);
296+
297+
AddIndexBlockResponse response2 = indicesAdmin().prepareAddBlock(block, indexName).get();
298+
assertTrue("Add block [" + block + "] to index [" + indexName + "] not acknowledged: " + response, response2.isAcknowledged());
299+
assertIndexHasBlock(block, indexName);
300+
} finally {
301+
disableIndexBlock(indexName, block);
302+
}
303+
304+
}
305+
306+
private static void removeVerified(String indexName) {
307+
PlainActionFuture<Void> listener = new PlainActionFuture<>();
308+
internalCluster().clusterService(internalCluster().getMasterName())
309+
.createTaskQueue("test", Priority.NORMAL, new SimpleBatchedExecutor<>() {
310+
@Override
311+
public Tuple<ClusterState, Object> executeTask(
312+
ClusterStateTaskListener clusterStateTaskListener,
313+
ClusterState clusterState
314+
) {
315+
316+
IndexMetadata indexMetadata = clusterState.metadata().index(indexName);
317+
Settings.Builder settingsBuilder = Settings.builder().put(indexMetadata.getSettings());
318+
settingsBuilder.remove(MetadataIndexStateService.VERIFIED_READ_ONLY_SETTING.getKey());
319+
return Tuple.tuple(
320+
ClusterState.builder(clusterState)
321+
.metadata(
322+
Metadata.builder(clusterState.metadata())
323+
.put(
324+
IndexMetadata.builder(indexMetadata)
325+
.settings(settingsBuilder)
326+
.settingsVersion(indexMetadata.getSettingsVersion() + 1)
327+
)
328+
)
329+
.build(),
330+
null
331+
);
332+
}
333+
334+
@Override
335+
public void taskSucceeded(ClusterStateTaskListener clusterStateTaskListener, Object ignored) {
336+
listener.onResponse(null);
337+
}
338+
})
339+
.submitTask("test", e -> fail(e), null);
340+
341+
listener.actionGet();
342+
}
343+
269344
public void testSameBlockTwice() throws Exception {
270345
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
271346
createIndex(indexName);
@@ -452,6 +527,9 @@ static void assertIndexHasBlock(APIBlock block, final String... indices) {
452527
.count(),
453528
equalTo(1L)
454529
);
530+
if (block.getBlock().contains(ClusterBlockLevel.WRITE)) {
531+
assertThat(MetadataIndexStateService.VERIFIED_READ_ONLY_SETTING.get(indexSettings), is(true));
532+
}
455533
}
456534
}
457535

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,7 @@ static TransportVersion def(int id) {
165165
public static final TransportVersion BYTE_SIZE_VALUE_ALWAYS_USES_BYTES_1 = def(8_825_00_0);
166166
public static final TransportVersion REVERT_BYTE_SIZE_VALUE_ALWAYS_USES_BYTES_1 = def(8_826_00_0);
167167
public static final TransportVersion ESQL_SKIP_ES_INDEX_SERIALIZATION = def(8_827_00_0);
168+
public static final TransportVersion ADD_INDEX_BLOCK_TWO_PHASE = def(8_828_00_0);
168169

169170
/*
170171
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/action/admin/indices/readonly/AddIndexBlockClusterStateUpdateRequest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ public record AddIndexBlockClusterStateUpdateRequest(
2121
TimeValue masterNodeTimeout,
2222
TimeValue ackTimeout,
2323
APIBlock block,
24+
boolean markVerified,
2425
long taskId,
2526
Index[] indices
2627
) {

server/src/main/java/org/elasticsearch/action/admin/indices/readonly/AddIndexBlockRequest.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
package org.elasticsearch.action.admin.indices.readonly;
1111

12+
import org.elasticsearch.TransportVersions;
1213
import org.elasticsearch.action.ActionRequestValidationException;
1314
import org.elasticsearch.action.IndicesRequest;
1415
import org.elasticsearch.action.support.IndicesOptions;
@@ -32,12 +33,18 @@ public class AddIndexBlockRequest extends AcknowledgedRequest<AddIndexBlockReque
3233
private final APIBlock block;
3334
private String[] indices;
3435
private IndicesOptions indicesOptions = IndicesOptions.strictExpandOpen();
36+
private boolean markVerified = true;
3537

3638
public AddIndexBlockRequest(StreamInput in) throws IOException {
3739
super(in);
3840
indices = in.readStringArray();
3941
indicesOptions = IndicesOptions.readIndicesOptions(in);
4042
block = APIBlock.readFrom(in);
43+
if (in.getTransportVersion().onOrAfter(TransportVersions.ADD_INDEX_BLOCK_TWO_PHASE)) {
44+
markVerified = in.readBoolean();
45+
} else {
46+
markVerified = false;
47+
}
4148
}
4249

4350
/**
@@ -103,6 +110,15 @@ public AddIndexBlockRequest indicesOptions(IndicesOptions indicesOptions) {
103110
return this;
104111
}
105112

113+
public boolean markVerified() {
114+
return markVerified;
115+
}
116+
117+
public AddIndexBlockRequest markVerified(boolean markVerified) {
118+
this.markVerified = markVerified;
119+
return this;
120+
}
121+
106122
/**
107123
* Returns the block to be added
108124
*/
@@ -116,6 +132,9 @@ public void writeTo(StreamOutput out) throws IOException {
116132
out.writeStringArray(indices);
117133
indicesOptions.writeIndicesOptions(out);
118134
block.writeTo(out);
135+
if (out.getTransportVersion().onOrAfter(TransportVersions.ADD_INDEX_BLOCK_TWO_PHASE)) {
136+
out.writeBoolean(markVerified);
137+
}
119138
}
120139

121140
@Override
@@ -136,4 +155,5 @@ public int hashCode() {
136155
result = 31 * result + Arrays.hashCode(indices);
137156
return result;
138157
}
158+
139159
}

server/src/main/java/org/elasticsearch/action/admin/indices/readonly/TransportAddIndexBlockAction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ protected void masterOperation(
107107
request.masterNodeTimeout(),
108108
request.ackTimeout(),
109109
request.getBlock(),
110+
request.markVerified(),
110111
task.getId(),
111112
concreteIndices
112113
),

server/src/main/java/org/elasticsearch/action/admin/indices/readonly/TransportVerifyShardIndexBlockAction.java

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,18 @@
88
*/
99
package org.elasticsearch.action.admin.indices.readonly;
1010

11+
import org.elasticsearch.TransportVersions;
1112
import org.elasticsearch.action.ActionListener;
1213
import org.elasticsearch.action.ActionType;
14+
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
1315
import org.elasticsearch.action.support.ActionFilters;
1416
import org.elasticsearch.action.support.replication.ReplicationOperation;
1517
import org.elasticsearch.action.support.replication.ReplicationRequest;
1618
import org.elasticsearch.action.support.replication.ReplicationResponse;
1719
import org.elasticsearch.action.support.replication.TransportReplicationAction;
1820
import org.elasticsearch.cluster.action.shard.ShardStateAction;
1921
import org.elasticsearch.cluster.block.ClusterBlock;
22+
import org.elasticsearch.cluster.block.ClusterBlockLevel;
2023
import org.elasticsearch.cluster.block.ClusterBlocks;
2124
import org.elasticsearch.cluster.service.ClusterService;
2225
import org.elasticsearch.common.io.stream.StreamInput;
@@ -121,7 +124,7 @@ protected void shardOperationOnReplica(ShardRequest shardRequest, IndexShard rep
121124
});
122125
}
123126

124-
private void executeShardOperation(final ShardRequest request, final IndexShard indexShard) {
127+
private void executeShardOperation(final ShardRequest request, final IndexShard indexShard) throws IOException {
125128
final ShardId shardId = indexShard.shardId();
126129
if (indexShard.getActiveOperationsCount() != IndexShard.OPERATIONS_BLOCKED) {
127130
throw new IllegalStateException(
@@ -133,6 +136,15 @@ private void executeShardOperation(final ShardRequest request, final IndexShard
133136
if (clusterBlocks.hasIndexBlock(shardId.getIndexName(), request.clusterBlock()) == false) {
134137
throw new IllegalStateException("index shard " + shardId + " has not applied block " + request.clusterBlock());
135138
}
139+
140+
// same pattern as in TransportVerifyShardBeforeCloseAction, but could also flush in phase1.
141+
if (request.phase1()) {
142+
indexShard.sync();
143+
} else {
144+
if (request.clusterBlock().contains(ClusterBlockLevel.WRITE)) {
145+
indexShard.flush(new FlushRequest().force(true).waitIfOngoing(true));
146+
}
147+
}
136148
}
137149

138150
@Override
@@ -160,31 +172,45 @@ public void markShardCopyAsStaleIfNeeded(
160172
public static final class ShardRequest extends ReplicationRequest<ShardRequest> {
161173

162174
private final ClusterBlock clusterBlock;
175+
private final boolean phase1;
163176

164177
ShardRequest(StreamInput in) throws IOException {
165178
super(in);
166179
clusterBlock = new ClusterBlock(in);
180+
if (in.getTransportVersion().onOrAfter(TransportVersions.ADD_INDEX_BLOCK_TWO_PHASE)) {
181+
phase1 = in.readBoolean();
182+
} else {
183+
phase1 = true; // does not matter, not verified anyway
184+
}
167185
}
168186

169-
public ShardRequest(final ShardId shardId, final ClusterBlock clusterBlock, final TaskId parentTaskId) {
187+
public ShardRequest(final ShardId shardId, final ClusterBlock clusterBlock, boolean phase1, final TaskId parentTaskId) {
170188
super(shardId);
171189
this.clusterBlock = Objects.requireNonNull(clusterBlock);
190+
this.phase1 = phase1;
172191
setParentTask(parentTaskId);
173192
}
174193

175194
@Override
176195
public String toString() {
177-
return "verify shard " + shardId + " before block with " + clusterBlock;
196+
return "verify shard " + shardId + " before block with " + clusterBlock + " phase1=" + phase1;
178197
}
179198

180199
@Override
181200
public void writeTo(final StreamOutput out) throws IOException {
182201
super.writeTo(out);
183202
clusterBlock.writeTo(out);
203+
if (out.getTransportVersion().onOrAfter(TransportVersions.ADD_INDEX_BLOCK_TWO_PHASE)) {
204+
out.writeBoolean(phase1);
205+
}
184206
}
185207

186208
public ClusterBlock clusterBlock() {
187209
return clusterBlock;
188210
}
211+
212+
public boolean phase1() {
213+
return phase1;
214+
}
189215
}
190216
}

0 commit comments

Comments
 (0)