Skip to content
Merged
159 changes: 89 additions & 70 deletions server/src/main/java/org/elasticsearch/TransportVersion.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,47 +82,121 @@ public TransportVersion(int id) {
this(null, id, null);
}

interface BufferedReaderParser<T> {
T parse(String component, String path, BufferedReader bufferedReader);
}

static <T> T parseFromBufferedReader(
String component,
String path,
Function<String, InputStream> nameToStream,
BufferedReaderParser<T> parser
) {
try (InputStream inputStream = nameToStream.apply(path)) {
if (inputStream == null) {
return null;
}
try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
return parser.parse(component, path, bufferedReader);
}
} catch (IOException ioe) {
throw new UncheckedIOException("parsing error [" + component + ":" + path + "]", ioe);
}
}

/**
* Constructs a named transport version along with its set of compatible patch versions from x-content.
* This method takes in the parameter {@code latest} which is the highest valid transport version id
* supported by this node. Versions newer than the current transport version id for this node are discarded.
*/
public static TransportVersion fromInputStream(String path, boolean nameInFile, InputStream stream, Integer latest) {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8))) {
String line = reader.readLine();
public static TransportVersion fromBufferedReader(
String component,
String path,
boolean nameInFile,
BufferedReader bufferedReader,
Integer latest
) {
try {
String line = bufferedReader.readLine();
String[] parts = line.replaceAll("\\s+", "").split(",");
String check;
while ((check = reader.readLine()) != null) {
while ((check = bufferedReader.readLine()) != null) {
if (check.replaceAll("\\s+", "").isEmpty() == false) {
throw new IllegalArgumentException("invalid transport version file format [" + path + "]");
throw new IllegalArgumentException("invalid transport version file format [" + toComponentPath(component, path) + "]");
}
}
if (parts.length < (nameInFile ? 2 : 1)) {
throw new IllegalStateException("invalid transport version file format [" + path + "]");
throw new IllegalStateException("invalid transport version file format [" + toComponentPath(component, path) + "]");
}
String name = nameInFile ? parts[0] : path.substring(path.lastIndexOf('/') + 1, path.length() - 4);
List<Integer> ids = new ArrayList<>();
for (int i = nameInFile ? 1 : 0; i < parts.length; ++i) {
try {
ids.add(Integer.parseInt(parts[i]));
} catch (NumberFormatException nfe) {
throw new IllegalStateException("invalid transport version file format [" + path + "]", nfe);
throw new IllegalStateException(
"invalid transport version file format [" + toComponentPath(component, path) + "]",
nfe
);
}
}
ids.sort(Integer::compareTo);
TransportVersion transportVersion = null;
for (int idIndex = 0; idIndex < ids.size(); ++idIndex) {
for (int idIndex = ids.size() - 1; idIndex >= 0; --idIndex) {
if (idIndex > 0 && ids.get(idIndex - 1) <= ids.get(idIndex)) {
throw new IllegalStateException("invalid transport version file format [" + toComponentPath(component, path) + "]");
}
if (ids.get(idIndex) > latest) {
break;
}
transportVersion = new TransportVersion(name, ids.get(idIndex), transportVersion);
}
return transportVersion;
} catch (IOException ioe) {
throw new UncheckedIOException("cannot parse transport version [" + path + "]", ioe);
throw new UncheckedIOException("invalid transport version file format [" + toComponentPath(component, path) + "]", ioe);
}
}

public static Map<String, TransportVersion> collectFromInputStreams(
String component,
Function<String, InputStream> nameToStream,
String latestFileName
) {
TransportVersion latest = parseFromBufferedReader(
component,
"/transport/latest/" + latestFileName,
nameToStream,
(c, p, br) -> fromBufferedReader(c, p, true, br, Integer.MAX_VALUE)
);
if (latest != null) {
List<String> versionFilesNames = parseFromBufferedReader(
component,
"/transport/defined/manifest.txt",
nameToStream,
(c, p, br) -> br.lines().filter(line -> line.isBlank() == false).toList()
);
if (versionFilesNames != null) {
Map<String, TransportVersion> transportVersions = new HashMap<>();
for (String versionFileName : versionFilesNames) {
TransportVersion transportVersion = parseFromBufferedReader(
component,
"/transport/defined/" + versionFileName,
nameToStream,
(c, p, br) -> fromBufferedReader(c, p, false, br, latest.id())
);
if (transportVersion != null) {
transportVersions.put(versionFileName.substring(0, versionFileName.length() - 4), transportVersion);
}
}
return transportVersions;
}
}
return Map.of();
}

private static String toComponentPath(String component, String path) {
return component + ":" + path;
}

public static TransportVersion readVersion(StreamInput in) throws IOException {
return fromId(in.readVInt());
}
Expand Down Expand Up @@ -345,7 +419,11 @@ private static class VersionsHolder {
static {
// collect all the transport versions from server and es modules/plugins (defined in server)
List<TransportVersion> allVersions = new ArrayList<>(TransportVersions.DEFINED_VERSIONS);
Map<String, TransportVersion> allVersionsByName = loadTransportVersionsByName();
Map<String, TransportVersion> allVersionsByName = collectFromInputStreams(
"<server>",
TransportVersion.class::getResourceAsStream,
Version.CURRENT.major + "." + Version.CURRENT.minor + ".csv"
);
addTransportVersions(allVersionsByName.values(), allVersions).sort(TransportVersion::compareTo);

// set version lookup by release before adding serverless versions
Expand Down Expand Up @@ -373,65 +451,6 @@ private static class VersionsHolder {
CURRENT = ALL_VERSIONS.getLast();
}

private static Map<String, TransportVersion> loadTransportVersionsByName() {
Map<String, TransportVersion> transportVersions = new HashMap<>();

String latestLocation = "/transport/latest/" + Version.CURRENT.major + "." + Version.CURRENT.minor + ".csv";
int latestId = -1;
try (InputStream inputStream = TransportVersion.class.getResourceAsStream(latestLocation)) {
// this check is required until bootstrapping for the new transport versions format is completed;
// when load is false, we will only use the transport versions in the legacy format;
// load becomes false if we don't find the latest or manifest files required for the new format
if (inputStream != null) {
TransportVersion latest = fromInputStream(latestLocation, true, inputStream, Integer.MAX_VALUE);
if (latest == null) {
throw new IllegalStateException(
"invalid latest transport version for minor version ["
+ Version.CURRENT.major
+ "."
+ Version.CURRENT.minor
+ "]"
);
}
latestId = latest.id();
}
} catch (IOException ioe) {
throw new UncheckedIOException("latest transport version file not found at [" + latestLocation + "]", ioe);
}

String manifestLocation = "/transport/defined/manifest.txt";
List<String> versionFileNames = null;
if (latestId > -1) {
try (InputStream inputStream = TransportVersion.class.getResourceAsStream(manifestLocation)) {
if (inputStream != null) {
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
versionFileNames = reader.lines().filter(line -> line.isBlank() == false).toList();
}
} catch (IOException ioe) {
throw new UncheckedIOException("transport version manifest file not found at [" + manifestLocation + "]", ioe);
}
}

if (versionFileNames != null) {
for (String name : versionFileNames) {
String versionLocation = "/transport/defined/" + name;
try (InputStream inputStream = TransportVersion.class.getResourceAsStream(versionLocation)) {
if (inputStream == null) {
throw new IllegalStateException("transport version file not found at [" + versionLocation + "]");
}
TransportVersion transportVersion = TransportVersion.fromInputStream(versionLocation, false, inputStream, latestId);
if (transportVersion != null) {
transportVersions.put(transportVersion.name(), transportVersion);
}
} catch (IOException ioe) {
throw new UncheckedIOException("transport version file not found at [ " + versionLocation + "]", ioe);
}
}
}

return transportVersions;
}

private static List<TransportVersion> addTransportVersions(Collection<TransportVersion> addFrom, List<TransportVersion> addTo) {
for (TransportVersion transportVersion : addFrom) {
addTo.add(transportVersion);
Expand Down
83 changes: 52 additions & 31 deletions server/src/test/java/org/elasticsearch/TransportVersionTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.TransportVersionUtils;

import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.InputStreamReader;
import java.lang.reflect.Modifier;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Set;
import java.util.regex.Matcher;
Expand Down Expand Up @@ -221,50 +225,53 @@ public void testDuplicateConstants() {
}
}

public void testFromName() {
assertThat(TransportVersion.fromName("test_0"), is(new TransportVersion("test_0", 3001000, null)));
assertThat(TransportVersion.fromName("test_1"), is(new TransportVersion("test_1", 3002000, null)));
assertThat(
TransportVersion.fromName("test_2"),
is(
new TransportVersion(
"test_2",
3003000,
new TransportVersion("test_2", 2001001, new TransportVersion("test_2", 1001001, null))
)
)
);
assertThat(
TransportVersion.fromName("test_3"),
is(new TransportVersion("test_3", 3003001, new TransportVersion("test_3", 2001002, null)))
);
assertThat(
TransportVersion.fromName("test_4"),
is(
new TransportVersion(
"test_4",
3003002,
new TransportVersion("test_4", 2001003, new TransportVersion("test_4", 1001002, null))
)
)
public void testLatest() {
TransportVersion latest = TransportVersion.parseFromBufferedReader(
"<test>",
"/transport/defined/" + Version.CURRENT.major + "." + Version.CURRENT.minor + ".csv",
TransportVersion.class::getResourceAsStream,
(c, p, br) -> TransportVersion.fromBufferedReader(c, p, true, br, Integer.MAX_VALUE)
);
// TODO: once placeholder is removed, test the latest known version can be found fromName
// assertThat(latest, is(TransportVersion.fromName(latest.name())));
}

public void testSupports() {
TransportVersion test0 = TransportVersion.fromName("test_0");
byte[] data0 = "100001000,3001000".getBytes(StandardCharsets.UTF_8);
TransportVersion test0 = TransportVersion.fromBufferedReader(
"<test>",
"testSupports0",
false,
new BufferedReader(new InputStreamReader(new ByteArrayInputStream(data0), StandardCharsets.UTF_8)),
5000000
);
assertThat(new TransportVersion(null, 2003000, null).supports(test0), is(false));
assertThat(new TransportVersion(null, 3001000, null).supports(test0), is(true));
assertThat(new TransportVersion(null, 100001001, null).supports(test0), is(true));

TransportVersion test1 = TransportVersion.fromName("test_1");
byte[] data1 = "3002000".getBytes(StandardCharsets.UTF_8);
TransportVersion test1 = TransportVersion.fromBufferedReader(
"<test>",
"testSupports1",
false,
new BufferedReader(new InputStreamReader(new ByteArrayInputStream(data1), StandardCharsets.UTF_8)),
5000000
);
assertThat(new TransportVersion(null, 2003000, null).supports(test1), is(false));
assertThat(new TransportVersion(null, 3001000, null).supports(test1), is(false));
assertThat(new TransportVersion(null, 3001001, null).supports(test1), is(false));
assertThat(new TransportVersion(null, 3002000, null).supports(test1), is(true));
assertThat(new TransportVersion(null, 100001000, null).supports(test1), is(true));
assertThat(new TransportVersion(null, 100001001, null).supports(test1), is(true));

TransportVersion test2 = TransportVersion.fromName("test_2");
byte[] data2 = "3003000,2001001,1001001".getBytes(StandardCharsets.UTF_8);
TransportVersion test2 = TransportVersion.fromBufferedReader(
"<test>",
"testSupports2",
false,
new BufferedReader(new InputStreamReader(new ByteArrayInputStream(data2), StandardCharsets.UTF_8)),
5000000
);
assertThat(new TransportVersion(null, 1001000, null).supports(test2), is(false));
assertThat(new TransportVersion(null, 1001001, null).supports(test2), is(true));
assertThat(new TransportVersion(null, 1001002, null).supports(test2), is(true));
Expand All @@ -284,7 +291,14 @@ public void testSupports() {
assertThat(new TransportVersion(null, 100001000, null).supports(test2), is(true));
assertThat(new TransportVersion(null, 100001001, null).supports(test2), is(true));

TransportVersion test3 = TransportVersion.fromName("test_3");
byte[] data3 = "100002000,3003001,2001002".getBytes(StandardCharsets.UTF_8);
TransportVersion test3 = TransportVersion.fromBufferedReader(
"<test>",
"testSupports3",
false,
new BufferedReader(new InputStreamReader(new ByteArrayInputStream(data3), StandardCharsets.UTF_8)),
5000000
);
assertThat(new TransportVersion(null, 1001001, null).supports(test3), is(false));
assertThat(new TransportVersion(null, 1001002, null).supports(test3), is(false));
assertThat(new TransportVersion(null, 1001003, null).supports(test3), is(false));
Expand All @@ -305,7 +319,14 @@ public void testSupports() {
assertThat(new TransportVersion(null, 100001000, null).supports(test3), is(true));
assertThat(new TransportVersion(null, 100001001, null).supports(test3), is(true));

TransportVersion test4 = TransportVersion.fromName("test_4");
byte[] data4 = "100002000,3003002,2001003,1001002".getBytes(StandardCharsets.UTF_8);
TransportVersion test4 = TransportVersion.fromBufferedReader(
"<test>",
"testSupports3",
false,
new BufferedReader(new InputStreamReader(new ByteArrayInputStream(data4), StandardCharsets.UTF_8)),
5000000
);
assertThat(new TransportVersion(null, 1001001, null).supports(test4), is(false));
assertThat(new TransportVersion(null, 1001002, null).supports(test4), is(true));
assertThat(new TransportVersion(null, 1001003, null).supports(test4), is(true));
Expand Down
5 changes: 0 additions & 5 deletions server/src/test/resources/transport/defined/manifest.txt

This file was deleted.

1 change: 0 additions & 1 deletion server/src/test/resources/transport/defined/test_0.csv

This file was deleted.

2 changes: 0 additions & 2 deletions server/src/test/resources/transport/defined/test_1.csv

This file was deleted.

1 change: 0 additions & 1 deletion server/src/test/resources/transport/defined/test_2.csv

This file was deleted.

1 change: 0 additions & 1 deletion server/src/test/resources/transport/defined/test_3.csv

This file was deleted.

1 change: 0 additions & 1 deletion server/src/test/resources/transport/defined/test_4.csv

This file was deleted.