Skip to content

Commit 81f31f3

Browse files
authored
Refactor TransportVersion loading to support external consumers (#132694) (#132831)
This change moves transport version loading out of TransportVersion.VersionsHolder, so that is can be consumed elsewhere by projects using the same resource file structure. Jira: ES-12401
1 parent a6e4941 commit 81f31f3

File tree

8 files changed

+141
-112
lines changed

8 files changed

+141
-112
lines changed

server/src/main/java/org/elasticsearch/TransportVersion.java

Lines changed: 89 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -82,47 +82,121 @@ public TransportVersion(int id) {
8282
this(null, id, null);
8383
}
8484

85+
interface BufferedReaderParser<T> {
86+
T parse(String component, String path, BufferedReader bufferedReader);
87+
}
88+
89+
static <T> T parseFromBufferedReader(
90+
String component,
91+
String path,
92+
Function<String, InputStream> nameToStream,
93+
BufferedReaderParser<T> parser
94+
) {
95+
try (InputStream inputStream = nameToStream.apply(path)) {
96+
if (inputStream == null) {
97+
return null;
98+
}
99+
try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
100+
return parser.parse(component, path, bufferedReader);
101+
}
102+
} catch (IOException ioe) {
103+
throw new UncheckedIOException("parsing error [" + component + ":" + path + "]", ioe);
104+
}
105+
}
106+
85107
/**
86108
* Constructs a named transport version along with its set of compatible patch versions from x-content.
87109
* This method takes in the parameter {@code latest} which is the highest valid transport version id
88110
* supported by this node. Versions newer than the current transport version id for this node are discarded.
89111
*/
90-
public static TransportVersion fromInputStream(String path, boolean nameInFile, InputStream stream, Integer latest) {
91-
try (BufferedReader reader = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8))) {
92-
String line = reader.readLine();
112+
public static TransportVersion fromBufferedReader(
113+
String component,
114+
String path,
115+
boolean nameInFile,
116+
BufferedReader bufferedReader,
117+
Integer latest
118+
) {
119+
try {
120+
String line = bufferedReader.readLine();
93121
String[] parts = line.replaceAll("\\s+", "").split(",");
94122
String check;
95-
while ((check = reader.readLine()) != null) {
123+
while ((check = bufferedReader.readLine()) != null) {
96124
if (check.replaceAll("\\s+", "").isEmpty() == false) {
97-
throw new IllegalArgumentException("invalid transport version file format [" + path + "]");
125+
throw new IllegalArgumentException("invalid transport version file format [" + toComponentPath(component, path) + "]");
98126
}
99127
}
100128
if (parts.length < (nameInFile ? 2 : 1)) {
101-
throw new IllegalStateException("invalid transport version file format [" + path + "]");
129+
throw new IllegalStateException("invalid transport version file format [" + toComponentPath(component, path) + "]");
102130
}
103131
String name = nameInFile ? parts[0] : path.substring(path.lastIndexOf('/') + 1, path.length() - 4);
104132
List<Integer> ids = new ArrayList<>();
105133
for (int i = nameInFile ? 1 : 0; i < parts.length; ++i) {
106134
try {
107135
ids.add(Integer.parseInt(parts[i]));
108136
} catch (NumberFormatException nfe) {
109-
throw new IllegalStateException("invalid transport version file format [" + path + "]", nfe);
137+
throw new IllegalStateException(
138+
"invalid transport version file format [" + toComponentPath(component, path) + "]",
139+
nfe
140+
);
110141
}
111142
}
112-
ids.sort(Integer::compareTo);
113143
TransportVersion transportVersion = null;
114-
for (int idIndex = 0; idIndex < ids.size(); ++idIndex) {
144+
for (int idIndex = ids.size() - 1; idIndex >= 0; --idIndex) {
145+
if (idIndex > 0 && ids.get(idIndex - 1) <= ids.get(idIndex)) {
146+
throw new IllegalStateException("invalid transport version file format [" + toComponentPath(component, path) + "]");
147+
}
115148
if (ids.get(idIndex) > latest) {
116149
break;
117150
}
118151
transportVersion = new TransportVersion(name, ids.get(idIndex), transportVersion);
119152
}
120153
return transportVersion;
121154
} catch (IOException ioe) {
122-
throw new UncheckedIOException("cannot parse transport version [" + path + "]", ioe);
155+
throw new UncheckedIOException("invalid transport version file format [" + toComponentPath(component, path) + "]", ioe);
123156
}
124157
}
125158

159+
public static Map<String, TransportVersion> collectFromInputStreams(
160+
String component,
161+
Function<String, InputStream> nameToStream,
162+
String latestFileName
163+
) {
164+
TransportVersion latest = parseFromBufferedReader(
165+
component,
166+
"/transport/latest/" + latestFileName,
167+
nameToStream,
168+
(c, p, br) -> fromBufferedReader(c, p, true, br, Integer.MAX_VALUE)
169+
);
170+
if (latest != null) {
171+
List<String> versionFilesNames = parseFromBufferedReader(
172+
component,
173+
"/transport/defined/manifest.txt",
174+
nameToStream,
175+
(c, p, br) -> br.lines().filter(line -> line.isBlank() == false).toList()
176+
);
177+
if (versionFilesNames != null) {
178+
Map<String, TransportVersion> transportVersions = new HashMap<>();
179+
for (String versionFileName : versionFilesNames) {
180+
TransportVersion transportVersion = parseFromBufferedReader(
181+
component,
182+
"/transport/defined/" + versionFileName,
183+
nameToStream,
184+
(c, p, br) -> fromBufferedReader(c, p, false, br, latest.id())
185+
);
186+
if (transportVersion != null) {
187+
transportVersions.put(versionFileName.substring(0, versionFileName.length() - 4), transportVersion);
188+
}
189+
}
190+
return transportVersions;
191+
}
192+
}
193+
return Map.of();
194+
}
195+
196+
private static String toComponentPath(String component, String path) {
197+
return component + ":" + path;
198+
}
199+
126200
public static TransportVersion readVersion(StreamInput in) throws IOException {
127201
return fromId(in.readVInt());
128202
}
@@ -345,7 +419,11 @@ private static class VersionsHolder {
345419
static {
346420
// collect all the transport versions from server and es modules/plugins (defined in server)
347421
List<TransportVersion> allVersions = new ArrayList<>(TransportVersions.DEFINED_VERSIONS);
348-
Map<String, TransportVersion> allVersionsByName = loadTransportVersionsByName();
422+
Map<String, TransportVersion> allVersionsByName = collectFromInputStreams(
423+
"<server>",
424+
TransportVersion.class::getResourceAsStream,
425+
Version.CURRENT.major + "." + Version.CURRENT.minor + ".csv"
426+
);
349427
addTransportVersions(allVersionsByName.values(), allVersions).sort(TransportVersion::compareTo);
350428

351429
// set version lookup by release before adding serverless versions
@@ -373,65 +451,6 @@ private static class VersionsHolder {
373451
CURRENT = ALL_VERSIONS.getLast();
374452
}
375453

376-
private static Map<String, TransportVersion> loadTransportVersionsByName() {
377-
Map<String, TransportVersion> transportVersions = new HashMap<>();
378-
379-
String latestLocation = "/transport/latest/" + Version.CURRENT.major + "." + Version.CURRENT.minor + ".csv";
380-
int latestId = -1;
381-
try (InputStream inputStream = TransportVersion.class.getResourceAsStream(latestLocation)) {
382-
// this check is required until bootstrapping for the new transport versions format is completed;
383-
// when load is false, we will only use the transport versions in the legacy format;
384-
// load becomes false if we don't find the latest or manifest files required for the new format
385-
if (inputStream != null) {
386-
TransportVersion latest = fromInputStream(latestLocation, true, inputStream, Integer.MAX_VALUE);
387-
if (latest == null) {
388-
throw new IllegalStateException(
389-
"invalid latest transport version for minor version ["
390-
+ Version.CURRENT.major
391-
+ "."
392-
+ Version.CURRENT.minor
393-
+ "]"
394-
);
395-
}
396-
latestId = latest.id();
397-
}
398-
} catch (IOException ioe) {
399-
throw new UncheckedIOException("latest transport version file not found at [" + latestLocation + "]", ioe);
400-
}
401-
402-
String manifestLocation = "/transport/defined/manifest.txt";
403-
List<String> versionFileNames = null;
404-
if (latestId > -1) {
405-
try (InputStream inputStream = TransportVersion.class.getResourceAsStream(manifestLocation)) {
406-
if (inputStream != null) {
407-
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
408-
versionFileNames = reader.lines().filter(line -> line.isBlank() == false).toList();
409-
}
410-
} catch (IOException ioe) {
411-
throw new UncheckedIOException("transport version manifest file not found at [" + manifestLocation + "]", ioe);
412-
}
413-
}
414-
415-
if (versionFileNames != null) {
416-
for (String name : versionFileNames) {
417-
String versionLocation = "/transport/defined/" + name;
418-
try (InputStream inputStream = TransportVersion.class.getResourceAsStream(versionLocation)) {
419-
if (inputStream == null) {
420-
throw new IllegalStateException("transport version file not found at [" + versionLocation + "]");
421-
}
422-
TransportVersion transportVersion = TransportVersion.fromInputStream(versionLocation, false, inputStream, latestId);
423-
if (transportVersion != null) {
424-
transportVersions.put(transportVersion.name(), transportVersion);
425-
}
426-
} catch (IOException ioe) {
427-
throw new UncheckedIOException("transport version file not found at [ " + versionLocation + "]", ioe);
428-
}
429-
}
430-
}
431-
432-
return transportVersions;
433-
}
434-
435454
private static List<TransportVersion> addTransportVersions(Collection<TransportVersion> addFrom, List<TransportVersion> addTo) {
436455
for (TransportVersion transportVersion : addFrom) {
437456
addTo.add(transportVersion);

server/src/test/java/org/elasticsearch/TransportVersionTests.java

Lines changed: 52 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,11 @@
1212
import org.elasticsearch.test.ESTestCase;
1313
import org.elasticsearch.test.TransportVersionUtils;
1414

15+
import java.io.BufferedReader;
16+
import java.io.ByteArrayInputStream;
17+
import java.io.InputStreamReader;
1518
import java.lang.reflect.Modifier;
19+
import java.nio.charset.StandardCharsets;
1620
import java.util.List;
1721
import java.util.Set;
1822
import java.util.regex.Matcher;
@@ -221,50 +225,53 @@ public void testDuplicateConstants() {
221225
}
222226
}
223227

224-
public void testFromName() {
225-
assertThat(TransportVersion.fromName("test_0"), is(new TransportVersion("test_0", 3001000, null)));
226-
assertThat(TransportVersion.fromName("test_1"), is(new TransportVersion("test_1", 3002000, null)));
227-
assertThat(
228-
TransportVersion.fromName("test_2"),
229-
is(
230-
new TransportVersion(
231-
"test_2",
232-
3003000,
233-
new TransportVersion("test_2", 2001001, new TransportVersion("test_2", 1001001, null))
234-
)
235-
)
236-
);
237-
assertThat(
238-
TransportVersion.fromName("test_3"),
239-
is(new TransportVersion("test_3", 3003001, new TransportVersion("test_3", 2001002, null)))
240-
);
241-
assertThat(
242-
TransportVersion.fromName("test_4"),
243-
is(
244-
new TransportVersion(
245-
"test_4",
246-
3003002,
247-
new TransportVersion("test_4", 2001003, new TransportVersion("test_4", 1001002, null))
248-
)
249-
)
228+
public void testLatest() {
229+
TransportVersion latest = TransportVersion.parseFromBufferedReader(
230+
"<test>",
231+
"/transport/defined/" + Version.CURRENT.major + "." + Version.CURRENT.minor + ".csv",
232+
TransportVersion.class::getResourceAsStream,
233+
(c, p, br) -> TransportVersion.fromBufferedReader(c, p, true, br, Integer.MAX_VALUE)
250234
);
235+
// TODO: once placeholder is removed, test the latest known version can be found fromName
236+
// assertThat(latest, is(TransportVersion.fromName(latest.name())));
251237
}
252238

253239
public void testSupports() {
254-
TransportVersion test0 = TransportVersion.fromName("test_0");
240+
byte[] data0 = "100001000,3001000".getBytes(StandardCharsets.UTF_8);
241+
TransportVersion test0 = TransportVersion.fromBufferedReader(
242+
"<test>",
243+
"testSupports0",
244+
false,
245+
new BufferedReader(new InputStreamReader(new ByteArrayInputStream(data0), StandardCharsets.UTF_8)),
246+
5000000
247+
);
255248
assertThat(new TransportVersion(null, 2003000, null).supports(test0), is(false));
256249
assertThat(new TransportVersion(null, 3001000, null).supports(test0), is(true));
257250
assertThat(new TransportVersion(null, 100001001, null).supports(test0), is(true));
258251

259-
TransportVersion test1 = TransportVersion.fromName("test_1");
252+
byte[] data1 = "3002000".getBytes(StandardCharsets.UTF_8);
253+
TransportVersion test1 = TransportVersion.fromBufferedReader(
254+
"<test>",
255+
"testSupports1",
256+
false,
257+
new BufferedReader(new InputStreamReader(new ByteArrayInputStream(data1), StandardCharsets.UTF_8)),
258+
5000000
259+
);
260260
assertThat(new TransportVersion(null, 2003000, null).supports(test1), is(false));
261261
assertThat(new TransportVersion(null, 3001000, null).supports(test1), is(false));
262262
assertThat(new TransportVersion(null, 3001001, null).supports(test1), is(false));
263263
assertThat(new TransportVersion(null, 3002000, null).supports(test1), is(true));
264264
assertThat(new TransportVersion(null, 100001000, null).supports(test1), is(true));
265265
assertThat(new TransportVersion(null, 100001001, null).supports(test1), is(true));
266266

267-
TransportVersion test2 = TransportVersion.fromName("test_2");
267+
byte[] data2 = "3003000,2001001,1001001".getBytes(StandardCharsets.UTF_8);
268+
TransportVersion test2 = TransportVersion.fromBufferedReader(
269+
"<test>",
270+
"testSupports2",
271+
false,
272+
new BufferedReader(new InputStreamReader(new ByteArrayInputStream(data2), StandardCharsets.UTF_8)),
273+
5000000
274+
);
268275
assertThat(new TransportVersion(null, 1001000, null).supports(test2), is(false));
269276
assertThat(new TransportVersion(null, 1001001, null).supports(test2), is(true));
270277
assertThat(new TransportVersion(null, 1001002, null).supports(test2), is(true));
@@ -284,7 +291,14 @@ public void testSupports() {
284291
assertThat(new TransportVersion(null, 100001000, null).supports(test2), is(true));
285292
assertThat(new TransportVersion(null, 100001001, null).supports(test2), is(true));
286293

287-
TransportVersion test3 = TransportVersion.fromName("test_3");
294+
byte[] data3 = "100002000,3003001,2001002".getBytes(StandardCharsets.UTF_8);
295+
TransportVersion test3 = TransportVersion.fromBufferedReader(
296+
"<test>",
297+
"testSupports3",
298+
false,
299+
new BufferedReader(new InputStreamReader(new ByteArrayInputStream(data3), StandardCharsets.UTF_8)),
300+
5000000
301+
);
288302
assertThat(new TransportVersion(null, 1001001, null).supports(test3), is(false));
289303
assertThat(new TransportVersion(null, 1001002, null).supports(test3), is(false));
290304
assertThat(new TransportVersion(null, 1001003, null).supports(test3), is(false));
@@ -305,7 +319,14 @@ public void testSupports() {
305319
assertThat(new TransportVersion(null, 100001000, null).supports(test3), is(true));
306320
assertThat(new TransportVersion(null, 100001001, null).supports(test3), is(true));
307321

308-
TransportVersion test4 = TransportVersion.fromName("test_4");
322+
byte[] data4 = "100002000,3003002,2001003,1001002".getBytes(StandardCharsets.UTF_8);
323+
TransportVersion test4 = TransportVersion.fromBufferedReader(
324+
"<test>",
325+
"testSupports3",
326+
false,
327+
new BufferedReader(new InputStreamReader(new ByteArrayInputStream(data4), StandardCharsets.UTF_8)),
328+
5000000
329+
);
309330
assertThat(new TransportVersion(null, 1001001, null).supports(test4), is(false));
310331
assertThat(new TransportVersion(null, 1001002, null).supports(test4), is(true));
311332
assertThat(new TransportVersion(null, 1001003, null).supports(test4), is(true));

server/src/test/resources/transport/defined/manifest.txt

Lines changed: 0 additions & 5 deletions
This file was deleted.

server/src/test/resources/transport/defined/test_0.csv

Lines changed: 0 additions & 1 deletion
This file was deleted.

server/src/test/resources/transport/defined/test_1.csv

Lines changed: 0 additions & 2 deletions
This file was deleted.

server/src/test/resources/transport/defined/test_2.csv

Lines changed: 0 additions & 1 deletion
This file was deleted.

server/src/test/resources/transport/defined/test_3.csv

Lines changed: 0 additions & 1 deletion
This file was deleted.

server/src/test/resources/transport/defined/test_4.csv

Lines changed: 0 additions & 1 deletion
This file was deleted.

0 commit comments

Comments
 (0)