Skip to content

Commit 2a7eb6e

Browse files
authored
Make downsampling project-aware (#124000)
Allows downsampling to work on multiple projects.
1 parent f360d6b commit 2a7eb6e

File tree

9 files changed

+123
-29
lines changed

9 files changed

+123
-29
lines changed

server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetadata.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,9 +122,14 @@ public static PersistentTasksCustomMetadata fromXContent(XContentParser parser)
122122
return PERSISTENT_TASKS_PARSER.apply(parser, null).build();
123123
}
124124

125-
@SuppressWarnings("unchecked")
125+
@Deprecated(forRemoval = true)
126126
public static <Params extends PersistentTaskParams> PersistentTask<Params> getTaskWithId(ClusterState clusterState, String taskId) {
127-
PersistentTasksCustomMetadata tasks = get(clusterState.metadata().getProject());
127+
return getTaskWithId(clusterState.metadata().getProject(), taskId);
128+
}
129+
130+
@SuppressWarnings("unchecked")
131+
public static <Params extends PersistentTaskParams> PersistentTask<Params> getTaskWithId(ProjectMetadata project, String taskId) {
132+
PersistentTasksCustomMetadata tasks = get(project);
128133
if (tasks != null) {
129134
return (PersistentTask<Params>) tasks.getTask(taskId);
130135
}

server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.elasticsearch.client.internal.OriginSettingClient;
2020
import org.elasticsearch.cluster.ClusterState;
2121
import org.elasticsearch.cluster.ClusterStateObserver;
22+
import org.elasticsearch.cluster.metadata.ProjectId;
2223
import org.elasticsearch.cluster.service.ClusterService;
2324
import org.elasticsearch.core.Nullable;
2425
import org.elasticsearch.core.TimeValue;
@@ -171,16 +172,39 @@ private <Req extends ActionRequest, Resp extends PersistentTaskResponse> void ex
171172
* @param timeout a timeout for waiting
172173
* @param listener the callback listener
173174
*/
175+
@Deprecated(forRemoval = true)
174176
public void waitForPersistentTaskCondition(
175177
final String taskId,
176178
final Predicate<PersistentTask<?>> predicate,
177179
final @Nullable TimeValue timeout,
178180
final WaitForPersistentTaskListener<?> listener
181+
) {
182+
final var projectId = clusterService.state().metadata().getProject().id();
183+
waitForPersistentTaskCondition(projectId, taskId, predicate, timeout, listener);
184+
}
185+
186+
/**
187+
* Waits for a given persistent task to comply with a given predicate, then call back the listener accordingly.
188+
*
189+
* @param projectId the project ID
190+
* @param taskId the persistent task id
191+
* @param predicate the persistent task predicate to evaluate, must be able to handle {@code null} input which means either the project
192+
* does not exist or persistent tasks for the project do not exist
193+
* @param timeout a timeout for waiting
194+
* @param listener the callback listener
195+
*/
196+
public void waitForPersistentTaskCondition(
197+
final ProjectId projectId,
198+
final String taskId,
199+
final Predicate<PersistentTask<?>> predicate,
200+
final @Nullable TimeValue timeout,
201+
final WaitForPersistentTaskListener<?> listener
179202
) {
180203
ClusterStateObserver.waitForState(clusterService, threadPool.getThreadContext(), new ClusterStateObserver.Listener() {
181204
@Override
182205
public void onNewClusterState(ClusterState state) {
183-
listener.onResponse(PersistentTasksCustomMetadata.getTaskWithId(state, taskId));
206+
final var project = state.metadata().projects().get(projectId);
207+
listener.onResponse(project == null ? null : PersistentTasksCustomMetadata.getTaskWithId(project, taskId));
184208
}
185209

186210
@Override
@@ -192,7 +216,15 @@ public void onClusterServiceClose() {
192216
public void onTimeout(TimeValue timeout) {
193217
listener.onTimeout(timeout);
194218
}
195-
}, clusterState -> predicate.test(PersistentTasksCustomMetadata.getTaskWithId(clusterState, taskId)), timeout, logger);
219+
}, clusterState -> {
220+
final var project = clusterState.metadata().projects().get(projectId);
221+
if (project == null) {
222+
logger.debug("project [{}] not found while waiting for persistent task [{}] to pass predicate", projectId, taskId);
223+
return predicate.test(null);
224+
} else {
225+
return predicate.test(PersistentTasksCustomMetadata.getTaskWithId(project, taskId));
226+
}
227+
}, timeout, logger);
196228
}
197229

198230
// visible for testing

test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -554,6 +554,16 @@ public static ClusterState getClusterStateWithDataStream(String dataStream, List
554554
.build();
555555
}
556556

557+
public static ClusterState getClusterStateWithDataStream(
558+
ProjectId projectId,
559+
String dataStream,
560+
List<Tuple<Instant, Instant>> timeSlices
561+
) {
562+
return ClusterState.builder(ClusterName.DEFAULT)
563+
.putProjectMetadata(getProjectWithDataStream(projectId, dataStream, timeSlices))
564+
.build();
565+
}
566+
557567
public static ProjectMetadata getProjectWithDataStream(
558568
ProjectId projectId,
559569
String dataStream,

x-pack/plugin/downsample/qa/rest/build.gradle

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,15 @@ if (buildParams.inFipsJvm){
3434
// This test cluster is using a BASIC license and FIPS 140 mode is not supported in BASIC
3535
tasks.named("yamlRestTest").configure{enabled = false }
3636
}
37+
38+
configurations {
39+
basicRestSpecs {
40+
attributes {
41+
attribute(ArtifactTypeDefinition.ARTIFACT_TYPE_ATTRIBUTE, ArtifactTypeDefinition.DIRECTORY_TYPE)
42+
}
43+
}
44+
}
45+
46+
artifacts {
47+
basicRestSpecs(new File(projectDir, "src/yamlRestTest/resources/rest-api-spec/test"))
48+
}

x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutor.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ protected AllocatedPersistentTask createTask(
116116
public void validate(DownsampleShardTaskParams params, ClusterState clusterState) {
117117
// This is just a pre-check, but doesn't prevent from avoiding from aborting the task when source index disappeared
118118
// after initial creation of the persistent task.
119-
var indexShardRouting = clusterState.routingTable().shardRoutingTable(params.shardId().getIndexName(), params.shardId().id());
119+
var indexShardRouting = findShardRoutingTable(params.shardId(), clusterState);
120120
if (indexShardRouting == null) {
121121
throw new ShardNotFoundException(params.shardId());
122122
}
@@ -178,11 +178,8 @@ private void delegate(final AllocatedPersistentTask task, final DownsampleShardT
178178
}
179179

180180
private static IndexShardRoutingTable findShardRoutingTable(ShardId shardId, ClusterState clusterState) {
181-
var indexRoutingTable = clusterState.routingTable().index(shardId.getIndexName());
182-
if (indexRoutingTable != null) {
183-
return indexRoutingTable.shard(shardId.getId());
184-
}
185-
return null;
181+
var indexRoutingTable = clusterState.globalRoutingTable().indexRouting(clusterState.metadata(), shardId.getIndex());
182+
return indexRoutingTable.map(routingTable -> routingTable.shard(shardId.getId())).orElse(null);
186183
}
187184

188185
static void realNodeOperation(
@@ -327,6 +324,7 @@ protected void doExecute(Task t, Request request, ActionListener<ActionResponse.
327324
realNodeOperation(client, indicesService, downsampleMetrics, request.task, request.params, request.lastDownsampleTsid);
328325
listener.onResponse(ActionResponse.Empty.INSTANCE);
329326
}
327+
330328
}
331329
}
332330
}

0 commit comments

Comments
 (0)