Skip to content

Commit 673b24f

Browse files
authored
Fast refresh indices to use search shards (#116658)
The changes of PR #115019 were reverted because it induced ES-8275. Now that the ticket is done, this PR re-introduces the reverted changes. Fast refresh indices should now behave like non fast refresh indices in how they execute (m)gets and searches. I.e., they should use the search shards. For BWC, we define a new transport version. We expect search shards to be upgraded first, before promotable shards. Until the cluster is fully upgraded, the promotable shards (whether upgraded or not) will still receive and execute gets/searches locally. Relates ES-9573
1 parent db63a28 commit 673b24f

File tree

10 files changed

+64
-74
lines changed

10 files changed

+64
-74
lines changed

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,7 @@ static TransportVersion def(int id) {
201201
public static final TransportVersion QUERY_RULES_LIST_INCLUDES_TYPES = def(8_792_00_0);
202202
public static final TransportVersion INDEX_STATS_ADDITIONAL_FIELDS = def(8_793_00_0);
203203
public static final TransportVersion INDEX_STATS_ADDITIONAL_FIELDS_REVERT = def(8_794_00_0);
204+
public static final TransportVersion FAST_REFRESH_RCO_2 = def(8_795_00_0);
204205

205206
/*
206207
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java

Lines changed: 11 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.elasticsearch.cluster.service.ClusterService;
2424
import org.elasticsearch.common.io.stream.StreamInput;
2525
import org.elasticsearch.common.settings.Settings;
26-
import org.elasticsearch.index.IndexSettings;
2726
import org.elasticsearch.index.shard.IndexShard;
2827
import org.elasticsearch.indices.IndicesService;
2928
import org.elasticsearch.injection.guice.Inject;
@@ -120,27 +119,18 @@ public void onPrimaryOperationComplete(
120119
ActionListener<Void> listener
121120
) {
122121
assert replicaRequest.primaryRefreshResult.refreshed() : "primary has not refreshed";
123-
boolean fastRefresh = IndexSettings.INDEX_FAST_REFRESH_SETTING.get(
124-
clusterService.state().metadata().index(indexShardRoutingTable.shardId().getIndex()).getSettings()
122+
UnpromotableShardRefreshRequest unpromotableReplicaRequest = new UnpromotableShardRefreshRequest(
123+
indexShardRoutingTable,
124+
replicaRequest.primaryRefreshResult.primaryTerm(),
125+
replicaRequest.primaryRefreshResult.generation(),
126+
false
127+
);
128+
transportService.sendRequest(
129+
transportService.getLocalNode(),
130+
TransportUnpromotableShardRefreshAction.NAME,
131+
unpromotableReplicaRequest,
132+
new ActionListenerResponseHandler<>(listener.safeMap(r -> null), in -> ActionResponse.Empty.INSTANCE, refreshExecutor)
125133
);
126-
127-
// Indices marked with fast refresh do not rely on refreshing the unpromotables
128-
if (fastRefresh) {
129-
listener.onResponse(null);
130-
} else {
131-
UnpromotableShardRefreshRequest unpromotableReplicaRequest = new UnpromotableShardRefreshRequest(
132-
indexShardRoutingTable,
133-
replicaRequest.primaryRefreshResult.primaryTerm(),
134-
replicaRequest.primaryRefreshResult.generation(),
135-
false
136-
);
137-
transportService.sendRequest(
138-
transportService.getLocalNode(),
139-
TransportUnpromotableShardRefreshAction.NAME,
140-
unpromotableReplicaRequest,
141-
new ActionListenerResponseHandler<>(listener.safeMap(r -> null), in -> ActionResponse.Empty.INSTANCE, refreshExecutor)
142-
);
143-
}
144134
}
145135
}
146136
}

server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportUnpromotableShardRefreshAction.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424

2525
import java.util.List;
2626

27+
import static org.elasticsearch.TransportVersions.FAST_REFRESH_RCO_2;
28+
import static org.elasticsearch.index.IndexSettings.INDEX_FAST_REFRESH_SETTING;
29+
2730
public class TransportUnpromotableShardRefreshAction extends TransportBroadcastUnpromotableAction<
2831
UnpromotableShardRefreshRequest,
2932
ActionResponse.Empty> {
@@ -73,6 +76,18 @@ protected void unpromotableShardOperation(
7376
return;
7477
}
7578

79+
// During an upgrade to FAST_REFRESH_RCO_2, we expect search shards to be first upgraded before the primary is upgraded. Thus,
80+
// when the primary is upgraded, and starts to deliver unpromotable refreshes, we expect the search shards to be upgraded already.
81+
// Note that the fast refresh setting is final.
82+
// TODO: remove assertion (ES-9563)
83+
assert INDEX_FAST_REFRESH_SETTING.get(shard.indexSettings().getSettings()) == false
84+
|| transportService.getLocalNodeConnection().getTransportVersion().onOrAfter(FAST_REFRESH_RCO_2)
85+
: "attempted to refresh a fast refresh search shard "
86+
+ shard
87+
+ " on transport version "
88+
+ transportService.getLocalNodeConnection().getTransportVersion()
89+
+ " (before FAST_REFRESH_RCO_2)";
90+
7691
ActionListener.run(responseListener, listener -> {
7792
shard.waitForPrimaryTermAndGeneration(
7893
request.getPrimaryTerm(),

server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -126,12 +126,10 @@ protected void asyncShardOperation(GetRequest request, ShardId shardId, ActionLi
126126
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
127127
IndexShard indexShard = indexService.getShard(shardId.id());
128128
if (indexShard.routingEntry().isPromotableToPrimary() == false) {
129-
// TODO: Re-evaluate assertion (ES-8227)
130-
// assert indexShard.indexSettings().isFastRefresh() == false
131-
// : "a search shard should not receive a TransportGetAction for an index with fast refresh";
132129
handleGetOnUnpromotableShard(request, indexShard, listener);
133130
return;
134131
}
132+
// TODO: adapt assertion to assert only that it is not stateless (ES-9563)
135133
assert DiscoveryNode.isStateless(clusterService.getSettings()) == false || indexShard.indexSettings().isFastRefresh()
136134
: "in Stateless a promotable to primary shard can receive a TransportGetAction only if an index has the fast refresh setting";
137135
if (request.realtime()) { // we are not tied to a refresh cycle here anyway

server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -124,12 +124,10 @@ protected void asyncShardOperation(MultiGetShardRequest request, ShardId shardId
124124
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
125125
IndexShard indexShard = indexService.getShard(shardId.id());
126126
if (indexShard.routingEntry().isPromotableToPrimary() == false) {
127-
// TODO: Re-evaluate assertion (ES-8227)
128-
// assert indexShard.indexSettings().isFastRefresh() == false
129-
// : "a search shard should not receive a TransportShardMultiGetAction for an index with fast refresh";
130127
handleMultiGetOnUnpromotableShard(request, indexShard, listener);
131128
return;
132129
}
130+
// TODO: adapt assertion to assert only that it is not stateless (ES-9563)
133131
assert DiscoveryNode.isStateless(clusterService.getSettings()) == false || indexShard.indexSettings().isFastRefresh()
134132
: "in Stateless a promotable to primary shard can receive a TransportShardMultiGetAction only if an index has "
135133
+ "the fast refresh setting";

server/src/main/java/org/elasticsearch/action/support/replication/PostWriteRefresh.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import org.elasticsearch.common.util.concurrent.ThreadContext;
2020
import org.elasticsearch.core.Nullable;
2121
import org.elasticsearch.core.TimeValue;
22-
import org.elasticsearch.index.IndexSettings;
2322
import org.elasticsearch.index.engine.Engine;
2423
import org.elasticsearch.index.shard.IndexShard;
2524
import org.elasticsearch.index.translog.Translog;
@@ -53,9 +52,7 @@ public void refreshShard(
5352
case WAIT_UNTIL -> waitUntil(indexShard, location, new ActionListener<>() {
5453
@Override
5554
public void onResponse(Boolean forced) {
56-
// Fast refresh indices do not depend on the unpromotables being refreshed
57-
boolean fastRefresh = IndexSettings.INDEX_FAST_REFRESH_SETTING.get(indexShard.indexSettings().getSettings());
58-
if (location != null && (indexShard.routingEntry().isSearchable() == false && fastRefresh == false)) {
55+
if (location != null && indexShard.routingEntry().isSearchable() == false) {
5956
refreshUnpromotables(indexShard, location, listener, forced, postWriteRefreshTimeout);
6057
} else {
6158
listener.onResponse(forced);
@@ -68,9 +65,7 @@ public void onFailure(Exception e) {
6865
}
6966
});
7067
case IMMEDIATE -> immediate(indexShard, listener.delegateFailureAndWrap((l, r) -> {
71-
// Fast refresh indices do not depend on the unpromotables being refreshed
72-
boolean fastRefresh = IndexSettings.INDEX_FAST_REFRESH_SETTING.get(indexShard.indexSettings().getSettings());
73-
if (indexShard.getReplicationGroup().getRoutingTable().unpromotableShards().size() > 0 && fastRefresh == false) {
68+
if (indexShard.getReplicationGroup().getRoutingTable().unpromotableShards().size() > 0) {
7469
sendUnpromotableRequests(indexShard, r.generation(), true, l, postWriteRefreshTimeout);
7570
} else {
7671
l.onResponse(true);

server/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.Set;
3333
import java.util.stream.Collectors;
3434

35+
import static org.elasticsearch.TransportVersions.FAST_REFRESH_RCO_2;
3536
import static org.elasticsearch.index.IndexSettings.INDEX_FAST_REFRESH_SETTING;
3637

3738
public class OperationRouting {
@@ -305,8 +306,14 @@ public ShardId shardId(ClusterState clusterState, String index, String id, @Null
305306
}
306307

307308
public static boolean canSearchShard(ShardRouting shardRouting, ClusterState clusterState) {
309+
// TODO: remove if and always return isSearchable (ES-9563)
308310
if (INDEX_FAST_REFRESH_SETTING.get(clusterState.metadata().index(shardRouting.index()).getSettings())) {
309-
return shardRouting.isPromotableToPrimary();
311+
// Until all the cluster is upgraded, we send searches/gets to the primary (even if it has been upgraded) to execute locally.
312+
if (clusterState.getMinTransportVersion().onOrAfter(FAST_REFRESH_RCO_2)) {
313+
return shardRouting.isSearchable();
314+
} else {
315+
return shardRouting.isPromotableToPrimary();
316+
}
310317
} else {
311318
return shardRouting.isSearchable();
312319
}

server/src/main/java/org/elasticsearch/index/cache/bitset/BitsetFilterCache.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,6 @@
5858
import java.util.concurrent.ExecutionException;
5959
import java.util.concurrent.Executor;
6060

61-
import static org.elasticsearch.index.IndexSettings.INDEX_FAST_REFRESH_SETTING;
62-
6361
/**
6462
* This is a cache for {@link BitDocIdSet} based filters and is unbounded by size or time.
6563
* <p>
@@ -105,10 +103,7 @@ static boolean shouldLoadRandomAccessFiltersEagerly(IndexSettings settings) {
105103
boolean loadFiltersEagerlySetting = settings.getValue(INDEX_LOAD_RANDOM_ACCESS_FILTERS_EAGERLY_SETTING);
106104
boolean isStateless = DiscoveryNode.isStateless(settings.getNodeSettings());
107105
if (isStateless) {
108-
return loadFiltersEagerlySetting
109-
&& (DiscoveryNode.hasRole(settings.getNodeSettings(), DiscoveryNodeRole.SEARCH_ROLE)
110-
|| (DiscoveryNode.hasRole(settings.getNodeSettings(), DiscoveryNodeRole.INDEX_ROLE)
111-
&& INDEX_FAST_REFRESH_SETTING.get(settings.getSettings())));
106+
return loadFiltersEagerlySetting && DiscoveryNode.hasRole(settings.getNodeSettings(), DiscoveryNodeRole.SEARCH_ROLE);
112107
} else {
113108
return loadFiltersEagerlySetting;
114109
}

server/src/test/java/org/elasticsearch/cluster/routing/IndexRoutingTableTests.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
package org.elasticsearch.cluster.routing;
1111

12+
import org.elasticsearch.TransportVersion;
1213
import org.elasticsearch.cluster.ClusterState;
1314
import org.elasticsearch.common.UUIDs;
1415
import org.elasticsearch.common.settings.Settings;
@@ -19,6 +20,7 @@
1920

2021
import java.util.List;
2122

23+
import static org.elasticsearch.TransportVersions.FAST_REFRESH_RCO_2;
2224
import static org.elasticsearch.index.IndexSettings.INDEX_FAST_REFRESH_SETTING;
2325
import static org.mockito.ArgumentMatchers.any;
2426
import static org.mockito.Mockito.mock;
@@ -27,16 +29,22 @@
2729
public class IndexRoutingTableTests extends ESTestCase {
2830

2931
public void testReadyForSearch() {
30-
innerReadyForSearch(false);
31-
innerReadyForSearch(true);
32+
innerReadyForSearch(false, false);
33+
innerReadyForSearch(false, true);
34+
innerReadyForSearch(true, false);
35+
innerReadyForSearch(true, true);
3236
}
3337

34-
private void innerReadyForSearch(boolean fastRefresh) {
38+
// TODO: remove if (fastRefresh && beforeFastRefreshRCO) branches (ES-9563)
39+
private void innerReadyForSearch(boolean fastRefresh, boolean beforeFastRefreshRCO) {
3540
Index index = new Index(randomIdentifier(), UUIDs.randomBase64UUID());
3641
ClusterState clusterState = mock(ClusterState.class, Mockito.RETURNS_DEEP_STUBS);
3742
when(clusterState.metadata().index(any(Index.class)).getSettings()).thenReturn(
3843
Settings.builder().put(INDEX_FAST_REFRESH_SETTING.getKey(), fastRefresh).build()
3944
);
45+
when(clusterState.getMinTransportVersion()).thenReturn(
46+
beforeFastRefreshRCO ? TransportVersion.fromId(FAST_REFRESH_RCO_2.id() - 1_00_0) : TransportVersion.current()
47+
);
4048
// 2 primaries that are search and index
4149
ShardId p1 = new ShardId(index, 0);
4250
IndexShardRoutingTable shardTable1 = new IndexShardRoutingTable(
@@ -55,7 +63,7 @@ private void innerReadyForSearch(boolean fastRefresh) {
5563
shardTable1 = new IndexShardRoutingTable(p1, List.of(getShard(p1, true, ShardRoutingState.STARTED, ShardRouting.Role.INDEX_ONLY)));
5664
shardTable2 = new IndexShardRoutingTable(p2, List.of(getShard(p2, true, ShardRoutingState.STARTED, ShardRouting.Role.INDEX_ONLY)));
5765
indexRoutingTable = new IndexRoutingTable(index, new IndexShardRoutingTable[] { shardTable1, shardTable2 });
58-
if (fastRefresh) {
66+
if (fastRefresh && beforeFastRefreshRCO) {
5967
assertTrue(indexRoutingTable.readyForSearch(clusterState));
6068
} else {
6169
assertFalse(indexRoutingTable.readyForSearch(clusterState));
@@ -91,7 +99,7 @@ private void innerReadyForSearch(boolean fastRefresh) {
9199
)
92100
);
93101
indexRoutingTable = new IndexRoutingTable(index, new IndexShardRoutingTable[] { shardTable1, shardTable2 });
94-
if (fastRefresh) {
102+
if (fastRefresh && beforeFastRefreshRCO) {
95103
assertTrue(indexRoutingTable.readyForSearch(clusterState));
96104
} else {
97105
assertFalse(indexRoutingTable.readyForSearch(clusterState));
@@ -118,8 +126,6 @@ private void innerReadyForSearch(boolean fastRefresh) {
118126
assertTrue(indexRoutingTable.readyForSearch(clusterState));
119127

120128
// 2 unassigned primaries that are index only with some replicas that are all available
121-
// Fast refresh indices do not support replicas so this can not practically happen. If we add support we will want to ensure
122-
// that readyForSearch allows for searching replicas when the index shard is not available.
123129
shardTable1 = new IndexShardRoutingTable(
124130
p1,
125131
List.of(
@@ -137,8 +143,8 @@ private void innerReadyForSearch(boolean fastRefresh) {
137143
)
138144
);
139145
indexRoutingTable = new IndexRoutingTable(index, new IndexShardRoutingTable[] { shardTable1, shardTable2 });
140-
if (fastRefresh) {
141-
assertFalse(indexRoutingTable.readyForSearch(clusterState)); // if we support replicas for fast refreshes this needs to change
146+
if (fastRefresh && beforeFastRefreshRCO) {
147+
assertFalse(indexRoutingTable.readyForSearch(clusterState));
142148
} else {
143149
assertTrue(indexRoutingTable.readyForSearch(clusterState));
144150
}

server/src/test/java/org/elasticsearch/index/cache/bitset/BitSetFilterCacheTests.java

Lines changed: 9 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@
4848
import java.util.concurrent.atomic.AtomicLong;
4949

5050
import static org.elasticsearch.cluster.node.DiscoveryNode.STATELESS_ENABLED_SETTING_NAME;
51-
import static org.elasticsearch.index.IndexSettings.INDEX_FAST_REFRESH_SETTING;
5251
import static org.elasticsearch.index.cache.bitset.BitsetFilterCache.INDEX_LOAD_RANDOM_ACCESS_FILTERS_EAGERLY_SETTING;
5352
import static org.hamcrest.Matchers.equalTo;
5453
import static org.hamcrest.Matchers.greaterThan;
@@ -253,35 +252,21 @@ public void testShouldLoadRandomAccessFiltersEagerly() {
253252
for (var hasIndexRole : values) {
254253
for (var loadFiltersEagerly : values) {
255254
for (var isStateless : values) {
256-
for (var fastRefresh : values) {
257-
if (isStateless == false && fastRefresh) {
258-
// fast refresh is only relevant for stateless indices
259-
continue;
260-
}
261-
262-
boolean result = BitsetFilterCache.shouldLoadRandomAccessFiltersEagerly(
263-
bitsetFilterCacheSettings(isStateless, hasIndexRole, loadFiltersEagerly, fastRefresh)
264-
);
265-
if (isStateless) {
266-
assertEquals(loadFiltersEagerly && ((hasIndexRole && fastRefresh) || hasIndexRole == false), result);
267-
} else {
268-
assertEquals(loadFiltersEagerly, result);
269-
}
255+
boolean result = BitsetFilterCache.shouldLoadRandomAccessFiltersEagerly(
256+
bitsetFilterCacheSettings(isStateless, hasIndexRole, loadFiltersEagerly)
257+
);
258+
if (isStateless) {
259+
assertEquals(loadFiltersEagerly && hasIndexRole == false, result);
260+
} else {
261+
assertEquals(loadFiltersEagerly, result);
270262
}
271263
}
272264
}
273265
}
274266
}
275267

276-
private IndexSettings bitsetFilterCacheSettings(
277-
boolean isStateless,
278-
boolean hasIndexRole,
279-
boolean loadFiltersEagerly,
280-
boolean fastRefresh
281-
) {
282-
var indexSettingsBuilder = Settings.builder()
283-
.put(INDEX_LOAD_RANDOM_ACCESS_FILTERS_EAGERLY_SETTING.getKey(), loadFiltersEagerly)
284-
.put(INDEX_FAST_REFRESH_SETTING.getKey(), fastRefresh);
268+
private IndexSettings bitsetFilterCacheSettings(boolean isStateless, boolean hasIndexRole, boolean loadFiltersEagerly) {
269+
var indexSettingsBuilder = Settings.builder().put(INDEX_LOAD_RANDOM_ACCESS_FILTERS_EAGERLY_SETTING.getKey(), loadFiltersEagerly);
285270

286271
var nodeSettingsBuilder = Settings.builder()
287272
.putList(

0 commit comments

Comments
 (0)