Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,7 @@ public Collection<?> createComponents(PluginServices services) {
errorStoreInitialisationService.get(),
services.allocationService(),
dataStreamLifecycleErrorsPublisher.get(),
services.dataStreamGlobalRetentionSettings(),
services.projectResolver()
services.dataStreamGlobalRetentionSettings()
)
);
dataLifecycleInitialisationService.get().init();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.SelectorResolver;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
Expand Down Expand Up @@ -169,7 +168,6 @@ public class DataStreamLifecycleService implements ClusterStateListener, Closeab
final ResultDeduplicator<Tuple<ProjectId, String>, Void> clusterStateChangesDeduplicator;
private final DataStreamLifecycleHealthInfoPublisher dslHealthInfoPublisher;
private final DataStreamGlobalRetentionSettings globalRetentionSettings;
private final ProjectResolver projectResolver;
private LongSupplier nowSupplier;
private final Clock clock;
private final DataStreamLifecycleErrorStore errorStore;
Expand Down Expand Up @@ -218,8 +216,7 @@ public DataStreamLifecycleService(
DataStreamLifecycleErrorStore errorStore,
AllocationService allocationService,
DataStreamLifecycleHealthInfoPublisher dataStreamLifecycleHealthInfoPublisher,
DataStreamGlobalRetentionSettings globalRetentionSettings,
ProjectResolver projectResolver
DataStreamGlobalRetentionSettings globalRetentionSettings
) {
this.settings = settings;
this.client = client;
Expand All @@ -231,7 +228,6 @@ public DataStreamLifecycleService(
this.nowSupplier = nowSupplier;
this.errorStore = errorStore;
this.globalRetentionSettings = globalRetentionSettings;
this.projectResolver = projectResolver;
this.scheduledJob = null;
this.pollInterval = DATA_STREAM_LIFECYCLE_POLL_INTERVAL_SETTING.get(settings);
this.targetMergePolicyFloorSegment = DATA_STREAM_MERGE_POLICY_TARGET_FLOOR_SEGMENT_SETTING.get(settings);
Expand Down Expand Up @@ -1074,7 +1070,7 @@ private void rolloverDataStream(
rolloverRequest.indicesOptions()
);
logger.trace("Data stream lifecycle issues rollover request for data stream [{}]", rolloverRequest.getRolloverTarget());
projectResolver.projectClient(client, projectId).admin().indices().rolloverIndex(rolloverRequest, new ActionListener<>() {
client.projectClient(projectId).admin().indices().rolloverIndex(rolloverRequest, new ActionListener<>() {
@Override
public void onResponse(RolloverResponse rolloverResponse) {
// Log only when the conditions were met and the index was rolled over.
Expand Down Expand Up @@ -1137,7 +1133,7 @@ private void updateIndexSetting(ProjectId projectId, UpdateSettingsRequest updat
updateSettingsRequest.settings().keySet(),
targetIndex
);
projectResolver.projectClient(client, projectId).admin().indices().updateSettings(updateSettingsRequest, new ActionListener<>() {
client.projectClient(projectId).admin().indices().updateSettings(updateSettingsRequest, new ActionListener<>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
logger.info(
Expand Down Expand Up @@ -1173,7 +1169,7 @@ private void addIndexBlock(ProjectId projectId, AddIndexBlockRequest addIndexBlo
addIndexBlockRequest.getBlock(),
targetIndex
);
projectResolver.projectClient(client, projectId).admin().indices().addBlock(addIndexBlockRequest, new ActionListener<>() {
client.projectClient(projectId).admin().indices().addBlock(addIndexBlockRequest, new ActionListener<>() {
@Override
public void onResponse(AddIndexBlockResponse addIndexBlockResponse) {
if (addIndexBlockResponse.isAcknowledged()) {
Expand Down Expand Up @@ -1252,7 +1248,7 @@ private void deleteIndex(ProjectId projectId, DeleteIndexRequest deleteIndexRequ
// "saving" the index name here so we don't capture the entire request
String targetIndex = deleteIndexRequest.indices()[0];
logger.trace("Data stream lifecycle issues request to delete index [{}]", targetIndex);
projectResolver.projectClient(client, projectId).admin().indices().delete(deleteIndexRequest, new ActionListener<>() {
client.projectClient(projectId).admin().indices().delete(deleteIndexRequest, new ActionListener<>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
if (acknowledgedResponse.isAcknowledged()) {
Expand Down Expand Up @@ -1293,7 +1289,7 @@ private void downsampleIndex(ProjectId projectId, DownsampleAction.Request reque
String sourceIndex = request.getSourceIndex();
String downsampleIndex = request.getTargetIndex();
logger.info("Data stream lifecycle issuing request to downsample index [{}] to index [{}]", sourceIndex, downsampleIndex);
projectResolver.projectClient(client, projectId).execute(DownsampleAction.INSTANCE, request, new ActionListener<>() {
client.projectClient(projectId).execute(DownsampleAction.INSTANCE, request, new ActionListener<>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
assert acknowledgedResponse.isAcknowledged() : "the downsample response is always acknowledged";
Expand All @@ -1318,7 +1314,7 @@ private void forceMergeIndex(ProjectId projectId, ForceMergeRequest forceMergeRe
: "Data stream lifecycle force merges one index at a time";
final String targetIndex = forceMergeRequest.indices()[0];
logger.info("Data stream lifecycle is issuing a request to force merge index [{}]", targetIndex);
projectResolver.projectClient(client, projectId).admin().indices().forceMerge(forceMergeRequest, new ActionListener<>() {
client.projectClient(projectId).admin().indices().forceMerge(forceMergeRequest, new ActionListener<>() {
@Override
public void onResponse(BroadcastResponse forceMergeResponse) {
if (forceMergeResponse.getFailedShards() > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,7 @@ public void setupServices() {
errorStore,
allocationService,
new DataStreamLifecycleHealthInfoPublisher(Settings.EMPTY, client, clusterService, errorStore),
globalRetentionSettings,
TestProjectResolvers.mustExecuteFirst()
globalRetentionSettings
);
clientDelegate = null;
dataStreamLifecycleService.init();
Expand Down Expand Up @@ -1456,8 +1455,7 @@ public void testTrackingTimeStats() {
errorStore,
mock(AllocationService.class),
new DataStreamLifecycleHealthInfoPublisher(Settings.EMPTY, getTransportRequestsRecordingClient(), clusterService, errorStore),
globalRetentionSettings,
TestProjectResolvers.mustExecuteFirst()
globalRetentionSettings
);
assertThat(service.getLastRunDuration(), is(nullValue()));
assertThat(service.getTimeBetweenStarts(), is(nullValue()));
Expand Down Expand Up @@ -1758,7 +1756,7 @@ private static DiscoveryNode getNode(String nodeId) {
* (it does not even notify the listener), but tests can provide an implementation of clientDelegate to provide any needed behavior.
*/
private Client getTransportRequestsRecordingClient() {
return new NoOpClient(threadPool) {
return new NoOpClient(threadPool, TestProjectResolvers.usingRequestHeader(threadPool.getThreadContext())) {
@Override
protected <Request extends ActionRequest, Response extends ActionResponse> void doExecute(
ActionType<Response> action,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,12 @@

package org.elasticsearch.cluster.project;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.client.internal.FilterClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ProjectState;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.core.CheckedRunnable;
import org.elasticsearch.core.FixForMultiProject;

import java.util.Collection;
import java.util.Objects;
Expand Down Expand Up @@ -91,26 +84,4 @@ default Collection<ProjectId> getProjectIds(ClusterState clusterState) {
* It is an error to attempt to override the active project-id
*/
<E extends Exception> void executeOnProject(ProjectId projectId, CheckedRunnable<E> body) throws E;

/**
* Returns a client that executes every request in the context of the given project.
*/
@FixForMultiProject(description = "This recreates a client on every invocation. We should optimize this to be less wasteful")
default Client projectClient(Client baseClient, ProjectId projectId) {
// We only take the shortcut when the given project ID matches the "current" project ID. If it doesn't, we'll let #executeOnProject
// take care of error handling.
if (supportsMultipleProjects() == false && projectId.equals(getProjectId())) {
return baseClient;
}
return new FilterClient(baseClient) {
@Override
protected <Request extends ActionRequest, Response extends ActionResponse> void doExecute(
ActionType<Response> action,
Request request,
ActionListener<Response> listener
) {
executeOnProject(projectId, () -> super.doExecute(action, request, listener));
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.client.internal.support.AbstractClient;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.project.TestProjectResolvers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
Expand All @@ -30,6 +31,10 @@ public NoOpClient(ThreadPool threadPool) {
super(Settings.EMPTY, threadPool, TestProjectResolvers.alwaysThrow());
}

public NoOpClient(ThreadPool threadPool, ProjectResolver projectResolver) {
super(Settings.EMPTY, threadPool, projectResolver);
}

@Override
protected <Request extends ActionRequest, Response extends ActionResponse> void doExecute(
ActionType<Response> action,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
package org.elasticsearch.xpack.apmdata;

import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
Expand All @@ -33,17 +32,15 @@ public APMIndexTemplateRegistry(
ClusterService clusterService,
ThreadPool threadPool,
Client client,
NamedXContentRegistry xContentRegistry,
ProjectResolver projectResolver
NamedXContentRegistry xContentRegistry
) {
super(
nodeSettings,
clusterService,
threadPool,
client,
xContentRegistry,
templateFilter(isDataStreamsLifecycleOnlyMode(clusterService.getSettings())),
projectResolver
templateFilter(isDataStreamsLifecycleOnlyMode(clusterService.getSettings()))
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,7 @@ public Collection<?> createComponents(PluginServices services) {
Settings settings = services.environment().settings();
ClusterService clusterService = services.clusterService();
registry.set(
new APMIndexTemplateRegistry(
settings,
clusterService,
services.threadPool(),
services.client(),
services.xContentRegistry(),
services.projectResolver()
)
new APMIndexTemplateRegistry(settings, clusterService, services.threadPool(), services.client(), services.xContentRegistry())
);
if (enabled) {
APMIndexTemplateRegistry registryInstance = registry.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
import org.elasticsearch.cluster.project.TestProjectResolvers;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -60,8 +59,7 @@ public void createRegistryAndClient() {
clusterService,
threadPool,
client,
NamedXContentRegistry.EMPTY,
TestProjectResolvers.mustExecuteFirst()
NamedXContentRegistry.EMPTY
);
apmIndexTemplateRegistry.setEnabled(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.project.TestProjectResolvers;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -91,18 +89,16 @@ public void createRegistryAndClient() {
threadPool = new TestThreadPool(this.getClass().getName());
client = new VerifyingClient(threadPool);
ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool, clusterSettings);
ProjectResolver projectResolver = TestProjectResolvers.mustExecuteFirst();
stackTemplateRegistryAccessor = new StackTemplateRegistryAccessor(
new StackTemplateRegistry(Settings.EMPTY, clusterService, threadPool, client, NamedXContentRegistry.EMPTY, projectResolver)
new StackTemplateRegistry(Settings.EMPTY, clusterService, threadPool, client, NamedXContentRegistry.EMPTY)
);

apmIndexTemplateRegistry = new APMIndexTemplateRegistry(
Settings.EMPTY,
clusterService,
threadPool,
client,
NamedXContentRegistry.EMPTY,
projectResolver
NamedXContentRegistry.EMPTY
);
apmIndexTemplateRegistry.setEnabled(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.cluster.project.TestProjectResolvers;
import org.elasticsearch.common.TriFunction;
import org.elasticsearch.test.client.NoOpClient;
import org.elasticsearch.threadpool.ThreadPool;
Expand All @@ -27,7 +28,7 @@ public class VerifyingClient extends NoOpClient {
};

VerifyingClient(ThreadPool threadPool) {
super(threadPool);
super(threadPool, TestProjectResolvers.usingRequestHeader(threadPool.getThreadContext()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.project.TestProjectResolvers;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.datastreams.DataStreamsPlugin;
import org.elasticsearch.index.mapper.DateFieldMapper;
Expand Down Expand Up @@ -59,8 +58,7 @@ public void setup() {
clusterService.threadPool(),
client,
xContentRegistry(),
3L,
TestProjectResolvers.mustExecuteFirst()
3L
);
registry.initialize();
ensureGreen();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
Expand All @@ -29,10 +28,9 @@ public RolloverEnabledTestTemplateRegistry(
ThreadPool threadPool,
Client client,
NamedXContentRegistry xContentRegistry,
long version,
ProjectResolver projectResolver
long version
) {
super(nodeSettings, clusterService, threadPool, client, xContentRegistry, projectResolver);
super(nodeSettings, clusterService, threadPool, client, xContentRegistry);
this.version = version;
}

Expand Down
Loading