Skip to content

Commit eae2648

Browse files
ES|QL: Add support for JOINs on aliases
1 parent e352d2c commit eae2648

File tree

9 files changed

+42
-12
lines changed

9 files changed

+42
-12
lines changed

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,7 @@ static TransportVersion def(int id) {
264264
public static final TransportVersion NODES_STATS_SUPPORTS_MULTI_PROJECT = def(9_079_0_00);
265265
public static final TransportVersion ML_INFERENCE_HUGGING_FACE_RERANK_ADDED = def(9_080_0_00);
266266
public static final TransportVersion SETTINGS_IN_DATA_STREAMS_DRY_RUN = def(9_081_0_00);
267+
public static final TransportVersion JOIN_ON_ALIASES = def(9_082_0_00);
267268
/*
268269
* STOP! READ THIS FIRST! No, really,
269270
* ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,7 @@ private void runLookup(DataType keyType, PopulateIndices populateIndices) throws
227227
ctx -> internalCluster().getInstance(TransportEsqlQueryAction.class, finalNodeWithShard).getLookupFromIndexService(),
228228
keyType,
229229
"lookup",
230+
"lookup",
230231
"key",
231232
List.of(new Alias(Source.EMPTY, "l", new ReferenceAttribute(Source.EMPTY, "l", DataType.LONG))),
232233
Source.EMPTY

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -464,6 +464,7 @@ public void messageReceived(T request, TransportChannel channel, Task task) {
464464
abstract static class Request {
465465
final String sessionId;
466466
final String index;
467+
final String indexPattern;
467468
final DataType inputDataType;
468469
final Page inputPage;
469470
final List<NamedExpression> extractFields;
@@ -472,13 +473,15 @@ abstract static class Request {
472473
Request(
473474
String sessionId,
474475
String index,
476+
String indexPattern,
475477
DataType inputDataType,
476478
Page inputPage,
477479
List<NamedExpression> extractFields,
478480
Source source
479481
) {
480482
this.sessionId = sessionId;
481483
this.index = index;
484+
this.indexPattern = indexPattern;
482485
this.inputDataType = inputDataType;
483486
this.inputPage = inputPage;
484487
this.extractFields = extractFields;
@@ -489,6 +492,7 @@ abstract static class Request {
489492
abstract static class TransportRequest extends AbstractTransportRequest implements IndicesRequest {
490493
final String sessionId;
491494
final ShardId shardId;
495+
final String indexPattern;
492496
/**
493497
* For mixed clusters with nodes &lt;8.14, this will be null.
494498
*/
@@ -504,6 +508,7 @@ abstract static class TransportRequest extends AbstractTransportRequest implemen
504508
TransportRequest(
505509
String sessionId,
506510
ShardId shardId,
511+
String indexPattern,
507512
DataType inputDataType,
508513
Page inputPage,
509514
Page toRelease,
@@ -512,6 +517,7 @@ abstract static class TransportRequest extends AbstractTransportRequest implemen
512517
) {
513518
this.sessionId = sessionId;
514519
this.shardId = shardId;
520+
this.indexPattern = indexPattern;
515521
this.inputDataType = inputDataType;
516522
this.inputPage = inputPage;
517523
this.toRelease = toRelease;
@@ -521,7 +527,7 @@ abstract static class TransportRequest extends AbstractTransportRequest implemen
521527

522528
@Override
523529
public final String[] indices() {
524-
return new String[] { shardId.getIndexName() };
530+
return new String[] { indexPattern };
525531
}
526532

527533
@Override

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ public static class Request extends AbstractLookupService.Request {
170170
List<NamedExpression> extractFields,
171171
Source source
172172
) {
173-
super(sessionId, index, inputDataType, inputPage, extractFields, source);
173+
super(sessionId, index, index, inputDataType, inputPage, extractFields, source);
174174
this.matchType = matchType;
175175
this.matchField = matchField;
176176
}
@@ -191,7 +191,7 @@ protected static class TransportRequest extends AbstractLookupService.TransportR
191191
List<NamedExpression> extractFields,
192192
Source source
193193
) {
194-
super(sessionId, shardId, inputDataType, inputPage, toRelease, extractFields, source);
194+
super(sessionId, shardId, shardId.getIndexName(), inputDataType, inputPage, toRelease, extractFields, source);
195195
this.matchType = matchType;
196196
this.matchField = matchField;
197197
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public record Factory(
4343
int inputChannel,
4444
Function<DriverContext, LookupFromIndexService> lookupService,
4545
DataType inputDataType,
46+
String lookupIndexPattern,
4647
String lookupIndex,
4748
String matchField,
4849
List<NamedExpression> loadFields,
@@ -73,6 +74,7 @@ public Operator get(DriverContext driverContext) {
7374
inputChannel,
7475
lookupService.apply(driverContext),
7576
inputDataType,
77+
lookupIndexPattern,
7678
lookupIndex,
7779
matchField,
7880
loadFields,
@@ -86,6 +88,7 @@ public Operator get(DriverContext driverContext) {
8688
private final CancellableTask parentTask;
8789
private final int inputChannel;
8890
private final DataType inputDataType;
91+
private final String lookupIndexPattern;
8992
private final String lookupIndex;
9093
private final String matchField;
9194
private final List<NamedExpression> loadFields;
@@ -108,6 +111,7 @@ public LookupFromIndexOperator(
108111
int inputChannel,
109112
LookupFromIndexService lookupService,
110113
DataType inputDataType,
114+
String lookupIndexPattern,
111115
String lookupIndex,
112116
String matchField,
113117
List<NamedExpression> loadFields,
@@ -119,6 +123,7 @@ public LookupFromIndexOperator(
119123
this.inputChannel = inputChannel;
120124
this.lookupService = lookupService;
121125
this.inputDataType = inputDataType;
126+
this.lookupIndexPattern = lookupIndexPattern;
122127
this.lookupIndex = lookupIndex;
123128
this.matchField = matchField;
124129
this.loadFields = loadFields;
@@ -132,6 +137,7 @@ protected void performAsync(Page inputPage, ActionListener<OngoingJoin> listener
132137
LookupFromIndexService.Request request = new LookupFromIndexService.Request(
133138
sessionId,
134139
lookupIndex,
140+
lookupIndexPattern,
135141
inputDataType,
136142
matchField,
137143
new Page(inputBlock),

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.index.shard.ShardId;
2626
import org.elasticsearch.tasks.TaskId;
2727
import org.elasticsearch.transport.TransportService;
28+
import org.elasticsearch.xpack.esql.VerificationException;
2829
import org.elasticsearch.xpack.esql.action.EsqlQueryAction;
2930
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
3031
import org.elasticsearch.xpack.esql.core.tree.Source;
@@ -36,8 +37,6 @@
3637
import java.util.List;
3738
import java.util.Objects;
3839

39-
import static java.lang.System.in;
40-
4140
/**
4241
* {@link LookupFromIndexService} performs lookup against a Lookup index for
4342
* a given input page. See {@link AbstractLookupService} for how it works
@@ -70,6 +69,7 @@ protected TransportRequest transportRequest(LookupFromIndexService.Request reque
7069
return new TransportRequest(
7170
request.sessionId,
7271
shardId,
72+
request.indexPattern,
7373
request.inputDataType,
7474
request.inputPage,
7575
null,
@@ -109,13 +109,14 @@ public static class Request extends AbstractLookupService.Request {
109109
Request(
110110
String sessionId,
111111
String index,
112+
String indexPattern,
112113
DataType inputDataType,
113114
String matchField,
114115
Page inputPage,
115116
List<NamedExpression> extractFields,
116117
Source source
117118
) {
118-
super(sessionId, index, inputDataType, inputPage, extractFields, source);
119+
super(sessionId, index, indexPattern, inputDataType, inputPage, extractFields, source);
119120
this.matchField = matchField;
120121
}
121122
}
@@ -126,21 +127,30 @@ protected static class TransportRequest extends AbstractLookupService.TransportR
126127
TransportRequest(
127128
String sessionId,
128129
ShardId shardId,
130+
String indexPattern,
129131
DataType inputDataType,
130132
Page inputPage,
131133
Page toRelease,
132134
List<NamedExpression> extractFields,
133135
String matchField,
134136
Source source
135137
) {
136-
super(sessionId, shardId, inputDataType, inputPage, toRelease, extractFields, source);
138+
super(sessionId, shardId, indexPattern, inputDataType, inputPage, toRelease, extractFields, source);
137139
this.matchField = matchField;
138140
}
139141

140142
static TransportRequest readFrom(StreamInput in, BlockFactory blockFactory) throws IOException {
141143
TaskId parentTaskId = TaskId.readFromStream(in);
142144
String sessionId = in.readString();
143145
ShardId shardId = new ShardId(in);
146+
147+
String indexPattern;
148+
if (in.getTransportVersion().onOrAfter(TransportVersions.JOIN_ON_ALIASES)) {
149+
indexPattern = in.readString();
150+
} else {
151+
indexPattern = shardId.getIndexName();
152+
}
153+
144154
DataType inputDataType = DataType.fromTypeName(in.readString());
145155
Page inputPage;
146156
try (BlockStreamInput bsi = new BlockStreamInput(in, blockFactory)) {
@@ -162,6 +172,7 @@ static TransportRequest readFrom(StreamInput in, BlockFactory blockFactory) thro
162172
TransportRequest result = new TransportRequest(
163173
sessionId,
164174
shardId,
175+
indexPattern,
165176
inputDataType,
166177
inputPage,
167178
inputPage,
@@ -178,6 +189,14 @@ public void writeTo(StreamOutput out) throws IOException {
178189
super.writeTo(out);
179190
out.writeString(sessionId);
180191
out.writeWriteable(shardId);
192+
193+
if (indexPattern.equals(shardId.getIndexName()) == false
194+
&& out.getTransportVersion().before(TransportVersions.JOIN_ON_ALIASES)) {
195+
// TODO can we throw exceptions here?
196+
throw new VerificationException("Aliases and index patterns are not allowed for LOOKUP JOIN []", indexPattern);
197+
}
198+
out.writeString(indexPattern);
199+
181200
out.writeString(inputDataType.typeName());
182201
out.writeWriteable(inputPage);
183202
PlanStreamOutput planOut = new PlanStreamOutput(out, null);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/LookupJoin.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -102,11 +102,6 @@ public void postAnalysisVerification(Failures failures) {
102102
)
103103
);
104104
}
105-
106-
// this check is crucial for security: ES|QL would use the concrete indices, so it would bypass the security on the alias
107-
if (esr.concreteIndices().contains(esr.indexPattern()) == false) {
108-
failures.add(fail(this, "Aliases and index patterns are not allowed for LOOKUP JOIN [{}]", esr.indexPattern()));
109-
}
110105
});
111106
}
112107
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -739,6 +739,7 @@ private PhysicalOperation planLookupJoin(LookupJoinExec join, LocalExecutionPlan
739739
matchConfig.channel(),
740740
ctx -> lookupFromIndexService,
741741
matchConfig.type(),
742+
localSourceExec.indexPattern(),
742743
indexName,
743744
matchConfig.fieldName(),
744745
join.addedFields().stream().map(f -> (NamedExpression) f).toList(),

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperatorTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ protected Operator.OperatorFactory simple(SimpleOptions options) {
143143
this::lookupService,
144144
inputDataType,
145145
lookupIndex,
146+
lookupIndex,
146147
matchField,
147148
loadFields,
148149
Source.EMPTY

0 commit comments

Comments
 (0)