Skip to content

Commit 10758c2

Browse files
committed
Add ProjectResolver#projectClient
1 parent e6abe6a commit 10758c2

File tree

2 files changed

+31
-9
lines changed

2 files changed

+31
-9
lines changed

modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -352,7 +352,7 @@ void run(ClusterState state) {
352352
for (var projectId : state.metadata().projects().keySet()) {
353353
// We catch inside the loop to avoid one broken project preventing DLM to run on other projects.
354354
try {
355-
projectResolver.executeOnProject(projectId, () -> run(state.projectState(projectId)));
355+
run(state.projectState(projectId));
356356
} catch (Exception e) {
357357
logger.error(Strings.format("Data stream lifecycle failed to run on project [%s]", projectId), e);
358358
}
@@ -608,7 +608,7 @@ private void downsampleIndexOnce(
608608
),
609609
signallingErrorRetryInterval
610610
),
611-
(req, reqListener) -> downsampleIndex(request, reqListener)
611+
(req, reqListener) -> downsampleIndex(projectId, request, reqListener)
612612
);
613613
}
614614

@@ -1042,7 +1042,7 @@ private void rolloverDataStream(
10421042
rolloverRequest.indicesOptions()
10431043
);
10441044
logger.trace("Data stream lifecycle issues rollover request for data stream [{}]", rolloverRequest.getRolloverTarget());
1045-
client.admin().indices().rolloverIndex(rolloverRequest, new ActionListener<>() {
1045+
projectResolver.projectClient(client, projectId).admin().indices().rolloverIndex(rolloverRequest, new ActionListener<>() {
10461046
@Override
10471047
public void onResponse(RolloverResponse rolloverResponse) {
10481048
// Log only when the conditions were met and the index was rolled over.
@@ -1096,7 +1096,7 @@ private void updateIndexSetting(ProjectId projectId, UpdateSettingsRequest updat
10961096
updateSettingsRequest.settings().keySet(),
10971097
targetIndex
10981098
);
1099-
client.admin().indices().updateSettings(updateSettingsRequest, new ActionListener<>() {
1099+
projectResolver.projectClient(client, projectId).admin().indices().updateSettings(updateSettingsRequest, new ActionListener<>() {
11001100
@Override
11011101
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
11021102
logger.info(
@@ -1132,7 +1132,7 @@ private void addIndexBlock(ProjectId projectId, AddIndexBlockRequest addIndexBlo
11321132
addIndexBlockRequest.getBlock(),
11331133
targetIndex
11341134
);
1135-
client.admin().indices().addBlock(addIndexBlockRequest, new ActionListener<>() {
1135+
projectResolver.projectClient(client, projectId).admin().indices().addBlock(addIndexBlockRequest, new ActionListener<>() {
11361136
@Override
11371137
public void onResponse(AddIndexBlockResponse addIndexBlockResponse) {
11381138
if (addIndexBlockResponse.isAcknowledged()) {
@@ -1211,7 +1211,7 @@ private void deleteIndex(ProjectId projectId, DeleteIndexRequest deleteIndexRequ
12111211
// "saving" the index name here so we don't capture the entire request
12121212
String targetIndex = deleteIndexRequest.indices()[0];
12131213
logger.trace("Data stream lifecycle issues request to delete index [{}]", targetIndex);
1214-
client.admin().indices().delete(deleteIndexRequest, new ActionListener<>() {
1214+
projectResolver.projectClient(client, projectId).admin().indices().delete(deleteIndexRequest, new ActionListener<>() {
12151215
@Override
12161216
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
12171217
if (acknowledgedResponse.isAcknowledged()) {
@@ -1248,11 +1248,11 @@ public void onFailure(Exception e) {
12481248
});
12491249
}
12501250

1251-
private void downsampleIndex(DownsampleAction.Request request, ActionListener<Void> listener) {
1251+
private void downsampleIndex(ProjectId projectId, DownsampleAction.Request request, ActionListener<Void> listener) {
12521252
String sourceIndex = request.getSourceIndex();
12531253
String downsampleIndex = request.getTargetIndex();
12541254
logger.info("Data stream lifecycle issuing request to downsample index [{}] to index [{}]", sourceIndex, downsampleIndex);
1255-
client.execute(DownsampleAction.INSTANCE, request, new ActionListener<>() {
1255+
projectResolver.projectClient(client, projectId).execute(DownsampleAction.INSTANCE, request, new ActionListener<>() {
12561256
@Override
12571257
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
12581258
assert acknowledgedResponse.isAcknowledged() : "the downsample response is always acknowledged";
@@ -1277,7 +1277,7 @@ private void forceMergeIndex(ProjectId projectId, ForceMergeRequest forceMergeRe
12771277
: "Data stream lifecycle force merges one index at a time";
12781278
final String targetIndex = forceMergeRequest.indices()[0];
12791279
logger.info("Data stream lifecycle is issuing a request to force merge index [{}]", targetIndex);
1280-
client.admin().indices().forceMerge(forceMergeRequest, new ActionListener<>() {
1280+
projectResolver.projectClient(client, projectId).admin().indices().forceMerge(forceMergeRequest, new ActionListener<>() {
12811281
@Override
12821282
public void onResponse(BroadcastResponse forceMergeResponse) {
12831283
if (forceMergeResponse.getFailedShards() > 0) {

server/src/main/java/org/elasticsearch/cluster/project/ProjectResolver.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,12 @@
99

1010
package org.elasticsearch.cluster.project;
1111

12+
import org.elasticsearch.action.ActionListener;
13+
import org.elasticsearch.action.ActionRequest;
14+
import org.elasticsearch.action.ActionResponse;
15+
import org.elasticsearch.action.ActionType;
16+
import org.elasticsearch.client.internal.Client;
17+
import org.elasticsearch.client.internal.FilterClient;
1218
import org.elasticsearch.cluster.ClusterState;
1319
import org.elasticsearch.cluster.ProjectState;
1420
import org.elasticsearch.cluster.metadata.Metadata;
@@ -77,6 +83,22 @@ default Collection<ProjectId> getProjectIds(ClusterState clusterState) {
7783
*/
7884
<E extends Exception> void executeOnProject(ProjectId projectId, CheckedRunnable<E> body) throws E;
7985

86+
/**
87+
* Returns a client that executes every request in the context of the given project.
88+
*/
89+
default Client projectClient(Client baseClient, ProjectId projectId) {
90+
return new FilterClient(baseClient) {
91+
@Override
92+
protected <Request extends ActionRequest, Response extends ActionResponse> void doExecute(
93+
ActionType<Response> action,
94+
Request request,
95+
ActionListener<Response> listener
96+
) {
97+
executeOnProject(projectId, () -> super.doExecute(action, request, listener));
98+
}
99+
};
100+
}
101+
80102
/**
81103
* Returns {@code false} if the cluster runs in a setup that always expects only a single default project (see also
82104
* {@link Metadata#DEFAULT_PROJECT_ID}).

0 commit comments

Comments
 (0)