Skip to content

Commit c547bd3

Browse files
authored
Fix the transport version of PlanStreamOutput (#103758) (#103767) (#103768)
The backward compatibility of ESQL is not handled correctly because PlanStreamOutput doesn't return the transport version from its delegate. The reason we didn't notice these failures is that the mixed cluster QA doesn't upgrade two nodes to the current version, leaving all nodes in the cluster with BWC versions.
1 parent d35aa81 commit c547bd3

File tree

3 files changed

+73
-6
lines changed

3 files changed

+73
-6
lines changed

docs/changelog/103758.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 103758
2+
summary: Fix the transport version of `PlanStreamOutput`
3+
area: ES|QL
4+
type: bug
5+
issues: []

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanStreamOutput.java

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
package org.elasticsearch.xpack.esql.io.stream;
99

10-
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
10+
import org.elasticsearch.TransportVersion;
1111
import org.elasticsearch.common.io.stream.StreamOutput;
1212
import org.elasticsearch.xpack.esql.io.stream.PlanNameRegistry.PlanWriter;
1313
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
@@ -24,18 +24,19 @@
2424
* A customized stream output used to serialize ESQL physical plan fragments. Complements stream
2525
* output with methods that write plan nodes, Attributes, Expressions, etc.
2626
*/
27-
public final class PlanStreamOutput extends OutputStreamStreamOutput {
27+
public final class PlanStreamOutput extends StreamOutput {
2828

29+
private final StreamOutput delegate;
2930
private final PlanNameRegistry registry;
3031

3132
private final Function<Class<?>, String> nameSupplier;
3233

33-
public PlanStreamOutput(StreamOutput streamOutput, PlanNameRegistry registry) {
34-
this(streamOutput, registry, PlanNamedTypes::name);
34+
public PlanStreamOutput(StreamOutput delegate, PlanNameRegistry registry) {
35+
this(delegate, registry, PlanNamedTypes::name);
3536
}
3637

37-
public PlanStreamOutput(StreamOutput streamOutput, PlanNameRegistry registry, Function<Class<?>, String> nameSupplier) {
38-
super(streamOutput);
38+
public PlanStreamOutput(StreamOutput delegate, PlanNameRegistry registry, Function<Class<?>, String> nameSupplier) {
39+
this.delegate = delegate;
3940
this.registry = registry;
4041
this.nameSupplier = nameSupplier;
4142
}
@@ -89,4 +90,35 @@ public <T> void writeNamed(Class<T> type, T value) throws IOException {
8990
writeString(name);
9091
writer.write(this, value);
9192
}
93+
94+
@Override
95+
public void writeByte(byte b) throws IOException {
96+
delegate.writeByte(b);
97+
}
98+
99+
@Override
100+
public void writeBytes(byte[] b, int offset, int length) throws IOException {
101+
delegate.writeBytes(b, offset, length);
102+
}
103+
104+
@Override
105+
public void flush() throws IOException {
106+
delegate.flush();
107+
}
108+
109+
@Override
110+
public void close() throws IOException {
111+
delegate.close();
112+
}
113+
114+
@Override
115+
public TransportVersion getTransportVersion() {
116+
return delegate.getTransportVersion();
117+
}
118+
119+
@Override
120+
public void setTransportVersion(TransportVersion version) {
121+
delegate.setTransportVersion(version);
122+
super.setTransportVersion(version);
123+
}
92124
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.io.stream;
9+
10+
import org.elasticsearch.TransportVersion;
11+
import org.elasticsearch.common.io.stream.BytesStreamOutput;
12+
import org.elasticsearch.test.ESTestCase;
13+
import org.elasticsearch.test.TransportVersionUtils;
14+
15+
import static org.hamcrest.Matchers.equalTo;
16+
17+
public class PlanStreamOutputTests extends ESTestCase {
18+
19+
public void testTransportVersion() {
20+
BytesStreamOutput out = new BytesStreamOutput();
21+
TransportVersion v1 = TransportVersionUtils.randomCompatibleVersion(random());
22+
out.setTransportVersion(v1);
23+
PlanStreamOutput planOut = new PlanStreamOutput(out, PlanNameRegistry.INSTANCE);
24+
assertThat(planOut.getTransportVersion(), equalTo(v1));
25+
TransportVersion v2 = TransportVersionUtils.randomCompatibleVersion(random());
26+
planOut.setTransportVersion(v2);
27+
assertThat(planOut.getTransportVersion(), equalTo(v2));
28+
assertThat(out.getTransportVersion(), equalTo(v2));
29+
}
30+
}

0 commit comments

Comments
 (0)