Skip to content

Commit 4447a52

Browse files
committed
making WritableIndexExpander a singleton
1 parent 400b69d commit 4447a52

18 files changed

+90
-94
lines changed

x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/UnusedStatsRemoverIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ private void refreshStatsIndex() {
162162

163163
private void runUnusedStatsRemover() {
164164
PlainActionFuture<Boolean> deletionListener = new PlainActionFuture<>();
165-
new UnusedStatsRemover(client, new TaskId("test", 0L), writableIndexExpander).remove(10000.0f, deletionListener, () -> false);
165+
new UnusedStatsRemover(client, new TaskId("test", 0L)).remove(10000.0f, deletionListener, () -> false);
166166
deletionListener.actionGet();
167167
}
168168

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,7 @@
364364
import org.elasticsearch.xpack.ml.job.process.normalizer.NativeNormalizerProcessFactory;
365365
import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory;
366366
import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerProcessFactory;
367+
import org.elasticsearch.xpack.ml.job.retention.WritableIndexExpander;
367368
import org.elasticsearch.xpack.ml.job.snapshot.upgrader.SnapshotUpgradeTaskExecutor;
368369
import org.elasticsearch.xpack.ml.job.task.OpenJobPersistentTasksExecutor;
369370
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
@@ -921,6 +922,9 @@ public Collection<?> createComponents(PluginServices services) {
921922
IndexNameExpressionResolver indexNameExpressionResolver = services.indexNameExpressionResolver();
922923
TelemetryProvider telemetryProvider = services.telemetryProvider();
923924

925+
// Initialize WritableIndexExpander
926+
WritableIndexExpander.initialize(clusterService, indexNameExpressionResolver);
927+
924928
if (enabled == false) {
925929
// Holders for @link(MachineLearningFeatureSetUsage) which needs access to job manager and ML extension,
926930
// both empty if ML is disabled

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java

Lines changed: 8 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import org.elasticsearch.action.support.ThreadedActionListener;
1616
import org.elasticsearch.client.internal.Client;
1717
import org.elasticsearch.client.internal.OriginSettingClient;
18-
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
1918
import org.elasticsearch.cluster.service.ClusterService;
2019
import org.elasticsearch.common.Strings;
2120
import org.elasticsearch.core.TimeValue;
@@ -41,7 +40,6 @@
4140
import org.elasticsearch.xpack.ml.job.retention.MlDataRemover;
4241
import org.elasticsearch.xpack.ml.job.retention.UnusedStateRemover;
4342
import org.elasticsearch.xpack.ml.job.retention.UnusedStatsRemover;
44-
import org.elasticsearch.xpack.ml.job.retention.WritableIndexExpander;
4543
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
4644
import org.elasticsearch.xpack.ml.utils.VolatileCursorIterator;
4745
import org.elasticsearch.xpack.ml.utils.persistence.WrappedBatchedJobsIterator;
@@ -64,19 +62,16 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction<
6462

6563
private final ThreadPool threadPool;
6664
private final Executor executor;
67-
private final IndexNameExpressionResolver indexNameExpressionResolver;
6865
private final OriginSettingClient client;
6966
private final ClusterService clusterService;
7067
private final Clock clock;
7168
private final JobConfigProvider jobConfigProvider;
7269
private final JobResultsProvider jobResultsProvider;
7370
private final AnomalyDetectionAuditor auditor;
74-
private final WritableIndexExpander writableIndexExpander;
7571

7672
@Inject
7773
public TransportDeleteExpiredDataAction(
7874
ThreadPool threadPool,
79-
IndexNameExpressionResolver indexNameExpressionResolver,
8075
TransportService transportService,
8176
ActionFilters actionFilters,
8277
Client client,
@@ -88,7 +83,6 @@ public TransportDeleteExpiredDataAction(
8883
this(
8984
threadPool,
9085
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME),
91-
indexNameExpressionResolver,
9286
transportService,
9387
actionFilters,
9488
client,
@@ -103,7 +97,6 @@ public TransportDeleteExpiredDataAction(
10397
TransportDeleteExpiredDataAction(
10498
ThreadPool threadPool,
10599
Executor executor,
106-
IndexNameExpressionResolver indexNameExpressionResolver,
107100
TransportService transportService,
108101
ActionFilters actionFilters,
109102
Client client,
@@ -116,14 +109,12 @@ public TransportDeleteExpiredDataAction(
116109
super(DeleteExpiredDataAction.NAME, transportService, actionFilters, DeleteExpiredDataAction.Request::new, executor);
117110
this.threadPool = threadPool;
118111
this.executor = executor;
119-
this.indexNameExpressionResolver = indexNameExpressionResolver;
120112
this.client = new OriginSettingClient(client, ClientHelper.ML_ORIGIN);
121113
this.clusterService = clusterService;
122114
this.clock = clock;
123115
this.jobConfigProvider = jobConfigProvider;
124116
this.jobResultsProvider = jobResultsProvider;
125117
this.auditor = auditor;
126-
this.writableIndexExpander = new WritableIndexExpander(clusterService, indexNameExpressionResolver);
127118
}
128119

129120
@Override
@@ -255,28 +246,25 @@ private List<MlDataRemover> createDataRemovers(
255246
originClient,
256247
new WrappedBatchedJobsIterator(new SearchAfterJobsIterator(originClient)),
257248
parentTaskId,
258-
writableIndexExpander,
259249
anomalyDetectionAuditor,
260250
threadPool
261251
),
262-
new ExpiredForecastsRemover(originClient, threadPool, parentTaskId, writableIndexExpander),
252+
new ExpiredForecastsRemover(originClient, threadPool, parentTaskId),
263253
new ExpiredModelSnapshotsRemover(
264254
originClient,
265255
new WrappedBatchedJobsIterator(new SearchAfterJobsIterator(originClient)),
266256
parentTaskId,
267-
writableIndexExpander,
268257
threadPool,
269258
jobResultsProvider,
270259
anomalyDetectionAuditor
271260
),
272-
new UnusedStateRemover(originClient, parentTaskId, writableIndexExpander),
261+
new UnusedStateRemover(originClient, parentTaskId),
273262
new EmptyStateIndexRemover(originClient, parentTaskId),
274-
new UnusedStatsRemover(originClient, parentTaskId, writableIndexExpander),
263+
new UnusedStatsRemover(originClient, parentTaskId),
275264
new ExpiredAnnotationsRemover(
276265
originClient,
277266
new WrappedBatchedJobsIterator(new SearchAfterJobsIterator(originClient)),
278267
parentTaskId,
279-
writableIndexExpander,
280268
anomalyDetectionAuditor,
281269
threadPool
282270
)
@@ -285,35 +273,20 @@ private List<MlDataRemover> createDataRemovers(
285273

286274
private List<MlDataRemover> createDataRemovers(List<Job> jobs, TaskId parentTaskId, AnomalyDetectionAuditor anomalyDetectionAuditor) {
287275
return Arrays.asList(
288-
new ExpiredResultsRemover(
289-
client,
290-
new VolatileCursorIterator<>(jobs),
291-
parentTaskId,
292-
writableIndexExpander,
293-
anomalyDetectionAuditor,
294-
threadPool
295-
),
296-
new ExpiredForecastsRemover(client, threadPool, parentTaskId, writableIndexExpander),
276+
new ExpiredResultsRemover(client, new VolatileCursorIterator<>(jobs), parentTaskId, anomalyDetectionAuditor, threadPool),
277+
new ExpiredForecastsRemover(client, threadPool, parentTaskId),
297278
new ExpiredModelSnapshotsRemover(
298279
client,
299280
new VolatileCursorIterator<>(jobs),
300281
parentTaskId,
301-
writableIndexExpander,
302282
threadPool,
303283
jobResultsProvider,
304284
anomalyDetectionAuditor
305285
),
306-
new UnusedStateRemover(client, parentTaskId, writableIndexExpander),
286+
new UnusedStateRemover(client, parentTaskId),
307287
new EmptyStateIndexRemover(client, parentTaskId),
308-
new UnusedStatsRemover(client, parentTaskId, writableIndexExpander),
309-
new ExpiredAnnotationsRemover(
310-
client,
311-
new VolatileCursorIterator<>(jobs),
312-
parentTaskId,
313-
writableIndexExpander,
314-
anomalyDetectionAuditor,
315-
threadPool
316-
)
288+
new UnusedStatsRemover(client, parentTaskId),
289+
new ExpiredAnnotationsRemover(client, new VolatileCursorIterator<>(jobs), parentTaskId, anomalyDetectionAuditor, threadPool)
317290
);
318291
}
319292

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
6868
import org.elasticsearch.xpack.core.security.user.InternalUsers;
6969
import org.elasticsearch.xpack.ml.MachineLearning;
70+
import org.elasticsearch.xpack.ml.job.retention.WritableIndexExpander;
7071
import org.elasticsearch.xpack.ml.utils.MlIndicesUtils;
7172

7273
import java.util.ArrayList;
@@ -133,7 +134,21 @@ public void deleteModelSnapshots(List<ModelSnapshot> modelSnapshots, ActionListe
133134
indices.add(AnomalyDetectorsIndex.jobResultsAliasedName(modelSnapshot.getJobId()));
134135
}
135136

136-
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(indices.toArray(new String[0])).setRefresh(true)
137+
// Remove read-only indices
138+
List<String> indicesToQuery;
139+
try {
140+
indicesToQuery = WritableIndexExpander.getInstance().getWritableIndices(indices);
141+
} catch (Exception e) {
142+
logger.error("Failed to get writable indices for [" + jobId + "].", e);
143+
listener.onFailure(e);
144+
return;
145+
}
146+
if (indicesToQuery.isEmpty()) {
147+
logger.info("No writable model snapshot indices found for [{}] job. No expired model snapshots to remove.", jobId);
148+
return;
149+
}
150+
151+
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(indicesToQuery.toArray(new String[0])).setRefresh(true)
137152
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
138153
.setQuery(QueryBuilders.idsQuery().addIds(idsToDelete.toArray(new String[0])));
139154

@@ -181,7 +196,23 @@ public void deleteAnnotations(
181196
boolQuery.filter(QueryBuilders.termsQuery(Annotation.EVENT.getPreferredName(), eventsToDelete));
182197
}
183198
QueryBuilder query = QueryBuilders.constantScoreQuery(boolQuery);
184-
DeleteByQueryRequest dbqRequest = new DeleteByQueryRequest(AnnotationIndex.READ_ALIAS_NAME).setQuery(query)
199+
200+
List<String> indicesToQuery = List.of(AnnotationIndex.READ_ALIAS_NAME);
201+
// Remove read-only indices
202+
try {
203+
indicesToQuery = WritableIndexExpander.getInstance().getWritableIndices(indicesToQuery);
204+
} catch (Exception e) {
205+
logger.error("Failed to get writable indices for [" + jobId + "]", e);
206+
listener.onFailure(e);
207+
return;
208+
}
209+
if (indicesToQuery.isEmpty()) {
210+
logger.info("No writable annotation indices found for [{}] job. No annotations to remove.", jobId);
211+
listener.onResponse(true);
212+
return;
213+
}
214+
215+
DeleteByQueryRequest dbqRequest = new DeleteByQueryRequest(indicesToQuery.toArray(new String[0])).setQuery(query)
185216
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
186217
.setAbortOnVersionConflict(false)
187218
.setRefresh(true)

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import org.elasticsearch.xpack.core.ml.job.config.Job;
1313

1414
import java.util.Iterator;
15-
import java.util.Objects;
1615
import java.util.function.BooleanSupplier;
1716

1817
/**
@@ -27,18 +26,11 @@ abstract class AbstractExpiredJobDataRemover implements MlDataRemover {
2726
protected final OriginSettingClient client;
2827
private final Iterator<Job> jobIterator;
2928
private final TaskId parentTaskId;
30-
protected final WritableIndexExpander writableIndexExpander;
3129

32-
AbstractExpiredJobDataRemover(
33-
OriginSettingClient client,
34-
Iterator<Job> jobIterator,
35-
TaskId parentTaskId,
36-
WritableIndexExpander writableIndexExpander
37-
) {
30+
AbstractExpiredJobDataRemover(OriginSettingClient client, Iterator<Job> jobIterator, TaskId parentTaskId) {
3831
this.client = client;
3932
this.jobIterator = jobIterator;
4033
this.parentTaskId = parentTaskId;
41-
this.writableIndexExpander = Objects.requireNonNull(writableIndexExpander);
4234
}
4335

4436
protected TaskId getParentTaskId() {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredAnnotationsRemover.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,10 @@ public ExpiredAnnotationsRemover(
6060
OriginSettingClient client,
6161
Iterator<Job> jobIterator,
6262
TaskId parentTaskId,
63-
WritableIndexExpander writableIndexExpander,
6463
AnomalyDetectionAuditor auditor,
6564
ThreadPool threadPool
6665
) {
67-
super(client, jobIterator, parentTaskId, writableIndexExpander);
66+
super(client, jobIterator, parentTaskId);
6867
this.auditor = Objects.requireNonNull(auditor);
6968
this.threadPool = Objects.requireNonNull(threadPool);
7069
}
@@ -85,7 +84,7 @@ protected void removeDataBefore(
8584
long cutoffEpochMs,
8685
ActionListener<Boolean> listener
8786
) {
88-
var indicesToQuery = writableIndexExpander.getWritableIndices(AnnotationIndex.READ_ALIAS_NAME);
87+
var indicesToQuery = WritableIndexExpander.getInstance().getWritableIndices(AnnotationIndex.READ_ALIAS_NAME);
8988
if (indicesToQuery.isEmpty()) {
9089
LOGGER.info("No writable annotation indices found for [{}] job. No expired annotations to remove.", job.getId());
9190
listener.onResponse(true);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -64,19 +64,12 @@ public class ExpiredForecastsRemover implements MlDataRemover {
6464
private final ThreadPool threadPool;
6565
private final long cutoffEpochMs;
6666
private final TaskId parentTaskId;
67-
private final WritableIndexExpander writableIndexExpander;
6867

69-
public ExpiredForecastsRemover(
70-
OriginSettingClient client,
71-
ThreadPool threadPool,
72-
TaskId parentTaskId,
73-
WritableIndexExpander writableIndexExpander
74-
) {
68+
public ExpiredForecastsRemover(OriginSettingClient client, ThreadPool threadPool, TaskId parentTaskId) {
7569
this.client = Objects.requireNonNull(client);
7670
this.threadPool = Objects.requireNonNull(threadPool);
7771
this.cutoffEpochMs = Instant.now(Clock.systemDefaultZone()).toEpochMilli();
7872
this.parentTaskId = parentTaskId;
79-
this.writableIndexExpander = Objects.requireNonNull(writableIndexExpander);
8073
}
8174

8275
@Override
@@ -132,7 +125,7 @@ private void deleteForecasts(
132125
return;
133126
}
134127

135-
var indicesToQuery = writableIndexExpander.getWritableIndices(RESULTS_INDEX_PATTERN);
128+
var indicesToQuery = WritableIndexExpander.getInstance().getWritableIndices(RESULTS_INDEX_PATTERN);
136129
if (indicesToQuery.isEmpty()) {
137130
LOGGER.info("No writable indices found for expired forecasts. No expired forecasts to remove.");
138131
listener.onResponse(true);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -84,12 +84,11 @@ public ExpiredModelSnapshotsRemover(
8484
OriginSettingClient client,
8585
Iterator<Job> jobIterator,
8686
TaskId parentTaskId,
87-
WritableIndexExpander writableIndexExpander,
8887
ThreadPool threadPool,
8988
JobResultsProvider jobResultsProvider,
9089
AnomalyDetectionAuditor auditor
9190
) {
92-
super(client, jobIterator, parentTaskId, writableIndexExpander);
91+
super(client, jobIterator, parentTaskId);
9392
this.threadPool = Objects.requireNonNull(threadPool);
9493
this.jobResultsProvider = jobResultsProvider;
9594
this.auditor = auditor;
@@ -271,11 +270,11 @@ private void deleteModelSnapshots(List<ModelSnapshot> modelSnapshots, String job
271270
}
272271

273272
// Remove read-only indices
274-
List<String> indicesToQuery = new ArrayList<>();
273+
List<String> indicesToQuery;
275274
try {
276-
indicesToQuery = writableIndexExpander.getWritableIndices(indices);
275+
indicesToQuery = WritableIndexExpander.getInstance().getWritableIndices(indices);
277276
} catch (Exception e) {
278-
LOGGER.error("Failed to get writable indices for [" + jobId + "] job: " + e.getMessage(), e);
277+
LOGGER.error("Failed to get writable indices for [" + jobId + "]", e);
279278
listener.onFailure(e);
280279
return;
281280
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,11 +76,10 @@ public ExpiredResultsRemover(
7676
OriginSettingClient client,
7777
Iterator<Job> jobIterator,
7878
TaskId parentTaskId,
79-
WritableIndexExpander writableIndexExpander,
8079
AnomalyDetectionAuditor auditor,
8180
ThreadPool threadPool
8281
) {
83-
super(client, jobIterator, parentTaskId, writableIndexExpander);
82+
super(client, jobIterator, parentTaskId);
8483
this.auditor = Objects.requireNonNull(auditor);
8584
this.threadPool = Objects.requireNonNull(threadPool);
8685
}
@@ -100,7 +99,8 @@ protected void removeDataBefore(
10099
) {
101100
LOGGER.debug("Removing results of job [{}] that have a timestamp before [{}]", job.getId(), cutoffEpochMs);
102101

103-
var indicesToQuery = writableIndexExpander.getWritableIndices(AnomalyDetectorsIndex.jobResultsAliasedName(job.getId()));
102+
var indicesToQuery = WritableIndexExpander.getInstance()
103+
.getWritableIndices(AnomalyDetectorsIndex.jobResultsAliasedName(job.getId()));
104104
if (indicesToQuery.isEmpty()) {
105105
LOGGER.info("No writable indices found for job [{}]. No expired results removed.", job.getId());
106106
listener.onResponse(true);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,12 +51,10 @@ public class UnusedStateRemover implements MlDataRemover {
5151

5252
private final OriginSettingClient client;
5353
private final TaskId parentTaskId;
54-
private final WritableIndexExpander writableIndexExpander;
5554

56-
public UnusedStateRemover(OriginSettingClient client, TaskId parentTaskId, WritableIndexExpander writableIndexExpander) {
55+
public UnusedStateRemover(OriginSettingClient client, TaskId parentTaskId) {
5756
this.client = Objects.requireNonNull(client);
5857
this.parentTaskId = Objects.requireNonNull(parentTaskId);
59-
this.writableIndexExpander = Objects.requireNonNull(writableIndexExpander);
6058
}
6159

6260
@Override
@@ -140,7 +138,7 @@ private Set<String> getDataFrameAnalyticsJobIds() {
140138
private void executeDeleteUnusedStateDocs(List<String> unusedDocIds, float requestsPerSec, ActionListener<Boolean> listener) {
141139
LOGGER.info("Found [{}] unused state documents; attempting to delete", unusedDocIds.size());
142140

143-
var indicesToQuery = writableIndexExpander.getWritableIndices(AnomalyDetectorsIndex.jobStateIndexPattern());
141+
var indicesToQuery = WritableIndexExpander.getInstance().getWritableIndices(AnomalyDetectorsIndex.jobStateIndexPattern());
144142

145143
if (indicesToQuery.isEmpty()) {
146144
LOGGER.info("No writable indices found for unused state documents");

0 commit comments

Comments
 (0)