Skip to content

Commit e627316

Browse files
committed
Serialize Configuration to get the source in the LOOKUP request, + TransportVersion
1 parent f40de01 commit e627316

File tree

6 files changed

+40
-1
lines changed

6 files changed

+40
-1
lines changed

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ static TransportVersion def(int id) {
173173
public static final TransportVersion INFERENCE_REQUEST_ADAPTIVE_RATE_LIMITING = def(8_839_00_0);
174174
public static final TransportVersion ML_INFERENCE_IBM_WATSONX_RERANK_ADDED = def(8_840_00_0);
175175
public static final TransportVersion ELASTICSEARCH_9_0 = def(9_000_00_0);
176+
public static final TransportVersion ESQL_LOOKUP_JOIN_CONFIGURATION_IN_REQUEST = def(9_001_00_0);
176177

177178
/*
178179
* STOP! READ THIS FIRST! No, really,

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.elasticsearch.tasks.TaskId;
4747
import org.elasticsearch.threadpool.ThreadPool;
4848
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
49+
import org.elasticsearch.xpack.esql.ConfigurationTestUtils;
4950
import org.elasticsearch.xpack.esql.core.expression.Alias;
5051
import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;
5152
import org.elasticsearch.xpack.esql.core.tree.Source;
@@ -214,6 +215,7 @@ private void runLookup(DataType keyType, PopulateIndices populateIndices) throws
214215
final String finalNodeWithShard = nodeWithShard;
215216
LookupFromIndexOperator.Factory lookup = new LookupFromIndexOperator.Factory(
216217
"test",
218+
ConfigurationTestUtils.randomConfiguration(),
217219
parentTask,
218220
QueryPragmas.ENRICH_MAX_WORKERS.get(Settings.EMPTY),
219221
1,

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
2727
import org.elasticsearch.xpack.esql.core.tree.Source;
2828
import org.elasticsearch.xpack.esql.core.type.DataType;
29+
import org.elasticsearch.xpack.esql.session.Configuration;
2930

3031
import java.io.IOException;
3132
import java.util.Iterator;
@@ -38,6 +39,7 @@
3839
public final class LookupFromIndexOperator extends AsyncOperator<LookupFromIndexOperator.OngoingJoin> {
3940
public record Factory(
4041
String sessionId,
42+
Configuration configuration,
4143
CancellableTask parentTask,
4244
int maxOutstandingRequests,
4345
int inputChannel,
@@ -67,6 +69,7 @@ public String describe() {
6769
public Operator get(DriverContext driverContext) {
6870
return new LookupFromIndexOperator(
6971
sessionId,
72+
configuration,
7073
driverContext,
7174
parentTask,
7275
maxOutstandingRequests,
@@ -83,6 +86,7 @@ public Operator get(DriverContext driverContext) {
8386

8487
private final LookupFromIndexService lookupService;
8588
private final String sessionId;
89+
private final Configuration configuration;
8690
private final CancellableTask parentTask;
8791
private final int inputChannel;
8892
private final DataType inputDataType;
@@ -102,6 +106,7 @@ public Operator get(DriverContext driverContext) {
102106

103107
public LookupFromIndexOperator(
104108
String sessionId,
109+
Configuration configuration,
105110
DriverContext driverContext,
106111
CancellableTask parentTask,
107112
int maxOutstandingRequests,
@@ -115,6 +120,7 @@ public LookupFromIndexOperator(
115120
) {
116121
super(driverContext, maxOutstandingRequests);
117122
this.sessionId = sessionId;
123+
this.configuration = configuration;
118124
this.parentTask = parentTask;
119125
this.inputChannel = inputChannel;
120126
this.lookupService = lookupService;
@@ -131,6 +137,7 @@ protected void performAsync(Page inputPage, ActionListener<OngoingJoin> listener
131137
totalTerms += inputBlock.getTotalValueCount();
132138
LookupFromIndexService.Request request = new LookupFromIndexService.Request(
133139
sessionId,
140+
configuration,
134141
lookupIndex,
135142
inputDataType,
136143
matchField,

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99

1010
import org.elasticsearch.TransportVersions;
1111
import org.elasticsearch.cluster.service.ClusterService;
12+
import org.elasticsearch.common.breaker.CircuitBreaker;
13+
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
1214
import org.elasticsearch.common.collect.Iterators;
1315
import org.elasticsearch.common.io.stream.StreamInput;
1416
import org.elasticsearch.common.io.stream.StreamOutput;
@@ -30,11 +32,14 @@
3032
import org.elasticsearch.xpack.esql.core.type.DataType;
3133
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
3234
import org.elasticsearch.xpack.esql.io.stream.PlanStreamOutput;
35+
import org.elasticsearch.xpack.esql.session.Configuration;
3336

3437
import java.io.IOException;
3538
import java.util.List;
3639
import java.util.Objects;
3740

41+
import static java.lang.System.in;
42+
3843
/**
3944
* {@link LookupFromIndexService} performs lookup against a Lookup index for
4045
* a given input page. See {@link AbstractLookupService} for how it works
@@ -66,6 +71,7 @@ public LookupFromIndexService(
6671
protected TransportRequest transportRequest(LookupFromIndexService.Request request, ShardId shardId) {
6772
return new TransportRequest(
6873
request.sessionId,
74+
request.configuration,
6975
shardId,
7076
request.inputDataType,
7177
request.inputPage,
@@ -101,10 +107,12 @@ protected AbstractLookupService.LookupResponse readLookupResponse(StreamInput in
101107
}
102108

103109
public static class Request extends AbstractLookupService.Request {
110+
private final Configuration configuration;
104111
private final String matchField;
105112

106113
Request(
107114
String sessionId,
115+
Configuration configuration,
108116
String index,
109117
DataType inputDataType,
110118
String matchField,
@@ -113,15 +121,18 @@ public static class Request extends AbstractLookupService.Request {
113121
Source source
114122
) {
115123
super(sessionId, index, inputDataType, inputPage, extractFields, source);
124+
this.configuration = configuration;
116125
this.matchField = matchField;
117126
}
118127
}
119128

120129
protected static class TransportRequest extends AbstractLookupService.TransportRequest {
130+
private final Configuration configuration;
121131
private final String matchField;
122132

123133
TransportRequest(
124134
String sessionId,
135+
Configuration configuration,
125136
ShardId shardId,
126137
DataType inputDataType,
127138
Page inputPage,
@@ -131,19 +142,30 @@ protected static class TransportRequest extends AbstractLookupService.TransportR
131142
Source source
132143
) {
133144
super(sessionId, shardId, inputDataType, inputPage, toRelease, extractFields, source);
145+
this.configuration = configuration;
134146
this.matchField = matchField;
135147
}
136148

137149
static TransportRequest readFrom(StreamInput in, BlockFactory blockFactory) throws IOException {
138150
TaskId parentTaskId = TaskId.readFromStream(in);
139151
String sessionId = in.readString();
152+
Configuration configuration = null;
153+
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_CONFIGURATION_IN_REQUEST)) {
154+
configuration = new Configuration(
155+
// TODO make EsqlConfiguration Releasable
156+
new BlockStreamInput(
157+
in,
158+
new BlockFactory(new NoopCircuitBreaker(CircuitBreaker.REQUEST), BigArrays.NON_RECYCLING_INSTANCE)
159+
)
160+
);
161+
}
140162
ShardId shardId = new ShardId(in);
141163
DataType inputDataType = DataType.fromTypeName(in.readString());
142164
Page inputPage;
143165
try (BlockStreamInput bsi = new BlockStreamInput(in, blockFactory)) {
144166
inputPage = new Page(bsi);
145167
}
146-
PlanStreamInput planIn = new PlanStreamInput(in, in.namedWriteableRegistry(), null);
168+
PlanStreamInput planIn = new PlanStreamInput(in, in.namedWriteableRegistry(), configuration);
147169
List<NamedExpression> extractFields = planIn.readNamedWriteableCollectionAsList(NamedExpression.class);
148170
String matchField = in.readString();
149171
var source = Source.EMPTY;
@@ -152,6 +174,7 @@ static TransportRequest readFrom(StreamInput in, BlockFactory blockFactory) thro
152174
}
153175
TransportRequest result = new TransportRequest(
154176
sessionId,
177+
configuration,
155178
shardId,
156179
inputDataType,
157180
inputPage,
@@ -168,6 +191,9 @@ static TransportRequest readFrom(StreamInput in, BlockFactory blockFactory) thro
168191
public void writeTo(StreamOutput out) throws IOException {
169192
super.writeTo(out);
170193
out.writeString(sessionId);
194+
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_CONFIGURATION_IN_REQUEST)) {
195+
configuration.writeTo(out);
196+
}
171197
out.writeWriteable(shardId);
172198
out.writeString(inputDataType.typeName());
173199
out.writeWriteable(inputPage);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -601,6 +601,7 @@ private PhysicalOperation planLookupJoin(LookupJoinExec join, LocalExecutionPlan
601601
return source.with(
602602
new LookupFromIndexOperator.Factory(
603603
sessionId,
604+
configuration,
604605
parentTask,
605606
context.queryPragmas().enrichMaxWorkers(),
606607
matchConfig.channel(),

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperatorTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.elasticsearch.threadpool.TestThreadPool;
5454
import org.elasticsearch.threadpool.ThreadPool;
5555
import org.elasticsearch.transport.TransportService;
56+
import org.elasticsearch.xpack.esql.ConfigurationTestUtils;
5657
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
5758
import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;
5859
import org.elasticsearch.xpack.esql.core.tree.Source;
@@ -137,6 +138,7 @@ protected Operator.OperatorFactory simple() {
137138
);
138139
return new LookupFromIndexOperator.Factory(
139140
sessionId,
141+
ConfigurationTestUtils.randomConfiguration(),
140142
parentTask,
141143
maxOutstandingRequests,
142144
inputChannel,

0 commit comments

Comments
 (0)