Skip to content

Commit 54709c5

Browse files
authored
Refactor TransportVersion loading to support external consumers (elastic#132694) (elastic#132862)
This change moves transport version loading out of TransportVersion.VersionsHolder, so that is can be consumed elsewhere by projects using the same resource file structure. Jira: ES-12401
1 parent f131654 commit 54709c5

File tree

8 files changed

+141
-112
lines changed

8 files changed

+141
-112
lines changed

server/src/main/java/org/elasticsearch/TransportVersion.java

Lines changed: 89 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -79,47 +79,121 @@ public TransportVersion(int id) {
7979
this(null, id, null);
8080
}
8181

82+
interface BufferedReaderParser<T> {
83+
T parse(String component, String path, BufferedReader bufferedReader);
84+
}
85+
86+
static <T> T parseFromBufferedReader(
87+
String component,
88+
String path,
89+
Function<String, InputStream> nameToStream,
90+
BufferedReaderParser<T> parser
91+
) {
92+
try (InputStream inputStream = nameToStream.apply(path)) {
93+
if (inputStream == null) {
94+
return null;
95+
}
96+
try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
97+
return parser.parse(component, path, bufferedReader);
98+
}
99+
} catch (IOException ioe) {
100+
throw new UncheckedIOException("parsing error [" + component + ":" + path + "]", ioe);
101+
}
102+
}
103+
82104
/**
83105
* Constructs a named transport version along with its set of compatible patch versions from x-content.
84106
* This method takes in the parameter {@code latest} which is the highest valid transport version id
85107
* supported by this node. Versions newer than the current transport version id for this node are discarded.
86108
*/
87-
public static TransportVersion fromInputStream(String path, boolean nameInFile, InputStream stream, Integer latest) {
88-
try (BufferedReader reader = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8))) {
89-
String line = reader.readLine();
109+
public static TransportVersion fromBufferedReader(
110+
String component,
111+
String path,
112+
boolean nameInFile,
113+
BufferedReader bufferedReader,
114+
Integer latest
115+
) {
116+
try {
117+
String line = bufferedReader.readLine();
90118
String[] parts = line.replaceAll("\\s+", "").split(",");
91119
String check;
92-
while ((check = reader.readLine()) != null) {
120+
while ((check = bufferedReader.readLine()) != null) {
93121
if (check.replaceAll("\\s+", "").isEmpty() == false) {
94-
throw new IllegalArgumentException("invalid transport version file format [" + path + "]");
122+
throw new IllegalArgumentException("invalid transport version file format [" + toComponentPath(component, path) + "]");
95123
}
96124
}
97125
if (parts.length < (nameInFile ? 2 : 1)) {
98-
throw new IllegalStateException("invalid transport version file format [" + path + "]");
126+
throw new IllegalStateException("invalid transport version file format [" + toComponentPath(component, path) + "]");
99127
}
100128
String name = nameInFile ? parts[0] : path.substring(path.lastIndexOf('/') + 1, path.length() - 4);
101129
List<Integer> ids = new ArrayList<>();
102130
for (int i = nameInFile ? 1 : 0; i < parts.length; ++i) {
103131
try {
104132
ids.add(Integer.parseInt(parts[i]));
105133
} catch (NumberFormatException nfe) {
106-
throw new IllegalStateException("invalid transport version file format [" + path + "]", nfe);
134+
throw new IllegalStateException(
135+
"invalid transport version file format [" + toComponentPath(component, path) + "]",
136+
nfe
137+
);
107138
}
108139
}
109-
ids.sort(Integer::compareTo);
110140
TransportVersion transportVersion = null;
111-
for (int idIndex = 0; idIndex < ids.size(); ++idIndex) {
141+
for (int idIndex = ids.size() - 1; idIndex >= 0; --idIndex) {
142+
if (idIndex > 0 && ids.get(idIndex - 1) <= ids.get(idIndex)) {
143+
throw new IllegalStateException("invalid transport version file format [" + toComponentPath(component, path) + "]");
144+
}
112145
if (ids.get(idIndex) > latest) {
113146
break;
114147
}
115148
transportVersion = new TransportVersion(name, ids.get(idIndex), transportVersion);
116149
}
117150
return transportVersion;
118151
} catch (IOException ioe) {
119-
throw new UncheckedIOException("cannot parse transport version [" + path + "]", ioe);
152+
throw new UncheckedIOException("invalid transport version file format [" + toComponentPath(component, path) + "]", ioe);
120153
}
121154
}
122155

156+
public static Map<String, TransportVersion> collectFromInputStreams(
157+
String component,
158+
Function<String, InputStream> nameToStream,
159+
String latestFileName
160+
) {
161+
TransportVersion latest = parseFromBufferedReader(
162+
component,
163+
"/transport/latest/" + latestFileName,
164+
nameToStream,
165+
(c, p, br) -> fromBufferedReader(c, p, true, br, Integer.MAX_VALUE)
166+
);
167+
if (latest != null) {
168+
List<String> versionFilesNames = parseFromBufferedReader(
169+
component,
170+
"/transport/defined/manifest.txt",
171+
nameToStream,
172+
(c, p, br) -> br.lines().filter(line -> line.isBlank() == false).toList()
173+
);
174+
if (versionFilesNames != null) {
175+
Map<String, TransportVersion> transportVersions = new HashMap<>();
176+
for (String versionFileName : versionFilesNames) {
177+
TransportVersion transportVersion = parseFromBufferedReader(
178+
component,
179+
"/transport/defined/" + versionFileName,
180+
nameToStream,
181+
(c, p, br) -> fromBufferedReader(c, p, false, br, latest.id())
182+
);
183+
if (transportVersion != null) {
184+
transportVersions.put(versionFileName.substring(0, versionFileName.length() - 4), transportVersion);
185+
}
186+
}
187+
return transportVersions;
188+
}
189+
}
190+
return Map.of();
191+
}
192+
193+
private static String toComponentPath(String component, String path) {
194+
return component + ":" + path;
195+
}
196+
123197
public static TransportVersion readVersion(StreamInput in) throws IOException {
124198
return fromId(in.readVInt());
125199
}
@@ -337,7 +411,11 @@ private static class VersionsHolder {
337411
static {
338412
// collect all the transport versions from server and es modules/plugins (defined in server)
339413
List<TransportVersion> allVersions = new ArrayList<>(TransportVersions.DEFINED_VERSIONS);
340-
Map<String, TransportVersion> allVersionsByName = loadTransportVersionsByName();
414+
Map<String, TransportVersion> allVersionsByName = collectFromInputStreams(
415+
"<server>",
416+
TransportVersion.class::getResourceAsStream,
417+
Version.CURRENT.major + "." + Version.CURRENT.minor + ".csv"
418+
);
341419
addTransportVersions(allVersionsByName.values(), allVersions).sort(TransportVersion::compareTo);
342420

343421
// set the transport version lookups
@@ -351,65 +429,6 @@ private static class VersionsHolder {
351429
CURRENT = ALL_VERSIONS.get(ALL_VERSIONS.size() - 1);
352430
}
353431

354-
private static Map<String, TransportVersion> loadTransportVersionsByName() {
355-
Map<String, TransportVersion> transportVersions = new HashMap<>();
356-
357-
String latestLocation = "/transport/latest/" + Version.CURRENT.major + "." + Version.CURRENT.minor + ".csv";
358-
int latestId = -1;
359-
try (InputStream inputStream = TransportVersion.class.getResourceAsStream(latestLocation)) {
360-
// this check is required until bootstrapping for the new transport versions format is completed;
361-
// when load is false, we will only use the transport versions in the legacy format;
362-
// load becomes false if we don't find the latest or manifest files required for the new format
363-
if (inputStream != null) {
364-
TransportVersion latest = fromInputStream(latestLocation, true, inputStream, Integer.MAX_VALUE);
365-
if (latest == null) {
366-
throw new IllegalStateException(
367-
"invalid latest transport version for minor version ["
368-
+ Version.CURRENT.major
369-
+ "."
370-
+ Version.CURRENT.minor
371-
+ "]"
372-
);
373-
}
374-
latestId = latest.id();
375-
}
376-
} catch (IOException ioe) {
377-
throw new UncheckedIOException("latest transport version file not found at [" + latestLocation + "]", ioe);
378-
}
379-
380-
String manifestLocation = "/transport/defined/manifest.txt";
381-
List<String> versionFileNames = null;
382-
if (latestId > -1) {
383-
try (InputStream inputStream = TransportVersion.class.getResourceAsStream(manifestLocation)) {
384-
if (inputStream != null) {
385-
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
386-
versionFileNames = reader.lines().filter(line -> line.isBlank() == false).toList();
387-
}
388-
} catch (IOException ioe) {
389-
throw new UncheckedIOException("transport version manifest file not found at [" + manifestLocation + "]", ioe);
390-
}
391-
}
392-
393-
if (versionFileNames != null) {
394-
for (String name : versionFileNames) {
395-
String versionLocation = "/transport/defined/" + name;
396-
try (InputStream inputStream = TransportVersion.class.getResourceAsStream(versionLocation)) {
397-
if (inputStream == null) {
398-
throw new IllegalStateException("transport version file not found at [" + versionLocation + "]");
399-
}
400-
TransportVersion transportVersion = TransportVersion.fromInputStream(versionLocation, false, inputStream, latestId);
401-
if (transportVersion != null) {
402-
transportVersions.put(transportVersion.name(), transportVersion);
403-
}
404-
} catch (IOException ioe) {
405-
throw new UncheckedIOException("transport version file not found at [ " + versionLocation + "]", ioe);
406-
}
407-
}
408-
}
409-
410-
return transportVersions;
411-
}
412-
413432
private static List<TransportVersion> addTransportVersions(Collection<TransportVersion> addFrom, List<TransportVersion> addTo) {
414433
for (TransportVersion transportVersion : addFrom) {
415434
addTo.add(transportVersion);

server/src/test/java/org/elasticsearch/TransportVersionTests.java

Lines changed: 52 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,11 @@
1212
import org.elasticsearch.test.ESTestCase;
1313
import org.elasticsearch.test.TransportVersionUtils;
1414

15+
import java.io.BufferedReader;
16+
import java.io.ByteArrayInputStream;
17+
import java.io.InputStreamReader;
1518
import java.lang.reflect.Modifier;
19+
import java.nio.charset.StandardCharsets;
1620
import java.util.Collections;
1721
import java.util.List;
1822
import java.util.Set;
@@ -268,50 +272,53 @@ public void testDuplicateConstants() {
268272
}
269273
}
270274

271-
public void testFromName() {
272-
assertThat(TransportVersion.fromName("test_0"), is(new TransportVersion("test_0", 3001000, null)));
273-
assertThat(TransportVersion.fromName("test_1"), is(new TransportVersion("test_1", 3002000, null)));
274-
assertThat(
275-
TransportVersion.fromName("test_2"),
276-
is(
277-
new TransportVersion(
278-
"test_2",
279-
3003000,
280-
new TransportVersion("test_2", 2001001, new TransportVersion("test_2", 1001001, null))
281-
)
282-
)
283-
);
284-
assertThat(
285-
TransportVersion.fromName("test_3"),
286-
is(new TransportVersion("test_3", 3003001, new TransportVersion("test_3", 2001002, null)))
287-
);
288-
assertThat(
289-
TransportVersion.fromName("test_4"),
290-
is(
291-
new TransportVersion(
292-
"test_4",
293-
3003002,
294-
new TransportVersion("test_4", 2001003, new TransportVersion("test_4", 1001002, null))
295-
)
296-
)
275+
public void testLatest() {
276+
TransportVersion latest = TransportVersion.parseFromBufferedReader(
277+
"<test>",
278+
"/transport/defined/" + Version.CURRENT.major + "." + Version.CURRENT.minor + ".csv",
279+
TransportVersion.class::getResourceAsStream,
280+
(c, p, br) -> TransportVersion.fromBufferedReader(c, p, true, br, Integer.MAX_VALUE)
297281
);
282+
// TODO: once placeholder is removed, test the latest known version can be found fromName
283+
// assertThat(latest, is(TransportVersion.fromName(latest.name())));
298284
}
299285

300286
public void testSupports() {
301-
TransportVersion test0 = TransportVersion.fromName("test_0");
287+
byte[] data0 = "100001000,3001000".getBytes(StandardCharsets.UTF_8);
288+
TransportVersion test0 = TransportVersion.fromBufferedReader(
289+
"<test>",
290+
"testSupports0",
291+
false,
292+
new BufferedReader(new InputStreamReader(new ByteArrayInputStream(data0), StandardCharsets.UTF_8)),
293+
5000000
294+
);
302295
assertThat(new TransportVersion(null, 2003000, null).supports(test0), is(false));
303296
assertThat(new TransportVersion(null, 3001000, null).supports(test0), is(true));
304297
assertThat(new TransportVersion(null, 100001001, null).supports(test0), is(true));
305298

306-
TransportVersion test1 = TransportVersion.fromName("test_1");
299+
byte[] data1 = "3002000".getBytes(StandardCharsets.UTF_8);
300+
TransportVersion test1 = TransportVersion.fromBufferedReader(
301+
"<test>",
302+
"testSupports1",
303+
false,
304+
new BufferedReader(new InputStreamReader(new ByteArrayInputStream(data1), StandardCharsets.UTF_8)),
305+
5000000
306+
);
307307
assertThat(new TransportVersion(null, 2003000, null).supports(test1), is(false));
308308
assertThat(new TransportVersion(null, 3001000, null).supports(test1), is(false));
309309
assertThat(new TransportVersion(null, 3001001, null).supports(test1), is(false));
310310
assertThat(new TransportVersion(null, 3002000, null).supports(test1), is(true));
311311
assertThat(new TransportVersion(null, 100001000, null).supports(test1), is(true));
312312
assertThat(new TransportVersion(null, 100001001, null).supports(test1), is(true));
313313

314-
TransportVersion test2 = TransportVersion.fromName("test_2");
314+
byte[] data2 = "3003000,2001001,1001001".getBytes(StandardCharsets.UTF_8);
315+
TransportVersion test2 = TransportVersion.fromBufferedReader(
316+
"<test>",
317+
"testSupports2",
318+
false,
319+
new BufferedReader(new InputStreamReader(new ByteArrayInputStream(data2), StandardCharsets.UTF_8)),
320+
5000000
321+
);
315322
assertThat(new TransportVersion(null, 1001000, null).supports(test2), is(false));
316323
assertThat(new TransportVersion(null, 1001001, null).supports(test2), is(true));
317324
assertThat(new TransportVersion(null, 1001002, null).supports(test2), is(true));
@@ -331,7 +338,14 @@ public void testSupports() {
331338
assertThat(new TransportVersion(null, 100001000, null).supports(test2), is(true));
332339
assertThat(new TransportVersion(null, 100001001, null).supports(test2), is(true));
333340

334-
TransportVersion test3 = TransportVersion.fromName("test_3");
341+
byte[] data3 = "100002000,3003001,2001002".getBytes(StandardCharsets.UTF_8);
342+
TransportVersion test3 = TransportVersion.fromBufferedReader(
343+
"<test>",
344+
"testSupports3",
345+
false,
346+
new BufferedReader(new InputStreamReader(new ByteArrayInputStream(data3), StandardCharsets.UTF_8)),
347+
5000000
348+
);
335349
assertThat(new TransportVersion(null, 1001001, null).supports(test3), is(false));
336350
assertThat(new TransportVersion(null, 1001002, null).supports(test3), is(false));
337351
assertThat(new TransportVersion(null, 1001003, null).supports(test3), is(false));
@@ -352,7 +366,14 @@ public void testSupports() {
352366
assertThat(new TransportVersion(null, 100001000, null).supports(test3), is(true));
353367
assertThat(new TransportVersion(null, 100001001, null).supports(test3), is(true));
354368

355-
TransportVersion test4 = TransportVersion.fromName("test_4");
369+
byte[] data4 = "100002000,3003002,2001003,1001002".getBytes(StandardCharsets.UTF_8);
370+
TransportVersion test4 = TransportVersion.fromBufferedReader(
371+
"<test>",
372+
"testSupports3",
373+
false,
374+
new BufferedReader(new InputStreamReader(new ByteArrayInputStream(data4), StandardCharsets.UTF_8)),
375+
5000000
376+
);
356377
assertThat(new TransportVersion(null, 1001001, null).supports(test4), is(false));
357378
assertThat(new TransportVersion(null, 1001002, null).supports(test4), is(true));
358379
assertThat(new TransportVersion(null, 1001003, null).supports(test4), is(true));

server/src/test/resources/transport/defined/manifest.txt

Lines changed: 0 additions & 5 deletions
This file was deleted.

server/src/test/resources/transport/defined/test_0.csv

Lines changed: 0 additions & 1 deletion
This file was deleted.

server/src/test/resources/transport/defined/test_1.csv

Lines changed: 0 additions & 2 deletions
This file was deleted.

server/src/test/resources/transport/defined/test_2.csv

Lines changed: 0 additions & 1 deletion
This file was deleted.

server/src/test/resources/transport/defined/test_3.csv

Lines changed: 0 additions & 1 deletion
This file was deleted.

server/src/test/resources/transport/defined/test_4.csv

Lines changed: 0 additions & 1 deletion
This file was deleted.

0 commit comments

Comments
 (0)