Skip to content

Commit 93bbe28

Browse files
nielsbaumanmridula-s109
authored andcommitted
Make ESQL join operators project-aware (elastic#130040)
Allows both the enrich and lookup join operators to work with multiple projects.
1 parent 75ba969 commit 93bbe28

File tree

9 files changed

+79
-41
lines changed

9 files changed

+79
-41
lines changed

test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import org.elasticsearch.cluster.NodeConnectionsService;
2525
import org.elasticsearch.cluster.block.ClusterBlocks;
2626
import org.elasticsearch.cluster.coordination.ClusterStatePublisher;
27+
import org.elasticsearch.cluster.metadata.ProjectId;
28+
import org.elasticsearch.cluster.metadata.ProjectMetadata;
2729
import org.elasticsearch.cluster.node.DiscoveryNode;
2830
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
2931
import org.elasticsearch.cluster.node.DiscoveryNodes;
@@ -138,6 +140,16 @@ public static ClusterService createClusterService(
138140
DiscoveryNode localNode,
139141
Settings providedSettings,
140142
ClusterSettings clusterSettings
143+
) {
144+
return createClusterService(threadPool, localNode, providedSettings, clusterSettings, null);
145+
}
146+
147+
public static ClusterService createClusterService(
148+
ThreadPool threadPool,
149+
DiscoveryNode localNode,
150+
Settings providedSettings,
151+
ClusterSettings clusterSettings,
152+
ProjectId projectId
141153
) {
142154
Settings settings = Settings.builder()
143155
.put("node.name", "test")
@@ -151,12 +163,14 @@ public static ClusterService createClusterService(
151163
new TaskManager(settings, threadPool, Collections.emptySet(), Tracer.NOOP)
152164
);
153165
clusterService.setNodeConnectionsService(createNoOpNodeConnectionsService());
154-
ClusterState initialClusterState = ClusterState.builder(new ClusterName(ClusterServiceUtils.class.getSimpleName()))
166+
ClusterState.Builder builder = ClusterState.builder(new ClusterName(ClusterServiceUtils.class.getSimpleName()))
155167
.nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()).masterNodeId(localNode.getId()))
156168
.putCompatibilityVersions(localNode.getId(), CompatibilityVersionsUtils.staticCurrent())
157-
.blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK)
158-
.build();
159-
clusterService.getClusterApplierService().setInitialState(initialClusterState);
169+
.blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK);
170+
if (projectId != null) {
171+
builder.putProjectMetadata(ProjectMetadata.builder(projectId));
172+
}
173+
clusterService.getClusterApplierService().setInitialState(builder.build());
160174
clusterService.getMasterService().setClusterStatePublisher(createClusterStatePublisher(clusterService.getClusterApplierService()));
161175
clusterService.getMasterService().setClusterStateSupplier(clusterService.getClusterApplierService()::state);
162176
clusterService.start();

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@
1515
import org.elasticsearch.action.support.IndicesOptions;
1616
import org.elasticsearch.cluster.ClusterState;
1717
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
18-
import org.elasticsearch.cluster.metadata.ProjectMetadata;
1918
import org.elasticsearch.cluster.node.DiscoveryNode;
19+
import org.elasticsearch.cluster.project.ProjectResolver;
2020
import org.elasticsearch.cluster.routing.ShardIterator;
2121
import org.elasticsearch.cluster.routing.ShardRouting;
2222
import org.elasticsearch.cluster.service.ClusterService;
@@ -134,6 +134,7 @@ public abstract class AbstractLookupService<R extends AbstractLookupService.Requ
134134
private final BigArrays bigArrays;
135135
private final BlockFactory blockFactory;
136136
private final LocalCircuitBreaker.SizeSettings localBreakerSettings;
137+
private final ProjectResolver projectResolver;
137138
/**
138139
* Should output {@link Page pages} be combined into a single resulting page?
139140
* If this is {@code true} we'll run a {@link MergePositionsOperator} to merge
@@ -154,7 +155,8 @@ public abstract class AbstractLookupService<R extends AbstractLookupService.Requ
154155
BigArrays bigArrays,
155156
BlockFactory blockFactory,
156157
boolean mergePages,
157-
CheckedBiFunction<StreamInput, BlockFactory, T, IOException> readRequest
158+
CheckedBiFunction<StreamInput, BlockFactory, T, IOException> readRequest,
159+
ProjectResolver projectResolver
158160
) {
159161
this.actionName = actionName;
160162
this.clusterService = clusterService;
@@ -167,6 +169,7 @@ public abstract class AbstractLookupService<R extends AbstractLookupService.Requ
167169
this.blockFactory = blockFactory;
168170
this.localBreakerSettings = new LocalCircuitBreaker.SizeSettings(clusterService.getSettings());
169171
this.mergePages = mergePages;
172+
this.projectResolver = projectResolver;
170173
transportService.registerRequestHandler(
171174
actionName,
172175
transportService.getThreadPool().executor(EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME),
@@ -227,8 +230,9 @@ protected static QueryList termQueryList(
227230
*/
228231
public final void lookupAsync(R request, CancellableTask parentTask, ActionListener<List<Page>> outListener) {
229232
ClusterState clusterState = clusterService.state();
233+
var projectState = projectResolver.getProjectState(clusterState);
230234
List<ShardIterator> shardIterators = clusterService.operationRouting()
231-
.searchShards(clusterState.projectState(), new String[] { request.index }, Map.of(), "_local");
235+
.searchShards(projectState, new String[] { request.index }, Map.of(), "_local");
232236
if (shardIterators.size() != 1) {
233237
outListener.onFailure(new EsqlIllegalArgumentException("target index {} has more than one shard", request.index));
234238
return;
@@ -278,12 +282,11 @@ private void doLookup(T request, CancellableTask task, ActionListener<List<Page>
278282
final List<Releasable> releasables = new ArrayList<>(6);
279283
boolean started = false;
280284
try {
281-
282-
ProjectMetadata projMeta = clusterService.state().metadata().getProject();
285+
var projectState = projectResolver.getProjectState(clusterService.state());
283286
AliasFilter aliasFilter = indicesService.buildAliasFilter(
284-
clusterService.state().projectState(),
287+
projectState,
285288
request.shardId.getIndex().getName(),
286-
indexNameExpressionResolver.resolveExpressions(projMeta, request.indexPattern)
289+
indexNameExpressionResolver.resolveExpressions(projectState.metadata(), request.indexPattern)
287290
);
288291

289292
LookupShardContext shardContext = lookupShardContextFactory.create(request.shardId);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.elasticsearch.action.support.ContextPreservingActionListener;
1414
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
1515
import org.elasticsearch.cluster.node.DiscoveryNode;
16+
import org.elasticsearch.cluster.project.ProjectResolver;
1617
import org.elasticsearch.cluster.service.ClusterService;
1718
import org.elasticsearch.common.io.stream.StreamInput;
1819
import org.elasticsearch.common.io.stream.StreamOutput;
@@ -75,7 +76,8 @@ public EnrichLookupService(
7576
TransportService transportService,
7677
IndexNameExpressionResolver indexNameExpressionResolver,
7778
BigArrays bigArrays,
78-
BlockFactory blockFactory
79+
BlockFactory blockFactory,
80+
ProjectResolver projectResolver
7981
) {
8082
super(
8183
LOOKUP_ACTION_NAME,
@@ -87,7 +89,8 @@ public EnrichLookupService(
8789
bigArrays,
8890
blockFactory,
8991
true,
90-
TransportRequest::readFrom
92+
TransportRequest::readFrom,
93+
projectResolver
9194
);
9295
}
9396

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.elasticsearch.action.support.ChannelActionListener;
1414
import org.elasticsearch.action.support.ContextPreservingActionListener;
1515
import org.elasticsearch.action.support.RefCountingListener;
16+
import org.elasticsearch.cluster.project.ProjectResolver;
1617
import org.elasticsearch.cluster.service.ClusterService;
1718
import org.elasticsearch.common.io.stream.StreamInput;
1819
import org.elasticsearch.common.io.stream.StreamOutput;
@@ -77,13 +78,20 @@ public class EnrichPolicyResolver {
7778
private final TransportService transportService;
7879
private final ThreadPool threadPool;
7980
private final RemoteClusterService remoteClusterService;
81+
private final ProjectResolver projectResolver;
8082

81-
public EnrichPolicyResolver(ClusterService clusterService, TransportService transportService, IndexResolver indexResolver) {
83+
public EnrichPolicyResolver(
84+
ClusterService clusterService,
85+
TransportService transportService,
86+
IndexResolver indexResolver,
87+
ProjectResolver projectResolver
88+
) {
8289
this.clusterService = clusterService;
8390
this.transportService = transportService;
8491
this.indexResolver = indexResolver;
8592
this.threadPool = transportService.getThreadPool();
8693
this.remoteClusterService = transportService.getRemoteClusterService();
94+
this.projectResolver = projectResolver;
8795
transportService.registerRequestHandler(
8896
RESOLVE_ACTION_NAME,
8997
threadPool.executor(ThreadPool.Names.SEARCH),
@@ -445,7 +453,8 @@ public void messageReceived(LookupRequest request, TransportChannel channel, Tas
445453
}
446454

447455
protected Map<String, EnrichPolicy> availablePolicies() {
448-
final EnrichMetadata metadata = clusterService.state().metadata().getProject().custom(EnrichMetadata.TYPE, EnrichMetadata.EMPTY);
456+
final EnrichMetadata metadata = projectResolver.getProjectMetadata(clusterService.state())
457+
.custom(EnrichMetadata.TYPE, EnrichMetadata.EMPTY);
449458
return metadata.getPolicies();
450459
}
451460

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
import org.elasticsearch.TransportVersions;
1111
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
12+
import org.elasticsearch.cluster.project.ProjectResolver;
1213
import org.elasticsearch.cluster.service.ClusterService;
1314
import org.elasticsearch.common.collect.Iterators;
1415
import org.elasticsearch.common.io.stream.StreamInput;
@@ -55,7 +56,8 @@ public LookupFromIndexService(
5556
TransportService transportService,
5657
IndexNameExpressionResolver indexNameExpressionResolver,
5758
BigArrays bigArrays,
58-
BlockFactory blockFactory
59+
BlockFactory blockFactory,
60+
ProjectResolver projectResolver
5961
) {
6062
super(
6163
LOOKUP_ACTION_NAME,
@@ -67,7 +69,8 @@ public LookupFromIndexService(
6769
bigArrays,
6870
blockFactory,
6971
false,
70-
TransportRequest::readFrom
72+
TransportRequest::readFrom,
73+
projectResolver
7174
);
7275
}
7376

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,12 @@ public TransportEsqlQueryAction(
113113
this.requestExecutor = threadPool.executor(ThreadPool.Names.SEARCH);
114114
exchangeService.registerTransportHandler(transportService);
115115
this.exchangeService = exchangeService;
116-
this.enrichPolicyResolver = new EnrichPolicyResolver(clusterService, transportService, planExecutor.indexResolver());
116+
this.enrichPolicyResolver = new EnrichPolicyResolver(
117+
clusterService,
118+
transportService,
119+
planExecutor.indexResolver(),
120+
projectResolver
121+
);
117122
AbstractLookupService.LookupShardContextFactory lookupLookupShardContextFactory = AbstractLookupService.LookupShardContextFactory
118123
.fromSearchService(searchService);
119124
this.enrichLookupService = new EnrichLookupService(
@@ -123,7 +128,8 @@ public TransportEsqlQueryAction(
123128
transportService,
124129
indexNameExpressionResolver,
125130
bigArrays,
126-
blockFactoryProvider.blockFactory()
131+
blockFactoryProvider.blockFactory(),
132+
projectResolver
127133
);
128134
this.lookupFromIndexService = new LookupFromIndexService(
129135
clusterService,
@@ -132,7 +138,8 @@ public TransportEsqlQueryAction(
132138
transportService,
133139
indexNameExpressionResolver,
134140
bigArrays,
135-
blockFactoryProvider.blockFactory()
141+
blockFactoryProvider.blockFactory(),
142+
projectResolver
136143
);
137144

138145
this.asyncTaskManagementService = new AsyncTaskManagementService<>(

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,10 @@
2222
import org.elasticsearch.client.internal.FilterClient;
2323
import org.elasticsearch.cluster.ClusterName;
2424
import org.elasticsearch.cluster.ClusterState;
25-
import org.elasticsearch.cluster.metadata.Metadata;
25+
import org.elasticsearch.cluster.metadata.ProjectId;
26+
import org.elasticsearch.cluster.metadata.ProjectMetadata;
2627
import org.elasticsearch.cluster.node.VersionInformation;
28+
import org.elasticsearch.cluster.project.TestProjectResolvers;
2729
import org.elasticsearch.cluster.service.ClusterService;
2830
import org.elasticsearch.common.settings.Settings;
2931
import org.elasticsearch.common.util.concurrent.EsExecutors;
@@ -95,9 +97,10 @@ public void setUpClusters() {
9597
}
9698
AbstractSimpleTransportTestCase.connectToNode(transports.get(""), transports.get("cluster_a").getLocalNode());
9799
AbstractSimpleTransportTestCase.connectToNode(transports.get(""), transports.get("cluster_b").getLocalNode());
98-
localCluster = newEnrichPolicyResolver(LOCAL_CLUSTER_GROUP_KEY);
99-
clusterA = newEnrichPolicyResolver("cluster_a");
100-
clusterB = newEnrichPolicyResolver("cluster_b");
100+
final var projectId = randomProjectIdOrDefault();
101+
localCluster = newEnrichPolicyResolver(projectId, LOCAL_CLUSTER_GROUP_KEY);
102+
clusterA = newEnrichPolicyResolver(projectId, "cluster_a");
103+
clusterB = newEnrichPolicyResolver(projectId, "cluster_b");
101104

102105
// hosts policies are the same across clusters
103106
var hostsPolicy = new EnrichPolicy("match", null, List.of(), "ip", List.of("region", "cost"));
@@ -401,8 +404,8 @@ public void testMissingRemotePolicy() {
401404
}
402405
}
403406

404-
TestEnrichPolicyResolver newEnrichPolicyResolver(String cluster) {
405-
return new TestEnrichPolicyResolver(cluster, new HashMap<>(), new HashMap<>(), new HashMap<>());
407+
TestEnrichPolicyResolver newEnrichPolicyResolver(ProjectId projectId, String cluster) {
408+
return new TestEnrichPolicyResolver(projectId, cluster, new HashMap<>(), new HashMap<>(), new HashMap<>());
406409
}
407410

408411
class TestEnrichPolicyResolver extends EnrichPolicyResolver {
@@ -412,15 +415,17 @@ class TestEnrichPolicyResolver extends EnrichPolicyResolver {
412415
final Map<String, Map<String, String>> mappings;
413416

414417
TestEnrichPolicyResolver(
418+
ProjectId projectId,
415419
String cluster,
416420
Map<String, EnrichPolicy> policies,
417421
Map<String, String> aliases,
418422
Map<String, Map<String, String>> mappings
419423
) {
420424
super(
421-
mockClusterService(policies),
425+
mockClusterService(projectId, policies),
422426
transports.get(cluster),
423-
new IndexResolver(new FieldCapsClient(threadPool, aliases, mappings))
427+
new IndexResolver(new FieldCapsClient(threadPool, aliases, mappings)),
428+
TestProjectResolvers.singleProject(projectId)
424429
);
425430
this.policies = policies;
426431
this.cluster = cluster;
@@ -457,11 +462,11 @@ protected void getRemoteConnection(String remoteCluster, ActionListener<Transpor
457462
listener.onResponse(transports.get("").getConnection(transports.get(remoteCluster).getLocalNode()));
458463
}
459464

460-
static ClusterService mockClusterService(Map<String, EnrichPolicy> policies) {
465+
static ClusterService mockClusterService(ProjectId projectId, Map<String, EnrichPolicy> policies) {
461466
ClusterService clusterService = mock(ClusterService.class);
462467
EnrichMetadata enrichMetadata = new EnrichMetadata(policies);
463468
ClusterState state = ClusterState.builder(new ClusterName("test"))
464-
.metadata(Metadata.builder().projectCustoms(Map.of(EnrichMetadata.TYPE, enrichMetadata)))
469+
.putProjectMetadata(ProjectMetadata.builder(projectId).customs(Map.of(EnrichMetadata.TYPE, enrichMetadata)))
465470
.build();
466471
when(clusterService.state()).thenReturn(state);
467472
return clusterService;

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperatorTests.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
1919
import org.elasticsearch.cluster.node.DiscoveryNode;
2020
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
21+
import org.elasticsearch.cluster.project.TestProjectResolvers;
2122
import org.elasticsearch.cluster.service.ClusterService;
2223
import org.elasticsearch.common.settings.ClusterSettings;
2324
import org.elasticsearch.common.settings.Settings;
@@ -184,7 +185,8 @@ private LookupFromIndexService lookupService(DriverContext mainContext) {
184185
IndicesService indicesService = mock(IndicesService.class);
185186
IndexNameExpressionResolver indexNameExpressionResolver = TestIndexNameExpressionResolver.newInstance();
186187
releasables.add(clusterService::stop);
187-
ClusterServiceUtils.setState(clusterService, ClusterStateCreationUtils.state("idx", 1, 1));
188+
final var projectId = randomProjectIdOrDefault();
189+
ClusterServiceUtils.setState(clusterService, ClusterStateCreationUtils.state(projectId, "idx", 1, 1));
188190
if (beCranky) {
189191
logger.info("building a cranky lookup");
190192
}
@@ -198,7 +200,8 @@ private LookupFromIndexService lookupService(DriverContext mainContext) {
198200
transportService(clusterService),
199201
indexNameExpressionResolver,
200202
bigArrays,
201-
blockFactory
203+
blockFactory,
204+
TestProjectResolvers.singleProject(projectId)
202205
);
203206
}
204207

x-pack/qa/multi-project/xpack-rest-tests-with-multiple-projects/build.gradle

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,17 +34,8 @@ tasks.named("yamlRestTest").configure {
3434
'^data_streams/10_data_stream_resolvability/*',
3535
'^deprecation/10_basic/*',
3636
'^dlm/10_usage/*',
37-
'^esql/60_enrich/*',
3837
'^esql/60_usage/*',
39-
'^esql/61_enrich_ip/*',
40-
'^esql/62_extra_enrich/*',
41-
'^esql/63_enrich_int_range/*',
42-
'^esql/64_enrich_int_match/*',
4338
'^esql/180_match_operator/*',
44-
'^esql/190_lookup_join/*',
45-
'^esql/191_lookup_join_on_datastreams/*',
46-
'^esql/191_lookup_join_text/*',
47-
'^esql/192_lookup_join_on_aliases/*',
4839
'^health/10_usage/*',
4940
'^ilm/80_health/*',
5041
'^logsdb/10_usage/*',

0 commit comments

Comments
 (0)