Skip to content

Commit f7b7faf

Browse files
authored
Introduce a dedicated ProjectClient interface (#132237)
A dedicated `ProjectClient` interface will make it easier to guarantee a project scope on a type/compilation basis.
1 parent 444c5f2 commit f7b7faf

27 files changed

+166
-115
lines changed

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
1717
import org.elasticsearch.action.index.IndexRequest;
1818
import org.elasticsearch.action.support.PlainActionFuture;
19-
import org.elasticsearch.client.internal.Client;
19+
import org.elasticsearch.client.internal.ProjectClient;
2020
import org.elasticsearch.cluster.block.ClusterBlockLevel;
2121
import org.elasticsearch.cluster.metadata.ProjectId;
2222
import org.elasticsearch.cluster.service.ClusterService;
@@ -77,7 +77,7 @@ public class GeoIpDownloader extends AllocatedPersistentTask {
7777
static final String DATABASES_INDEX_PATTERN = DATABASES_INDEX + "*";
7878
static final int MAX_CHUNK_SIZE = 1024 * 1024;
7979

80-
private final Client client;
80+
private final ProjectClient client;
8181
private final HttpClient httpClient;
8282
private final ClusterService clusterService;
8383
private final ThreadPool threadPool;
@@ -99,7 +99,7 @@ public class GeoIpDownloader extends AllocatedPersistentTask {
9999
private final ProjectId projectId;
100100

101101
GeoIpDownloader(
102-
Client client,
102+
ProjectClient client,
103103
HttpClient httpClient,
104104
ClusterService clusterService,
105105
ThreadPool threadPool,
@@ -116,7 +116,7 @@ public class GeoIpDownloader extends AllocatedPersistentTask {
116116
ProjectId projectId
117117
) {
118118
super(id, type, action, description, parentTask, headers);
119-
this.client = client.projectClient(projectId);
119+
this.client = client;
120120
this.httpClient = httpClient;
121121
this.clusterService = clusterService;
122122
this.threadPool = threadPool;

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ protected GeoIpDownloader createTask(
191191
) {
192192
ProjectId projectId = projectResolver.getProjectId();
193193
return new GeoIpDownloader(
194-
client,
194+
client.projectClient(projectId),
195195
httpClient,
196196
clusterService,
197197
threadPool,

modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/DatabaseNodeServiceTests.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.elasticsearch.action.search.SearchRequest;
1818
import org.elasticsearch.action.search.SearchResponse;
1919
import org.elasticsearch.client.internal.Client;
20+
import org.elasticsearch.client.internal.ProjectClient;
2021
import org.elasticsearch.cluster.ClusterName;
2122
import org.elasticsearch.cluster.ClusterState;
2223
import org.elasticsearch.cluster.metadata.AliasMetadata;
@@ -63,6 +64,7 @@
6364
import org.junit.After;
6465
import org.junit.Before;
6566
import org.mockito.ArgumentCaptor;
67+
import org.mockito.Mockito;
6668
import org.mockito.stubbing.Answer;
6769

6870
import java.io.ByteArrayInputStream;
@@ -112,6 +114,7 @@
112114
public class DatabaseNodeServiceTests extends ESTestCase {
113115

114116
private Client client;
117+
private ProjectClient projectClient;
115118
private Path geoIpTmpDir;
116119
private ThreadPool threadPool;
117120
private DatabaseNodeService databaseNodeService;
@@ -138,7 +141,9 @@ public void setup() throws IOException {
138141
Settings settings = Settings.builder().put("resource.reload.interval.high", TimeValue.timeValueMillis(100)).build();
139142
resourceWatcherService = new ResourceWatcherService(settings, threadPool);
140143

144+
projectClient = mock(ProjectClient.class);
141145
client = mock(Client.class);
146+
when(client.projectClient(any())).thenReturn(projectClient);
142147
ingestService = mock(IngestService.class);
143148
clusterService = mock(ClusterService.class);
144149
geoIpTmpDir = createTempDir();
@@ -161,6 +166,8 @@ public void cleanup() {
161166
threadPool.shutdownNow();
162167
Releasables.close(toRelease);
163168
toRelease.clear();
169+
verify(client, Mockito.atLeast(0)).projectClient(any());
170+
verifyNoMoreInteractions(client);
164171
}
165172

166173
public void testCheckDatabases() throws Exception {
@@ -181,7 +188,7 @@ public void testCheckDatabases() throws Exception {
181188
databaseNodeService.checkDatabases(state);
182189
DatabaseReaderLazyLoader database = databaseNodeService.getDatabaseReaderLazyLoader(projectId, "GeoIP2-City.mmdb");
183190
assertThat(database, nullValue());
184-
verify(client, times(0)).search(any());
191+
verify(projectClient, times(0)).search(any());
185192
verify(ingestService, times(0)).reloadPipeline(any(), anyString());
186193
try (Stream<Path> files = Files.list(geoIpTmpDir.resolve("geoip-databases").resolve("nodeId"))) {
187194
assertEquals(0, files.count());
@@ -199,7 +206,7 @@ public void testCheckDatabases() throws Exception {
199206
databaseNodeService.checkDatabases(state);
200207
database = databaseNodeService.getDatabaseReaderLazyLoader(projectId, "GeoIP2-City.mmdb");
201208
assertThat(database, notNullValue());
202-
verify(client, times(10)).search(any());
209+
verify(projectClient, times(10)).search(any());
203210
try (Stream<Path> files = Files.list(geoIpTmpDir.resolve("geoip-databases").resolve("nodeId"))) {
204211
assertThat(files.count(), greaterThanOrEqualTo(1L));
205212
}
@@ -226,7 +233,7 @@ public void testCheckDatabases_dontCheckDatabaseOnNonIngestNode() throws Excepti
226233

227234
databaseNodeService.checkDatabases(state);
228235
assertThat(databaseNodeService.getDatabase(projectId, "GeoIP2-City.mmdb"), nullValue());
229-
verify(client, never()).search(any());
236+
verify(projectClient, never()).search(any());
230237
try (Stream<Path> files = Files.list(geoIpTmpDir.resolve("geoip-databases").resolve("nodeId"))) {
231238
assertThat(files.toList(), empty());
232239
}
@@ -246,7 +253,7 @@ public void testCheckDatabases_dontCheckDatabaseWhenNoDatabasesIndex() throws Ex
246253

247254
databaseNodeService.checkDatabases(state);
248255
assertThat(databaseNodeService.getDatabase(projectId, "GeoIP2-City.mmdb"), nullValue());
249-
verify(client, never()).search(any());
256+
verify(projectClient, never()).search(any());
250257
try (Stream<Path> files = Files.list(geoIpTmpDir.resolve("geoip-databases").resolve("nodeId"))) {
251258
assertThat(files.toList(), empty());
252259
}
@@ -261,7 +268,7 @@ public void testCheckDatabases_dontCheckDatabaseWhenGeoIpDownloadTask() throws E
261268

262269
databaseNodeService.checkDatabases(state);
263270
assertThat(databaseNodeService.getDatabase(projectId, "GeoIP2-City.mmdb"), nullValue());
264-
verify(client, never()).search(any());
271+
verify(projectClient, never()).search(any());
265272
try (Stream<Path> files = Files.list(geoIpTmpDir.resolve("geoip-databases").resolve("nodeId"))) {
266273
assertThat(files.toList(), empty());
267274
}
@@ -281,7 +288,7 @@ public void testRetrieveDatabase() throws Exception {
281288
verify(failureHandler, never()).accept(any());
282289
verify(chunkConsumer, times(30)).accept(any());
283290
verify(completedHandler, times(1)).run();
284-
verify(client, times(30)).search(any());
291+
verify(projectClient, times(30)).search(any());
285292
}
286293

287294
public void testRetrieveDatabaseCorruption() throws Exception {
@@ -305,7 +312,7 @@ public void testRetrieveDatabaseCorruption() throws Exception {
305312
);
306313
verify(chunkConsumer, times(10)).accept(any());
307314
verify(completedHandler, times(0)).run();
308-
verify(client, times(10)).search(any());
315+
verify(projectClient, times(10)).search(any());
309316
}
310317

311318
public void testUpdateDatabase() throws Exception {
@@ -371,8 +378,7 @@ private String mockSearches(String databaseName, int firstChunk, int lastChunk)
371378
});
372379
requestMap.put(databaseName + "_" + i, actionFuture);
373380
}
374-
when(client.projectClient(any())).thenReturn(client);
375-
when(client.search(any())).thenAnswer(invocationOnMock -> {
381+
when(projectClient.search(any())).thenAnswer(invocationOnMock -> {
376382
SearchRequest req = (SearchRequest) invocationOnMock.getArguments()[0];
377383
TermQueryBuilder term = (TermQueryBuilder) req.source().query();
378384
String id = (String) term.value();

modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTests.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.action.index.IndexResponse;
2424
import org.elasticsearch.action.index.TransportIndexAction;
2525
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
26+
import org.elasticsearch.client.internal.ProjectClient;
2627
import org.elasticsearch.cluster.ClusterState;
2728
import org.elasticsearch.cluster.block.ClusterBlocks;
2829
import org.elasticsearch.cluster.metadata.IndexMetadata;
@@ -702,12 +703,14 @@ private GeoIpTaskState.Metadata newGeoIpTaskStateMetadata(boolean expired) {
702703
return new GeoIpTaskState.Metadata(0, 0, 0, randomAlphaOfLength(20), lastChecked.toEpochMilli());
703704
}
704705

705-
private static class MockClient extends NoOpClient {
706+
private static class MockClient extends NoOpClient implements ProjectClient {
706707

707708
private final Map<ActionType<?>, BiConsumer<? extends ActionRequest, ? extends ActionListener<?>>> handlers = new HashMap<>();
709+
private final ProjectId projectId;
708710

709711
private MockClient(ThreadPool threadPool, ProjectId projectId) {
710712
super(threadPool, TestProjectResolvers.singleProject(projectId));
713+
this.projectId = projectId;
711714
}
712715

713716
public <Response extends ActionResponse, Request extends ActionRequest> void addHandler(
@@ -717,6 +720,11 @@ public <Response extends ActionResponse, Request extends ActionRequest> void add
717720
handlers.put(action, listener);
718721
}
719722

723+
@Override
724+
public ProjectId projectId() {
725+
return projectId;
726+
}
727+
720728
@SuppressWarnings("unchecked")
721729
@Override
722730
protected <Request extends ActionRequest, Response extends ActionResponse> void doExecute(

server/src/main/java/org/elasticsearch/client/internal/Client.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -404,7 +404,7 @@ public interface Client extends ElasticsearchClient {
404404
/**
405405
* Returns a client that executes every request in the context of the given project.
406406
*/
407-
Client projectClient(ProjectId projectId);
407+
ProjectClient projectClient(ProjectId projectId);
408408

409409
/**
410410
* Returns this client's project resolver.
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.client.internal;
11+
12+
import org.elasticsearch.cluster.metadata.ProjectId;
13+
14+
/**
15+
* A {@link Client} that is scoped to a specific project. It should execute any request in the scope of that project. This scope is usually
16+
* defined by the thread context.
17+
*/
18+
public interface ProjectClient extends Client {
19+
20+
ProjectId projectId();
21+
}

server/src/main/java/org/elasticsearch/client/internal/support/AbstractClient.java

Lines changed: 45 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,10 @@
7676
import org.elasticsearch.client.internal.AdminClient;
7777
import org.elasticsearch.client.internal.Client;
7878
import org.elasticsearch.client.internal.FilterClient;
79+
import org.elasticsearch.client.internal.ProjectClient;
7980
import org.elasticsearch.cluster.metadata.ProjectId;
8081
import org.elasticsearch.cluster.project.ProjectResolver;
82+
import org.elasticsearch.common.Strings;
8183
import org.elasticsearch.common.settings.Settings;
8284
import org.elasticsearch.common.util.concurrent.ThreadContext;
8385
import org.elasticsearch.core.Nullable;
@@ -96,6 +98,7 @@ public abstract class AbstractClient implements Client {
9698
private final ThreadPool threadPool;
9799
private final ProjectResolver projectResolver;
98100
private final AdminClient admin;
101+
private final ProjectClient defaultProjectClient;
99102

100103
@SuppressWarnings("this-escape")
101104
public AbstractClient(Settings settings, ThreadPool threadPool, ProjectResolver projectResolver) {
@@ -104,6 +107,14 @@ public AbstractClient(Settings settings, ThreadPool threadPool, ProjectResolver
104107
this.projectResolver = projectResolver;
105108
this.admin = new AdminClient(this);
106109
this.logger = LogManager.getLogger(this.getClass());
110+
// We create a dedicated project client for the default project to avoid having to reconstruct it on every invocation.
111+
// This aims to reduce the overhead of creating a project client when the client is used in a single-project context.
112+
// TODO: only create the default project client if the project resolver does not support multiple projects.
113+
if (this instanceof ProjectClient == false) {
114+
this.defaultProjectClient = new ProjectClientImpl(this, ProjectId.DEFAULT);
115+
} else {
116+
this.defaultProjectClient = null;
117+
}
107118
}
108119

109120
@Override
@@ -417,29 +428,13 @@ protected <Request extends ActionRequest, Response extends ActionResponse> void
417428
}
418429

419430
@Override
420-
public Client projectClient(ProjectId projectId) {
431+
public ProjectClient projectClient(ProjectId projectId) {
421432
// We only take the shortcut when the given project ID matches the "current" project ID. If it doesn't, we'll let #executeOnProject
422433
// take care of error handling.
423434
if (projectResolver.supportsMultipleProjects() == false && projectId.equals(projectResolver.getProjectId())) {
424-
return this;
435+
return defaultProjectClient;
425436
}
426-
return new FilterClient(this) {
427-
@Override
428-
protected <Request extends ActionRequest, Response extends ActionResponse> void doExecute(
429-
ActionType<Response> action,
430-
Request request,
431-
ActionListener<Response> listener
432-
) {
433-
projectResolver.executeOnProject(projectId, () -> super.doExecute(action, request, listener));
434-
}
435-
436-
@Override
437-
public Client projectClient(ProjectId projectId) {
438-
throw new IllegalStateException(
439-
"Unable to create a project client for project [" + projectId + "], nested project client creation is not supported"
440-
);
441-
}
442-
};
437+
return new ProjectClientImpl(this, projectId);
443438
}
444439

445440
/**
@@ -477,4 +472,35 @@ public R get() throws InterruptedException, ExecutionException {
477472
return super.get();
478473
}
479474
}
475+
476+
private static class ProjectClientImpl extends FilterClient implements ProjectClient {
477+
478+
private final ProjectId projectId;
479+
480+
ProjectClientImpl(Client in, ProjectId projectId) {
481+
super(in);
482+
this.projectId = projectId;
483+
}
484+
485+
@Override
486+
public ProjectId projectId() {
487+
return projectId;
488+
}
489+
490+
@Override
491+
protected <Request extends ActionRequest, Response extends ActionResponse> void doExecute(
492+
ActionType<Response> action,
493+
Request request,
494+
ActionListener<Response> listener
495+
) {
496+
projectResolver().executeOnProject(projectId, () -> super.doExecute(action, request, listener));
497+
}
498+
499+
@Override
500+
public ProjectClient projectClient(ProjectId projectId) {
501+
throw new IllegalStateException(Strings.format("""
502+
Unable to create a project client for project [%s] from project client with project ID [%s],\
503+
nested project client creation is not supported""", projectId, this.projectId));
504+
}
505+
}
480506
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/AbstractStepTestCase.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.elasticsearch.client.internal.AdminClient;
1111
import org.elasticsearch.client.internal.Client;
1212
import org.elasticsearch.client.internal.IndicesAdminClient;
13+
import org.elasticsearch.client.internal.ProjectClient;
1314
import org.elasticsearch.cluster.ClusterStateObserver;
1415
import org.elasticsearch.cluster.ProjectState;
1516
import org.elasticsearch.cluster.metadata.IndexMetadata;
@@ -26,6 +27,7 @@
2627
public abstract class AbstractStepTestCase<T extends Step> extends ESTestCase {
2728

2829
protected Client client;
30+
protected ProjectClient projectClient;
2931
protected AdminClient adminClient;
3032
protected IndicesAdminClient indicesClient;
3133

@@ -34,9 +36,10 @@ public void setupClient() {
3436
client = Mockito.mock(Client.class);
3537
adminClient = Mockito.mock(AdminClient.class);
3638
indicesClient = Mockito.mock(IndicesAdminClient.class);
39+
projectClient = Mockito.mock(ProjectClient.class);
3740

38-
Mockito.when(client.projectClient(Mockito.any())).thenReturn(client);
39-
Mockito.when(client.admin()).thenReturn(adminClient);
41+
Mockito.when(client.projectClient(Mockito.any())).thenReturn(projectClient);
42+
Mockito.when(projectClient.admin()).thenReturn(adminClient);
4043
Mockito.when(adminClient.indices()).thenReturn(indicesClient);
4144
}
4245

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/CleanupSnapshotStepTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.cluster.metadata.IndexMetadata;
1717
import org.elasticsearch.cluster.metadata.LifecycleExecutionState;
1818
import org.elasticsearch.cluster.metadata.ProjectMetadata;
19+
import org.elasticsearch.cluster.project.TestProjectResolvers;
1920
import org.elasticsearch.index.IndexVersion;
2021
import org.elasticsearch.test.client.NoOpClient;
2122
import org.elasticsearch.threadpool.ThreadPool;
@@ -105,7 +106,7 @@ public void testPerformAction() {
105106
}
106107

107108
private NoOpClient getDeleteSnapshotRequestAssertingClient(ThreadPool threadPool, String expectedSnapshotName) {
108-
return new NoOpClient(threadPool) {
109+
return new NoOpClient(threadPool, TestProjectResolvers.usingRequestHeader(threadPool.getThreadContext())) {
109110
@Override
110111
protected <Request extends ActionRequest, Response extends ActionResponse> void doExecute(
111112
ActionType<Response> action,

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/CloseIndexStepTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ public void onFailure(Exception e) {
8282

8383
assertEquals(true, actionCompleted.get());
8484
Mockito.verify(client).projectClient(state.projectId());
85-
Mockito.verify(client).admin();
85+
Mockito.verify(projectClient).admin();
8686
Mockito.verifyNoMoreInteractions(client);
8787
Mockito.verify(adminClient, Mockito.only()).indices();
8888
Mockito.verify(indicesClient, Mockito.only()).close(Mockito.any(), Mockito.any());
@@ -110,7 +110,7 @@ public void testPerformActionFailure() {
110110

111111
assertSame(exception, expectThrows(Exception.class, () -> performActionAndWait(step, indexMetadata, state, null)));
112112
Mockito.verify(client).projectClient(state.projectId());
113-
Mockito.verify(client).admin();
113+
Mockito.verify(projectClient).admin();
114114
Mockito.verifyNoMoreInteractions(client);
115115
Mockito.verify(adminClient, Mockito.only()).indices();
116116
Mockito.verify(indicesClient, Mockito.only()).close(Mockito.any(), Mockito.any());

0 commit comments

Comments
 (0)