Skip to content

Commit 4990276

Browse files
authored
Fast refresh indices should use search shards (#113478)
Fast refresh indices should now behave like non fast refresh indices in how they execute (m)gets and searches. I.e., they should use the search shards. For BWC, we define a new transport version. We expect search shards to be upgraded first, before promotable shards. Until the cluster is fully upgraded, the promotable shards (whether upgraded or not) will still receive and execute gets/searches locally. Relates ES-9573 Relates ES-9579
1 parent 8f24f43 commit 4990276

File tree

10 files changed

+56
-44
lines changed

10 files changed

+56
-44
lines changed

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,7 @@ static TransportVersion def(int id) {
235235
public static final TransportVersion SEARCH_FAILURE_STATS = def(8_759_00_0);
236236
public static final TransportVersion INGEST_GEO_DATABASE_PROVIDERS = def(8_760_00_0);
237237
public static final TransportVersion DATE_TIME_DOC_VALUES_LOCALES = def(8_761_00_0);
238+
public static final TransportVersion FAST_REFRESH_RCO = def(8_762_00_0);
238239

239240
/*
240241
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java

Lines changed: 11 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.elasticsearch.cluster.service.ClusterService;
2424
import org.elasticsearch.common.io.stream.StreamInput;
2525
import org.elasticsearch.common.settings.Settings;
26-
import org.elasticsearch.index.IndexSettings;
2726
import org.elasticsearch.index.shard.IndexShard;
2827
import org.elasticsearch.indices.IndicesService;
2928
import org.elasticsearch.injection.guice.Inject;
@@ -120,27 +119,18 @@ public void onPrimaryOperationComplete(
120119
ActionListener<Void> listener
121120
) {
122121
assert replicaRequest.primaryRefreshResult.refreshed() : "primary has not refreshed";
123-
boolean fastRefresh = IndexSettings.INDEX_FAST_REFRESH_SETTING.get(
124-
clusterService.state().metadata().index(indexShardRoutingTable.shardId().getIndex()).getSettings()
122+
UnpromotableShardRefreshRequest unpromotableReplicaRequest = new UnpromotableShardRefreshRequest(
123+
indexShardRoutingTable,
124+
replicaRequest.primaryRefreshResult.primaryTerm(),
125+
replicaRequest.primaryRefreshResult.generation(),
126+
false
127+
);
128+
transportService.sendRequest(
129+
transportService.getLocalNode(),
130+
TransportUnpromotableShardRefreshAction.NAME,
131+
unpromotableReplicaRequest,
132+
new ActionListenerResponseHandler<>(listener.safeMap(r -> null), in -> ActionResponse.Empty.INSTANCE, refreshExecutor)
125133
);
126-
127-
// Indices marked with fast refresh do not rely on refreshing the unpromotables
128-
if (fastRefresh) {
129-
listener.onResponse(null);
130-
} else {
131-
UnpromotableShardRefreshRequest unpromotableReplicaRequest = new UnpromotableShardRefreshRequest(
132-
indexShardRoutingTable,
133-
replicaRequest.primaryRefreshResult.primaryTerm(),
134-
replicaRequest.primaryRefreshResult.generation(),
135-
false
136-
);
137-
transportService.sendRequest(
138-
transportService.getLocalNode(),
139-
TransportUnpromotableShardRefreshAction.NAME,
140-
unpromotableReplicaRequest,
141-
new ActionListenerResponseHandler<>(listener.safeMap(r -> null), in -> ActionResponse.Empty.INSTANCE, refreshExecutor)
142-
);
143-
}
144134
}
145135
}
146136
}

server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportUnpromotableShardRefreshAction.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424

2525
import java.util.List;
2626

27+
import static org.elasticsearch.TransportVersions.FAST_REFRESH_RCO;
28+
import static org.elasticsearch.index.IndexSettings.INDEX_FAST_REFRESH_SETTING;
29+
2730
public class TransportUnpromotableShardRefreshAction extends TransportBroadcastUnpromotableAction<
2831
UnpromotableShardRefreshRequest,
2932
ActionResponse.Empty> {
@@ -73,6 +76,18 @@ protected void unpromotableShardOperation(
7376
return;
7477
}
7578

79+
// During an upgrade to FAST_REFRESH_RCO, we expect search shards to be first upgraded before the primary is upgraded. Thus,
80+
// when the primary is upgraded, and starts to deliver unpromotable refreshes, we expect the search shards to be upgraded already.
81+
// Note that the fast refresh setting is final.
82+
// TODO: remove assertion (ES-9563)
83+
assert INDEX_FAST_REFRESH_SETTING.get(shard.indexSettings().getSettings()) == false
84+
|| transportService.getLocalNodeConnection().getTransportVersion().onOrAfter(FAST_REFRESH_RCO)
85+
: "attempted to refresh a fast refresh search shard "
86+
+ shard
87+
+ " on transport version "
88+
+ transportService.getLocalNodeConnection().getTransportVersion()
89+
+ " (before FAST_REFRESH_RCO)";
90+
7691
ActionListener.run(responseListener, listener -> {
7792
shard.waitForPrimaryTermAndGeneration(
7893
request.getPrimaryTerm(),

server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,11 +125,10 @@ protected void asyncShardOperation(GetRequest request, ShardId shardId, ActionLi
125125
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
126126
IndexShard indexShard = indexService.getShard(shardId.id());
127127
if (indexShard.routingEntry().isPromotableToPrimary() == false) {
128-
assert indexShard.indexSettings().isFastRefresh() == false
129-
: "a search shard should not receive a TransportGetAction for an index with fast refresh";
130128
handleGetOnUnpromotableShard(request, indexShard, listener);
131129
return;
132130
}
131+
// TODO: adapt assertion to assert only that it is not stateless (ES-9563)
133132
assert DiscoveryNode.isStateless(clusterService.getSettings()) == false || indexShard.indexSettings().isFastRefresh()
134133
: "in Stateless a promotable to primary shard can receive a TransportGetAction only if an index has the fast refresh setting";
135134
if (request.realtime()) { // we are not tied to a refresh cycle here anyway

server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,11 +124,10 @@ protected void asyncShardOperation(MultiGetShardRequest request, ShardId shardId
124124
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
125125
IndexShard indexShard = indexService.getShard(shardId.id());
126126
if (indexShard.routingEntry().isPromotableToPrimary() == false) {
127-
assert indexShard.indexSettings().isFastRefresh() == false
128-
: "a search shard should not receive a TransportShardMultiGetAction for an index with fast refresh";
129127
handleMultiGetOnUnpromotableShard(request, indexShard, listener);
130128
return;
131129
}
130+
// TODO: adapt assertion to assert only that it is not stateless (ES-9563)
132131
assert DiscoveryNode.isStateless(clusterService.getSettings()) == false || indexShard.indexSettings().isFastRefresh()
133132
: "in Stateless a promotable to primary shard can receive a TransportShardMultiGetAction only if an index has "
134133
+ "the fast refresh setting";

server/src/main/java/org/elasticsearch/action/support/replication/PostWriteRefresh.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import org.elasticsearch.common.util.concurrent.ThreadContext;
2020
import org.elasticsearch.core.Nullable;
2121
import org.elasticsearch.core.TimeValue;
22-
import org.elasticsearch.index.IndexSettings;
2322
import org.elasticsearch.index.engine.Engine;
2423
import org.elasticsearch.index.shard.IndexShard;
2524
import org.elasticsearch.index.translog.Translog;
@@ -53,9 +52,7 @@ public void refreshShard(
5352
case WAIT_UNTIL -> waitUntil(indexShard, location, new ActionListener<>() {
5453
@Override
5554
public void onResponse(Boolean forced) {
56-
// Fast refresh indices do not depend on the unpromotables being refreshed
57-
boolean fastRefresh = IndexSettings.INDEX_FAST_REFRESH_SETTING.get(indexShard.indexSettings().getSettings());
58-
if (location != null && (indexShard.routingEntry().isSearchable() == false && fastRefresh == false)) {
55+
if (location != null && indexShard.routingEntry().isSearchable() == false) {
5956
refreshUnpromotables(indexShard, location, listener, forced, postWriteRefreshTimeout);
6057
} else {
6158
listener.onResponse(forced);
@@ -68,9 +65,7 @@ public void onFailure(Exception e) {
6865
}
6966
});
7067
case IMMEDIATE -> immediate(indexShard, listener.delegateFailureAndWrap((l, r) -> {
71-
// Fast refresh indices do not depend on the unpromotables being refreshed
72-
boolean fastRefresh = IndexSettings.INDEX_FAST_REFRESH_SETTING.get(indexShard.indexSettings().getSettings());
73-
if (indexShard.getReplicationGroup().getRoutingTable().unpromotableShards().size() > 0 && fastRefresh == false) {
68+
if (indexShard.getReplicationGroup().getRoutingTable().unpromotableShards().size() > 0) {
7469
sendUnpromotableRequests(indexShard, r.generation(), true, l, postWriteRefreshTimeout);
7570
} else {
7671
l.onResponse(true);

server/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.Set;
3333
import java.util.stream.Collectors;
3434

35+
import static org.elasticsearch.TransportVersions.FAST_REFRESH_RCO;
3536
import static org.elasticsearch.index.IndexSettings.INDEX_FAST_REFRESH_SETTING;
3637

3738
public class OperationRouting {
@@ -305,8 +306,14 @@ public ShardId shardId(ClusterState clusterState, String index, String id, @Null
305306
}
306307

307308
public static boolean canSearchShard(ShardRouting shardRouting, ClusterState clusterState) {
309+
// TODO: remove if and always return isSearchable (ES-9563)
308310
if (INDEX_FAST_REFRESH_SETTING.get(clusterState.metadata().index(shardRouting.index()).getSettings())) {
309-
return shardRouting.isPromotableToPrimary();
311+
// Until all the cluster is upgraded, we send searches/gets to the primary (even if it has been upgraded) to execute locally.
312+
if (clusterState.getMinTransportVersion().onOrAfter(FAST_REFRESH_RCO)) {
313+
return shardRouting.isSearchable();
314+
} else {
315+
return shardRouting.isPromotableToPrimary();
316+
}
310317
} else {
311318
return shardRouting.isSearchable();
312319
}

server/src/main/java/org/elasticsearch/index/cache/bitset/BitsetFilterCache.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ static boolean shouldLoadRandomAccessFiltersEagerly(IndexSettings settings) {
105105
boolean loadFiltersEagerlySetting = settings.getValue(INDEX_LOAD_RANDOM_ACCESS_FILTERS_EAGERLY_SETTING);
106106
boolean isStateless = DiscoveryNode.isStateless(settings.getNodeSettings());
107107
if (isStateless) {
108-
return DiscoveryNode.hasRole(settings.getNodeSettings(), DiscoveryNodeRole.INDEX_ROLE)
108+
return DiscoveryNode.hasRole(settings.getNodeSettings(), DiscoveryNodeRole.SEARCH_ROLE)
109109
&& loadFiltersEagerlySetting
110110
&& INDEX_FAST_REFRESH_SETTING.get(settings.getSettings());
111111
} else {

server/src/test/java/org/elasticsearch/cluster/routing/IndexRoutingTableTests.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
package org.elasticsearch.cluster.routing;
1111

12+
import org.elasticsearch.TransportVersion;
1213
import org.elasticsearch.cluster.ClusterState;
1314
import org.elasticsearch.common.UUIDs;
1415
import org.elasticsearch.common.settings.Settings;
@@ -19,6 +20,7 @@
1920

2021
import java.util.List;
2122

23+
import static org.elasticsearch.TransportVersions.FAST_REFRESH_RCO;
2224
import static org.elasticsearch.index.IndexSettings.INDEX_FAST_REFRESH_SETTING;
2325
import static org.mockito.ArgumentMatchers.any;
2426
import static org.mockito.Mockito.mock;
@@ -27,16 +29,22 @@
2729
public class IndexRoutingTableTests extends ESTestCase {
2830

2931
public void testReadyForSearch() {
30-
innerReadyForSearch(false);
31-
innerReadyForSearch(true);
32+
innerReadyForSearch(false, false);
33+
innerReadyForSearch(false, true);
34+
innerReadyForSearch(true, false);
35+
innerReadyForSearch(true, true);
3236
}
3337

34-
private void innerReadyForSearch(boolean fastRefresh) {
38+
// TODO: remove if (fastRefresh && beforeFastRefreshRCO) branches (ES-9563)
39+
private void innerReadyForSearch(boolean fastRefresh, boolean beforeFastRefreshRCO) {
3540
Index index = new Index(randomIdentifier(), UUIDs.randomBase64UUID());
3641
ClusterState clusterState = mock(ClusterState.class, Mockito.RETURNS_DEEP_STUBS);
3742
when(clusterState.metadata().index(any(Index.class)).getSettings()).thenReturn(
3843
Settings.builder().put(INDEX_FAST_REFRESH_SETTING.getKey(), fastRefresh).build()
3944
);
45+
when(clusterState.getMinTransportVersion()).thenReturn(
46+
beforeFastRefreshRCO ? TransportVersion.fromId(FAST_REFRESH_RCO.id() - 1_00_0) : TransportVersion.current()
47+
);
4048
// 2 primaries that are search and index
4149
ShardId p1 = new ShardId(index, 0);
4250
IndexShardRoutingTable shardTable1 = new IndexShardRoutingTable(
@@ -55,7 +63,7 @@ private void innerReadyForSearch(boolean fastRefresh) {
5563
shardTable1 = new IndexShardRoutingTable(p1, List.of(getShard(p1, true, ShardRoutingState.STARTED, ShardRouting.Role.INDEX_ONLY)));
5664
shardTable2 = new IndexShardRoutingTable(p2, List.of(getShard(p2, true, ShardRoutingState.STARTED, ShardRouting.Role.INDEX_ONLY)));
5765
indexRoutingTable = new IndexRoutingTable(index, new IndexShardRoutingTable[] { shardTable1, shardTable2 });
58-
if (fastRefresh) {
66+
if (fastRefresh && beforeFastRefreshRCO) {
5967
assertTrue(indexRoutingTable.readyForSearch(clusterState));
6068
} else {
6169
assertFalse(indexRoutingTable.readyForSearch(clusterState));
@@ -91,7 +99,7 @@ private void innerReadyForSearch(boolean fastRefresh) {
9199
)
92100
);
93101
indexRoutingTable = new IndexRoutingTable(index, new IndexShardRoutingTable[] { shardTable1, shardTable2 });
94-
if (fastRefresh) {
102+
if (fastRefresh && beforeFastRefreshRCO) {
95103
assertTrue(indexRoutingTable.readyForSearch(clusterState));
96104
} else {
97105
assertFalse(indexRoutingTable.readyForSearch(clusterState));
@@ -118,8 +126,6 @@ private void innerReadyForSearch(boolean fastRefresh) {
118126
assertTrue(indexRoutingTable.readyForSearch(clusterState));
119127

120128
// 2 unassigned primaries that are index only with some replicas that are all available
121-
// Fast refresh indices do not support replicas so this can not practically happen. If we add support we will want to ensure
122-
// that readyForSearch allows for searching replicas when the index shard is not available.
123129
shardTable1 = new IndexShardRoutingTable(
124130
p1,
125131
List.of(
@@ -137,8 +143,8 @@ private void innerReadyForSearch(boolean fastRefresh) {
137143
)
138144
);
139145
indexRoutingTable = new IndexRoutingTable(index, new IndexShardRoutingTable[] { shardTable1, shardTable2 });
140-
if (fastRefresh) {
141-
assertFalse(indexRoutingTable.readyForSearch(clusterState)); // if we support replicas for fast refreshes this needs to change
146+
if (fastRefresh && beforeFastRefreshRCO) {
147+
assertFalse(indexRoutingTable.readyForSearch(clusterState));
142148
} else {
143149
assertTrue(indexRoutingTable.readyForSearch(clusterState));
144150
}

server/src/test/java/org/elasticsearch/index/cache/bitset/BitSetFilterCacheTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ public void testShouldLoadRandomAccessFiltersEagerly() {
276276
for (var isStateless : values) {
277277
if (isStateless) {
278278
assertEquals(
279-
loadFiltersEagerly && indexFastRefresh && hasIndexRole,
279+
loadFiltersEagerly && indexFastRefresh && hasIndexRole == false,
280280
BitsetFilterCache.shouldLoadRandomAccessFiltersEagerly(
281281
bitsetFilterCacheSettings(isStateless, hasIndexRole, loadFiltersEagerly, indexFastRefresh)
282282
)

0 commit comments

Comments
 (0)