Skip to content

Commit 353f3d9

Browse files
committed
Only transport the Source text, instead of the full Configuration
1 parent 2ec796d commit 353f3d9

File tree

3 files changed

+11
-27
lines changed

3 files changed

+11
-27
lines changed

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ static TransportVersion def(int id) {
175175
public static final TransportVersion COHERE_BIT_EMBEDDING_TYPE_SUPPORT_ADDED_BACKPORT_8_X = def(8_840_0_01);
176176
public static final TransportVersion ELASTICSEARCH_9_0 = def(9_000_0_00);
177177
public static final TransportVersion COHERE_BIT_EMBEDDING_TYPE_SUPPORT_ADDED = def(9_001_0_00);
178-
public static final TransportVersion ESQL_LOOKUP_JOIN_CONFIGURATION_IN_REQUEST = def(9_002_0_00);
178+
public static final TransportVersion ESQL_LOOKUP_JOIN_SOURCE_TEXT = def(9_002_0_00);
179179
/*
180180
* STOP! READ THIS FIRST! No, really,
181181
* ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,6 @@ public Operator get(DriverContext driverContext) {
8686

8787
private final LookupFromIndexService lookupService;
8888
private final String sessionId;
89-
private final Configuration configuration;
9089
private final CancellableTask parentTask;
9190
private final int inputChannel;
9291
private final DataType inputDataType;
@@ -106,7 +105,6 @@ public Operator get(DriverContext driverContext) {
106105

107106
public LookupFromIndexOperator(
108107
String sessionId,
109-
Configuration configuration,
110108
DriverContext driverContext,
111109
CancellableTask parentTask,
112110
int maxOutstandingRequests,
@@ -120,7 +118,6 @@ public LookupFromIndexOperator(
120118
) {
121119
super(driverContext, lookupService.getThreadContext(), maxOutstandingRequests);
122120
this.sessionId = sessionId;
123-
this.configuration = configuration;
124121
this.parentTask = parentTask;
125122
this.inputChannel = inputChannel;
126123
this.lookupService = lookupService;
@@ -137,7 +134,6 @@ protected void performAsync(Page inputPage, ActionListener<OngoingJoin> listener
137134
totalTerms += inputBlock.getTotalValueCount();
138135
LookupFromIndexService.Request request = new LookupFromIndexService.Request(
139136
sessionId,
140-
configuration,
141137
lookupIndex,
142138
inputDataType,
143139
matchField,

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java

Lines changed: 10 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@ public LookupFromIndexService(
7171
protected TransportRequest transportRequest(LookupFromIndexService.Request request, ShardId shardId) {
7272
return new TransportRequest(
7373
request.sessionId,
74-
request.configuration,
7574
shardId,
7675
request.inputDataType,
7776
request.inputPage,
@@ -107,12 +106,10 @@ protected AbstractLookupService.LookupResponse readLookupResponse(StreamInput in
107106
}
108107

109108
public static class Request extends AbstractLookupService.Request {
110-
private final Configuration configuration;
111109
private final String matchField;
112110

113111
Request(
114112
String sessionId,
115-
Configuration configuration,
116113
String index,
117114
DataType inputDataType,
118115
String matchField,
@@ -121,18 +118,15 @@ public static class Request extends AbstractLookupService.Request {
121118
Source source
122119
) {
123120
super(sessionId, index, inputDataType, inputPage, extractFields, source);
124-
this.configuration = configuration;
125121
this.matchField = matchField;
126122
}
127123
}
128124

129125
protected static class TransportRequest extends AbstractLookupService.TransportRequest {
130-
private final Configuration configuration;
131126
private final String matchField;
132127

133128
TransportRequest(
134129
String sessionId,
135-
Configuration configuration,
136130
ShardId shardId,
137131
DataType inputDataType,
138132
Page inputPage,
@@ -142,39 +136,33 @@ protected static class TransportRequest extends AbstractLookupService.TransportR
142136
Source source
143137
) {
144138
super(sessionId, shardId, inputDataType, inputPage, toRelease, extractFields, source);
145-
this.configuration = configuration;
146139
this.matchField = matchField;
147140
}
148141

149142
static TransportRequest readFrom(StreamInput in, BlockFactory blockFactory) throws IOException {
150143
TaskId parentTaskId = TaskId.readFromStream(in);
151144
String sessionId = in.readString();
152-
Configuration configuration = null;
153-
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_CONFIGURATION_IN_REQUEST)) {
154-
configuration = new Configuration(
155-
// TODO make EsqlConfiguration Releasable
156-
new BlockStreamInput(
157-
in,
158-
new BlockFactory(new NoopCircuitBreaker(CircuitBreaker.REQUEST), BigArrays.NON_RECYCLING_INSTANCE)
159-
)
160-
);
161-
}
162145
ShardId shardId = new ShardId(in);
163146
DataType inputDataType = DataType.fromTypeName(in.readString());
164147
Page inputPage;
165148
try (BlockStreamInput bsi = new BlockStreamInput(in, blockFactory)) {
166149
inputPage = new Page(bsi);
167150
}
168-
PlanStreamInput planIn = new PlanStreamInput(in, in.namedWriteableRegistry(), configuration);
151+
PlanStreamInput planIn = new PlanStreamInput(in, in.namedWriteableRegistry(), null);
169152
List<NamedExpression> extractFields = planIn.readNamedWriteableCollectionAsList(NamedExpression.class);
170153
String matchField = in.readString();
171154
var source = Source.EMPTY;
172155
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_ENRICH_RUNTIME_WARNINGS)) {
173156
source = Source.readFrom(planIn);
174157
}
158+
// Source.readFrom() requires the query from the Configuration passed to PlanStreamInput.
159+
// As we don't have the Configuration here, and it may be heavy to serialize, we directly pass the Source text.
160+
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_SOURCE_TEXT)) {
161+
String sourceText = in.readString();
162+
source = new Source(source.source(), sourceText);
163+
}
175164
TransportRequest result = new TransportRequest(
176165
sessionId,
177-
configuration,
178166
shardId,
179167
inputDataType,
180168
inputPage,
@@ -191,9 +179,6 @@ static TransportRequest readFrom(StreamInput in, BlockFactory blockFactory) thro
191179
public void writeTo(StreamOutput out) throws IOException {
192180
super.writeTo(out);
193181
out.writeString(sessionId);
194-
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_CONFIGURATION_IN_REQUEST)) {
195-
configuration.writeTo(out);
196-
}
197182
out.writeWriteable(shardId);
198183
out.writeString(inputDataType.typeName());
199184
out.writeWriteable(inputPage);
@@ -203,6 +188,9 @@ public void writeTo(StreamOutput out) throws IOException {
203188
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_ENRICH_RUNTIME_WARNINGS)) {
204189
source.writeTo(planOut);
205190
}
191+
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_SOURCE_TEXT)) {
192+
out.writeString(source.text());
193+
}
206194
}
207195

208196
@Override

0 commit comments

Comments
 (0)