Skip to content

Commit 48ce548

Browse files
committed
load transport version sets
1 parent ce38dd4 commit 48ce548

File tree

3 files changed

+82
-122
lines changed

3 files changed

+82
-122
lines changed

server/src/main/java/org/elasticsearch/TransportVersion.java

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -196,17 +196,8 @@ private static class VersionsHolder {
196196
.orElse(Collections.emptyList());
197197

198198
if (extendedVersions.isEmpty()) {
199-
//ALL_VERSIONS = TransportVersions.DEFINED_VERSIONS;
200-
// TODO: remove for testing
201199
ALL_VERSIONS = new ArrayList<>(TransportVersions.DEFINED_VERSIONS);
202-
ALL_VERSIONS.add(new TransportVersion(9114000));
203-
ALL_VERSIONS.add(new TransportVersion(9115000));
204-
ALL_VERSIONS.add(new TransportVersion(8841063));
205-
ALL_VERSIONS.add(new TransportVersion(9112001));
206-
ALL_VERSIONS.add(new TransportVersion(9116000));
207-
ALL_VERSIONS.add(new TransportVersion(9117000));
208-
ALL_VERSIONS.add(new TransportVersion(9118000));
209-
// TODO: end testing
200+
ALL_VERSIONS.addAll(TransportVersionSet.TRANSPORT_VERSIONS);
210201
} else {
211202
ALL_VERSIONS = Stream.concat(TransportVersions.DEFINED_VERSIONS.stream(), extendedVersions.stream()).sorted().toList();
212203
}

server/src/main/java/org/elasticsearch/TransportVersionSet.java

Lines changed: 73 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,14 @@
1111

1212
import org.elasticsearch.xcontent.ConstructingObjectParser;
1313
import org.elasticsearch.xcontent.ParseField;
14+
import org.elasticsearch.xcontent.XContentParser;
15+
import org.elasticsearch.xcontent.XContentParserConfiguration;
16+
import org.elasticsearch.xcontent.json.JsonXContent;
1417

18+
import java.io.BufferedReader;
1519
import java.io.IOException;
1620
import java.io.InputStream;
21+
import java.io.InputStreamReader;
1722
import java.io.UncheckedIOException;
1823
import java.util.ArrayList;
1924
import java.util.Collections;
@@ -24,19 +29,19 @@
2429
public class TransportVersionSet {
2530

2631
private static final ParseField NAME = new ParseField("name");
27-
private static final ParseField VERSIONS = new ParseField("versions");
32+
private static final ParseField IDS = new ParseField("ids");
2833

2934
private static final ConstructingObjectParser<TransportVersionSet, Integer> PARSER = new ConstructingObjectParser<>(
3035
TransportVersionSet.class.getCanonicalName(),
3136
false,
3237
(args, latestTransportId) -> {
3338
String name = (String) args[0];
34-
int[] ids = (int[]) args[1];
35-
List<TransportVersion> versions = new ArrayList<>(ids.length);
36-
for (int id = 0; id < ids.length; ++id) {
37-
TransportVersion version = new TransportVersion(id);
39+
@SuppressWarnings("unchecked")
40+
List<Integer> ids = (List<Integer>) args[1];
41+
List<TransportVersion> versions = new ArrayList<>(ids.size());
42+
for (int id = 0; id < ids.size(); ++id) {
3843
if (id <= latestTransportId) {
39-
versions.add(version);
44+
versions.add(new TransportVersion(ids.get(id)));
4045
}
4146
}
4247
if (versions.isEmpty()) {
@@ -48,57 +53,81 @@ public class TransportVersionSet {
4853

4954
static {
5055
PARSER.declareString(ConstructingObjectParser.constructorArg(), NAME);
51-
PARSER.declareIntArray(ConstructingObjectParser.constructorArg(), VERSIONS);
56+
PARSER.declareIntArray(ConstructingObjectParser.constructorArg(), IDS);
5257
}
5358

5459
private static final Map<String, TransportVersionSet> TRANSPORT_VERSION_SETS = loadTransportVersionSets();
55-
private static final List<TransportVersion> TRANSPORT_VERSIONS = collectTranportVersions();
60+
public static final List<TransportVersion> TRANSPORT_VERSIONS = collectTransportVersions();
5661

57-
private static final String MANIFEST_LOCATION = "META-INF/transport-versions-files-manifest.txt";
58-
private static final String METADATA_LOCATION = "org/elasticsearch/transport/";
5962
private static final String LATEST_SUFFIX = "-LATEST.json";
6063

6164
private static Map<String, TransportVersionSet> loadTransportVersionSets() {
6265
Map<String, TransportVersionSet> transportVersionSets = new HashMap<>();
6366

64-
String latestResourceLocation = Version.CURRENT.major + "-" + Version.CURRENT.minor + LATEST_SUFFIX;
65-
try (InputStream inputStream = TransportVersionSet.class.getResourceAsStream(latestResourceLocation)) {
67+
String latestLocation = "transport/" + Version.CURRENT.major + "." + Version.CURRENT.minor + LATEST_SUFFIX;
68+
int latestId = 0;
69+
try (InputStream inputStream = TransportVersionSet.class.getResourceAsStream(latestLocation)) {
70+
TransportVersionSet latest = fromXContent(inputStream, Integer.MAX_VALUE);
71+
// TODO: validation of latest tranport version set
72+
latestId = latest.versions.get(0).id();
73+
} catch (IOException ioe) {
74+
throw new UncheckedIOException("latest transport version file not found at [" + latestLocation + "]", ioe);
75+
}
6676

77+
String manifestLocation = "META-INF/transport-versions-files-manifest.txt";
78+
List<String> versionNames;
79+
try (InputStream transportVersionManifest = TransportVersionSet.class.getClassLoader().getResourceAsStream(manifestLocation)) {
80+
BufferedReader reader = new BufferedReader(new InputStreamReader(transportVersionManifest));
81+
versionNames = reader.lines().filter(line -> line.isBlank() == false).toList();
6782
} catch (IOException ioe) {
68-
throw new UncheckedIOException(ioe);
83+
throw new UncheckedIOException("transport version metadata manifest file not found at [" + manifestLocation + "]", ioe);
84+
}
85+
86+
for (String name : versionNames) {
87+
String versionLocation = "transport/" + name;
88+
try (InputStream inputStream = TransportVersionSet.class.getResourceAsStream(versionLocation)) {
89+
if (inputStream != null) {
90+
TransportVersionSet transportVersionSet = TransportVersionSet.fromXContent(inputStream, latestId);
91+
transportVersionSets.put(name, transportVersionSet);
92+
} else {
93+
throw new RuntimeException("Input stream is null");
94+
}
95+
} catch (IOException ioe) {
96+
throw new UncheckedIOException("transport version set file not found at [ " + versionLocation + "]", ioe);
97+
}
6998
}
7099

71100
// TODO: load
72101
// TODO: TEST ONLY
73-
transportVersionSets.put("ml-inference-custom-service-embedding-type", new TransportVersionSet(
74-
"ml-inference-custom-service-embedding-type",
75-
List.of(new TransportVersion(9118000))
76-
));
77-
transportVersionSets.put("esql-local-relation-with-new-blocks", new TransportVersionSet(
78-
"esql-local-relation-with-new-blocks",
79-
List.of(new TransportVersion(9117000))
80-
));
81-
transportVersionSets.put("esql-split-on-big-values", new TransportVersionSet(
82-
"esql-split-on-big-values",
83-
List.of(
84-
new TransportVersion(9116000),
85-
new TransportVersion(9112001),
86-
new TransportVersion(8841063)
87-
)
88-
));
89-
transportVersionSets.put("ml-inference-ibm-watsonx-completion-added", new TransportVersionSet(
90-
"ml-inference-ibm-watsonx-completion-added",
91-
List.of(new TransportVersion(9115000))
92-
));
93-
transportVersionSets.put("esql-serialize-timeseries-field-type", new TransportVersionSet(
94-
"esql-serialize-timeseries-field-type",
95-
List.of(new TransportVersion(9114000))
96-
));
97-
// TODO: END TEST ONLY
102+
// transportVersionSets.put("ml-inference-custom-service-embedding-type", new TransportVersionSet(
103+
// "ml-inference-custom-service-embedding-type",
104+
// List.of(new TransportVersion(9118000))
105+
// ));
106+
// transportVersionSets.put("esql-local-relation-with-new-blocks", new TransportVersionSet(
107+
// "esql-local-relation-with-new-blocks",
108+
// List.of(new TransportVersion(9117000))
109+
// ));
110+
// transportVersionSets.put("esql-split-on-big-values", new TransportVersionSet(
111+
// "esql-split-on-big-values",
112+
// List.of(
113+
// new TransportVersion(9116000),
114+
// new TransportVersion(9112001),
115+
// new TransportVersion(8841063)
116+
// )
117+
// ));
118+
// transportVersionSets.put("ml-inference-ibm-watsonx-completion-added", new TransportVersionSet(
119+
// "ml-inference-ibm-watsonx-completion-added",
120+
// List.of(new TransportVersion(9115000))
121+
// ));
122+
// transportVersionSets.put("esql-serialize-timeseries-field-type", new TransportVersionSet(
123+
// "esql-serialize-timeseries-field-type",
124+
// List.of(new TransportVersion(9114000))
125+
// ));
126+
// // TODO: END TEST ONLY
98127
return Collections.unmodifiableMap(transportVersionSets);
99128
}
100129

101-
private static List<TransportVersion> collectTranportVersions() {
130+
private static List<TransportVersion> collectTransportVersions() {
102131
List<TransportVersion> tranportVersions = new ArrayList<>();
103132
for (TransportVersionSet transportVersionSet : TRANSPORT_VERSION_SETS.values()) {
104133
for (TransportVersion transportVersion : transportVersionSet.versions) {
@@ -109,6 +138,11 @@ private static List<TransportVersion> collectTranportVersions() {
109138
return tranportVersions;
110139
}
111140

141+
private static TransportVersionSet fromXContent(InputStream inputStream, int maxTransportId) throws IOException {
142+
XContentParser parser = JsonXContent.jsonXContent.createParser(XContentParserConfiguration.EMPTY, inputStream);
143+
return PARSER.parse(parser, maxTransportId);
144+
}
145+
112146
public static TransportVersionSet get(String name) {
113147
TransportVersionSet transportVersionSet = TRANSPORT_VERSION_SETS.get(name);
114148
if (transportVersionSet == null) {
@@ -153,75 +187,4 @@ public boolean isCompatible(TransportVersion version) {
153187
public String toString() {
154188
return "TransportVersionSet{" + "name='" + name + '\'' + ", versions=" + versions + '}';
155189
}
156-
157-
// private static TransportVersionSet fromJSONInputStream(InputStream inputStream) throws IOException {
158-
// XContentParser parser = JsonXContent.jsonXContent.createParser(XContentParserConfiguration.EMPTY, inputStream);
159-
// return TransportVersionSet.fromXContent(parser);
160-
// }
161-
//
162-
//
163-
//
164-
//
165-
// static {
166-
// var locallyDefinedVersionSets = loadTransportVersionSets();
167-
// var extendedVersionSets = new ArrayList<TransportVersionSet>(); // TODO add SPI for serverless
168-
//
169-
// var allVersionSets = Stream.concat(
170-
// locallyDefinedVersionSets.stream(),
171-
// extendedVersionSets.stream()
172-
// ).toList();
173-
//
174-
// ALL_VERSIONS_SORTED = allVersionSets.stream().flatMap(tvSet -> tvSet.versions.stream()).sorted().toList();
175-
//
176-
// ALL_VERSIONS_MAP = ALL_VERSIONS_SORTED.stream()
177-
// .collect(Collectors.toUnmodifiableMap(TransportVersion::id, Function.identity()));
178-
//
179-
// ALL_VERSIONS_SETS_MAP = allVersionSets.stream()
180-
// .collect(Collectors.toUnmodifiableMap(TransportVersionSet::name, Function.identity()));
181-
//
182-
// LATEST = getLatestTVSet();
183-
// }
184-
//
185-
// private static TransportVersionSet getLatestTVSet() {
186-
//
187-
// var major = Version.CURRENT.major;
188-
// var minor = Version.CURRENT.minor;
189-
// var fileName = major + "." + minor + "-" + LATEST_SUFFIX;
190-
//
191-
// var path = METADATA_LOCATION + fileName;
192-
// return loadTransportVersionSet(path); // todo, use a different format?
193-
// }
194-
//
195-
// private static List<TransportVersionSet> loadTransportVersionSets() {
196-
// var transportVersionSets = new ArrayList<TransportVersionSet>();
197-
// for (var fileName : loadTransportVersionSetFileNames()) {
198-
// var path = METADATA_LOCATION + fileName;
199-
// transportVersionSets.add(loadTransportVersionSet(path));
200-
// }
201-
// return transportVersionSets;
202-
// }
203-
//
204-
// // TODO will getResourceAsStream work for IntelliJ?
205-
// private static List<String> loadTransportVersionSetFileNames() {
206-
// try (InputStream transportVersionManifest = TransportVersionSet.class.getResourceAsStream(MANIFEST_LOCATION)) {
207-
// BufferedReader reader = new BufferedReader(new InputStreamReader(transportVersionManifest));
208-
// return reader.lines().filter(line -> line.isBlank() == false).toList();
209-
// } catch (IOException e) {
210-
// throw new RuntimeException("Transport version metadata manifest file not found at " + MANIFEST_LOCATION, e);
211-
// }
212-
// }
213-
//
214-
// private static TransportVersionSet loadTransportVersionSet(String path) {
215-
// try (var fileStream = VersionsHolder.class.getResourceAsStream(path)) {
216-
// if (fileStream != null) {
217-
// return TransportVersionSet.fromJSONInputStream(fileStream);
218-
// } else {
219-
// throw new RuntimeException("Input stream is null");
220-
// }
221-
// } catch (IOException e) {
222-
// throw new RuntimeException("Failed to load TransportVersionSet at path: " + path +
223-
// " specified in the manifest file: " + MANIFEST_LOCATION, e);
224-
// }
225-
// }
226-
// }
227190
}

server/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import java.util.List;
5656
import java.util.Locale;
5757
import java.util.Map;
58+
import java.util.Objects;
5859
import java.util.Set;
5960
import java.util.concurrent.TimeUnit;
6061
import java.util.function.Function;
@@ -76,20 +77,25 @@
7677
*/
7778
public abstract class StreamInput extends InputStream {
7879

79-
private TransportVersion version = TransportVersion.current();
80+
// We set this to null to ensure TransportVersion.<clinit> is not
81+
// executed prior to logging initialization.
82+
private TransportVersion version = null;
8083

8184
/**
8285
* The transport version the data is serialized as.
8386
*/
8487
public TransportVersion getTransportVersion() {
88+
if (this.version == null) {
89+
version = TransportVersion.current();
90+
}
8591
return this.version;
8692
}
8793

8894
/**
8995
* Set the transport version of the data in this stream.
9096
*/
9197
public void setTransportVersion(TransportVersion version) {
92-
this.version = version;
98+
this.version = Objects.requireNonNull(version);
9399
}
94100

95101
/**

0 commit comments

Comments
 (0)