Skip to content

Commit cc7bbe4

Browse files
authored
Skip search on indices with INDEX_REFRESH_BLOCK (elastic#129132)
elastic#117543 introduced a cluster block that is added to new indices in stateless and removed when at least one replica is ready. A search against those indices should be skipped during that time.
1 parent bea994a commit cc7bbe4

File tree

3 files changed

+226
-0
lines changed

3 files changed

+226
-0
lines changed
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.search;
11+
12+
import org.elasticsearch.action.index.IndexRequestBuilder;
13+
import org.elasticsearch.action.search.ClosePointInTimeRequest;
14+
import org.elasticsearch.action.search.OpenPointInTimeRequest;
15+
import org.elasticsearch.action.search.SearchRequest;
16+
import org.elasticsearch.action.search.SearchShardsGroup;
17+
import org.elasticsearch.action.search.SearchShardsRequest;
18+
import org.elasticsearch.action.search.TransportClosePointInTimeAction;
19+
import org.elasticsearch.action.search.TransportOpenPointInTimeAction;
20+
import org.elasticsearch.action.search.TransportSearchShardsAction;
21+
import org.elasticsearch.action.support.IndicesOptions;
22+
import org.elasticsearch.cluster.ClusterState;
23+
import org.elasticsearch.cluster.block.ClusterBlocks;
24+
import org.elasticsearch.cluster.metadata.IndexMetadata;
25+
import org.elasticsearch.cluster.metadata.ProjectId;
26+
import org.elasticsearch.cluster.node.DiscoveryNode;
27+
import org.elasticsearch.cluster.service.ClusterService;
28+
import org.elasticsearch.common.bytes.BytesReference;
29+
import org.elasticsearch.core.TimeValue;
30+
import org.elasticsearch.index.query.MatchAllQueryBuilder;
31+
import org.elasticsearch.index.query.QueryBuilders;
32+
import org.elasticsearch.search.builder.PointInTimeBuilder;
33+
import org.elasticsearch.search.builder.SearchSourceBuilder;
34+
import org.elasticsearch.test.ESIntegTestCase;
35+
36+
import java.util.ArrayList;
37+
import java.util.HashMap;
38+
import java.util.List;
39+
import java.util.Map;
40+
import java.util.Objects;
41+
42+
import static org.elasticsearch.cluster.block.ClusterBlocks.EMPTY_CLUSTER_BLOCK;
43+
import static org.elasticsearch.test.ClusterServiceUtils.setState;
44+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
45+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
46+
47+
public class SearchWithIndexBlocksIT extends ESIntegTestCase {
48+
49+
public void testSearchIndicesWithIndexRefreshBlocks() {
50+
List<String> indices = createIndices();
51+
Map<String, Integer> numDocsPerIndex = indexDocuments(indices);
52+
List<String> unblockedIndices = addIndexRefreshBlockToSomeIndices(indices);
53+
54+
int expectedHits = 0;
55+
for (String index : unblockedIndices) {
56+
expectedHits += numDocsPerIndex.get(index);
57+
}
58+
59+
assertHitCount(prepareSearch().setQuery(QueryBuilders.matchAllQuery()), expectedHits);
60+
}
61+
62+
public void testOpenPITOnIndicesWithIndexRefreshBlocks() {
63+
List<String> indices = createIndices();
64+
Map<String, Integer> numDocsPerIndex = indexDocuments(indices);
65+
List<String> unblockedIndices = addIndexRefreshBlockToSomeIndices(indices);
66+
67+
int expectedHits = 0;
68+
for (String index : unblockedIndices) {
69+
expectedHits += numDocsPerIndex.get(index);
70+
}
71+
72+
BytesReference pitId = null;
73+
try {
74+
OpenPointInTimeRequest openPITRequest = new OpenPointInTimeRequest(indices.toArray(new String[0])).keepAlive(
75+
TimeValue.timeValueSeconds(10)
76+
).allowPartialSearchResults(true);
77+
pitId = client().execute(TransportOpenPointInTimeAction.TYPE, openPITRequest).actionGet().getPointInTimeId();
78+
SearchRequest searchRequest = new SearchRequest().source(
79+
new SearchSourceBuilder().pointInTimeBuilder(new PointInTimeBuilder(pitId).setKeepAlive(TimeValue.timeValueSeconds(10)))
80+
);
81+
assertHitCount(client().search(searchRequest), expectedHits);
82+
} finally {
83+
if (pitId != null) {
84+
client().execute(TransportClosePointInTimeAction.TYPE, new ClosePointInTimeRequest(pitId)).actionGet();
85+
}
86+
}
87+
}
88+
89+
public void testMultiSearchIndicesWithIndexRefreshBlocks() {
90+
List<String> indices = createIndices();
91+
Map<String, Integer> numDocsPerIndex = indexDocuments(indices);
92+
List<String> unblockedIndices = addIndexRefreshBlockToSomeIndices(indices);
93+
94+
int expectedHits = 0;
95+
for (String index : unblockedIndices) {
96+
expectedHits += numDocsPerIndex.get(index);
97+
}
98+
99+
final long expectedHitsL = expectedHits;
100+
assertResponse(
101+
client().prepareMultiSearch()
102+
.add(prepareSearch().setQuery(QueryBuilders.matchAllQuery()))
103+
.add(prepareSearch().setQuery(QueryBuilders.termQuery("field", "blah"))),
104+
response -> {
105+
assertHitCount(Objects.requireNonNull(response.getResponses()[0].getResponse()), expectedHitsL);
106+
assertHitCount(Objects.requireNonNull(response.getResponses()[1].getResponse()), 0);
107+
}
108+
);
109+
}
110+
111+
public void testSearchShardsOnIndicesWithIndexRefreshBlocks() {
112+
List<String> indices = createIndices();
113+
indexDocuments(indices);
114+
List<String> unblockedIndices = addIndexRefreshBlockToSomeIndices(indices);
115+
116+
var resp = client().execute(
117+
TransportSearchShardsAction.TYPE,
118+
new SearchShardsRequest(
119+
indices.toArray(new String[0]),
120+
IndicesOptions.DEFAULT,
121+
new MatchAllQueryBuilder(),
122+
null,
123+
null,
124+
true,
125+
null
126+
)
127+
).actionGet();
128+
for (SearchShardsGroup group : resp.getGroups()) {
129+
assertTrue(unblockedIndices.contains(group.shardId().getIndex().getName()));
130+
}
131+
}
132+
133+
private List<String> createIndices() {
134+
int numIndices = randomIntBetween(1, 3);
135+
List<String> indices = new ArrayList<>();
136+
for (int i = 0; i < numIndices; i++) {
137+
indices.add("test" + i);
138+
createIndex("test" + i);
139+
}
140+
return indices;
141+
}
142+
143+
private Map<String, Integer> indexDocuments(List<String> indices) {
144+
Map<String, Integer> numDocsPerIndex = new HashMap<>();
145+
List<IndexRequestBuilder> indexRequests = new ArrayList<>();
146+
for (String index : indices) {
147+
int numDocs = randomIntBetween(0, 10);
148+
numDocsPerIndex.put(index, numDocs);
149+
for (int i = 0; i < numDocs; i++) {
150+
indexRequests.add(prepareIndex(index).setId(String.valueOf(i)).setSource("field", "value"));
151+
}
152+
}
153+
indexRandom(true, indexRequests);
154+
155+
return numDocsPerIndex;
156+
}
157+
158+
private List<String> addIndexRefreshBlockToSomeIndices(List<String> indices) {
159+
List<String> unblockedIndices = new ArrayList<>();
160+
var blocksBuilder = ClusterBlocks.builder().blocks(EMPTY_CLUSTER_BLOCK);
161+
for (String index : indices) {
162+
boolean blockIndex = randomBoolean();
163+
if (blockIndex) {
164+
blocksBuilder.addIndexBlock(ProjectId.DEFAULT, index, IndexMetadata.INDEX_REFRESH_BLOCK);
165+
} else {
166+
unblockedIndices.add(index);
167+
}
168+
}
169+
170+
var dataNodes = clusterService().state().getNodes().getAllNodes();
171+
for (DiscoveryNode dataNode : dataNodes) {
172+
ClusterService clusterService = internalCluster().getInstance(ClusterService.class, dataNode.getName());
173+
ClusterState currentState = clusterService.state();
174+
ClusterState newState = ClusterState.builder(currentState).blocks(blocksBuilder).build();
175+
setState(clusterService, newState);
176+
}
177+
178+
return unblockedIndices;
179+
}
180+
}

server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1950,6 +1950,7 @@ List<SearchShardIterator> getLocalShardsIterator(
19501950
Set<ResolvedExpression> indicesAndAliases,
19511951
String[] concreteIndices
19521952
) {
1953+
concreteIndices = ignoreBlockedIndices(projectState, concreteIndices);
19531954
var routingMap = indexNameExpressionResolver.resolveSearchRouting(
19541955
projectState.metadata(),
19551956
searchRequest.routing(),
@@ -1982,6 +1983,20 @@ List<SearchShardIterator> getLocalShardsIterator(
19821983
return Arrays.asList(list);
19831984
}
19841985

1986+
static String[] ignoreBlockedIndices(ProjectState projectState, String[] concreteIndices) {
1987+
// optimization: mostly we do not have any blocks so there's no point in the expensive per-index checking
1988+
boolean hasIndexBlocks = projectState.blocks().indices(projectState.projectId()).isEmpty() == false;
1989+
if (hasIndexBlocks) {
1990+
return Arrays.stream(concreteIndices)
1991+
.filter(
1992+
index -> projectState.blocks()
1993+
.hasIndexBlock(projectState.projectId(), index, IndexMetadata.INDEX_REFRESH_BLOCK) == false
1994+
)
1995+
.toArray(String[]::new);
1996+
}
1997+
return concreteIndices;
1998+
}
1999+
19852000
private interface TelemetryListener {
19862001
void setRemotes(int count);
19872002

server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1831,4 +1831,35 @@ public void onFailure(Exception ex) {
18311831
assertTrue(ESTestCase.terminate(threadPool));
18321832
}
18331833
}
1834+
1835+
public void testIgnoreIndicesWithIndexRefreshBlock() {
1836+
int numIndices = randomIntBetween(1, 10);
1837+
String[] concreteIndices = new String[numIndices];
1838+
for (int i = 0; i < numIndices; i++) {
1839+
concreteIndices[i] = "index" + i;
1840+
}
1841+
1842+
List<String> shuffledIndices = Arrays.asList(concreteIndices);
1843+
Collections.shuffle(shuffledIndices, random());
1844+
concreteIndices = shuffledIndices.toArray(new String[0]);
1845+
1846+
final ProjectId projectId = randomProjectIdOrDefault();
1847+
ClusterBlocks.Builder blocksBuilder = ClusterBlocks.builder();
1848+
int numBlockedIndices = randomIntBetween(0, numIndices);
1849+
for (int i = 0; i < numBlockedIndices; i++) {
1850+
blocksBuilder.addIndexBlock(projectId, concreteIndices[i], IndexMetadata.INDEX_REFRESH_BLOCK);
1851+
}
1852+
final ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
1853+
.putProjectMetadata(ProjectMetadata.builder(projectId).build())
1854+
.blocks(blocksBuilder)
1855+
.build();
1856+
final ProjectState projectState = clusterState.projectState(projectId);
1857+
1858+
String[] actual = TransportSearchAction.ignoreBlockedIndices(projectState, concreteIndices);
1859+
String[] expected = Arrays.stream(concreteIndices)
1860+
.filter(index -> clusterState.blocks().hasIndexBlock(projectId, index, IndexMetadata.INDEX_REFRESH_BLOCK) == false)
1861+
.toArray(String[]::new);
1862+
1863+
assertThat(Arrays.asList(actual), containsInAnyOrder(expected));
1864+
}
18341865
}

0 commit comments

Comments
 (0)