Skip to content

Commit 04dd9c2

Browse files
authored
Make fast refresh ineffective for search routing (#117455)
Re-introduction of ES PR #114619. Now, fast refresh indices route searches/gets to search shards in stateless. Thus, this PR removes unnecessary code and simplifies some things. Relates ES-9563
1 parent c11e3c2 commit 04dd9c2

File tree

7 files changed

+15
-82
lines changed

7 files changed

+15
-82
lines changed

server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportUnpromotableShardRefreshAction.java

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,6 @@
2424

2525
import java.util.List;
2626

27-
import static org.elasticsearch.TransportVersions.FAST_REFRESH_RCO_2;
28-
import static org.elasticsearch.index.IndexSettings.INDEX_FAST_REFRESH_SETTING;
29-
3027
public class TransportUnpromotableShardRefreshAction extends TransportBroadcastUnpromotableAction<
3128
UnpromotableShardRefreshRequest,
3229
ActionResponse.Empty> {
@@ -76,18 +73,6 @@ protected void unpromotableShardOperation(
7673
return;
7774
}
7875

79-
// During an upgrade to FAST_REFRESH_RCO_2, we expect search shards to be first upgraded before the primary is upgraded. Thus,
80-
// when the primary is upgraded, and starts to deliver unpromotable refreshes, we expect the search shards to be upgraded already.
81-
// Note that the fast refresh setting is final.
82-
// TODO: remove assertion (ES-9563)
83-
assert INDEX_FAST_REFRESH_SETTING.get(shard.indexSettings().getSettings()) == false
84-
|| transportService.getLocalNodeConnection().getTransportVersion().onOrAfter(FAST_REFRESH_RCO_2)
85-
: "attempted to refresh a fast refresh search shard "
86-
+ shard
87-
+ " on transport version "
88-
+ transportService.getLocalNodeConnection().getTransportVersion()
89-
+ " (before FAST_REFRESH_RCO_2)";
90-
9176
ActionListener.run(responseListener, listener -> {
9277
shard.waitForPrimaryTermAndGeneration(
9378
request.getPrimaryTerm(),

server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@
2828
import org.elasticsearch.cluster.ClusterStateObserver;
2929
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
3030
import org.elasticsearch.cluster.node.DiscoveryNode;
31-
import org.elasticsearch.cluster.routing.OperationRouting;
3231
import org.elasticsearch.cluster.routing.PlainShardIterator;
3332
import org.elasticsearch.cluster.routing.ShardIterator;
33+
import org.elasticsearch.cluster.routing.ShardRouting;
3434
import org.elasticsearch.cluster.service.ClusterService;
3535
import org.elasticsearch.common.io.stream.Writeable;
3636
import org.elasticsearch.core.TimeValue;
@@ -109,10 +109,7 @@ protected ShardIterator shards(ClusterState state, InternalRequest request) {
109109
if (iterator == null) {
110110
return null;
111111
}
112-
return new PlainShardIterator(
113-
iterator.shardId(),
114-
iterator.getShardRoutings().stream().filter(shardRouting -> OperationRouting.canSearchShard(shardRouting, state)).toList()
115-
);
112+
return new PlainShardIterator(iterator.shardId(), iterator.getShardRoutings().stream().filter(ShardRouting::isSearchable).toList());
116113
}
117114

118115
@Override
@@ -129,9 +126,8 @@ protected void asyncShardOperation(GetRequest request, ShardId shardId, ActionLi
129126
handleGetOnUnpromotableShard(request, indexShard, listener);
130127
return;
131128
}
132-
// TODO: adapt assertion to assert only that it is not stateless (ES-9563)
133-
assert DiscoveryNode.isStateless(clusterService.getSettings()) == false || indexShard.indexSettings().isFastRefresh()
134-
: "in Stateless a promotable to primary shard can receive a TransportGetAction only if an index has the fast refresh setting";
129+
assert DiscoveryNode.isStateless(clusterService.getSettings()) == false
130+
: "in Stateless a promotable to primary shard should not receive a TransportGetAction";
135131
if (request.realtime()) { // we are not tied to a refresh cycle here anyway
136132
asyncGet(request, shardId, listener);
137133
} else {

server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@
2828
import org.elasticsearch.cluster.ClusterStateObserver;
2929
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
3030
import org.elasticsearch.cluster.node.DiscoveryNode;
31-
import org.elasticsearch.cluster.routing.OperationRouting;
3231
import org.elasticsearch.cluster.routing.PlainShardIterator;
3332
import org.elasticsearch.cluster.routing.ShardIterator;
33+
import org.elasticsearch.cluster.routing.ShardRouting;
3434
import org.elasticsearch.cluster.service.ClusterService;
3535
import org.elasticsearch.common.io.stream.Writeable;
3636
import org.elasticsearch.core.TimeValue;
@@ -113,10 +113,7 @@ protected ShardIterator shards(ClusterState state, InternalRequest request) {
113113
if (iterator == null) {
114114
return null;
115115
}
116-
return new PlainShardIterator(
117-
iterator.shardId(),
118-
iterator.getShardRoutings().stream().filter(shardRouting -> OperationRouting.canSearchShard(shardRouting, state)).toList()
119-
);
116+
return new PlainShardIterator(iterator.shardId(), iterator.getShardRoutings().stream().filter(ShardRouting::isSearchable).toList());
120117
}
121118

122119
@Override
@@ -128,10 +125,8 @@ protected void asyncShardOperation(MultiGetShardRequest request, ShardId shardId
128125
handleMultiGetOnUnpromotableShard(request, indexShard, listener);
129126
return;
130127
}
131-
// TODO: adapt assertion to assert only that it is not stateless (ES-9563)
132-
assert DiscoveryNode.isStateless(clusterService.getSettings()) == false || indexShard.indexSettings().isFastRefresh()
133-
: "in Stateless a promotable to primary shard can receive a TransportShardMultiGetAction only if an index has "
134-
+ "the fast refresh setting";
128+
assert DiscoveryNode.isStateless(clusterService.getSettings()) == false
129+
: "in Stateless a promotable to primary shard should not receive a TransportShardMultiGetAction";
135130
if (request.realtime()) { // we are not tied to a refresh cycle here anyway
136131
asyncShardMultiGet(request, shardId, listener);
137132
} else {

server/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ public boolean readyForSearch(ClusterState clusterState) {
241241
boolean found = false;
242242
for (int idx = 0; idx < shardRoutingTable.size(); idx++) {
243243
ShardRouting shardRouting = shardRoutingTable.shard(idx);
244-
if (shardRouting.active() && OperationRouting.canSearchShard(shardRouting, clusterState)) {
244+
if (shardRouting.active() && shardRouting.isSearchable()) {
245245
found = true;
246246
break;
247247
}

server/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,6 @@
3232
import java.util.Set;
3333
import java.util.stream.Collectors;
3434

35-
import static org.elasticsearch.TransportVersions.FAST_REFRESH_RCO_2;
36-
import static org.elasticsearch.index.IndexSettings.INDEX_FAST_REFRESH_SETTING;
37-
3835
public class OperationRouting {
3936

4037
public static final Setting<Boolean> USE_ADAPTIVE_REPLICA_SELECTION_SETTING = Setting.boolSetting(
@@ -151,7 +148,7 @@ private static List<ShardRouting> statefulShardsThatHandleSearches(ShardIterator
151148
}
152149

153150
private static List<ShardRouting> statelessShardsThatHandleSearches(ClusterState clusterState, ShardIterator iterator) {
154-
return iterator.getShardRoutings().stream().filter(shardRouting -> canSearchShard(shardRouting, clusterState)).toList();
151+
return iterator.getShardRoutings().stream().filter(ShardRouting::isSearchable).toList();
155152
}
156153

157154
public static ShardIterator getShards(ClusterState clusterState, ShardId shardId) {
@@ -304,18 +301,4 @@ public ShardId shardId(ClusterState clusterState, String index, String id, @Null
304301
IndexMetadata indexMetadata = indexMetadata(clusterState, index);
305302
return new ShardId(indexMetadata.getIndex(), IndexRouting.fromIndexMetadata(indexMetadata).getShard(id, routing));
306303
}
307-
308-
public static boolean canSearchShard(ShardRouting shardRouting, ClusterState clusterState) {
309-
// TODO: remove if and always return isSearchable (ES-9563)
310-
if (INDEX_FAST_REFRESH_SETTING.get(clusterState.metadata().index(shardRouting.index()).getSettings())) {
311-
// Until all the cluster is upgraded, we send searches/gets to the primary (even if it has been upgraded) to execute locally.
312-
if (clusterState.getMinTransportVersion().onOrAfter(FAST_REFRESH_RCO_2)) {
313-
return shardRouting.isSearchable();
314-
} else {
315-
return shardRouting.isPromotableToPrimary();
316-
}
317-
} else {
318-
return shardRouting.isSearchable();
319-
}
320-
}
321304
}

server/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -935,8 +935,7 @@ public boolean isPromotableToPrimary() {
935935
}
936936

937937
/**
938-
* Determine if role searchable. Consumers should prefer {@link OperationRouting#canSearchShard(ShardRouting, ClusterState)} to
939-
* determine if a shard can be searched and {@link IndexRoutingTable#readyForSearch(ClusterState)} to determine if an index
938+
* Determine if role searchable. Consumers should prefer {@link IndexRoutingTable#readyForSearch(ClusterState)} to determine if an index
940939
* is ready to be searched.
941940
*/
942941
public boolean isSearchable() {

server/src/test/java/org/elasticsearch/cluster/routing/IndexRoutingTableTests.java

Lines changed: 4 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
package org.elasticsearch.cluster.routing;
1111

12-
import org.elasticsearch.TransportVersion;
1312
import org.elasticsearch.cluster.ClusterState;
1413
import org.elasticsearch.common.UUIDs;
1514
import org.elasticsearch.common.settings.Settings;
@@ -20,7 +19,6 @@
2019

2120
import java.util.List;
2221

23-
import static org.elasticsearch.TransportVersions.FAST_REFRESH_RCO_2;
2422
import static org.elasticsearch.index.IndexSettings.INDEX_FAST_REFRESH_SETTING;
2523
import static org.mockito.ArgumentMatchers.any;
2624
import static org.mockito.Mockito.mock;
@@ -29,21 +27,10 @@
2927
public class IndexRoutingTableTests extends ESTestCase {
3028

3129
public void testReadyForSearch() {
32-
innerReadyForSearch(false, false);
33-
innerReadyForSearch(false, true);
34-
innerReadyForSearch(true, false);
35-
innerReadyForSearch(true, true);
36-
}
37-
38-
// TODO: remove if (fastRefresh && beforeFastRefreshRCO) branches (ES-9563)
39-
private void innerReadyForSearch(boolean fastRefresh, boolean beforeFastRefreshRCO) {
4030
Index index = new Index(randomIdentifier(), UUIDs.randomBase64UUID());
4131
ClusterState clusterState = mock(ClusterState.class, Mockito.RETURNS_DEEP_STUBS);
4232
when(clusterState.metadata().index(any(Index.class)).getSettings()).thenReturn(
43-
Settings.builder().put(INDEX_FAST_REFRESH_SETTING.getKey(), fastRefresh).build()
44-
);
45-
when(clusterState.getMinTransportVersion()).thenReturn(
46-
beforeFastRefreshRCO ? TransportVersion.fromId(FAST_REFRESH_RCO_2.id() - 1_00_0) : TransportVersion.current()
33+
Settings.builder().put(INDEX_FAST_REFRESH_SETTING.getKey(), randomBoolean()).build()
4734
);
4835
// 2 primaries that are search and index
4936
ShardId p1 = new ShardId(index, 0);
@@ -63,11 +50,7 @@ private void innerReadyForSearch(boolean fastRefresh, boolean beforeFastRefreshR
6350
shardTable1 = new IndexShardRoutingTable(p1, List.of(getShard(p1, true, ShardRoutingState.STARTED, ShardRouting.Role.INDEX_ONLY)));
6451
shardTable2 = new IndexShardRoutingTable(p2, List.of(getShard(p2, true, ShardRoutingState.STARTED, ShardRouting.Role.INDEX_ONLY)));
6552
indexRoutingTable = new IndexRoutingTable(index, new IndexShardRoutingTable[] { shardTable1, shardTable2 });
66-
if (fastRefresh && beforeFastRefreshRCO) {
67-
assertTrue(indexRoutingTable.readyForSearch(clusterState));
68-
} else {
69-
assertFalse(indexRoutingTable.readyForSearch(clusterState));
70-
}
53+
assertFalse(indexRoutingTable.readyForSearch(clusterState));
7154

7255
// 2 unassigned primaries that are index only
7356
shardTable1 = new IndexShardRoutingTable(
@@ -99,11 +82,7 @@ private void innerReadyForSearch(boolean fastRefresh, boolean beforeFastRefreshR
9982
)
10083
);
10184
indexRoutingTable = new IndexRoutingTable(index, new IndexShardRoutingTable[] { shardTable1, shardTable2 });
102-
if (fastRefresh && beforeFastRefreshRCO) {
103-
assertTrue(indexRoutingTable.readyForSearch(clusterState));
104-
} else {
105-
assertFalse(indexRoutingTable.readyForSearch(clusterState));
106-
}
85+
assertFalse(indexRoutingTable.readyForSearch(clusterState));
10786

10887
// 2 primaries that are index only with some replicas that are all available
10988
shardTable1 = new IndexShardRoutingTable(
@@ -143,11 +122,7 @@ private void innerReadyForSearch(boolean fastRefresh, boolean beforeFastRefreshR
143122
)
144123
);
145124
indexRoutingTable = new IndexRoutingTable(index, new IndexShardRoutingTable[] { shardTable1, shardTable2 });
146-
if (fastRefresh && beforeFastRefreshRCO) {
147-
assertFalse(indexRoutingTable.readyForSearch(clusterState));
148-
} else {
149-
assertTrue(indexRoutingTable.readyForSearch(clusterState));
150-
}
125+
assertTrue(indexRoutingTable.readyForSearch(clusterState));
151126

152127
// 2 primaries that are index only with at least 1 replica per primary that is available
153128
shardTable1 = new IndexShardRoutingTable(

0 commit comments

Comments
 (0)