Skip to content

Commit 3abdf41

Browse files
committed
Make use of new method in DataStreamLifecycleService
1 parent 2f6d7ca commit 3abdf41

File tree

3 files changed

+10
-18
lines changed

3 files changed

+10
-18
lines changed

modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -214,8 +214,7 @@ public Collection<?> createComponents(PluginServices services) {
214214
errorStoreInitialisationService.get(),
215215
services.allocationService(),
216216
dataStreamLifecycleErrorsPublisher.get(),
217-
services.dataStreamGlobalRetentionSettings(),
218-
services.projectResolver()
217+
services.dataStreamGlobalRetentionSettings()
219218
)
220219
);
221220
dataLifecycleInitialisationService.get().init();

modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@
5454
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.SelectorResolver;
5555
import org.elasticsearch.cluster.metadata.ProjectId;
5656
import org.elasticsearch.cluster.metadata.ProjectMetadata;
57-
import org.elasticsearch.cluster.project.ProjectResolver;
5857
import org.elasticsearch.cluster.routing.allocation.AllocationService;
5958
import org.elasticsearch.cluster.service.ClusterService;
6059
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
@@ -169,7 +168,6 @@ public class DataStreamLifecycleService implements ClusterStateListener, Closeab
169168
final ResultDeduplicator<Tuple<ProjectId, String>, Void> clusterStateChangesDeduplicator;
170169
private final DataStreamLifecycleHealthInfoPublisher dslHealthInfoPublisher;
171170
private final DataStreamGlobalRetentionSettings globalRetentionSettings;
172-
private final ProjectResolver projectResolver;
173171
private LongSupplier nowSupplier;
174172
private final Clock clock;
175173
private final DataStreamLifecycleErrorStore errorStore;
@@ -218,8 +216,7 @@ public DataStreamLifecycleService(
218216
DataStreamLifecycleErrorStore errorStore,
219217
AllocationService allocationService,
220218
DataStreamLifecycleHealthInfoPublisher dataStreamLifecycleHealthInfoPublisher,
221-
DataStreamGlobalRetentionSettings globalRetentionSettings,
222-
ProjectResolver projectResolver
219+
DataStreamGlobalRetentionSettings globalRetentionSettings
223220
) {
224221
this.settings = settings;
225222
this.client = client;
@@ -231,7 +228,6 @@ public DataStreamLifecycleService(
231228
this.nowSupplier = nowSupplier;
232229
this.errorStore = errorStore;
233230
this.globalRetentionSettings = globalRetentionSettings;
234-
this.projectResolver = projectResolver;
235231
this.scheduledJob = null;
236232
this.pollInterval = DATA_STREAM_LIFECYCLE_POLL_INTERVAL_SETTING.get(settings);
237233
this.targetMergePolicyFloorSegment = DATA_STREAM_MERGE_POLICY_TARGET_FLOOR_SEGMENT_SETTING.get(settings);
@@ -1074,7 +1070,7 @@ private void rolloverDataStream(
10741070
rolloverRequest.indicesOptions()
10751071
);
10761072
logger.trace("Data stream lifecycle issues rollover request for data stream [{}]", rolloverRequest.getRolloverTarget());
1077-
projectResolver.projectClient(client, projectId).admin().indices().rolloverIndex(rolloverRequest, new ActionListener<>() {
1073+
client.projectClient(projectId).admin().indices().rolloverIndex(rolloverRequest, new ActionListener<>() {
10781074
@Override
10791075
public void onResponse(RolloverResponse rolloverResponse) {
10801076
// Log only when the conditions were met and the index was rolled over.
@@ -1137,7 +1133,7 @@ private void updateIndexSetting(ProjectId projectId, UpdateSettingsRequest updat
11371133
updateSettingsRequest.settings().keySet(),
11381134
targetIndex
11391135
);
1140-
projectResolver.projectClient(client, projectId).admin().indices().updateSettings(updateSettingsRequest, new ActionListener<>() {
1136+
client.projectClient(projectId).admin().indices().updateSettings(updateSettingsRequest, new ActionListener<>() {
11411137
@Override
11421138
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
11431139
logger.info(
@@ -1173,7 +1169,7 @@ private void addIndexBlock(ProjectId projectId, AddIndexBlockRequest addIndexBlo
11731169
addIndexBlockRequest.getBlock(),
11741170
targetIndex
11751171
);
1176-
projectResolver.projectClient(client, projectId).admin().indices().addBlock(addIndexBlockRequest, new ActionListener<>() {
1172+
client.projectClient(projectId).admin().indices().addBlock(addIndexBlockRequest, new ActionListener<>() {
11771173
@Override
11781174
public void onResponse(AddIndexBlockResponse addIndexBlockResponse) {
11791175
if (addIndexBlockResponse.isAcknowledged()) {
@@ -1252,7 +1248,7 @@ private void deleteIndex(ProjectId projectId, DeleteIndexRequest deleteIndexRequ
12521248
// "saving" the index name here so we don't capture the entire request
12531249
String targetIndex = deleteIndexRequest.indices()[0];
12541250
logger.trace("Data stream lifecycle issues request to delete index [{}]", targetIndex);
1255-
projectResolver.projectClient(client, projectId).admin().indices().delete(deleteIndexRequest, new ActionListener<>() {
1251+
client.projectClient(projectId).admin().indices().delete(deleteIndexRequest, new ActionListener<>() {
12561252
@Override
12571253
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
12581254
if (acknowledgedResponse.isAcknowledged()) {
@@ -1293,7 +1289,7 @@ private void downsampleIndex(ProjectId projectId, DownsampleAction.Request reque
12931289
String sourceIndex = request.getSourceIndex();
12941290
String downsampleIndex = request.getTargetIndex();
12951291
logger.info("Data stream lifecycle issuing request to downsample index [{}] to index [{}]", sourceIndex, downsampleIndex);
1296-
projectResolver.projectClient(client, projectId).execute(DownsampleAction.INSTANCE, request, new ActionListener<>() {
1292+
client.projectClient(projectId).execute(DownsampleAction.INSTANCE, request, new ActionListener<>() {
12971293
@Override
12981294
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
12991295
assert acknowledgedResponse.isAcknowledged() : "the downsample response is always acknowledged";
@@ -1318,7 +1314,7 @@ private void forceMergeIndex(ProjectId projectId, ForceMergeRequest forceMergeRe
13181314
: "Data stream lifecycle force merges one index at a time";
13191315
final String targetIndex = forceMergeRequest.indices()[0];
13201316
logger.info("Data stream lifecycle is issuing a request to force merge index [{}]", targetIndex);
1321-
projectResolver.projectClient(client, projectId).admin().indices().forceMerge(forceMergeRequest, new ActionListener<>() {
1317+
client.projectClient(projectId).admin().indices().forceMerge(forceMergeRequest, new ActionListener<>() {
13221318
@Override
13231319
public void onResponse(BroadcastResponse forceMergeResponse) {
13241320
if (forceMergeResponse.getFailedShards() > 0) {

modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@
5454
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
5555
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
5656
import org.elasticsearch.cluster.node.DiscoveryNodes;
57-
import org.elasticsearch.cluster.project.TestProjectResolvers;
5857
import org.elasticsearch.cluster.routing.allocation.AllocationService;
5958
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
6059
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
@@ -182,8 +181,7 @@ public void setupServices() {
182181
errorStore,
183182
allocationService,
184183
new DataStreamLifecycleHealthInfoPublisher(Settings.EMPTY, client, clusterService, errorStore),
185-
globalRetentionSettings,
186-
TestProjectResolvers.mustExecuteFirst()
184+
globalRetentionSettings
187185
);
188186
clientDelegate = null;
189187
dataStreamLifecycleService.init();
@@ -1456,8 +1454,7 @@ public void testTrackingTimeStats() {
14561454
errorStore,
14571455
mock(AllocationService.class),
14581456
new DataStreamLifecycleHealthInfoPublisher(Settings.EMPTY, getTransportRequestsRecordingClient(), clusterService, errorStore),
1459-
globalRetentionSettings,
1460-
TestProjectResolvers.mustExecuteFirst()
1457+
globalRetentionSettings
14611458
);
14621459
assertThat(service.getLastRunDuration(), is(nullValue()));
14631460
assertThat(service.getTimeBetweenStarts(), is(nullValue()));

0 commit comments

Comments
 (0)