Skip to content

Commit 72d2b72

Browse files
authored
[ES|QL] Inference Command : restore serialization / deserialization (elastic#139387)
1 parent 5f92c44 commit 72d2b72

File tree

13 files changed

+412
-13
lines changed

13 files changed

+412
-13
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
9242000
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
google_vertex_ai_configurable_max_batch_size,9241000
1+
esql_inference_row_limit,9242000

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/PlanWritables.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import org.elasticsearch.xpack.esql.plan.logical.Subquery;
2626
import org.elasticsearch.xpack.esql.plan.logical.TimeSeriesAggregate;
2727
import org.elasticsearch.xpack.esql.plan.logical.TopN;
28+
import org.elasticsearch.xpack.esql.plan.logical.inference.Completion;
29+
import org.elasticsearch.xpack.esql.plan.logical.inference.Rerank;
2830
import org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin;
2931
import org.elasticsearch.xpack.esql.plan.logical.join.Join;
3032
import org.elasticsearch.xpack.esql.plan.logical.local.CopyingLocalSupplier;
@@ -54,6 +56,8 @@
5456
import org.elasticsearch.xpack.esql.plan.physical.SubqueryExec;
5557
import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesAggregateExec;
5658
import org.elasticsearch.xpack.esql.plan.physical.TopNExec;
59+
import org.elasticsearch.xpack.esql.plan.physical.inference.CompletionExec;
60+
import org.elasticsearch.xpack.esql.plan.physical.inference.RerankExec;
5761

5862
import java.util.ArrayList;
5963
import java.util.List;
@@ -71,6 +75,7 @@ public static List<NamedWriteableRegistry.Entry> getNamedWriteables() {
7175
public static List<NamedWriteableRegistry.Entry> logical() {
7276
return List.of(
7377
Aggregate.ENTRY,
78+
Completion.ENTRY,
7479
Dissect.ENTRY,
7580
Enrich.ENTRY,
7681
EsRelation.ENTRY,
@@ -87,6 +92,7 @@ public static List<NamedWriteableRegistry.Entry> logical() {
8792
MvExpand.ENTRY,
8893
OrderBy.ENTRY,
8994
Project.ENTRY,
95+
Rerank.ENTRY,
9096
Sample.ENTRY,
9197
Subquery.ENTRY,
9298
TimeSeriesAggregate.ENTRY,
@@ -97,6 +103,7 @@ public static List<NamedWriteableRegistry.Entry> logical() {
97103
public static List<NamedWriteableRegistry.Entry> physical() {
98104
return List.of(
99105
AggregateExec.ENTRY,
106+
CompletionExec.ENTRY,
100107
DissectExec.ENTRY,
101108
EnrichExec.ENTRY,
102109
EsSourceExec.ENTRY,
@@ -113,6 +120,7 @@ public static List<NamedWriteableRegistry.Entry> physical() {
113120
LocalSourceExec.ENTRY,
114121
MvExpandExec.ENTRY,
115122
ProjectExec.ENTRY,
123+
RerankExec.ENTRY,
116124
SampleExec.ENTRY,
117125
ShowExec.ENTRY,
118126
SubqueryExec.ENTRY,

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/inference/Completion.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@
77

88
package org.elasticsearch.xpack.esql.plan.logical.inference;
99

10+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
11+
import org.elasticsearch.common.io.stream.StreamInput;
12+
import org.elasticsearch.common.io.stream.StreamOutput;
13+
import org.elasticsearch.common.settings.Settings;
1014
import org.elasticsearch.inference.TaskType;
1115
import org.elasticsearch.xpack.esql.capabilities.PostAnalysisVerificationAware;
1216
import org.elasticsearch.xpack.esql.capabilities.TelemetryAware;
@@ -19,19 +23,30 @@
1923
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
2024
import org.elasticsearch.xpack.esql.core.tree.Source;
2125
import org.elasticsearch.xpack.esql.core.type.DataType;
26+
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
2227
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
2328

29+
import java.io.IOException;
2430
import java.util.List;
2531
import java.util.Objects;
2632

2733
import static org.elasticsearch.xpack.esql.common.Failure.fail;
2834
import static org.elasticsearch.xpack.esql.core.type.DataType.TEXT;
2935
import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes;
36+
import static org.elasticsearch.xpack.esql.inference.InferenceSettings.COMPLETION_ROW_LIMIT_SETTING;
3037

3138
public class Completion extends InferencePlan<Completion> implements TelemetryAware, PostAnalysisVerificationAware {
3239

3340
public static final String DEFAULT_OUTPUT_FIELD_NAME = "completion";
3441

42+
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
43+
LogicalPlan.class,
44+
"Completion",
45+
Completion::new
46+
);
47+
48+
private static final Literal DEFAULT_ROW_LIMIT = Literal.integer(Source.EMPTY, COMPLETION_ROW_LIMIT_SETTING.getDefault(Settings.EMPTY));
49+
3550
private final Expression prompt;
3651
private final Attribute targetField;
3752
private List<Attribute> lazyOutput;
@@ -53,6 +68,29 @@ public Completion(
5368
this.targetField = targetField;
5469
}
5570

71+
public Completion(StreamInput in) throws IOException {
72+
this(
73+
Source.readFrom((PlanStreamInput) in),
74+
in.readNamedWriteable(LogicalPlan.class),
75+
in.readNamedWriteable(Expression.class),
76+
in.getTransportVersion().supports(ESQL_INFERENCE_ROW_LIMIT) ? in.readNamedWriteable(Expression.class) : DEFAULT_ROW_LIMIT,
77+
in.readNamedWriteable(Expression.class),
78+
in.readNamedWriteable(Attribute.class)
79+
);
80+
}
81+
82+
@Override
83+
public void writeTo(StreamOutput out) throws IOException {
84+
super.writeTo(out);
85+
out.writeNamedWriteable(prompt);
86+
out.writeNamedWriteable(targetField);
87+
}
88+
89+
@Override
90+
public String getWriteableName() {
91+
return ENTRY.name;
92+
}
93+
5694
public Expression prompt() {
5795
return prompt;
5896
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/inference/InferencePlan.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
package org.elasticsearch.xpack.esql.plan.logical.inference;
99

10+
import org.elasticsearch.TransportVersion;
1011
import org.elasticsearch.common.io.stream.StreamOutput;
1112
import org.elasticsearch.inference.TaskType;
1213
import org.elasticsearch.xpack.esql.core.expression.Expression;
@@ -35,6 +36,8 @@ public abstract class InferencePlan<PlanType extends InferencePlan<PlanType>> ex
3536
ExecutesOn.Coordinator,
3637
SurrogateLogicalPlan {
3738

39+
protected static final TransportVersion ESQL_INFERENCE_ROW_LIMIT = TransportVersion.fromName("esql_inference_row_limit");
40+
3841
public static final String INFERENCE_ID_OPTION_NAME = "inference_id";
3942
public static final List<String> VALID_INFERENCE_OPTION_NAMES = List.of(INFERENCE_ID_OPTION_NAME);
4043

@@ -49,12 +52,12 @@ protected InferencePlan(Source source, LogicalPlan child, Expression inferenceId
4952

5053
@Override
5154
public void writeTo(StreamOutput out) throws IOException {
52-
throw new UnsupportedOperationException("doesn't escape the coordinator node");
53-
}
54-
55-
@Override
56-
public String getWriteableName() {
57-
throw new UnsupportedOperationException("doesn't escape the coordinator node");
55+
source().writeTo(out);
56+
out.writeNamedWriteable(child());
57+
out.writeNamedWriteable(inferenceId());
58+
if (out.getTransportVersion().supports(ESQL_INFERENCE_ROW_LIMIT)) {
59+
out.writeNamedWriteable(rowLimit());
60+
}
5861
}
5962

6063
public Expression inferenceId() {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/inference/Rerank.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@
77

88
package org.elasticsearch.xpack.esql.plan.logical.inference;
99

10+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
11+
import org.elasticsearch.common.io.stream.StreamInput;
12+
import org.elasticsearch.common.io.stream.StreamOutput;
13+
import org.elasticsearch.common.settings.Settings;
1014
import org.elasticsearch.inference.TaskType;
1115
import org.elasticsearch.xpack.esql.capabilities.PostAnalysisVerificationAware;
1216
import org.elasticsearch.xpack.esql.capabilities.TelemetryAware;
@@ -21,19 +25,25 @@
2125
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
2226
import org.elasticsearch.xpack.esql.core.tree.Source;
2327
import org.elasticsearch.xpack.esql.core.type.DataType;
28+
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
2429
import org.elasticsearch.xpack.esql.plan.logical.Eval;
2530
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
2631
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
2732

33+
import java.io.IOException;
2834
import java.util.List;
2935
import java.util.Objects;
3036
import java.util.function.Predicate;
3137

3238
import static org.elasticsearch.xpack.esql.common.Failure.fail;
3339
import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes;
40+
import static org.elasticsearch.xpack.esql.inference.InferenceSettings.RERANK_ROW_LIMIT_SETTING;
3441

3542
public class Rerank extends InferencePlan<Rerank> implements PostAnalysisVerificationAware, TelemetryAware {
3643

44+
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(LogicalPlan.class, "Rerank", Rerank::new);
45+
46+
private static final Literal DEFAULT_ROW_LIMIT = Literal.integer(Source.EMPTY, RERANK_ROW_LIMIT_SETTING.getDefault(Settings.EMPTY));
3747
private static final String DEFAULT_INFERENCE_ID = ".rerank-v1-elasticsearch";
3848

3949
private final Attribute scoreAttribute;
@@ -67,6 +77,31 @@ public Rerank(
6777
this.scoreAttribute = scoreAttribute;
6878
}
6979

80+
public Rerank(StreamInput in) throws IOException {
81+
this(
82+
Source.readFrom((PlanStreamInput) in),
83+
in.readNamedWriteable(LogicalPlan.class),
84+
in.readNamedWriteable(Expression.class),
85+
in.getTransportVersion().supports(ESQL_INFERENCE_ROW_LIMIT) ? in.readNamedWriteable(Expression.class) : DEFAULT_ROW_LIMIT,
86+
in.readNamedWriteable(Expression.class),
87+
in.readCollectionAsList(Alias::new),
88+
in.readNamedWriteable(Attribute.class)
89+
);
90+
}
91+
92+
@Override
93+
public void writeTo(StreamOutput out) throws IOException {
94+
super.writeTo(out);
95+
out.writeNamedWriteable(queryText);
96+
out.writeCollection(rerankFields());
97+
out.writeNamedWriteable(scoreAttribute);
98+
}
99+
100+
@Override
101+
public String getWriteableName() {
102+
return ENTRY.name;
103+
}
104+
70105
public Expression queryText() {
71106
return queryText;
72107
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/inference/CompletionExec.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,21 +7,32 @@
77

88
package org.elasticsearch.xpack.esql.plan.physical.inference;
99

10+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
11+
import org.elasticsearch.common.io.stream.StreamInput;
12+
import org.elasticsearch.common.io.stream.StreamOutput;
1013
import org.elasticsearch.xpack.esql.core.expression.Attribute;
1114
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
1215
import org.elasticsearch.xpack.esql.core.expression.Expression;
1316
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
1417
import org.elasticsearch.xpack.esql.core.tree.Source;
18+
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
1519
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
1620
import org.elasticsearch.xpack.esql.plan.physical.UnaryExec;
1721

22+
import java.io.IOException;
1823
import java.util.List;
1924
import java.util.Objects;
2025

2126
import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes;
2227

2328
public class CompletionExec extends InferenceExec {
2429

30+
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
31+
PhysicalPlan.class,
32+
"CompletionExec",
33+
CompletionExec::new
34+
);
35+
2536
private final Expression prompt;
2637
private final Attribute targetField;
2738
private List<Attribute> lazyOutput;
@@ -32,6 +43,28 @@ public CompletionExec(Source source, PhysicalPlan child, Expression inferenceId,
3243
this.targetField = targetField;
3344
}
3445

46+
public CompletionExec(StreamInput in) throws IOException {
47+
this(
48+
Source.readFrom((PlanStreamInput) in),
49+
in.readNamedWriteable(PhysicalPlan.class),
50+
in.readNamedWriteable(Expression.class),
51+
in.readNamedWriteable(Expression.class),
52+
in.readNamedWriteable(Attribute.class)
53+
);
54+
}
55+
56+
@Override
57+
public String getWriteableName() {
58+
return ENTRY.name;
59+
}
60+
61+
@Override
62+
public void writeTo(StreamOutput out) throws IOException {
63+
super.writeTo(out);
64+
out.writeNamedWriteable(prompt);
65+
out.writeNamedWriteable(targetField);
66+
}
67+
3568
public Expression prompt() {
3669
return prompt;
3770
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/inference/InferenceExec.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,9 @@ public Expression inferenceId() {
3030

3131
@Override
3232
public void writeTo(StreamOutput out) throws IOException {
33-
throw new UnsupportedOperationException("doesn't escape the coordinator node");
34-
}
35-
36-
@Override
37-
public String getWriteableName() {
38-
throw new UnsupportedOperationException("doesn't escape the coordinator node");
33+
source().writeTo(out);
34+
out.writeNamedWriteable(child());
35+
out.writeNamedWriteable(inferenceId());
3936
}
4037

4138
@Override

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/inference/RerankExec.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,23 +7,34 @@
77

88
package org.elasticsearch.xpack.esql.plan.physical.inference;
99

10+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
11+
import org.elasticsearch.common.io.stream.StreamInput;
12+
import org.elasticsearch.common.io.stream.StreamOutput;
1013
import org.elasticsearch.xpack.esql.core.expression.Alias;
1114
import org.elasticsearch.xpack.esql.core.expression.Attribute;
1215
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
1316
import org.elasticsearch.xpack.esql.core.expression.Expression;
1417
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
1518
import org.elasticsearch.xpack.esql.core.tree.Source;
19+
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
1620
import org.elasticsearch.xpack.esql.plan.logical.inference.Rerank;
1721
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
1822
import org.elasticsearch.xpack.esql.plan.physical.UnaryExec;
1923

24+
import java.io.IOException;
2025
import java.util.List;
2126
import java.util.Objects;
2227

2328
import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes;
2429

2530
public class RerankExec extends InferenceExec {
2631

32+
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
33+
PhysicalPlan.class,
34+
"RerankExec",
35+
RerankExec::new
36+
);
37+
2738
private final Expression queryText;
2839
private final List<Alias> rerankFields;
2940
private final Attribute scoreAttribute;
@@ -43,6 +54,17 @@ public RerankExec(
4354
this.scoreAttribute = scoreAttribute;
4455
}
4556

57+
public RerankExec(StreamInput in) throws IOException {
58+
this(
59+
Source.readFrom((PlanStreamInput) in),
60+
in.readNamedWriteable(PhysicalPlan.class),
61+
in.readNamedWriteable(Expression.class),
62+
in.readNamedWriteable(Expression.class),
63+
in.readCollectionAsList(Alias::new),
64+
in.readNamedWriteable(Attribute.class)
65+
);
66+
}
67+
4668
public Expression queryText() {
4769
return queryText;
4870
}
@@ -55,6 +77,19 @@ public Attribute scoreAttribute() {
5577
return scoreAttribute;
5678
}
5779

80+
@Override
81+
public String getWriteableName() {
82+
return ENTRY.name;
83+
}
84+
85+
@Override
86+
public void writeTo(StreamOutput out) throws IOException {
87+
super.writeTo(out);
88+
out.writeNamedWriteable(queryText());
89+
out.writeCollection(rerankFields());
90+
out.writeNamedWriteable(scoreAttribute);
91+
}
92+
5893
@Override
5994
protected NodeInfo<? extends PhysicalPlan> info() {
6095
return NodeInfo.create(this, RerankExec::new, child(), inferenceId(), queryText, rerankFields, scoreAttribute);

0 commit comments

Comments
 (0)