Skip to content

Commit db9b450

Browse files
committed
Add multi-project support for health indicator data_stream_lifecycle
1 parent f79ff97 commit db9b450

File tree

11 files changed

+211
-47
lines changed

11 files changed

+211
-47
lines changed

modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ public Collection<?> createComponents(PluginServices services) {
216216
)
217217
);
218218
dataLifecycleInitialisationService.get().init();
219-
dataStreamLifecycleHealthIndicatorService.set(new DataStreamLifecycleHealthIndicatorService());
219+
dataStreamLifecycleHealthIndicatorService.set(new DataStreamLifecycleHealthIndicatorService(services.projectResolver()));
220220

221221
components.add(errorStoreInitialisationService.get());
222222
components.add(dataLifecycleInitialisationService.get());

modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleErrorStore.java

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,11 @@
1414
import org.elasticsearch.cluster.metadata.ProjectId;
1515
import org.elasticsearch.common.Strings;
1616
import org.elasticsearch.core.Nullable;
17+
import org.elasticsearch.core.Tuple;
1718
import org.elasticsearch.health.node.DslErrorInfo;
19+
import org.elasticsearch.health.node.ProjectIndexName;
1820

21+
import java.util.Comparator;
1922
import java.util.List;
2023
import java.util.Map;
2124
import java.util.Set;
@@ -117,23 +120,38 @@ public Set<String> getAllIndices(ProjectId projectId) {
117120
* retries DSL attempted (descending order) and the number of entries will be limited according to the provided limit parameter.
118121
* Returns empty list if no entries are present in the error store or none satisfy the predicate.
119122
*/
120-
public List<DslErrorInfo> getErrorsInfo(ProjectId projectId, Predicate<ErrorEntry> errorEntryPredicate, int limit) {
121-
final var indexNameToError = projectMap.get(projectId);
122-
if (indexNameToError == null || indexNameToError.isEmpty()) {
123-
return List.of();
124-
}
125-
return indexNameToError.entrySet()
123+
public List<DslErrorInfo> getErrorsInfo(Predicate<ErrorEntry> errorEntryPredicate, int limit) {
124+
return projectMap.entrySet()
126125
.stream()
127-
.filter(keyValue -> errorEntryPredicate.test(keyValue.getValue()))
128-
.sorted(Map.Entry.comparingByValue())
126+
.flatMap(
127+
projectToIndexError -> projectToIndexError.getValue()
128+
.entrySet()
129+
.stream()
130+
.map(
131+
indexToError -> new Tuple<>(
132+
new ProjectIndexName(projectToIndexError.getKey(), indexToError.getKey()),
133+
indexToError.getValue()
134+
)
135+
)
136+
)
137+
.filter(projectIndexAndError -> errorEntryPredicate.test(projectIndexAndError.v2()))
138+
.sorted(Comparator.comparing(Tuple::v2))
129139
.limit(limit)
130140
.map(
131-
keyValue -> new DslErrorInfo(
132-
keyValue.getKey(),
133-
keyValue.getValue().firstOccurrenceTimestamp(),
134-
keyValue.getValue().retryCount()
141+
projectIndexAndError -> new DslErrorInfo(
142+
projectIndexAndError.v1().indexName(),
143+
projectIndexAndError.v2().firstOccurrenceTimestamp(),
144+
projectIndexAndError.v2().retryCount(),
145+
projectIndexAndError.v1().projectId()
135146
)
136147
)
137148
.collect(Collectors.toList());
138149
}
150+
151+
/**
152+
* Get the total number of error entries in the store
153+
*/
154+
public int getTotalErrorEntries() {
155+
return projectMap.values().stream().mapToInt(Map::size).sum();
156+
}
139157
}

modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/health/DataStreamLifecycleHealthIndicatorService.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
package org.elasticsearch.datastreams.lifecycle.health;
1111

12+
import org.elasticsearch.cluster.project.ProjectResolver;
1213
import org.elasticsearch.health.Diagnosis;
1314
import org.elasticsearch.health.HealthIndicatorDetails;
1415
import org.elasticsearch.health.HealthIndicatorImpact;
@@ -20,6 +21,7 @@
2021
import org.elasticsearch.health.node.DataStreamLifecycleHealthInfo;
2122
import org.elasticsearch.health.node.DslErrorInfo;
2223
import org.elasticsearch.health.node.HealthInfo;
24+
import org.elasticsearch.health.node.ProjectIndexName;
2325

2426
import java.util.HashMap;
2527
import java.util.LinkedHashMap;
@@ -54,6 +56,12 @@ public class DataStreamLifecycleHealthIndicatorService implements HealthIndicato
5456
DSL_EXPLAIN_HELP_URL
5557
);
5658

59+
private final ProjectResolver projectResolver;
60+
61+
public DataStreamLifecycleHealthIndicatorService(ProjectResolver projectResolver) {
62+
this.projectResolver = projectResolver;
63+
}
64+
5765
@Override
5866
public String name() {
5967
return NAME;
@@ -79,20 +87,20 @@ public HealthIndicatorResult calculate(boolean verbose, int maxAffectedResources
7987
return createIndicator(
8088
HealthStatus.GREEN,
8189
"Data streams are executing their lifecycles without issues",
82-
createDetails(verbose, dataStreamLifecycleHealthInfo),
90+
createDetails(verbose, dataStreamLifecycleHealthInfo, projectResolver.supportsMultipleProjects()),
8391
List.of(),
8492
List.of()
8593
);
8694
} else {
8795
List<String> affectedIndices = stagnatingBackingIndices.stream()
88-
.map(DslErrorInfo::indexName)
96+
.map(dslErrorInfo -> indexDisplayName(dslErrorInfo, projectResolver.supportsMultipleProjects()))
8997
.limit(Math.min(maxAffectedResourcesCount, stagnatingBackingIndices.size()))
9098
.collect(toList());
9199
return createIndicator(
92100
HealthStatus.YELLOW,
93101
(stagnatingBackingIndices.size() > 1 ? stagnatingBackingIndices.size() + " backing indices have" : "A backing index has")
94102
+ " repeatedly encountered errors whilst trying to advance in its lifecycle",
95-
createDetails(verbose, dataStreamLifecycleHealthInfo),
103+
createDetails(verbose, dataStreamLifecycleHealthInfo, projectResolver.supportsMultipleProjects()),
96104
STAGNATING_INDEX_IMPACT,
97105
verbose
98106
? List.of(
@@ -106,7 +114,8 @@ public HealthIndicatorResult calculate(boolean verbose, int maxAffectedResources
106114
}
107115
}
108116

109-
private static HealthIndicatorDetails createDetails(boolean verbose, DataStreamLifecycleHealthInfo dataStreamLifecycleHealthInfo) {
117+
private static HealthIndicatorDetails createDetails(boolean verbose, DataStreamLifecycleHealthInfo dataStreamLifecycleHealthInfo,
118+
boolean supportsMultipleProjects) {
110119
if (verbose == false) {
111120
return HealthIndicatorDetails.EMPTY;
112121
}
@@ -117,12 +126,16 @@ private static HealthIndicatorDetails createDetails(boolean verbose, DataStreamL
117126
if (dataStreamLifecycleHealthInfo.dslErrorsInfo().isEmpty() == false) {
118127
details.put("stagnating_backing_indices", dataStreamLifecycleHealthInfo.dslErrorsInfo().stream().map(dslError -> {
119128
LinkedHashMap<String, Object> errorDetails = new LinkedHashMap<>(3, 1L);
120-
errorDetails.put("index_name", dslError.indexName());
129+
errorDetails.put("index_name", indexDisplayName(dslError, supportsMultipleProjects));
121130
errorDetails.put("first_occurrence_timestamp", dslError.firstOccurrence());
122131
errorDetails.put("retry_count", dslError.retryCount());
123132
return errorDetails;
124133
}).toList());
125134
}
126135
return new SimpleHealthIndicatorDetails(details);
127136
}
137+
138+
private static String indexDisplayName(DslErrorInfo dslErrorInfo, boolean supportsMultipleProjects) {
139+
return new ProjectIndexName(dslErrorInfo.projectId(), dslErrorInfo.indexName()).toString(supportsMultipleProjects);
140+
}
128141
}

modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/health/DataStreamLifecycleHealthInfoPublisher.java

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,10 @@
1414
import org.elasticsearch.action.ActionListener;
1515
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1616
import org.elasticsearch.client.internal.Client;
17-
import org.elasticsearch.cluster.metadata.Metadata;
1817
import org.elasticsearch.cluster.node.DiscoveryNode;
1918
import org.elasticsearch.cluster.service.ClusterService;
2019
import org.elasticsearch.common.settings.Setting;
2120
import org.elasticsearch.common.settings.Settings;
22-
import org.elasticsearch.core.FixForMultiProject;
2321
import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleErrorStore;
2422
import org.elasticsearch.health.node.DataStreamLifecycleHealthInfo;
2523
import org.elasticsearch.health.node.DslErrorInfo;
@@ -85,24 +83,22 @@ private void updateNumberOfErrorsToPublish(int newValue) {
8583
* {@link org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService#DATA_STREAM_SIGNALLING_ERROR_RETRY_INTERVAL_SETTING}
8684
*/
8785
public void publishDslErrorEntries(ActionListener<AcknowledgedResponse> actionListener) {
88-
@FixForMultiProject(description = "Once the health API becomes project-aware, we shouldn't use the default project ID")
89-
final var projectId = Metadata.DEFAULT_PROJECT_ID;
90-
// fetching the entries that persist in the error store for more than the signalling retry interval
91-
// note that we're reporting this view into the error store on every publishing iteration
92-
List<DslErrorInfo> errorEntriesToSignal = errorStore.getErrorsInfo(
93-
projectId,
94-
entry -> entry.retryCount() >= signallingErrorRetryInterval,
95-
maxNumberOfErrorsToPublish
96-
);
9786
DiscoveryNode currentHealthNode = HealthNode.findHealthNode(clusterService.state());
9887
if (currentHealthNode != null) {
9988
String healthNodeId = currentHealthNode.getId();
89+
// fetching the entries that persist in the error store for more than the signalling retry interval
90+
// note that we're reporting this view into the error store on every publishing iteration
91+
List<DslErrorInfo> errorEntriesToSignal = errorStore.getErrorsInfo(
92+
entry -> entry.retryCount() >= signallingErrorRetryInterval,
93+
maxNumberOfErrorsToPublish
94+
);
95+
10096
logger.trace("reporting [{}] DSL error entries to to health node [{}]", errorEntriesToSignal.size(), healthNodeId);
10197
client.execute(
10298
UpdateHealthInfoCacheAction.INSTANCE,
10399
new UpdateHealthInfoCacheAction.Request(
104100
healthNodeId,
105-
new DataStreamLifecycleHealthInfo(errorEntriesToSignal, errorStore.getAllIndices(projectId).size())
101+
new DataStreamLifecycleHealthInfo(errorEntriesToSignal, errorStore.getTotalErrorEntries())
106102
),
107103
actionListener
108104
);

modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleErrorStoreTests.java

Lines changed: 73 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.elasticsearch.test.ESTestCase;
1616
import org.junit.Before;
1717

18+
import java.io.IOException;
1819
import java.util.List;
1920
import java.util.Set;
2021
import java.util.stream.IntStream;
@@ -101,26 +102,94 @@ public void testGetFilteredEntries() {
101102
IntStream.range(0, 5).forEach(i -> errorStore.recordError(projectId, "test5", new NullPointerException("testing")));
102103

103104
{
104-
List<DslErrorInfo> entries = errorStore.getErrorsInfo(projectId, entry -> entry.retryCount() > 7, 100);
105+
List<DslErrorInfo> entries = errorStore.getErrorsInfo(entry -> entry.retryCount() > 7, 100);
105106
assertThat(entries.size(), is(1));
107+
assertThat(entries.getFirst().indexName(), is("test20"));
108+
assertThat(entries.getFirst().projectId(), is(projectId));
109+
}
110+
111+
{
112+
List<DslErrorInfo> entries = errorStore.getErrorsInfo(entry -> entry.retryCount() > 7, 0);
113+
assertThat(entries.size(), is(0));
114+
}
115+
116+
{
117+
List<DslErrorInfo> entries = errorStore.getErrorsInfo(entry -> entry.retryCount() > 50, 100);
118+
assertThat(entries.size(), is(0));
119+
}
120+
121+
{
122+
List<DslErrorInfo> entries = errorStore.getErrorsInfo(entry -> entry.retryCount() > 2, 100);
123+
assertThat(entries.size(), is(2));
106124
assertThat(entries.get(0).indexName(), is("test20"));
125+
assertThat(entries.get(0).projectId(), is(projectId));
126+
assertThat(entries.get(1).indexName(), is("test5"));
127+
assertThat(entries.get(1).projectId(), is(projectId));
107128
}
129+
}
130+
131+
public void testGetFilteredEntriesForMultipleProjects() {
132+
ProjectId projectId1 = randomProjectIdOrDefault();
133+
ProjectId projectId2 = randomUniqueProjectId();
134+
IntStream.range(0, 20).forEach(i -> errorStore.recordError(projectId1, "test20", new NullPointerException("testing")));
135+
IntStream.range(0, 5).forEach(i -> errorStore.recordError(projectId2, "test5", new NullPointerException("testing")));
108136

109137
{
110-
List<DslErrorInfo> entries = errorStore.getErrorsInfo(projectId, entry -> entry.retryCount() > 7, 0);
138+
List<DslErrorInfo> entries = errorStore.getErrorsInfo(entry -> entry.retryCount() > 7, 100);
139+
assertThat(entries.size(), is(1));
140+
assertThat(entries.getFirst().indexName(), is("test20"));
141+
assertThat(entries.getFirst().projectId(), is(projectId1));
142+
}
143+
144+
{
145+
List<DslErrorInfo> entries = errorStore.getErrorsInfo(entry -> entry.retryCount() > 7, 0);
111146
assertThat(entries.size(), is(0));
112147
}
113148

114149
{
115-
List<DslErrorInfo> entries = errorStore.getErrorsInfo(projectId, entry -> entry.retryCount() > 50, 100);
150+
List<DslErrorInfo> entries = errorStore.getErrorsInfo(entry -> entry.retryCount() > 50, 100);
116151
assertThat(entries.size(), is(0));
117152
}
118153

119154
{
120-
List<DslErrorInfo> entries = errorStore.getErrorsInfo(projectId, entry -> entry.retryCount() > 2, 100);
155+
List<DslErrorInfo> entries = errorStore.getErrorsInfo(entry -> entry.retryCount() > 2, 100);
121156
assertThat(entries.size(), is(2));
122157
assertThat(entries.get(0).indexName(), is("test20"));
158+
assertThat(entries.get(0).projectId(), is(projectId1));
123159
assertThat(entries.get(1).indexName(), is("test5"));
160+
assertThat(entries.get(1).projectId(), is(projectId2));
161+
}
162+
}
163+
164+
public void testTotalErrorCount() {
165+
ProjectId projectId1 = randomProjectIdOrDefault();
166+
ProjectId projectId2 = randomUniqueProjectId();
167+
168+
{
169+
// empty store
170+
assertThat(errorStore.getTotalErrorEntries(), is(0));
171+
}
172+
173+
{
174+
// single project multiple indices
175+
IntStream.range(1, 20).forEach(i -> errorStore.recordError(projectId1, "index1", new NullPointerException("testing")));
176+
IntStream.range(1, 5).forEach(i -> errorStore.recordError(projectId1, "index2", new NullPointerException("testing")));
177+
IntStream.range(1, 5).forEach(i -> errorStore.recordError(projectId1, "index2", new IOException("testing")));
178+
assertThat(errorStore.getTotalErrorEntries(), is(2));
179+
}
180+
181+
{
182+
// clear store
183+
errorStore.clearStore();
184+
assertThat(errorStore.getTotalErrorEntries(), is(0));
185+
}
186+
187+
{
188+
// multiple projects
189+
IntStream.range(1, 20).forEach(i -> errorStore.recordError(projectId1, "index1", new NullPointerException("testing")));
190+
IntStream.range(1, 5).forEach(i -> errorStore.recordError(projectId1, "index2", new IOException("testing")));
191+
IntStream.range(1, 5).forEach(i -> errorStore.recordError(projectId2, "index1", new NullPointerException("testing")));
192+
assertThat(errorStore.getTotalErrorEntries(), is(3));
124193
}
125194
}
126195
}

modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/health/DataStreamLifecycleHealthIndicatorServiceTests.java

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
package org.elasticsearch.datastreams.lifecycle.health;
1111

1212
import org.elasticsearch.cluster.metadata.DataStream;
13+
import org.elasticsearch.cluster.metadata.ProjectId;
14+
import org.elasticsearch.cluster.project.TestProjectResolvers;
1315
import org.elasticsearch.common.Strings;
1416
import org.elasticsearch.health.Diagnosis;
1517
import org.elasticsearch.health.HealthIndicatorDetails;
@@ -18,6 +20,7 @@
1820
import org.elasticsearch.health.node.DataStreamLifecycleHealthInfo;
1921
import org.elasticsearch.health.node.DslErrorInfo;
2022
import org.elasticsearch.health.node.HealthInfo;
23+
import org.elasticsearch.health.node.ProjectIndexName;
2124
import org.elasticsearch.test.ESTestCase;
2225
import org.junit.Before;
2326

@@ -38,7 +41,7 @@ public class DataStreamLifecycleHealthIndicatorServiceTests extends ESTestCase {
3841

3942
@Before
4043
public void setupService() {
41-
service = new DataStreamLifecycleHealthIndicatorService();
44+
service = new DataStreamLifecycleHealthIndicatorService(TestProjectResolvers.singleProjectOnly(randomProjectIdOrDefault()));
4245
}
4346

4447
public void testGreenWhenNoDSLHealthData() {
@@ -119,6 +122,52 @@ public void testSkippingFieldsWhenVerboseIsFalse() {
119122
assertThat(result.diagnosisList().isEmpty(), is(true));
120123
}
121124

125+
public void testMultiProject() {
126+
service = new DataStreamLifecycleHealthIndicatorService(TestProjectResolvers.allProjects());
127+
ProjectId projectId1 = randomProjectIdOrDefault();
128+
ProjectId projectId2 = randomUniqueProjectId();
129+
String index1 = DataStream.getDefaultBackingIndexName("foo", 1L);
130+
String index2 = DataStream.getDefaultBackingIndexName("boo", 1L);
131+
String index1DisplayName = projectId1 + ProjectIndexName.DELIMITER + index1;
132+
String index2DisplayName = projectId2 + ProjectIndexName.DELIMITER + index2;
133+
134+
HealthIndicatorResult result = service.calculate(
135+
true,
136+
constructHealthInfo(
137+
new DataStreamLifecycleHealthInfo(
138+
List.of(
139+
new DslErrorInfo(index1, 1L, 100, projectId1),
140+
new DslErrorInfo(index2, 3L, 100, projectId2)),
141+
15
142+
)
143+
)
144+
);
145+
146+
assertThat(result.status(), is(HealthStatus.YELLOW));
147+
assertThat(result.symptom(), is("2 backing indices have repeatedly encountered errors whilst trying to advance in its lifecycle"));
148+
assertThat(result.details(), is(not(HealthIndicatorDetails.EMPTY)));
149+
String detailsAsString = Strings.toString(result.details());
150+
assertThat(detailsAsString, containsString("\"total_backing_indices_in_error\":15"));
151+
assertThat(detailsAsString, containsString("\"stagnating_backing_indices_count\":2"));
152+
assertThat(
153+
detailsAsString,
154+
containsString(
155+
String.format(
156+
Locale.ROOT,
157+
"\"index_name\":\"%s\","
158+
+ "\"first_occurrence_timestamp\":1,\"retry_count\":100},{\"index_name\":\"%s\","
159+
+ "\"first_occurrence_timestamp\":3,\"retry_count\":100",
160+
index1DisplayName,
161+
index2DisplayName
162+
)
163+
)
164+
);
165+
assertThat(result.impacts(), is(STAGNATING_INDEX_IMPACT));
166+
Diagnosis diagnosis = result.diagnosisList().get(0);
167+
assertThat(diagnosis.definition(), is(STAGNATING_BACKING_INDICES_DIAGNOSIS_DEF));
168+
assertThat(diagnosis.affectedResources().get(0).getValues(), containsInAnyOrder(index1DisplayName, index2DisplayName));
169+
}
170+
122171
private HealthInfo constructHealthInfo(DataStreamLifecycleHealthInfo dslHealthInfo) {
123172
return new HealthInfo(Map.of(), dslHealthInfo, Map.of());
124173
}

0 commit comments

Comments
 (0)