Skip to content

Commit 201bcf0

Browse files
committed
Merge branch 'main' into fix-data-stream-lifecycle-service-it
2 parents 73f2aef + 7070f3f commit 201bcf0

File tree

13 files changed

+148
-88
lines changed

13 files changed

+148
-88
lines changed

modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
4343
import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
4444
import org.elasticsearch.cluster.metadata.IndexMetadata;
45+
import org.elasticsearch.cluster.metadata.Metadata;
4546
import org.elasticsearch.cluster.metadata.Template;
4647
import org.elasticsearch.cluster.node.DiscoveryNode;
4748
import org.elasticsearch.cluster.service.ClusterService;
@@ -514,7 +515,7 @@ public void testErrorRecordingOnRollover() throws Exception {
514515
assertBusy(() -> {
515516
DataStreamLifecycleService lifecycleService = internalCluster().getCurrentMasterNodeInstance(DataStreamLifecycleService.class);
516517

517-
ErrorEntry writeIndexRolloverError = lifecycleService.getErrorStore().getError(writeIndexName);
518+
ErrorEntry writeIndexRolloverError = lifecycleService.getErrorStore().getError(Metadata.DEFAULT_PROJECT_ID, writeIndexName);
518519
assertThat(writeIndexRolloverError, is(notNullValue()));
519520
assertThat(writeIndexRolloverError.error(), containsString("maximum normal shards open"));
520521

@@ -587,7 +588,7 @@ public void testErrorRecordingOnRollover() throws Exception {
587588
// let's check there's no error recorded against it anymore
588589
String previousWriteInddex = currentBackingIndices.get(1);
589590
DataStreamLifecycleService lifecycleService = internalCluster().getCurrentMasterNodeInstance(DataStreamLifecycleService.class);
590-
assertThat(lifecycleService.getErrorStore().getError(previousWriteInddex), nullValue());
591+
assertThat(lifecycleService.getErrorStore().getError(Metadata.DEFAULT_PROJECT_ID, previousWriteInddex), nullValue());
591592
});
592593

593594
// the error has been fixed so the health information shouldn't be reported anymore
@@ -657,7 +658,8 @@ public void testErrorRecordingOnRetention() throws Exception {
657658
DataStreamLifecycleService.class
658659
);
659660

660-
ErrorEntry recordedRetentionExecutionError = lifecycleService.getErrorStore().getError(firstGenerationIndex);
661+
ErrorEntry recordedRetentionExecutionError = lifecycleService.getErrorStore()
662+
.getError(Metadata.DEFAULT_PROJECT_ID, firstGenerationIndex);
661663
assertThat(recordedRetentionExecutionError, is(notNullValue()));
662664
assertThat(recordedRetentionExecutionError.retryCount(), greaterThanOrEqualTo(3));
663665
assertThat(recordedRetentionExecutionError.error(), containsString("blocked by: [FORBIDDEN/5/index read-only (api)"));
@@ -710,7 +712,7 @@ public void testErrorRecordingOnRetention() throws Exception {
710712
DataStreamLifecycleService lifecycleService = internalCluster().getCurrentMasterNodeInstance(
711713
DataStreamLifecycleService.class
712714
);
713-
assertThat(lifecycleService.getErrorStore().getError(firstGenerationIndex), nullValue());
715+
assertThat(lifecycleService.getErrorStore().getError(Metadata.DEFAULT_PROJECT_ID, firstGenerationIndex), nullValue());
714716
});
715717

716718
// health info for DSL should be EMPTY as everything's healthy

modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleErrorStore.java

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import org.elasticsearch.ElasticsearchException;
1313
import org.elasticsearch.action.datastreams.lifecycle.ErrorEntry;
14+
import org.elasticsearch.cluster.metadata.ProjectId;
1415
import org.elasticsearch.common.Strings;
1516
import org.elasticsearch.core.Nullable;
1617
import org.elasticsearch.health.node.DslErrorInfo;
@@ -34,7 +35,7 @@
3435
public class DataStreamLifecycleErrorStore {
3536

3637
public static final int MAX_ERROR_MESSAGE_LENGTH = 1000;
37-
private final ConcurrentMap<String, ErrorEntry> indexNameToError = new ConcurrentHashMap<>();
38+
private final ConcurrentMap<ProjectId, ConcurrentMap<String, ErrorEntry>> projectMap = new ConcurrentHashMap<>();
3839
private final LongSupplier nowSupplier;
3940

4041
public DataStreamLifecycleErrorStore(LongSupplier nowSupplier) {
@@ -48,12 +49,13 @@ public DataStreamLifecycleErrorStore(LongSupplier nowSupplier) {
4849
* Returns the previously recorded error for the provided index, or null otherwise.
4950
*/
5051
@Nullable
51-
public ErrorEntry recordError(String indexName, Exception e) {
52+
public ErrorEntry recordError(ProjectId projectId, String indexName, Exception e) {
5253
String exceptionToString = Strings.toString((builder, params) -> {
5354
ElasticsearchException.generateThrowableXContent(builder, EMPTY_PARAMS, e);
5455
return builder;
5556
});
5657
String newError = Strings.substring(exceptionToString, 0, MAX_ERROR_MESSAGE_LENGTH);
58+
final var indexNameToError = projectMap.computeIfAbsent(projectId, k -> new ConcurrentHashMap<>());
5759
ErrorEntry existingError = indexNameToError.get(indexName);
5860
long recordedTimestamp = nowSupplier.getAsLong();
5961
if (existingError == null) {
@@ -71,29 +73,41 @@ public ErrorEntry recordError(String indexName, Exception e) {
7173
/**
7274
* Clears the recorded error for the provided index (if any exists)
7375
*/
74-
public void clearRecordedError(String indexName) {
76+
public void clearRecordedError(ProjectId projectId, String indexName) {
77+
final var indexNameToError = projectMap.get(projectId);
78+
if (indexNameToError == null) {
79+
return;
80+
}
7581
indexNameToError.remove(indexName);
7682
}
7783

7884
/**
7985
* Clears all the errors recorded in the store.
8086
*/
8187
public void clearStore() {
82-
indexNameToError.clear();
88+
projectMap.clear();
8389
}
8490

8591
/**
8692
* Retrieves the recorded error for the provided index.
8793
*/
8894
@Nullable
89-
public ErrorEntry getError(String indexName) {
95+
public ErrorEntry getError(ProjectId projectId, String indexName) {
96+
final var indexNameToError = projectMap.get(projectId);
97+
if (indexNameToError == null) {
98+
return null;
99+
}
90100
return indexNameToError.get(indexName);
91101
}
92102

93103
/**
94104
* Return an immutable view (a snapshot) of the tracked indices at the moment this method is called.
95105
*/
96-
public Set<String> getAllIndices() {
106+
public Set<String> getAllIndices(ProjectId projectId) {
107+
final var indexNameToError = projectMap.get(projectId);
108+
if (indexNameToError == null) {
109+
return Set.of();
110+
}
97111
return Set.copyOf(indexNameToError.keySet());
98112
}
99113

@@ -103,8 +117,9 @@ public Set<String> getAllIndices() {
103117
* retries DSL attempted (descending order) and the number of entries will be limited according to the provided limit parameter.
104118
* Returns empty list if no entries are present in the error store or none satisfy the predicate.
105119
*/
106-
public List<DslErrorInfo> getErrorsInfo(Predicate<ErrorEntry> errorEntryPredicate, int limit) {
107-
if (indexNameToError.isEmpty()) {
120+
public List<DslErrorInfo> getErrorsInfo(ProjectId projectId, Predicate<ErrorEntry> errorEntryPredicate, int limit) {
121+
final var indexNameToError = projectMap.get(projectId);
122+
if (indexNameToError == null || indexNameToError.isEmpty()) {
108123
return List.of();
109124
}
110125
return indexNameToError.entrySet()

modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -784,21 +784,22 @@ static List<Index> getTargetIndices(
784784
*/
785785
private void clearErrorStoreForUnmanagedIndices(DataStream dataStream) {
786786
Metadata metadata = clusterService.state().metadata();
787-
for (String indexName : errorStore.getAllIndices()) {
787+
final var projectId = metadata.getProject().id();
788+
for (String indexName : errorStore.getAllIndices(projectId)) {
788789
IndexAbstraction indexAbstraction = metadata.getProject().getIndicesLookup().get(indexName);
789790
DataStream parentDataStream = indexAbstraction != null ? indexAbstraction.getParentDataStream() : null;
790791
if (indexAbstraction == null || parentDataStream == null) {
791792
logger.trace(
792793
"Clearing recorded error for index [{}] because the index doesn't exist or is not a data stream backing index anymore",
793794
indexName
794795
);
795-
errorStore.clearRecordedError(indexName);
796+
errorStore.clearRecordedError(projectId, indexName);
796797
} else if (parentDataStream.getName().equals(dataStream.getName())) {
797798
// we're only verifying the indices that pertain to this data stream
798799
IndexMetadata indexMeta = metadata.getProject().index(indexName);
799800
if (dataStream.isIndexManagedByDataStreamLifecycle(indexMeta.getIndex(), metadata.getProject()::index) == false) {
800801
logger.trace("Clearing recorded error for index [{}] because the index is not managed by DSL anymore", indexName);
801-
errorStore.clearRecordedError(indexName);
802+
errorStore.clearRecordedError(projectId, indexName);
802803
}
803804
}
804805
}
@@ -866,7 +867,7 @@ private Index maybeExecuteRollover(ClusterState state, DataStream dataStream, bo
866867
if (latestDataStream.getWriteIndex().getName().equals(currentRunWriteIndex.getName())) {
867868
// data stream has not been rolled over in the meantime so record the error against the write index we
868869
// attempted the rollover
869-
errorStore.recordError(currentRunWriteIndex.getName(), e);
870+
errorStore.recordError(clusterService.state().metadata().getProject().id(), currentRunWriteIndex.getName(), e);
870871
}
871872
}
872873
}
@@ -1074,7 +1075,7 @@ public void onFailure(Exception e) {
10741075
if (e instanceof IndexNotFoundException) {
10751076
// index was already deleted, treat this as a success
10761077
logger.trace("Clearing recorded error for index [{}] because the index was deleted", targetIndex);
1077-
errorStore.clearRecordedError(targetIndex);
1078+
errorStore.clearRecordedError(clusterService.state().metadata().getProject().id(), targetIndex);
10781079
listener.onResponse(null);
10791080
return;
10801081
}
@@ -1157,7 +1158,7 @@ public void onFailure(Exception e) {
11571158
if (e instanceof IndexNotFoundException) {
11581159
// index was already deleted, treat this as a success
11591160
logger.trace("Clearing recorded error for index [{}] because the index was deleted", targetIndex);
1160-
errorStore.clearRecordedError(targetIndex);
1161+
errorStore.clearRecordedError(clusterService.state().metadata().getProject().id(), targetIndex);
11611162
listener.onResponse(null);
11621163
return;
11631164
}
@@ -1193,7 +1194,7 @@ public void onFailure(Exception e) {
11931194
if (e instanceof IndexNotFoundException) {
11941195
logger.trace("Data stream lifecycle did not delete index [{}] as it was already deleted", targetIndex);
11951196
// index was already deleted, treat this as a success
1196-
errorStore.clearRecordedError(targetIndex);
1197+
errorStore.clearRecordedError(clusterService.state().metadata().getProject().id(), targetIndex);
11971198
listener.onResponse(null);
11981199
return;
11991200
}
@@ -1341,7 +1342,9 @@ static class ErrorRecordingActionListener implements ActionListener<Void> {
13411342
@Override
13421343
public void onResponse(Void unused) {
13431344
logger.trace("Clearing recorded error for index [{}] because the [{}] action was successful", targetIndex, actionName);
1344-
errorStore.clearRecordedError(targetIndex);
1345+
@FixForMultiProject(description = "Don't use default project ID")
1346+
final var projectId = Metadata.DEFAULT_PROJECT_ID;
1347+
errorStore.clearRecordedError(projectId, targetIndex);
13451348
}
13461349

13471350
@Override
@@ -1364,8 +1367,10 @@ static void recordAndLogError(
13641367
String logMessage,
13651368
int signallingErrorRetryThreshold
13661369
) {
1367-
ErrorEntry previousError = errorStore.recordError(targetIndex, e);
1368-
ErrorEntry currentError = errorStore.getError(targetIndex);
1370+
@FixForMultiProject(description = "Don't use default project ID")
1371+
final var projectId = Metadata.DEFAULT_PROJECT_ID;
1372+
ErrorEntry previousError = errorStore.recordError(projectId, targetIndex, e);
1373+
ErrorEntry currentError = errorStore.getError(projectId, targetIndex);
13691374
if (previousError == null || (currentError != null && previousError.error().equals(currentError.error()) == false)) {
13701375
logger.error(logMessage, e);
13711376
} else {

modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportExplainDataStreamLifecycleAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ protected void masterOperation(
113113
rolloverInfo == null ? null : rolloverInfo.getTime(),
114114
generationDate,
115115
parentDataStream.getLifecycle(),
116-
errorStore.getError(index)
116+
errorStore.getError(state.projectId(), index)
117117
);
118118
explainIndices.add(explainIndexDataStreamLifecycle);
119119
}

modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportGetDataStreamLifecycleStatsAction.java

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,13 @@
1010

1111
import org.elasticsearch.action.ActionListener;
1212
import org.elasticsearch.action.support.ActionFilters;
13-
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
14-
import org.elasticsearch.cluster.ClusterState;
13+
import org.elasticsearch.action.support.master.TransportMasterNodeReadProjectAction;
14+
import org.elasticsearch.cluster.ProjectState;
1515
import org.elasticsearch.cluster.block.ClusterBlockException;
1616
import org.elasticsearch.cluster.block.ClusterBlockLevel;
1717
import org.elasticsearch.cluster.metadata.DataStream;
18-
import org.elasticsearch.cluster.metadata.Metadata;
18+
import org.elasticsearch.cluster.metadata.ProjectMetadata;
19+
import org.elasticsearch.cluster.project.ProjectResolver;
1920
import org.elasticsearch.cluster.service.ClusterService;
2021
import org.elasticsearch.common.util.concurrent.EsExecutors;
2122
import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService;
@@ -33,7 +34,7 @@
3334
/**
3435
* Exposes stats about the latest lifecycle run and the error store.
3536
*/
36-
public class TransportGetDataStreamLifecycleStatsAction extends TransportMasterNodeReadAction<
37+
public class TransportGetDataStreamLifecycleStatsAction extends TransportMasterNodeReadProjectAction<
3738
GetDataStreamLifecycleStatsAction.Request,
3839
GetDataStreamLifecycleStatsAction.Response> {
3940

@@ -45,7 +46,8 @@ public TransportGetDataStreamLifecycleStatsAction(
4546
ClusterService clusterService,
4647
ThreadPool threadPool,
4748
ActionFilters actionFilters,
48-
DataStreamLifecycleService lifecycleService
49+
DataStreamLifecycleService lifecycleService,
50+
ProjectResolver projectResolver
4951
) {
5052
super(
5153
GetDataStreamLifecycleStatsAction.NAME,
@@ -54,6 +56,7 @@ public TransportGetDataStreamLifecycleStatsAction(
5456
threadPool,
5557
actionFilters,
5658
GetDataStreamLifecycleStatsAction.Request::new,
59+
projectResolver,
5760
GetDataStreamLifecycleStatsAction.Response::new,
5861
EsExecutors.DIRECT_EXECUTOR_SERVICE
5962
);
@@ -64,23 +67,22 @@ public TransportGetDataStreamLifecycleStatsAction(
6467
protected void masterOperation(
6568
Task task,
6669
GetDataStreamLifecycleStatsAction.Request request,
67-
ClusterState state,
70+
ProjectState state,
6871
ActionListener<GetDataStreamLifecycleStatsAction.Response> listener
6972
) throws Exception {
70-
listener.onResponse(collectStats(state));
73+
listener.onResponse(collectStats(state.metadata()));
7174
}
7275

7376
// Visible for testing
74-
GetDataStreamLifecycleStatsAction.Response collectStats(ClusterState state) {
75-
Metadata metadata = state.metadata();
76-
Set<String> indicesInErrorStore = lifecycleService.getErrorStore().getAllIndices();
77+
GetDataStreamLifecycleStatsAction.Response collectStats(ProjectMetadata project) {
78+
Set<String> indicesInErrorStore = lifecycleService.getErrorStore().getAllIndices(project.id());
7779
List<GetDataStreamLifecycleStatsAction.Response.DataStreamStats> dataStreamStats = new ArrayList<>();
78-
for (DataStream dataStream : state.metadata().getProject().dataStreams().values()) {
80+
for (DataStream dataStream : project.dataStreams().values()) {
7981
if (dataStream.getLifecycle() != null && dataStream.getLifecycle().enabled()) {
8082
int total = 0;
8183
int inError = 0;
8284
for (Index index : dataStream.getIndices()) {
83-
if (dataStream.isIndexManagedByDataStreamLifecycle(index, metadata.getProject()::index)) {
85+
if (dataStream.isIndexManagedByDataStreamLifecycle(index, project::index)) {
8486
total++;
8587
if (indicesInErrorStore.contains(index.getName())) {
8688
inError++;
@@ -102,7 +104,7 @@ GetDataStreamLifecycleStatsAction.Response collectStats(ClusterState state) {
102104
}
103105

104106
@Override
105-
protected ClusterBlockException checkBlock(GetDataStreamLifecycleStatsAction.Request request, ClusterState state) {
107+
protected ClusterBlockException checkBlock(GetDataStreamLifecycleStatsAction.Request request, ProjectState state) {
106108
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
107109
}
108110
}

modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/health/DataStreamLifecycleHealthInfoPublisher.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,12 @@
1414
import org.elasticsearch.action.ActionListener;
1515
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1616
import org.elasticsearch.client.internal.Client;
17+
import org.elasticsearch.cluster.metadata.Metadata;
1718
import org.elasticsearch.cluster.node.DiscoveryNode;
1819
import org.elasticsearch.cluster.service.ClusterService;
1920
import org.elasticsearch.common.settings.Setting;
2021
import org.elasticsearch.common.settings.Settings;
22+
import org.elasticsearch.core.FixForMultiProject;
2123
import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleErrorStore;
2224
import org.elasticsearch.health.node.DataStreamLifecycleHealthInfo;
2325
import org.elasticsearch.health.node.DslErrorInfo;
@@ -83,9 +85,12 @@ private void updateNumberOfErrorsToPublish(int newValue) {
8385
* {@link org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService#DATA_STREAM_SIGNALLING_ERROR_RETRY_INTERVAL_SETTING}
8486
*/
8587
public void publishDslErrorEntries(ActionListener<AcknowledgedResponse> actionListener) {
88+
@FixForMultiProject(description = "Once the health API becomes project-aware, we shouldn't use the default project ID")
89+
final var projectId = Metadata.DEFAULT_PROJECT_ID;
8690
// fetching the entries that persist in the error store for more than the signalling retry interval
8791
// note that we're reporting this view into the error store on every publishing iteration
8892
List<DslErrorInfo> errorEntriesToSignal = errorStore.getErrorsInfo(
93+
projectId,
8994
entry -> entry.retryCount() >= signallingErrorRetryInterval,
9095
maxNumberOfErrorsToPublish
9196
);
@@ -97,7 +102,7 @@ public void publishDslErrorEntries(ActionListener<AcknowledgedResponse> actionLi
97102
UpdateHealthInfoCacheAction.INSTANCE,
98103
new UpdateHealthInfoCacheAction.Request(
99104
healthNodeId,
100-
new DataStreamLifecycleHealthInfo(errorEntriesToSignal, errorStore.getAllIndices().size())
105+
new DataStreamLifecycleHealthInfo(errorEntriesToSignal, errorStore.getAllIndices(projectId).size())
101106
),
102107
actionListener
103108
);

0 commit comments

Comments
 (0)