-
Couldn't load subscription status.
- Fork 25.6k
Skip search shards with INDEX_REFRESH_BLOCK #129132
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 29 commits
2aa74e3
12b6b81
9c705cd
1c75721
1ecc447
cdb4bc1
b7ade2d
cd991c2
5f50d5c
3f86fb8
0edc27c
9de6f06
be37bf6
17706e2
8759a07
bf8a2be
8887609
0f0200a
7689263
598e906
4cdfbd0
76ecade
86c9a5d
7200edc
d72600b
61d40c4
51d5196
24f2770
b33a14f
e10d59b
31d7dce
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,153 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the "Elastic License | ||
| * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side | ||
| * Public License v 1"; you may not use this file except in compliance with, at | ||
| * your election, the "Elastic License 2.0", the "GNU Affero General Public | ||
| * License v3.0 only", or the "Server Side Public License, v 1". | ||
| */ | ||
|
|
||
| package org.elasticsearch.search; | ||
|
|
||
| import org.elasticsearch.action.index.IndexRequestBuilder; | ||
| import org.elasticsearch.action.search.ClosePointInTimeRequest; | ||
| import org.elasticsearch.action.search.OpenPointInTimeRequest; | ||
| import org.elasticsearch.action.search.SearchRequest; | ||
| import org.elasticsearch.action.search.TransportClosePointInTimeAction; | ||
| import org.elasticsearch.action.search.TransportOpenPointInTimeAction; | ||
| import org.elasticsearch.cluster.ClusterState; | ||
| import org.elasticsearch.cluster.block.ClusterBlocks; | ||
| import org.elasticsearch.cluster.metadata.IndexMetadata; | ||
| import org.elasticsearch.cluster.metadata.ProjectId; | ||
| import org.elasticsearch.cluster.node.DiscoveryNode; | ||
| import org.elasticsearch.cluster.service.ClusterService; | ||
| import org.elasticsearch.common.bytes.BytesReference; | ||
| import org.elasticsearch.core.TimeValue; | ||
| import org.elasticsearch.index.query.QueryBuilders; | ||
| import org.elasticsearch.search.builder.PointInTimeBuilder; | ||
| import org.elasticsearch.search.builder.SearchSourceBuilder; | ||
| import org.elasticsearch.test.ESIntegTestCase; | ||
|
|
||
| import java.util.ArrayList; | ||
| import java.util.HashMap; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Objects; | ||
|
|
||
| import static org.elasticsearch.cluster.block.ClusterBlocks.EMPTY_CLUSTER_BLOCK; | ||
| import static org.elasticsearch.test.ClusterServiceUtils.setState; | ||
| import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; | ||
| import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse; | ||
|
|
||
| public class SearchWithIndexBlocksIT extends ESIntegTestCase { | ||
|
|
||
| public void testSearchIndicesWithIndexRefreshBlocks() { | ||
| List<String> indices = createIndices(); | ||
| Map<String, Integer> numDocsPerIndex = indexDocuments(indices); | ||
| List<String> unblockedIndices = addIndexRefreshBlockToSomeIndices(indices); | ||
|
|
||
| int expectedHits = 0; | ||
| for (String index : unblockedIndices) { | ||
| expectedHits += numDocsPerIndex.get(index); | ||
| } | ||
|
|
||
| assertHitCount(prepareSearch().setQuery(QueryBuilders.matchAllQuery()), expectedHits); | ||
| } | ||
|
|
||
| public void testOpenPITOnIndicesWithIndexRefreshBlocks() { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What was your conclusion on open PIT? We do the same filtering, correct? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, we filter just the same - that's verified in this test by asserting the hit count does not include docs from blocked indices. |
||
| List<String> indices = createIndices(); | ||
| Map<String, Integer> numDocsPerIndex = indexDocuments(indices); | ||
| List<String> unblockedIndices = addIndexRefreshBlockToSomeIndices(indices); | ||
|
|
||
| int expectedHits = 0; | ||
| for (String index : unblockedIndices) { | ||
| expectedHits += numDocsPerIndex.get(index); | ||
| } | ||
|
|
||
| BytesReference pitId = null; | ||
| try { | ||
| OpenPointInTimeRequest openPITRequest = new OpenPointInTimeRequest(indices.toArray(new String[0])).keepAlive( | ||
| TimeValue.timeValueSeconds(10) | ||
| ).allowPartialSearchResults(true); | ||
| pitId = client().execute(TransportOpenPointInTimeAction.TYPE, openPITRequest).actionGet().getPointInTimeId(); | ||
| SearchRequest searchRequest = new SearchRequest().source( | ||
| new SearchSourceBuilder().pointInTimeBuilder(new PointInTimeBuilder(pitId).setKeepAlive(TimeValue.timeValueSeconds(10))) | ||
| ); | ||
| assertHitCount(client().search(searchRequest), expectedHits); | ||
| } finally { | ||
| if (pitId != null) { | ||
| client().execute(TransportClosePointInTimeAction.TYPE, new ClosePointInTimeRequest(pitId)).actionGet(); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| public void testMultiSearchIndicesWithIndexRefreshBlocks() { | ||
| List<String> indices = createIndices(); | ||
| Map<String, Integer> numDocsPerIndex = indexDocuments(indices); | ||
| List<String> unblockedIndices = addIndexRefreshBlockToSomeIndices(indices); | ||
|
|
||
| int expectedHits = 0; | ||
| for (String index : unblockedIndices) { | ||
| expectedHits += numDocsPerIndex.get(index); | ||
| } | ||
|
|
||
| final long expectedHitsL = expectedHits; | ||
| assertResponse( | ||
| client().prepareMultiSearch() | ||
| .add(prepareSearch().setQuery(QueryBuilders.matchAllQuery())) | ||
| .add(prepareSearch().setQuery(QueryBuilders.termQuery("field", "blah"))), | ||
| response -> { | ||
| assertHitCount(Objects.requireNonNull(response.getResponses()[0].getResponse()), expectedHitsL); | ||
| assertHitCount(Objects.requireNonNull(response.getResponses()[1].getResponse()), 0); | ||
| } | ||
| ); | ||
| } | ||
|
|
||
| private List<String> createIndices() { | ||
| int numIndices = randomIntBetween(1, 3); | ||
| List<String> indices = new ArrayList<>(); | ||
| for (int i = 0; i < numIndices; i++) { | ||
| indices.add("test" + i); | ||
| createIndex("test" + i); | ||
| } | ||
| return indices; | ||
| } | ||
|
|
||
| private Map<String, Integer> indexDocuments(List<String> indices) { | ||
| Map<String, Integer> numDocsPerIndex = new HashMap<>(); | ||
| List<IndexRequestBuilder> indexRequests = new ArrayList<>(); | ||
| for (String index : indices) { | ||
| int numDocs = randomIntBetween(0, 10); | ||
| numDocsPerIndex.put(index, numDocs); | ||
| for (int i = 0; i < numDocs; i++) { | ||
| indexRequests.add(prepareIndex(index).setId(String.valueOf(i)).setSource("field", "value")); | ||
| } | ||
| } | ||
| indexRandom(true, indexRequests); | ||
|
|
||
| return numDocsPerIndex; | ||
| } | ||
|
|
||
| private List<String> addIndexRefreshBlockToSomeIndices(List<String> indices) { | ||
| List<String> unblockedIndices = new ArrayList<>(); | ||
| var blocksBuilder = ClusterBlocks.builder().blocks(EMPTY_CLUSTER_BLOCK); | ||
| for (String index : indices) { | ||
| boolean blockIndex = randomBoolean(); | ||
| if (blockIndex) { | ||
| blocksBuilder.addIndexBlock(ProjectId.DEFAULT, index, IndexMetadata.INDEX_REFRESH_BLOCK); | ||
| } else { | ||
| unblockedIndices.add(index); | ||
| } | ||
| } | ||
|
|
||
| var dataNodes = clusterService().state().getNodes().getAllNodes(); | ||
| for (DiscoveryNode dataNode : dataNodes) { | ||
| ClusterService clusterService = internalCluster().getInstance(ClusterService.class, dataNode.getName()); | ||
| ClusterState currentState = clusterService.state(); | ||
| ClusterState newState = ClusterState.builder(currentState).blocks(blocksBuilder).build(); | ||
| setState(clusterService, newState); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This method is not intended to be used in integration test as it overrides the current data node cluster state. For testing the INDEX_REFRESH_BLOCK I think it makes sense to only have unit tests in stateful elasticsearch. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry @tlrx, can you explain more the risks of doing this? I like having fine-grained control over blocks so I can write tests that block some indices and allow others in one search - I think this is critical to test. If I can only 'set' the block by controlling active search nodes (like I do in the other PR), I can't think of a way to achieve what I want. |
||
| } | ||
|
|
||
| return unblockedIndices; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1864,6 +1864,7 @@ List<SearchShardIterator> getLocalShardsIterator( | |
| Set<ResolvedExpression> indicesAndAliases, | ||
| String[] concreteIndices | ||
| ) { | ||
| concreteIndices = ignoreBlockedIndices(projectState, concreteIndices); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do I understand correctly that search shards will inherit the new behaviour as it calls getLocalShardsIterator? Other API that we need to update to look at this block? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I've added a test to verify search shards respects the block. |
||
| var routingMap = indexNameExpressionResolver.resolveSearchRouting( | ||
| projectState.metadata(), | ||
| searchRequest.routing(), | ||
|
|
@@ -1896,6 +1897,20 @@ List<SearchShardIterator> getLocalShardsIterator( | |
| return Arrays.asList(list); | ||
| } | ||
|
|
||
| static String[] ignoreBlockedIndices(ProjectState projectState, String[] concreteIndices) { | ||
| // optimization: mostly we do not have any blocks so there's no point in the expensive per-index checking | ||
| boolean hasIndexBlocks = projectState.blocks().indices(projectState.projectId()).isEmpty() == false; | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: did we have a chance to incorporate this logic in buildPerIndexOriginalIndices perhaps, where we already look at blocks? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I want to filter concreteIndices as it's used in multiple places. However, it may be possible to move the block checks in buildPerIndexOriginalIndices to ignoreBlockedIndices. I will have to verify, then if so, I'll follow up with the change. |
||
| if (hasIndexBlocks) { | ||
| return Arrays.stream(concreteIndices) | ||
| .filter( | ||
| index -> projectState.blocks() | ||
| .hasIndexBlock(projectState.projectId(), index, IndexMetadata.INDEX_REFRESH_BLOCK) == false | ||
| ) | ||
| .toArray(String[]::new); | ||
| } | ||
| return concreteIndices; | ||
| } | ||
|
|
||
| private interface TelemetryListener { | ||
| void setRemotes(int count); | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1812,4 +1812,35 @@ public void onFailure(Exception ex) { | |
| assertTrue(ESTestCase.terminate(threadPool)); | ||
| } | ||
| } | ||
|
|
||
| public void testIgnoreBlockedIndices() { | ||
|
||
| int numIndices = randomIntBetween(1, 10); | ||
| String[] concreteIndices = new String[numIndices]; | ||
| for (int i = 0; i < numIndices; i++) { | ||
| concreteIndices[i] = "index" + i; | ||
| } | ||
|
|
||
| List<String> shuffledIndices = Arrays.asList(concreteIndices); | ||
| Collections.shuffle(shuffledIndices, random()); | ||
| concreteIndices = shuffledIndices.toArray(new String[0]); | ||
|
|
||
| final ProjectId projectId = randomProjectIdOrDefault(); | ||
| ClusterBlocks.Builder blocksBuilder = ClusterBlocks.builder(); | ||
| int numBlockedIndices = randomIntBetween(0, numIndices); | ||
| for (int i = 0; i < numBlockedIndices; i++) { | ||
| blocksBuilder.addIndexBlock(projectId, concreteIndices[i], IndexMetadata.INDEX_REFRESH_BLOCK); | ||
| } | ||
| final ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) | ||
| .putProjectMetadata(ProjectMetadata.builder(projectId).build()) | ||
| .blocks(blocksBuilder) | ||
| .build(); | ||
| final ProjectState projectState = clusterState.projectState(projectId); | ||
|
|
||
| String[] actual = TransportSearchAction.ignoreBlockedIndices(projectState, concreteIndices); | ||
| String[] expected = Arrays.stream(concreteIndices) | ||
| .filter(index -> clusterState.blocks().hasIndexBlock(projectId, index, IndexMetadata.INDEX_REFRESH_BLOCK) == false) | ||
| .toArray(String[]::new); | ||
|
|
||
| assertThat(Arrays.asList(actual), containsInAnyOrder(expected)); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we also want to have ESQL test for this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm tracking that to be a followup task.