Skip to content

Commit d3fcead

Browse files
authored
Revert fast refresh using search shards (#115019)
As this induces ES-8275 and makes fleet time outs for some APIs. Relates ES-9573
1 parent 73ca4f5 commit d3fcead

File tree

9 files changed

+74
-63
lines changed

9 files changed

+74
-63
lines changed

server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.cluster.service.ClusterService;
2424
import org.elasticsearch.common.io.stream.StreamInput;
2525
import org.elasticsearch.common.settings.Settings;
26+
import org.elasticsearch.index.IndexSettings;
2627
import org.elasticsearch.index.shard.IndexShard;
2728
import org.elasticsearch.indices.IndicesService;
2829
import org.elasticsearch.injection.guice.Inject;
@@ -119,18 +120,27 @@ public void onPrimaryOperationComplete(
119120
ActionListener<Void> listener
120121
) {
121122
assert replicaRequest.primaryRefreshResult.refreshed() : "primary has not refreshed";
122-
UnpromotableShardRefreshRequest unpromotableReplicaRequest = new UnpromotableShardRefreshRequest(
123-
indexShardRoutingTable,
124-
replicaRequest.primaryRefreshResult.primaryTerm(),
125-
replicaRequest.primaryRefreshResult.generation(),
126-
false
127-
);
128-
transportService.sendRequest(
129-
transportService.getLocalNode(),
130-
TransportUnpromotableShardRefreshAction.NAME,
131-
unpromotableReplicaRequest,
132-
new ActionListenerResponseHandler<>(listener.safeMap(r -> null), in -> ActionResponse.Empty.INSTANCE, refreshExecutor)
123+
boolean fastRefresh = IndexSettings.INDEX_FAST_REFRESH_SETTING.get(
124+
clusterService.state().metadata().index(indexShardRoutingTable.shardId().getIndex()).getSettings()
133125
);
126+
127+
// Indices marked with fast refresh do not rely on refreshing the unpromotables
128+
if (fastRefresh) {
129+
listener.onResponse(null);
130+
} else {
131+
UnpromotableShardRefreshRequest unpromotableReplicaRequest = new UnpromotableShardRefreshRequest(
132+
indexShardRoutingTable,
133+
replicaRequest.primaryRefreshResult.primaryTerm(),
134+
replicaRequest.primaryRefreshResult.generation(),
135+
false
136+
);
137+
transportService.sendRequest(
138+
transportService.getLocalNode(),
139+
TransportUnpromotableShardRefreshAction.NAME,
140+
unpromotableReplicaRequest,
141+
new ActionListenerResponseHandler<>(listener.safeMap(r -> null), in -> ActionResponse.Empty.INSTANCE, refreshExecutor)
142+
);
143+
}
134144
}
135145
}
136146
}

server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportUnpromotableShardRefreshAction.java

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,6 @@
2424

2525
import java.util.List;
2626

27-
import static org.elasticsearch.TransportVersions.FAST_REFRESH_RCO;
28-
import static org.elasticsearch.index.IndexSettings.INDEX_FAST_REFRESH_SETTING;
29-
3027
public class TransportUnpromotableShardRefreshAction extends TransportBroadcastUnpromotableAction<
3128
UnpromotableShardRefreshRequest,
3229
ActionResponse.Empty> {
@@ -76,18 +73,6 @@ protected void unpromotableShardOperation(
7673
return;
7774
}
7875

79-
// During an upgrade to FAST_REFRESH_RCO, we expect search shards to be first upgraded before the primary is upgraded. Thus,
80-
// when the primary is upgraded, and starts to deliver unpromotable refreshes, we expect the search shards to be upgraded already.
81-
// Note that the fast refresh setting is final.
82-
// TODO: remove assertion (ES-9563)
83-
assert INDEX_FAST_REFRESH_SETTING.get(shard.indexSettings().getSettings()) == false
84-
|| transportService.getLocalNodeConnection().getTransportVersion().onOrAfter(FAST_REFRESH_RCO)
85-
: "attempted to refresh a fast refresh search shard "
86-
+ shard
87-
+ " on transport version "
88-
+ transportService.getLocalNodeConnection().getTransportVersion()
89-
+ " (before FAST_REFRESH_RCO)";
90-
9176
ActionListener.run(responseListener, listener -> {
9277
shard.waitForPrimaryTermAndGeneration(
9378
request.getPrimaryTerm(),

server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,10 +126,12 @@ protected void asyncShardOperation(GetRequest request, ShardId shardId, ActionLi
126126
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
127127
IndexShard indexShard = indexService.getShard(shardId.id());
128128
if (indexShard.routingEntry().isPromotableToPrimary() == false) {
129+
// TODO: Re-evaluate assertion (ES-8227)
130+
// assert indexShard.indexSettings().isFastRefresh() == false
131+
// : "a search shard should not receive a TransportGetAction for an index with fast refresh";
129132
handleGetOnUnpromotableShard(request, indexShard, listener);
130133
return;
131134
}
132-
// TODO: adapt assertion to assert only that it is not stateless (ES-9563)
133135
assert DiscoveryNode.isStateless(clusterService.getSettings()) == false || indexShard.indexSettings().isFastRefresh()
134136
: "in Stateless a promotable to primary shard can receive a TransportGetAction only if an index has the fast refresh setting";
135137
if (request.realtime()) { // we are not tied to a refresh cycle here anyway

server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,10 +124,12 @@ protected void asyncShardOperation(MultiGetShardRequest request, ShardId shardId
124124
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
125125
IndexShard indexShard = indexService.getShard(shardId.id());
126126
if (indexShard.routingEntry().isPromotableToPrimary() == false) {
127+
// TODO: Re-evaluate assertion (ES-8227)
128+
// assert indexShard.indexSettings().isFastRefresh() == false
129+
// : "a search shard should not receive a TransportShardMultiGetAction for an index with fast refresh";
127130
handleMultiGetOnUnpromotableShard(request, indexShard, listener);
128131
return;
129132
}
130-
// TODO: adapt assertion to assert only that it is not stateless (ES-9563)
131133
assert DiscoveryNode.isStateless(clusterService.getSettings()) == false || indexShard.indexSettings().isFastRefresh()
132134
: "in Stateless a promotable to primary shard can receive a TransportShardMultiGetAction only if an index has "
133135
+ "the fast refresh setting";

server/src/main/java/org/elasticsearch/action/support/replication/PostWriteRefresh.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.elasticsearch.common.util.concurrent.ThreadContext;
2020
import org.elasticsearch.core.Nullable;
2121
import org.elasticsearch.core.TimeValue;
22+
import org.elasticsearch.index.IndexSettings;
2223
import org.elasticsearch.index.engine.Engine;
2324
import org.elasticsearch.index.shard.IndexShard;
2425
import org.elasticsearch.index.translog.Translog;
@@ -52,7 +53,9 @@ public void refreshShard(
5253
case WAIT_UNTIL -> waitUntil(indexShard, location, new ActionListener<>() {
5354
@Override
5455
public void onResponse(Boolean forced) {
55-
if (location != null && indexShard.routingEntry().isSearchable() == false) {
56+
// Fast refresh indices do not depend on the unpromotables being refreshed
57+
boolean fastRefresh = IndexSettings.INDEX_FAST_REFRESH_SETTING.get(indexShard.indexSettings().getSettings());
58+
if (location != null && (indexShard.routingEntry().isSearchable() == false && fastRefresh == false)) {
5659
refreshUnpromotables(indexShard, location, listener, forced, postWriteRefreshTimeout);
5760
} else {
5861
listener.onResponse(forced);
@@ -65,7 +68,9 @@ public void onFailure(Exception e) {
6568
}
6669
});
6770
case IMMEDIATE -> immediate(indexShard, listener.delegateFailureAndWrap((l, r) -> {
68-
if (indexShard.getReplicationGroup().getRoutingTable().unpromotableShards().size() > 0) {
71+
// Fast refresh indices do not depend on the unpromotables being refreshed
72+
boolean fastRefresh = IndexSettings.INDEX_FAST_REFRESH_SETTING.get(indexShard.indexSettings().getSettings());
73+
if (indexShard.getReplicationGroup().getRoutingTable().unpromotableShards().size() > 0 && fastRefresh == false) {
6974
sendUnpromotableRequests(indexShard, r.generation(), true, l, postWriteRefreshTimeout);
7075
} else {
7176
l.onResponse(true);

server/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import java.util.Set;
3333
import java.util.stream.Collectors;
3434

35-
import static org.elasticsearch.TransportVersions.FAST_REFRESH_RCO;
3635
import static org.elasticsearch.index.IndexSettings.INDEX_FAST_REFRESH_SETTING;
3736

3837
public class OperationRouting {
@@ -306,14 +305,8 @@ public ShardId shardId(ClusterState clusterState, String index, String id, @Null
306305
}
307306

308307
public static boolean canSearchShard(ShardRouting shardRouting, ClusterState clusterState) {
309-
// TODO: remove if and always return isSearchable (ES-9563)
310308
if (INDEX_FAST_REFRESH_SETTING.get(clusterState.metadata().index(shardRouting.index()).getSettings())) {
311-
// Until all the cluster is upgraded, we send searches/gets to the primary (even if it has been upgraded) to execute locally.
312-
if (clusterState.getMinTransportVersion().onOrAfter(FAST_REFRESH_RCO)) {
313-
return shardRouting.isSearchable();
314-
} else {
315-
return shardRouting.isPromotableToPrimary();
316-
}
309+
return shardRouting.isPromotableToPrimary();
317310
} else {
318311
return shardRouting.isSearchable();
319312
}

server/src/main/java/org/elasticsearch/index/cache/bitset/BitsetFilterCache.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@
5858
import java.util.concurrent.ExecutionException;
5959
import java.util.concurrent.Executor;
6060

61+
import static org.elasticsearch.index.IndexSettings.INDEX_FAST_REFRESH_SETTING;
62+
6163
/**
6264
* This is a cache for {@link BitDocIdSet} based filters and is unbounded by size or time.
6365
* <p>
@@ -103,7 +105,10 @@ static boolean shouldLoadRandomAccessFiltersEagerly(IndexSettings settings) {
103105
boolean loadFiltersEagerlySetting = settings.getValue(INDEX_LOAD_RANDOM_ACCESS_FILTERS_EAGERLY_SETTING);
104106
boolean isStateless = DiscoveryNode.isStateless(settings.getNodeSettings());
105107
if (isStateless) {
106-
return loadFiltersEagerlySetting && DiscoveryNode.hasRole(settings.getNodeSettings(), DiscoveryNodeRole.SEARCH_ROLE);
108+
return loadFiltersEagerlySetting
109+
&& (DiscoveryNode.hasRole(settings.getNodeSettings(), DiscoveryNodeRole.SEARCH_ROLE)
110+
|| (DiscoveryNode.hasRole(settings.getNodeSettings(), DiscoveryNodeRole.INDEX_ROLE)
111+
&& INDEX_FAST_REFRESH_SETTING.get(settings.getSettings())));
107112
} else {
108113
return loadFiltersEagerlySetting;
109114
}

server/src/test/java/org/elasticsearch/cluster/routing/IndexRoutingTableTests.java

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
package org.elasticsearch.cluster.routing;
1111

12-
import org.elasticsearch.TransportVersion;
1312
import org.elasticsearch.cluster.ClusterState;
1413
import org.elasticsearch.common.UUIDs;
1514
import org.elasticsearch.common.settings.Settings;
@@ -20,7 +19,6 @@
2019

2120
import java.util.List;
2221

23-
import static org.elasticsearch.TransportVersions.FAST_REFRESH_RCO;
2422
import static org.elasticsearch.index.IndexSettings.INDEX_FAST_REFRESH_SETTING;
2523
import static org.mockito.ArgumentMatchers.any;
2624
import static org.mockito.Mockito.mock;
@@ -29,22 +27,16 @@
2927
public class IndexRoutingTableTests extends ESTestCase {
3028

3129
public void testReadyForSearch() {
32-
innerReadyForSearch(false, false);
33-
innerReadyForSearch(false, true);
34-
innerReadyForSearch(true, false);
35-
innerReadyForSearch(true, true);
30+
innerReadyForSearch(false);
31+
innerReadyForSearch(true);
3632
}
3733

38-
// TODO: remove if (fastRefresh && beforeFastRefreshRCO) branches (ES-9563)
39-
private void innerReadyForSearch(boolean fastRefresh, boolean beforeFastRefreshRCO) {
34+
private void innerReadyForSearch(boolean fastRefresh) {
4035
Index index = new Index(randomIdentifier(), UUIDs.randomBase64UUID());
4136
ClusterState clusterState = mock(ClusterState.class, Mockito.RETURNS_DEEP_STUBS);
4237
when(clusterState.metadata().index(any(Index.class)).getSettings()).thenReturn(
4338
Settings.builder().put(INDEX_FAST_REFRESH_SETTING.getKey(), fastRefresh).build()
4439
);
45-
when(clusterState.getMinTransportVersion()).thenReturn(
46-
beforeFastRefreshRCO ? TransportVersion.fromId(FAST_REFRESH_RCO.id() - 1_00_0) : TransportVersion.current()
47-
);
4840
// 2 primaries that are search and index
4941
ShardId p1 = new ShardId(index, 0);
5042
IndexShardRoutingTable shardTable1 = new IndexShardRoutingTable(
@@ -63,7 +55,7 @@ private void innerReadyForSearch(boolean fastRefresh, boolean beforeFastRefreshR
6355
shardTable1 = new IndexShardRoutingTable(p1, List.of(getShard(p1, true, ShardRoutingState.STARTED, ShardRouting.Role.INDEX_ONLY)));
6456
shardTable2 = new IndexShardRoutingTable(p2, List.of(getShard(p2, true, ShardRoutingState.STARTED, ShardRouting.Role.INDEX_ONLY)));
6557
indexRoutingTable = new IndexRoutingTable(index, new IndexShardRoutingTable[] { shardTable1, shardTable2 });
66-
if (fastRefresh && beforeFastRefreshRCO) {
58+
if (fastRefresh) {
6759
assertTrue(indexRoutingTable.readyForSearch(clusterState));
6860
} else {
6961
assertFalse(indexRoutingTable.readyForSearch(clusterState));
@@ -99,7 +91,7 @@ private void innerReadyForSearch(boolean fastRefresh, boolean beforeFastRefreshR
9991
)
10092
);
10193
indexRoutingTable = new IndexRoutingTable(index, new IndexShardRoutingTable[] { shardTable1, shardTable2 });
102-
if (fastRefresh && beforeFastRefreshRCO) {
94+
if (fastRefresh) {
10395
assertTrue(indexRoutingTable.readyForSearch(clusterState));
10496
} else {
10597
assertFalse(indexRoutingTable.readyForSearch(clusterState));
@@ -126,6 +118,8 @@ private void innerReadyForSearch(boolean fastRefresh, boolean beforeFastRefreshR
126118
assertTrue(indexRoutingTable.readyForSearch(clusterState));
127119

128120
// 2 unassigned primaries that are index only with some replicas that are all available
121+
// Fast refresh indices do not support replicas so this can not practically happen. If we add support we will want to ensure
122+
// that readyForSearch allows for searching replicas when the index shard is not available.
129123
shardTable1 = new IndexShardRoutingTable(
130124
p1,
131125
List.of(
@@ -143,8 +137,8 @@ private void innerReadyForSearch(boolean fastRefresh, boolean beforeFastRefreshR
143137
)
144138
);
145139
indexRoutingTable = new IndexRoutingTable(index, new IndexShardRoutingTable[] { shardTable1, shardTable2 });
146-
if (fastRefresh && beforeFastRefreshRCO) {
147-
assertFalse(indexRoutingTable.readyForSearch(clusterState));
140+
if (fastRefresh) {
141+
assertFalse(indexRoutingTable.readyForSearch(clusterState)); // if we support replicas for fast refreshes this needs to change
148142
} else {
149143
assertTrue(indexRoutingTable.readyForSearch(clusterState));
150144
}

server/src/test/java/org/elasticsearch/index/cache/bitset/BitSetFilterCacheTests.java

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import java.util.concurrent.atomic.AtomicLong;
4949

5050
import static org.elasticsearch.cluster.node.DiscoveryNode.STATELESS_ENABLED_SETTING_NAME;
51+
import static org.elasticsearch.index.IndexSettings.INDEX_FAST_REFRESH_SETTING;
5152
import static org.elasticsearch.index.cache.bitset.BitsetFilterCache.INDEX_LOAD_RANDOM_ACCESS_FILTERS_EAGERLY_SETTING;
5253
import static org.hamcrest.Matchers.equalTo;
5354
import static org.hamcrest.Matchers.greaterThan;
@@ -272,21 +273,35 @@ public void testShouldLoadRandomAccessFiltersEagerly() {
272273
for (var hasIndexRole : values) {
273274
for (var loadFiltersEagerly : values) {
274275
for (var isStateless : values) {
275-
boolean result = BitsetFilterCache.shouldLoadRandomAccessFiltersEagerly(
276-
bitsetFilterCacheSettings(isStateless, hasIndexRole, loadFiltersEagerly)
277-
);
278-
if (isStateless) {
279-
assertEquals(loadFiltersEagerly && hasIndexRole == false, result);
280-
} else {
281-
assertEquals(loadFiltersEagerly, result);
276+
for (var fastRefresh : values) {
277+
if (isStateless == false && fastRefresh) {
278+
// fast refresh is only relevant for stateless indices
279+
continue;
280+
}
281+
282+
boolean result = BitsetFilterCache.shouldLoadRandomAccessFiltersEagerly(
283+
bitsetFilterCacheSettings(isStateless, hasIndexRole, loadFiltersEagerly, fastRefresh)
284+
);
285+
if (isStateless) {
286+
assertEquals(loadFiltersEagerly && ((hasIndexRole && fastRefresh) || hasIndexRole == false), result);
287+
} else {
288+
assertEquals(loadFiltersEagerly, result);
289+
}
282290
}
283291
}
284292
}
285293
}
286294
}
287295

288-
private IndexSettings bitsetFilterCacheSettings(boolean isStateless, boolean hasIndexRole, boolean loadFiltersEagerly) {
289-
var indexSettingsBuilder = Settings.builder().put(INDEX_LOAD_RANDOM_ACCESS_FILTERS_EAGERLY_SETTING.getKey(), loadFiltersEagerly);
296+
private IndexSettings bitsetFilterCacheSettings(
297+
boolean isStateless,
298+
boolean hasIndexRole,
299+
boolean loadFiltersEagerly,
300+
boolean fastRefresh
301+
) {
302+
var indexSettingsBuilder = Settings.builder()
303+
.put(INDEX_LOAD_RANDOM_ACCESS_FILTERS_EAGERLY_SETTING.getKey(), loadFiltersEagerly)
304+
.put(INDEX_FAST_REFRESH_SETTING.getKey(), fastRefresh);
290305

291306
var nodeSettingsBuilder = Settings.builder()
292307
.putList(

0 commit comments

Comments
 (0)