Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.client.internal.support.AbstractClient;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.project.TestProjectResolvers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.TestEnvironment;
Expand Down Expand Up @@ -96,7 +97,7 @@ public <FactoryType> FactoryType compile(Script script, ScriptContext<FactoryTyp

private static class MockClient extends AbstractClient {
MockClient(Settings settings, ThreadPool threadPool) {
super(settings, threadPool);
super(settings, threadPool, TestProjectResolvers.alwaysThrow());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.client.internal.support.AbstractClient;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.project.TestProjectResolvers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.TestEnvironment;
Expand Down Expand Up @@ -96,7 +97,7 @@ public <FactoryType> FactoryType compile(Script script, ScriptContext<FactoryTyp

private class MockClient extends AbstractClient {
MockClient(Settings settings, ThreadPool threadPool) {
super(settings, threadPool);
super(settings, threadPool, TestProjectResolvers.alwaysThrow());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.cluster.project.TestProjectResolvers;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
Expand Down Expand Up @@ -68,7 +69,7 @@ public void testTransferRequestParameters() throws Exception {
);
rankEvalRequest.indicesOptions(expectedIndicesOptions);

NodeClient client = new NodeClient(settings, null) {
NodeClient client = new NodeClient(settings, null, TestProjectResolvers.alwaysThrow()) {
@Override
public void multiSearch(MultiSearchRequest request, ActionListener<MultiSearchResponse> listener) {
assertEquals(1, request.requests().size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.action.search.TransportSearchScrollAction;
import org.elasticsearch.client.internal.ParentTaskAssigningClient;
import org.elasticsearch.client.internal.support.AbstractClient;
import org.elasticsearch.cluster.project.TestProjectResolvers;
import org.elasticsearch.common.BackoffPolicy;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -214,7 +215,7 @@ private static class MockClient extends AbstractClient {
private ExecuteRequest<?, ?> executeRequest;

MockClient(ThreadPool threadPool) {
super(Settings.EMPTY, threadPool);
super(Settings.EMPTY, threadPool, TestProjectResolvers.alwaysThrow());
}

@Override
Expand Down
12 changes: 12 additions & 0 deletions server/src/main/java/org/elasticsearch/client/internal/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.transport.RemoteClusterService;
Expand Down Expand Up @@ -399,6 +401,16 @@ public interface Client extends ElasticsearchClient {
*/
Client filterWithHeader(Map<String, String> headers);

/**
* Returns a client that executes every request in the context of the given project.
*/
Client projectClient(ProjectId projectId);

/**
* Returns this client's project resolver.
*/
ProjectResolver projectResolver();

/**
* Returns a client to a remote cluster with the given cluster alias.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.client.internal.support.AbstractClient;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterService;
Expand All @@ -35,15 +36,15 @@ public abstract class FilterClient extends AbstractClient {
* @see #in()
*/
public FilterClient(Client in) {
this(in.settings(), in.threadPool(), in);
this(in.settings(), in.threadPool(), in.projectResolver(), in);
}

/**
* A Constructor that allows to pass settings and threadpool separately. This is useful if the
* client is a proxy and not yet fully constructed ie. both dependencies are not available yet.
*/
protected FilterClient(Settings settings, ThreadPool threadPool, Client in) {
super(settings, threadPool);
protected FilterClient(Settings settings, ThreadPool threadPool, ProjectResolver projectResolver, Client in) {
super(settings, threadPool, projectResolver);
this.in = in;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.client.internal.RemoteClusterClient;
import org.elasticsearch.client.internal.support.AbstractClient;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException;
Expand Down Expand Up @@ -48,8 +49,8 @@ public class NodeClient extends AbstractClient {
private Transport.Connection localConnection;
private RemoteClusterService remoteClusterService;

public NodeClient(Settings settings, ThreadPool threadPool) {
super(settings, threadPool);
public NodeClient(Settings settings, ThreadPool threadPool, ProjectResolver projectResolver) {
super(settings, threadPool, projectResolver);
}

public void initialize(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@
import org.elasticsearch.client.internal.AdminClient;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.client.internal.FilterClient;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.Nullable;
Expand All @@ -92,12 +94,14 @@ public abstract class AbstractClient implements Client {

protected final Settings settings;
private final ThreadPool threadPool;
private final ProjectResolver projectResolver;
private final AdminClient admin;

@SuppressWarnings("this-escape")
public AbstractClient(Settings settings, ThreadPool threadPool) {
public AbstractClient(Settings settings, ThreadPool threadPool, ProjectResolver projectResolver) {
this.settings = settings;
this.threadPool = threadPool;
this.projectResolver = projectResolver;
this.admin = new AdminClient(this);
this.logger = LogManager.getLogger(this.getClass());
}
Expand All @@ -112,6 +116,11 @@ public final ThreadPool threadPool() {
return this.threadPool;
}

@Override
public ProjectResolver projectResolver() {
return projectResolver;
}

@Override
public final AdminClient admin() {
return admin;
Expand Down Expand Up @@ -407,6 +416,32 @@ protected <Request extends ActionRequest, Response extends ActionResponse> void
};
}

@Override
public Client projectClient(ProjectId projectId) {
// We only take the shortcut when the given project ID matches the "current" project ID. If it doesn't, we'll let #executeOnProject
// take care of error handling.
if (projectResolver.supportsMultipleProjects() == false && projectId.equals(projectResolver.getProjectId())) {
return this;
}
return new FilterClient(this) {
@Override
protected <Request extends ActionRequest, Response extends ActionResponse> void doExecute(
ActionType<Response> action,
Request request,
ActionListener<Response> listener
) {
projectResolver.executeOnProject(projectId, () -> super.doExecute(action, request, listener));
}

@Override
public Client projectClient(ProjectId projectId) {
throw new IllegalStateException(
"Unable to create a project client for project [" + projectId + "], nested project client creation is not supported"
);
}
};
}

/**
* Same as {@link PlainActionFuture} but for use with {@link RefCounted} result types. Unlike {@code PlainActionFuture} this future
* acquires a reference to its result. This means that the result reference must be released by a call to {@link RefCounted#decRef()}
Expand Down
29 changes: 18 additions & 11 deletions server/src/main/java/org/elasticsearch/node/NodeConstruction.java
Original file line number Diff line number Diff line change
Expand Up @@ -287,9 +287,15 @@ static NodeConstruction prepareConstruction(
// places they shouldn't. Best to explicitly drop them now to protect against such leakage.
settingsModule = constructor.validateSettings(initialEnvironment.settings(), settings, threadPool);
}
// serverless deployments plug-in the multi-project resolver factory
ProjectResolver projectResolver = constructor.pluginsService.loadSingletonServiceProvider(
ProjectResolverFactory.class,
() -> ProjectResolverFactory.DEFAULT
).create();
constructor.modules.bindToInstance(ProjectResolver.class, projectResolver);

SearchModule searchModule = constructor.createSearchModule(settingsModule.getSettings(), threadPool, telemetryProvider);
constructor.createClientAndRegistries(settingsModule.getSettings(), threadPool, searchModule);
constructor.createClientAndRegistries(settingsModule.getSettings(), threadPool, searchModule, projectResolver);
DocumentParsingProvider documentParsingProvider = constructor.getDocumentParsingProvider();

ScriptService scriptService = constructor.createScriptService(settingsModule, threadPool, serviceProvider);
Expand All @@ -305,7 +311,8 @@ static NodeConstruction prepareConstruction(
serviceProvider,
forbidPrivateIndexSettings,
telemetryProvider,
documentParsingProvider
documentParsingProvider,
projectResolver
);

return constructor;
Expand Down Expand Up @@ -562,8 +569,13 @@ private SearchModule createSearchModule(Settings settings, ThreadPool threadPool
/**
* Create various objects that are stored as member variables. This is so they are accessible as soon as possible.
*/
private void createClientAndRegistries(Settings settings, ThreadPool threadPool, SearchModule searchModule) {
client = new NodeClient(settings, threadPool);
private void createClientAndRegistries(
Settings settings,
ThreadPool threadPool,
SearchModule searchModule,
ProjectResolver projectResolver
) {
client = new NodeClient(settings, threadPool, projectResolver);
modules.add(b -> {
b.bind(Client.class).toInstance(client);
b.bind(NodeClient.class).toInstance(client);
Expand Down Expand Up @@ -664,7 +676,8 @@ private void construct(
NodeServiceProvider serviceProvider,
boolean forbidPrivateIndexSettings,
TelemetryProvider telemetryProvider,
DocumentParsingProvider documentParsingProvider
DocumentParsingProvider documentParsingProvider,
ProjectResolver projectResolver
) throws IOException {

Settings settings = settingsModule.getSettings();
Expand All @@ -681,12 +694,6 @@ private void construct(
telemetryProvider.getTracer()
);

// serverless deployments plug-in the multi-project resolver factory
ProjectResolver projectResolver = pluginsService.loadSingletonServiceProvider(
ProjectResolverFactory.class,
() -> ProjectResolverFactory.DEFAULT
).create();
modules.bindToInstance(ProjectResolver.class, projectResolver);
ClusterService clusterService = createClusterService(settingsModule, threadPool, taskManager);
clusterService.addStateApplier(scriptService);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public void testGetTaskActionWithMultiProjectEnabled() {
var transportService = mock(TransportService.class);
var clusterService = mock(ClusterService.class);
var nodeId = "node1";
NodeClient client = new NodeClient(Settings.EMPTY, threadPool) {
NodeClient client = new NodeClient(Settings.EMPTY, threadPool, TestProjectResolvers.alwaysThrow()) {
@Override
@SuppressWarnings("unchecked")
public <Request extends ActionRequest, Response extends ActionResponse> void doExecute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
import org.elasticsearch.cluster.project.TestProjectResolvers;
import org.elasticsearch.cluster.version.CompatibilityVersions;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
Expand Down Expand Up @@ -109,7 +110,7 @@ public void testDoExecuteForRemoteServerNodes() {
final RemoteClusterNodesAction.TransportAction action = new RemoteClusterNodesAction.TransportAction(
mock(TransportService.class),
new ActionFilters(Set.of()),
new AbstractClient(Settings.EMPTY, threadPool) {
new AbstractClient(Settings.EMPTY, threadPool, TestProjectResolvers.alwaysThrow()) {
@SuppressWarnings("unchecked")
@Override
protected <Request extends ActionRequest, Response extends ActionResponse> void doExecute(
Expand Down Expand Up @@ -187,7 +188,7 @@ public void testDoExecuteForRemoteNodes() {
final RemoteClusterNodesAction.TransportAction action = new RemoteClusterNodesAction.TransportAction(
mock(TransportService.class),
new ActionFilters(Set.of()),
new AbstractClient(Settings.EMPTY, threadPool) {
new AbstractClient(Settings.EMPTY, threadPool, TestProjectResolvers.alwaysThrow()) {
@SuppressWarnings("unchecked")
@Override
protected <Request extends ActionRequest, Response extends ActionResponse> void doExecute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.project.TestProjectResolvers;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.ByteSizeValue;
Expand Down Expand Up @@ -65,7 +66,7 @@ public void initializeComponents() throws Exception {
clusterService.getClusterSettings(),
Set.of()
);
final var nodeClient = new NodeClient(clusterService.getSettings(), threadPool);
final var nodeClient = new NodeClient(clusterService.getSettings(), threadPool, TestProjectResolvers.alwaysThrow());
repositoriesService = new RepositoriesService(
clusterService.getSettings(),
clusterService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ class TestTransportBulkAction extends TransportBulkAction {
transportService,
TransportBulkActionIngestTests.this.clusterService,
ingestService,
new NodeClient(Settings.EMPTY, TransportBulkActionIngestTests.this.threadPool),
new NodeClient(Settings.EMPTY, TransportBulkActionIngestTests.this.threadPool, TestProjectResolvers.alwaysThrow()),
new ActionFilters(Collections.emptySet()),
TestIndexNameExpressionResolver.newInstance(),
new IndexingPressure(SETTINGS),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.project.TestProjectResolvers;
import org.elasticsearch.cluster.routing.GlobalRoutingTableTestHelper;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -121,7 +122,7 @@ class TestTransportBulkAction extends TransportBulkAction {
transportService,
TransportBulkActionTests.this.clusterService,
null,
new NodeClient(Settings.EMPTY, TransportBulkActionTests.this.threadPool),
new NodeClient(Settings.EMPTY, TransportBulkActionTests.this.threadPool, TestProjectResolvers.alwaysThrow()),
new ActionFilters(Collections.emptySet()),
new Resolver(),
new IndexingPressure(Settings.EMPTY),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ private TransportBulkAction createAction(boolean controlled, AtomicLong expected
IndexNameExpressionResolver resolver = new Resolver();
ActionFilters actionFilters = new ActionFilters(new HashSet<>());

NodeClient client = new NodeClient(Settings.EMPTY, threadPool) {
NodeClient client = new NodeClient(Settings.EMPTY, threadPool, TestProjectResolvers.alwaysThrow()) {
@Override
@SuppressWarnings("unchecked")
public <Request extends ActionRequest, Response extends ActionResponse> void doExecute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public static void afterClass() {

public void testTransportMultiGetAction() {
final Task task = createTask();
final NodeClient client = new NodeClient(Settings.EMPTY, threadPool);
final NodeClient client = new NodeClient(Settings.EMPTY, threadPool, TestProjectResolvers.alwaysThrow());
final MultiGetRequestBuilder request = new MultiGetRequestBuilder(client);
request.add(new MultiGetRequest.Item("index1", "1"));
request.add(new MultiGetRequest.Item("index1", "2"));
Expand Down Expand Up @@ -219,7 +219,7 @@ protected void executeShardAction(

public void testTransportMultiGetAction_withMissingRouting() {
final Task task = createTask();
final NodeClient client = new NodeClient(Settings.EMPTY, threadPool);
final NodeClient client = new NodeClient(Settings.EMPTY, threadPool, TestProjectResolvers.alwaysThrow());
final MultiGetRequestBuilder request = new MultiGetRequestBuilder(client);
request.add(new MultiGetRequest.Item("index2", "1").routing("1"));
request.add(new MultiGetRequest.Item("index2", "2"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
import org.elasticsearch.cluster.project.TestProjectResolvers;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -137,7 +138,7 @@ private TransportMultiSearchAction createTransportMultiSearchAction(boolean cont
final Executor commonExecutor = randomExecutor(threadPool);
final Set<SearchRequest> requests = Collections.newSetFromMap(Collections.synchronizedMap(new IdentityHashMap<>()));

NodeClient client = new NodeClient(settings, threadPool) {
NodeClient client = new NodeClient(settings, threadPool, TestProjectResolvers.alwaysThrow()) {
@Override
public void search(final SearchRequest request, final ActionListener<SearchResponse> listener) {
requests.add(request);
Expand Down
Loading