Skip to content

Commit 2ebbad4

Browse files
authored
Defer unpromotable shard refreshes until index refresh blocks are cleared (#120642)
This update postpones unpromotable refreshes for indices with an active INDEX_REFRESH_BLOCK until the block is cleared. This ensures refresh operations proceed only when the index is no longer blocked. To avoid indefinite delays, the maximum wait time is governed by the bulk request timeout whereas for explicit refreshes it relies on the fact that the block will be removed eventually. Closes ES-10134
1 parent 776ebda commit 2ebbad4

File tree

10 files changed

+393
-21
lines changed

10 files changed

+393
-21
lines changed

docs/changelog/120642.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 120642
2+
summary: Defer unpromotable shard refreshes until index refresh blocks are cleared
3+
area: Engine
4+
type: enhancement
5+
issues: []

server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,14 @@ public TransportShardRefreshAction(
7474
ReplicaActionExecution.SubjectToCircuitBreaker
7575
);
7676
// registers the unpromotable version of shard refresh action
77-
new TransportUnpromotableShardRefreshAction(clusterService, transportService, shardStateAction, actionFilters, indicesService);
77+
new TransportUnpromotableShardRefreshAction(
78+
clusterService,
79+
transportService,
80+
shardStateAction,
81+
actionFilters,
82+
indicesService,
83+
threadPool
84+
);
7885
this.refreshExecutor = transportService.getThreadPool().executor(ThreadPool.Names.REFRESH);
7986
}
8087

server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportUnpromotableShardRefreshAction.java

Lines changed: 82 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,20 +9,31 @@
99

1010
package org.elasticsearch.action.admin.indices.refresh;
1111

12+
import org.elasticsearch.ElasticsearchTimeoutException;
1213
import org.elasticsearch.action.ActionListener;
1314
import org.elasticsearch.action.ActionResponse;
1415
import org.elasticsearch.action.support.ActionFilters;
1516
import org.elasticsearch.action.support.broadcast.unpromotable.TransportBroadcastUnpromotableAction;
17+
import org.elasticsearch.cluster.ClusterState;
18+
import org.elasticsearch.cluster.ClusterStateObserver;
1619
import org.elasticsearch.cluster.action.shard.ShardStateAction;
20+
import org.elasticsearch.cluster.block.ClusterBlockException;
21+
import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
1722
import org.elasticsearch.cluster.service.ClusterService;
1823
import org.elasticsearch.common.io.stream.StreamInput;
24+
import org.elasticsearch.core.TimeValue;
1925
import org.elasticsearch.indices.IndicesService;
2026
import org.elasticsearch.injection.guice.Inject;
27+
import org.elasticsearch.node.NodeClosedException;
2128
import org.elasticsearch.tasks.Task;
2229
import org.elasticsearch.threadpool.ThreadPool;
2330
import org.elasticsearch.transport.TransportService;
2431

2532
import java.util.List;
33+
import java.util.Map;
34+
import java.util.Set;
35+
36+
import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_REFRESH_BLOCK;
2637

2738
public class TransportUnpromotableShardRefreshAction extends TransportBroadcastUnpromotableAction<
2839
UnpromotableShardRefreshRequest,
@@ -36,14 +47,37 @@ public class TransportUnpromotableShardRefreshAction extends TransportBroadcastU
3647
}
3748

3849
private final IndicesService indicesService;
50+
private final ThreadPool threadPool;
51+
private final boolean useRefreshBlock;
3952

4053
@Inject
4154
public TransportUnpromotableShardRefreshAction(
4255
ClusterService clusterService,
4356
TransportService transportService,
4457
ShardStateAction shardStateAction,
4558
ActionFilters actionFilters,
46-
IndicesService indicesService
59+
IndicesService indicesService,
60+
ThreadPool threadPool
61+
) {
62+
this(
63+
clusterService,
64+
transportService,
65+
shardStateAction,
66+
actionFilters,
67+
indicesService,
68+
threadPool,
69+
MetadataCreateIndexService.useRefreshBlock(clusterService.getSettings())
70+
);
71+
}
72+
73+
public TransportUnpromotableShardRefreshAction(
74+
ClusterService clusterService,
75+
TransportService transportService,
76+
ShardStateAction shardStateAction,
77+
ActionFilters actionFilters,
78+
IndicesService indicesService,
79+
ThreadPool threadPool,
80+
boolean useRefreshBlock
4781
) {
4882
super(
4983
NAME,
@@ -55,6 +89,53 @@ public TransportUnpromotableShardRefreshAction(
5589
transportService.getThreadPool().executor(ThreadPool.Names.REFRESH)
5690
);
5791
this.indicesService = indicesService;
92+
this.threadPool = threadPool;
93+
this.useRefreshBlock = useRefreshBlock;
94+
}
95+
96+
@Override
97+
protected void doExecute(Task task, UnpromotableShardRefreshRequest request, ActionListener<ActionResponse.Empty> listener) {
98+
beforeDispatchingRequestToUnpromotableShards(request, listener.delegateFailure((l, unused) -> super.doExecute(task, request, l)));
99+
}
100+
101+
private void beforeDispatchingRequestToUnpromotableShards(UnpromotableShardRefreshRequest request, ActionListener<Void> listener) {
102+
if (useRefreshBlock == false) {
103+
listener.onResponse(null);
104+
return;
105+
}
106+
107+
var clusterStateObserver = new ClusterStateObserver(clusterService, request.getTimeout(), logger, threadPool.getThreadContext());
108+
109+
if (isIndexBlockedForRefresh(request.shardId().getIndexName(), clusterStateObserver.setAndGetObservedState()) == false) {
110+
listener.onResponse(null);
111+
return;
112+
}
113+
114+
clusterStateObserver.waitForNextChange(new ClusterStateObserver.Listener() {
115+
@Override
116+
public void onNewClusterState(ClusterState state) {
117+
listener.onResponse(null);
118+
}
119+
120+
@Override
121+
public void onClusterServiceClose() {
122+
listener.onFailure(new NodeClosedException(clusterService.localNode()));
123+
}
124+
125+
@Override
126+
public void onTimeout(TimeValue timeout) {
127+
listener.onFailure(
128+
new ElasticsearchTimeoutException(
129+
"shard refresh timed out waiting for index block to be removed",
130+
new ClusterBlockException(Map.of(request.shardId().getIndexName(), Set.of(INDEX_REFRESH_BLOCK)))
131+
)
132+
);
133+
}
134+
}, clusterState -> isIndexBlockedForRefresh(request.shardId().getIndexName(), clusterState) == false);
135+
}
136+
137+
private static boolean isIndexBlockedForRefresh(String index, ClusterState state) {
138+
return state.blocks().hasIndexBlock(index, INDEX_REFRESH_BLOCK);
58139
}
59140

60141
@Override

server/src/main/java/org/elasticsearch/action/admin/indices/refresh/UnpromotableShardRefreshRequest.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
import org.elasticsearch.common.Strings;
1717
import org.elasticsearch.common.io.stream.StreamInput;
1818
import org.elasticsearch.common.io.stream.StreamOutput;
19+
import org.elasticsearch.core.Nullable;
20+
import org.elasticsearch.core.TimeValue;
1921
import org.elasticsearch.index.engine.Engine;
2022

2123
import java.io.IOException;
@@ -26,22 +28,36 @@ public class UnpromotableShardRefreshRequest extends BroadcastUnpromotableReques
2628

2729
private final long primaryTerm;
2830
private final long segmentGeneration;
31+
private final TimeValue timeout;
2932

3033
public UnpromotableShardRefreshRequest(
3134
IndexShardRoutingTable indexShardRoutingTable,
3235
long primaryTerm,
3336
long segmentGeneration,
3437
boolean failShardOnError
38+
) {
39+
this(indexShardRoutingTable, primaryTerm, segmentGeneration, failShardOnError, null);
40+
}
41+
42+
public UnpromotableShardRefreshRequest(
43+
IndexShardRoutingTable indexShardRoutingTable,
44+
long primaryTerm,
45+
long segmentGeneration,
46+
boolean failShardOnError,
47+
@Nullable TimeValue timeout
3548
) {
3649
super(indexShardRoutingTable, failShardOnError);
3750
this.primaryTerm = primaryTerm;
3851
this.segmentGeneration = segmentGeneration;
52+
this.timeout = timeout;
3953
}
4054

4155
public UnpromotableShardRefreshRequest(StreamInput in) throws IOException {
4256
super(in);
4357
segmentGeneration = in.readVLong();
4458
primaryTerm = in.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0) ? in.readVLong() : Engine.UNKNOWN_PRIMARY_TERM;
59+
// The timeout is only used by the request sender, therefore we don't write it over the wire
60+
timeout = null;
4561
}
4662

4763
@Override
@@ -70,6 +86,11 @@ public long getPrimaryTerm() {
7086
return primaryTerm;
7187
}
7288

89+
@Nullable
90+
public TimeValue getTimeout() {
91+
return timeout;
92+
}
93+
7394
@Override
7495
public String toString() {
7596
return Strings.format(

server/src/main/java/org/elasticsearch/action/support/replication/PostWriteRefresh.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public void onFailure(Exception e) {
6565
}
6666
});
6767
case IMMEDIATE -> immediate(indexShard, listener.delegateFailureAndWrap((l, r) -> {
68-
if (indexShard.getReplicationGroup().getRoutingTable().unpromotableShards().size() > 0) {
68+
if (indexShard.getReplicationGroup().getRoutingTable().allUnpromotableShards().size() > 0) {
6969
sendUnpromotableRequests(indexShard, r.generation(), true, l, postWriteRefreshTimeout);
7070
} else {
7171
l.onResponse(true);
@@ -136,7 +136,8 @@ private void sendUnpromotableRequests(
136136
indexShard.getReplicationGroup().getRoutingTable(),
137137
indexShard.getOperationPrimaryTerm(),
138138
generation,
139-
true
139+
true,
140+
postWriteRefreshTimeout
140141
);
141142
transportService.sendRequest(
142143
transportService.getLocalNode(),

server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1761,7 +1761,7 @@ public static void validateStoreTypeSetting(Settings indexSettings) {
17611761
}
17621762
}
17631763

1764-
private static boolean useRefreshBlock(Settings settings) {
1764+
public static boolean useRefreshBlock(Settings settings) {
17651765
return DiscoveryNode.isStateless(settings) && settings.getAsBoolean(USE_INDEX_REFRESH_BLOCK_SETTING_NAME, false);
17661766
}
17671767

server/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ public class IndexShardRoutingTable {
5252
final List<ShardRouting> replicas;
5353
final List<ShardRouting> activeShards;
5454
final List<ShardRouting> assignedShards;
55+
private final List<ShardRouting> assignedUnpromotableShards;
5556
private final List<ShardRouting> unpromotableShards;
5657
/**
5758
* The initializing list, including ones that are initializing on a target node because of relocation.
@@ -71,8 +72,9 @@ public class IndexShardRoutingTable {
7172
List<ShardRouting> replicas = new ArrayList<>();
7273
List<ShardRouting> activeShards = new ArrayList<>();
7374
List<ShardRouting> assignedShards = new ArrayList<>();
74-
List<ShardRouting> unpromotableShards = new ArrayList<>();
75+
List<ShardRouting> assignedUnpromotableShards = new ArrayList<>();
7576
List<ShardRouting> allInitializingShards = new ArrayList<>();
77+
List<ShardRouting> unpromotableShards = new ArrayList<>();
7678
boolean allShardsStarted = true;
7779
int activeSearchShardCount = 0;
7880
int totalSearchShardCount = 0;
@@ -95,20 +97,24 @@ public class IndexShardRoutingTable {
9597
if (shard.initializing()) {
9698
allInitializingShards.add(shard);
9799
}
100+
if (shard.isPromotableToPrimary() == false) {
101+
unpromotableShards.add(shard);
102+
}
98103
if (shard.relocating()) {
99104
// create the target initializing shard routing on the node the shard is relocating to
100105
allInitializingShards.add(shard.getTargetRelocatingShard());
101106
assert shard.assignedToNode() : "relocating from unassigned " + shard;
102107
assert shard.getTargetRelocatingShard().assignedToNode() : "relocating to unassigned " + shard.getTargetRelocatingShard();
103108
assignedShards.add(shard.getTargetRelocatingShard());
104109
if (shard.getTargetRelocatingShard().isPromotableToPrimary() == false) {
110+
assignedUnpromotableShards.add(shard.getTargetRelocatingShard());
105111
unpromotableShards.add(shard.getTargetRelocatingShard());
106112
}
107113
}
108114
if (shard.assignedToNode()) {
109115
assignedShards.add(shard);
110116
if (shard.isPromotableToPrimary() == false) {
111-
unpromotableShards.add(shard);
117+
assignedUnpromotableShards.add(shard);
112118
}
113119
}
114120
if (shard.state() != ShardRoutingState.STARTED) {
@@ -117,10 +123,13 @@ public class IndexShardRoutingTable {
117123
}
118124
assert shards.isEmpty() == false : "cannot have an empty shard routing table";
119125
assert primary != null : shards;
126+
assert unpromotableShards.containsAll(assignedUnpromotableShards)
127+
: unpromotableShards + " does not contain all assigned unpromotable shards " + assignedUnpromotableShards;
120128
this.primary = primary;
121129
this.replicas = CollectionUtils.wrapUnmodifiableOrEmptySingleton(replicas);
122130
this.activeShards = CollectionUtils.wrapUnmodifiableOrEmptySingleton(activeShards);
123131
this.assignedShards = CollectionUtils.wrapUnmodifiableOrEmptySingleton(assignedShards);
132+
this.assignedUnpromotableShards = CollectionUtils.wrapUnmodifiableOrEmptySingleton(assignedUnpromotableShards);
124133
this.unpromotableShards = CollectionUtils.wrapUnmodifiableOrEmptySingleton(unpromotableShards);
125134
this.allInitializingShards = CollectionUtils.wrapUnmodifiableOrEmptySingleton(allInitializingShards);
126135
this.allShardsStarted = allShardsStarted;
@@ -185,6 +194,15 @@ public List<ShardRouting> assignedShards() {
185194
* @return a {@link List} of shards
186195
*/
187196
public List<ShardRouting> unpromotableShards() {
197+
return this.assignedUnpromotableShards;
198+
}
199+
200+
/**
201+
* Returns a {@link List} of all unpromotable shards, including unassigned shards
202+
*
203+
* @return a {@link List} of shards
204+
*/
205+
public List<ShardRouting> allUnpromotableShards() {
188206
return this.unpromotableShards;
189207
}
190208

0 commit comments

Comments
 (0)