diff --git a/server/src/main/java/org/elasticsearch/TransportVersion.java b/server/src/main/java/org/elasticsearch/TransportVersion.java index d31bb2a5e5495..a90274496aca6 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersion.java +++ b/server/src/main/java/org/elasticsearch/TransportVersion.java @@ -82,23 +82,51 @@ public TransportVersion(int id) { this(null, id, null); } + interface BufferedReaderParser { + T parse(String component, String path, BufferedReader bufferedReader); + } + + static T parseFromBufferedReader( + String component, + String path, + Function nameToStream, + BufferedReaderParser parser + ) { + try (InputStream inputStream = nameToStream.apply(path)) { + if (inputStream == null) { + return null; + } + try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) { + return parser.parse(component, path, bufferedReader); + } + } catch (IOException ioe) { + throw new UncheckedIOException("parsing error [" + component + ":" + path + "]", ioe); + } + } + /** * Constructs a named transport version along with its set of compatible patch versions from x-content. * This method takes in the parameter {@code latest} which is the highest valid transport version id * supported by this node. Versions newer than the current transport version id for this node are discarded. */ - public static TransportVersion fromInputStream(String path, boolean nameInFile, InputStream stream, Integer latest) { - try (BufferedReader reader = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8))) { - String line = reader.readLine(); + public static TransportVersion fromBufferedReader( + String component, + String path, + boolean nameInFile, + BufferedReader bufferedReader, + Integer latest + ) { + try { + String line = bufferedReader.readLine(); String[] parts = line.replaceAll("\\s+", "").split(","); String check; - while ((check = reader.readLine()) != null) { + while ((check = bufferedReader.readLine()) != null) { if (check.replaceAll("\\s+", "").isEmpty() == false) { - throw new IllegalArgumentException("invalid transport version file format [" + path + "]"); + throw new IllegalArgumentException("invalid transport version file format [" + toComponentPath(component, path) + "]"); } } if (parts.length < (nameInFile ? 2 : 1)) { - throw new IllegalStateException("invalid transport version file format [" + path + "]"); + throw new IllegalStateException("invalid transport version file format [" + toComponentPath(component, path) + "]"); } String name = nameInFile ? parts[0] : path.substring(path.lastIndexOf('/') + 1, path.length() - 4); List ids = new ArrayList<>(); @@ -106,12 +134,17 @@ public static TransportVersion fromInputStream(String path, boolean nameInFile, try { ids.add(Integer.parseInt(parts[i])); } catch (NumberFormatException nfe) { - throw new IllegalStateException("invalid transport version file format [" + path + "]", nfe); + throw new IllegalStateException( + "invalid transport version file format [" + toComponentPath(component, path) + "]", + nfe + ); } } - ids.sort(Integer::compareTo); TransportVersion transportVersion = null; - for (int idIndex = 0; idIndex < ids.size(); ++idIndex) { + for (int idIndex = ids.size() - 1; idIndex >= 0; --idIndex) { + if (idIndex > 0 && ids.get(idIndex - 1) <= ids.get(idIndex)) { + throw new IllegalStateException("invalid transport version file format [" + toComponentPath(component, path) + "]"); + } if (ids.get(idIndex) > latest) { break; } @@ -119,10 +152,51 @@ public static TransportVersion fromInputStream(String path, boolean nameInFile, } return transportVersion; } catch (IOException ioe) { - throw new UncheckedIOException("cannot parse transport version [" + path + "]", ioe); + throw new UncheckedIOException("invalid transport version file format [" + toComponentPath(component, path) + "]", ioe); } } + public static Map collectFromInputStreams( + String component, + Function nameToStream, + String latestFileName + ) { + TransportVersion latest = parseFromBufferedReader( + component, + "/transport/latest/" + latestFileName, + nameToStream, + (c, p, br) -> fromBufferedReader(c, p, true, br, Integer.MAX_VALUE) + ); + if (latest != null) { + List versionFilesNames = parseFromBufferedReader( + component, + "/transport/defined/manifest.txt", + nameToStream, + (c, p, br) -> br.lines().filter(line -> line.isBlank() == false).toList() + ); + if (versionFilesNames != null) { + Map transportVersions = new HashMap<>(); + for (String versionFileName : versionFilesNames) { + TransportVersion transportVersion = parseFromBufferedReader( + component, + "/transport/defined/" + versionFileName, + nameToStream, + (c, p, br) -> fromBufferedReader(c, p, false, br, latest.id()) + ); + if (transportVersion != null) { + transportVersions.put(versionFileName.substring(0, versionFileName.length() - 4), transportVersion); + } + } + return transportVersions; + } + } + return Map.of(); + } + + private static String toComponentPath(String component, String path) { + return component + ":" + path; + } + public static TransportVersion readVersion(StreamInput in) throws IOException { return fromId(in.readVInt()); } @@ -345,7 +419,11 @@ private static class VersionsHolder { static { // collect all the transport versions from server and es modules/plugins (defined in server) List allVersions = new ArrayList<>(TransportVersions.DEFINED_VERSIONS); - Map allVersionsByName = loadTransportVersionsByName(); + Map allVersionsByName = collectFromInputStreams( + "", + TransportVersion.class::getResourceAsStream, + Version.CURRENT.major + "." + Version.CURRENT.minor + ".csv" + ); addTransportVersions(allVersionsByName.values(), allVersions).sort(TransportVersion::compareTo); // set version lookup by release before adding serverless versions @@ -373,65 +451,6 @@ private static class VersionsHolder { CURRENT = ALL_VERSIONS.getLast(); } - private static Map loadTransportVersionsByName() { - Map transportVersions = new HashMap<>(); - - String latestLocation = "/transport/latest/" + Version.CURRENT.major + "." + Version.CURRENT.minor + ".csv"; - int latestId = -1; - try (InputStream inputStream = TransportVersion.class.getResourceAsStream(latestLocation)) { - // this check is required until bootstrapping for the new transport versions format is completed; - // when load is false, we will only use the transport versions in the legacy format; - // load becomes false if we don't find the latest or manifest files required for the new format - if (inputStream != null) { - TransportVersion latest = fromInputStream(latestLocation, true, inputStream, Integer.MAX_VALUE); - if (latest == null) { - throw new IllegalStateException( - "invalid latest transport version for minor version [" - + Version.CURRENT.major - + "." - + Version.CURRENT.minor - + "]" - ); - } - latestId = latest.id(); - } - } catch (IOException ioe) { - throw new UncheckedIOException("latest transport version file not found at [" + latestLocation + "]", ioe); - } - - String manifestLocation = "/transport/defined/manifest.txt"; - List versionFileNames = null; - if (latestId > -1) { - try (InputStream inputStream = TransportVersion.class.getResourceAsStream(manifestLocation)) { - if (inputStream != null) { - BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8)); - versionFileNames = reader.lines().filter(line -> line.isBlank() == false).toList(); - } - } catch (IOException ioe) { - throw new UncheckedIOException("transport version manifest file not found at [" + manifestLocation + "]", ioe); - } - } - - if (versionFileNames != null) { - for (String name : versionFileNames) { - String versionLocation = "/transport/defined/" + name; - try (InputStream inputStream = TransportVersion.class.getResourceAsStream(versionLocation)) { - if (inputStream == null) { - throw new IllegalStateException("transport version file not found at [" + versionLocation + "]"); - } - TransportVersion transportVersion = TransportVersion.fromInputStream(versionLocation, false, inputStream, latestId); - if (transportVersion != null) { - transportVersions.put(transportVersion.name(), transportVersion); - } - } catch (IOException ioe) { - throw new UncheckedIOException("transport version file not found at [ " + versionLocation + "]", ioe); - } - } - } - - return transportVersions; - } - private static List addTransportVersions(Collection addFrom, List addTo) { for (TransportVersion transportVersion : addFrom) { addTo.add(transportVersion); diff --git a/server/src/test/java/org/elasticsearch/TransportVersionTests.java b/server/src/test/java/org/elasticsearch/TransportVersionTests.java index be747ea10e8a6..1291496d7eaad 100644 --- a/server/src/test/java/org/elasticsearch/TransportVersionTests.java +++ b/server/src/test/java/org/elasticsearch/TransportVersionTests.java @@ -12,7 +12,11 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.TransportVersionUtils; +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.InputStreamReader; import java.lang.reflect.Modifier; +import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Set; import java.util.regex.Matcher; @@ -230,42 +234,38 @@ public void testDuplicateConstants() { } } - public void testFromName() { - assertThat(TransportVersion.fromName("test_0"), is(new TransportVersion("test_0", 3001000, null))); - assertThat(TransportVersion.fromName("test_1"), is(new TransportVersion("test_1", 3002000, null))); - assertThat( - TransportVersion.fromName("test_2"), - is( - new TransportVersion( - "test_2", - 3003000, - new TransportVersion("test_2", 2001001, new TransportVersion("test_2", 1001001, null)) - ) - ) - ); - assertThat( - TransportVersion.fromName("test_3"), - is(new TransportVersion("test_3", 3003001, new TransportVersion("test_3", 2001002, null))) - ); - assertThat( - TransportVersion.fromName("test_4"), - is( - new TransportVersion( - "test_4", - 3003002, - new TransportVersion("test_4", 2001003, new TransportVersion("test_4", 1001002, null)) - ) - ) + public void testLatest() { + TransportVersion latest = TransportVersion.parseFromBufferedReader( + "", + "/transport/defined/" + Version.CURRENT.major + "." + Version.CURRENT.minor + ".csv", + TransportVersion.class::getResourceAsStream, + (c, p, br) -> TransportVersion.fromBufferedReader(c, p, true, br, Integer.MAX_VALUE) ); + // TODO: once placeholder is removed, test the latest known version can be found fromName + // assertThat(latest, is(TransportVersion.fromName(latest.name()))); } public void testSupports() { - TransportVersion test0 = TransportVersion.fromName("test_0"); + byte[] data0 = "100001000,3001000".getBytes(StandardCharsets.UTF_8); + TransportVersion test0 = TransportVersion.fromBufferedReader( + "", + "testSupports0", + false, + new BufferedReader(new InputStreamReader(new ByteArrayInputStream(data0), StandardCharsets.UTF_8)), + 5000000 + ); assertThat(new TransportVersion(null, 2003000, null).supports(test0), is(false)); assertThat(new TransportVersion(null, 3001000, null).supports(test0), is(true)); assertThat(new TransportVersion(null, 100001001, null).supports(test0), is(true)); - TransportVersion test1 = TransportVersion.fromName("test_1"); + byte[] data1 = "3002000".getBytes(StandardCharsets.UTF_8); + TransportVersion test1 = TransportVersion.fromBufferedReader( + "", + "testSupports1", + false, + new BufferedReader(new InputStreamReader(new ByteArrayInputStream(data1), StandardCharsets.UTF_8)), + 5000000 + ); assertThat(new TransportVersion(null, 2003000, null).supports(test1), is(false)); assertThat(new TransportVersion(null, 3001000, null).supports(test1), is(false)); assertThat(new TransportVersion(null, 3001001, null).supports(test1), is(false)); @@ -273,7 +273,14 @@ public void testSupports() { assertThat(new TransportVersion(null, 100001000, null).supports(test1), is(true)); assertThat(new TransportVersion(null, 100001001, null).supports(test1), is(true)); - TransportVersion test2 = TransportVersion.fromName("test_2"); + byte[] data2 = "3003000,2001001,1001001".getBytes(StandardCharsets.UTF_8); + TransportVersion test2 = TransportVersion.fromBufferedReader( + "", + "testSupports2", + false, + new BufferedReader(new InputStreamReader(new ByteArrayInputStream(data2), StandardCharsets.UTF_8)), + 5000000 + ); assertThat(new TransportVersion(null, 1001000, null).supports(test2), is(false)); assertThat(new TransportVersion(null, 1001001, null).supports(test2), is(true)); assertThat(new TransportVersion(null, 1001002, null).supports(test2), is(true)); @@ -293,7 +300,14 @@ public void testSupports() { assertThat(new TransportVersion(null, 100001000, null).supports(test2), is(true)); assertThat(new TransportVersion(null, 100001001, null).supports(test2), is(true)); - TransportVersion test3 = TransportVersion.fromName("test_3"); + byte[] data3 = "100002000,3003001,2001002".getBytes(StandardCharsets.UTF_8); + TransportVersion test3 = TransportVersion.fromBufferedReader( + "", + "testSupports3", + false, + new BufferedReader(new InputStreamReader(new ByteArrayInputStream(data3), StandardCharsets.UTF_8)), + 5000000 + ); assertThat(new TransportVersion(null, 1001001, null).supports(test3), is(false)); assertThat(new TransportVersion(null, 1001002, null).supports(test3), is(false)); assertThat(new TransportVersion(null, 1001003, null).supports(test3), is(false)); @@ -314,7 +328,14 @@ public void testSupports() { assertThat(new TransportVersion(null, 100001000, null).supports(test3), is(true)); assertThat(new TransportVersion(null, 100001001, null).supports(test3), is(true)); - TransportVersion test4 = TransportVersion.fromName("test_4"); + byte[] data4 = "100002000,3003002,2001003,1001002".getBytes(StandardCharsets.UTF_8); + TransportVersion test4 = TransportVersion.fromBufferedReader( + "", + "testSupports3", + false, + new BufferedReader(new InputStreamReader(new ByteArrayInputStream(data4), StandardCharsets.UTF_8)), + 5000000 + ); assertThat(new TransportVersion(null, 1001001, null).supports(test4), is(false)); assertThat(new TransportVersion(null, 1001002, null).supports(test4), is(true)); assertThat(new TransportVersion(null, 1001003, null).supports(test4), is(true)); diff --git a/server/src/test/resources/transport/defined/manifest.txt b/server/src/test/resources/transport/defined/manifest.txt deleted file mode 100644 index b158f9910edf4..0000000000000 --- a/server/src/test/resources/transport/defined/manifest.txt +++ /dev/null @@ -1,5 +0,0 @@ -test_0.csv -test_1.csv -test_2.csv -test_3.csv -test_4.csv diff --git a/server/src/test/resources/transport/defined/test_0.csv b/server/src/test/resources/transport/defined/test_0.csv deleted file mode 100644 index 46b80e0a7f735..0000000000000 --- a/server/src/test/resources/transport/defined/test_0.csv +++ /dev/null @@ -1 +0,0 @@ -100001000,3001000 diff --git a/server/src/test/resources/transport/defined/test_1.csv b/server/src/test/resources/transport/defined/test_1.csv deleted file mode 100644 index 68f67c2ab7884..0000000000000 --- a/server/src/test/resources/transport/defined/test_1.csv +++ /dev/null @@ -1,2 +0,0 @@ -3002000 - diff --git a/server/src/test/resources/transport/defined/test_2.csv b/server/src/test/resources/transport/defined/test_2.csv deleted file mode 100644 index 5db5b13038410..0000000000000 --- a/server/src/test/resources/transport/defined/test_2.csv +++ /dev/null @@ -1 +0,0 @@ -3003000,2001001,1001001 diff --git a/server/src/test/resources/transport/defined/test_3.csv b/server/src/test/resources/transport/defined/test_3.csv deleted file mode 100644 index b9dd0509e1364..0000000000000 --- a/server/src/test/resources/transport/defined/test_3.csv +++ /dev/null @@ -1 +0,0 @@ -100002000,3003001,2001002 diff --git a/server/src/test/resources/transport/defined/test_4.csv b/server/src/test/resources/transport/defined/test_4.csv deleted file mode 100644 index 55c482a68ee7f..0000000000000 --- a/server/src/test/resources/transport/defined/test_4.csv +++ /dev/null @@ -1 +0,0 @@ -100002000,3003002,2001003,1001002