Skip to content

Commit 5806e64

Browse files
committed
Return sum of all primary shards' number of segments
1 parent fabc3c0 commit 5806e64

File tree

2 files changed

+36
-41
lines changed

2 files changed

+36
-41
lines changed

x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/TimeSeriesRestDriver.java

Lines changed: 18 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import org.elasticsearch.core.Nullable;
2525
import org.elasticsearch.core.TimeValue;
2626
import org.elasticsearch.test.rest.ESRestTestCase;
27+
import org.elasticsearch.test.rest.ObjectPath;
28+
import org.elasticsearch.test.rest.Stash;
2729
import org.elasticsearch.xcontent.ToXContent;
2830
import org.elasticsearch.xcontent.XContentBuilder;
2931
import org.elasticsearch.xcontent.XContentType;
@@ -46,9 +48,7 @@
4648
import java.util.List;
4749
import java.util.Locale;
4850
import java.util.Map;
49-
import java.util.Optional;
5051
import java.util.concurrent.TimeUnit;
51-
import java.util.stream.Collectors;
5252

5353
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
5454
import static org.elasticsearch.test.ESTestCase.randomAlphaOfLengthBetween;
@@ -373,35 +373,22 @@ private static void ensureGreen(String index) throws IOException {
373373
@SuppressWarnings("unchecked")
374374
public static Integer getNumberOfPrimarySegments(RestClient client, String index) throws IOException {
375375
Response response = client.performRequest(new Request("GET", index + "/_segments"));
376-
XContentType entityContentType = XContentType.fromMediaType(response.getEntity().getContentType().getValue());
377-
final Map<String, Object> originalResponseEntity = XContentHelper.convertToMap(
378-
entityContentType.xContent(),
379-
response.getEntity().getContent(),
380-
false
381-
);
382-
if (logger.isTraceEnabled()) {
383-
logger.trace(
384-
"segments response for {}: {}",
385-
index,
386-
originalResponseEntity.keySet()
387-
.stream()
388-
.map(key -> key + "=" + originalResponseEntity.get(key))
389-
.collect(Collectors.joining(", ", "{", "}"))
390-
);
391-
}
392-
Map<String, Object> responseEntity = (Map<String, Object>) originalResponseEntity.get("indices");
393-
responseEntity = (Map<String, Object>) responseEntity.get(index);
394-
responseEntity = (Map<String, Object>) responseEntity.get("shards");
395-
List<Map<String, Object>> shards = (List<Map<String, Object>>) responseEntity.get("0");
396-
// We want to mamke sure to get the primary shard because there is a chance the replica doesn't have data yet:
397-
Optional<Map<String, Object>> shardOptional = shards.stream()
398-
.filter(shard -> ((Map<String, Object>) shard.get("routing")).get("primary").equals(true))
399-
.findAny();
400-
if (shardOptional.isPresent()) {
401-
return (Integer) shardOptional.get().get("num_search_segments");
402-
} else {
403-
throw new RuntimeException("No primary shard found for index " + index);
404-
}
376+
final Map<String, Object> originalResponseEntity = ESRestTestCase.entityAsMap(response);
377+
logger.trace("segments response for {}: {}", index, originalResponseEntity);
378+
// We need to use a stash here because the index name is likely a data stream index with a dot.
379+
Stash stash = new Stash();
380+
stash.stashValue("index", index);
381+
Map<String, Object> responseEntity = new ObjectPath(originalResponseEntity).evaluate("indices.${index}.shards", stash);
382+
return responseEntity.values()
383+
.stream()
384+
.mapToInt(
385+
shardList -> ((List<Map<String, Object>>) shardList).stream()
386+
.filter(shard -> ((Map<String, Object>) shard.get("routing")).get("primary").equals(true))
387+
.findFirst()
388+
.map(shard -> (Integer) shard.get("num_search_segments"))
389+
.orElseThrow(() -> new IllegalStateException("no primary shard found in " + shardList))
390+
)
391+
.sum();
405392
}
406393

407394
public static void updatePolicy(RestClient client, String indexName, String policy) throws IOException {

x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/SearchableSnapshotActionIT.java

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ public void testSearchableSnapshotForceMergesClonedIndex() throws Exception {
156156
configureClusterAllocation(true);
157157
}
158158

159-
assertForceMergedSnapshotDone(phase, backingIndexName, numberOfPrimaries > 1, true);
159+
assertForceMergedSnapshotDone(phase, backingIndexName, numberOfPrimaries, true);
160160
} catch (Exception | AssertionError e) {
161161
// Make sure we re-enable allocation in case of failure so that the remaining tests in the suite are not affected.
162162
configureClusterAllocation(true);
@@ -172,12 +172,13 @@ public void testSearchableSnapshotForceMergesSourceIndex() throws Exception {
172172
// Data streams have 1 primary shard by default.
173173
// The test suite runs with 4 nodes, so we can have up to 3 (allocated) replicas.
174174
final String phase = randomBoolean() ? "cold" : "frozen";
175-
final String backingIndexName = prepareDataStreamWithDocs(phase, 1, 0);
175+
final int numberOfPrimaries = 1;
176+
final String backingIndexName = prepareDataStreamWithDocs(phase, numberOfPrimaries, 0);
176177

177178
// Enable/start ILM on the data stream.
178179
updateIndexSettings(dataStream, Settings.builder().put(LifecycleSettings.LIFECYCLE_NAME, policy));
179180

180-
assertForceMergedSnapshotDone(phase, backingIndexName, false, false);
181+
assertForceMergedSnapshotDone(phase, backingIndexName, numberOfPrimaries, false);
181182
}
182183

183184
/**
@@ -209,7 +210,7 @@ public void testSearchableSnapshotForceMergesClonedIndexAfterRetry() throws Exce
209210

210211
configureClusterAllocation(true);
211212

212-
assertForceMergedSnapshotDone(phase, backingIndexName, numberOfPrimaries > 1, true);
213+
assertForceMergedSnapshotDone(phase, backingIndexName, numberOfPrimaries, true);
213214
} catch (Exception | AssertionError e) {
214215
// Make sure we re-enable allocation in case of failure so that the remaining tests in the suite are not affected.
215216
configureClusterAllocation(true);
@@ -1156,6 +1157,13 @@ private Step.StepKey getKeyForIndex(Response response, String indexName) throws
11561157
* first generation backing index.
11571158
*/
11581159
private String prepareDataStreamWithDocs(String phase, int numberOfPrimaries, int numberOfReplicas) throws Exception {
1160+
logger.info(
1161+
"--> running [{}] with [{}] primaries, [{}] replicas, in phase [{}}",
1162+
getTestName(),
1163+
numberOfPrimaries,
1164+
numberOfReplicas,
1165+
phase
1166+
);
11591167
createSnapshotRepo(client(), snapshotRepo, randomBoolean());
11601168
createNewSingletonPolicy(client(), policy, phase, new SearchableSnapshotAction(snapshotRepo, true));
11611169

@@ -1223,10 +1231,10 @@ private static void assertForceMergeCloneIndexSettings(String backingIndexName,
12231231
*
12241232
* @param phase The phase of the ILM policy that the searchable snapshot action runs in.
12251233
* @param backingIndexName The original backing index name.
1226-
* @param multiplePrimaries True if the original backing index had multiple primaries, affecting segment assertions.
1234+
* @param numberOfPrimaries The number of primaries that the original backing index had, affecting segment count assertions.
12271235
* @param withReplicas True if the original backing index had one or more replicas, affecting snapshot index naming assertions.
12281236
*/
1229-
private void assertForceMergedSnapshotDone(String phase, String backingIndexName, boolean multiplePrimaries, boolean withReplicas)
1237+
private void assertForceMergedSnapshotDone(String phase, String backingIndexName, int numberOfPrimaries, boolean withReplicas)
12301238
throws Exception {
12311239
final String prefix = phase.equals("cold")
12321240
? SearchableSnapshotAction.FULL_RESTORED_INDEX_PREFIX
@@ -1240,11 +1248,11 @@ private void assertForceMergedSnapshotDone(String phase, String backingIndexName
12401248
// Regardless of whether we force merged the backing index or a clone, the cloned index should not exist (anymore).
12411249
awaitIndexDoesNotExist(FORCE_MERGE_CLONE_INDEX_PREFIX + "-*-" + backingIndexName);
12421250

1243-
// Retrieve the number of segments in the first (random) shard of the backing index.
1251+
// Retrieve the total number of segments across all primary shards of the restored index.
12441252
final Integer numberOfPrimarySegments = getNumberOfPrimarySegments(client(), restoredIndexName);
1245-
// If the backing index had multiple primaries, some primaries might be empty, but others should have no more than 1 segment
1246-
if (multiplePrimaries || phase.equals("frozen")) {
1247-
assertThat(numberOfPrimarySegments, lessThanOrEqualTo(1));
1253+
// If the backing index had multiple primaries, some primaries might be empty, but others should have no more than 1 segment.
1254+
if (numberOfPrimaries > 1 || phase.equals("frozen")) {
1255+
assertThat(numberOfPrimarySegments, lessThanOrEqualTo(numberOfPrimaries));
12481256
} else {
12491257
// If the backing index had only one primary, we expect exactly 1 segment after force merging.
12501258
assertThat(numberOfPrimarySegments, equalTo(1));

0 commit comments

Comments
 (0)