-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Update TransportVersion
to support a new model
#131488
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 12 commits
340136c
46cf098
a4ec0d6
dc9cfe9
a20dfd7
b772a3a
3182209
8aa55ca
f13e723
ead340b
be9d70b
01511ff
cba38b1
f9bebe8
b02cb1a
a19caac
c56ec3a
75fbbe4
c42bfc2
1b272b6
da5491c
c80f8fb
19b1923
ac02157
71d2cc4
6f4dd0d
38d5710
eb0a94f
518ed4b
14e2539
ec39480
18d2206
aa7a4bc
5da94f9
e097c7a
8c8ecfc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,16 +14,28 @@ | |
import org.elasticsearch.common.io.stream.StreamOutput; | ||
import org.elasticsearch.internal.VersionExtension; | ||
import org.elasticsearch.plugins.ExtensionLoader; | ||
import org.elasticsearch.xcontent.ConstructingObjectParser; | ||
import org.elasticsearch.xcontent.ParseField; | ||
import org.elasticsearch.xcontent.XContentParser; | ||
import org.elasticsearch.xcontent.XContentParserConfiguration; | ||
import org.elasticsearch.xcontent.json.JsonXContent; | ||
|
||
import java.io.BufferedReader; | ||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.io.InputStreamReader; | ||
import java.io.UncheckedIOException; | ||
import java.nio.charset.StandardCharsets; | ||
import java.util.ArrayList; | ||
import java.util.Collection; | ||
import java.util.Collections; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.ServiceLoader; | ||
import java.util.function.Function; | ||
import java.util.function.IntFunction; | ||
import java.util.stream.Collectors; | ||
import java.util.stream.Stream; | ||
|
||
/** | ||
* Represents the version of the wire protocol used to communicate between a pair of ES nodes. | ||
|
@@ -57,7 +69,68 @@ | |
* different version value. If you need to know whether the cluster as a whole speaks a new enough {@link TransportVersion} to understand a | ||
* newly-added feature, use {@link org.elasticsearch.cluster.ClusterState#getMinTransportVersion}. | ||
*/ | ||
public record TransportVersion(int id) implements VersionId<TransportVersion> { | ||
public class TransportVersion implements VersionId<TransportVersion> { | ||
|
||
private final String name; | ||
private final int id; | ||
private final TransportVersion nextPatchVersion; | ||
|
||
public TransportVersion(int id) { | ||
this(null, id, null); | ||
} | ||
|
||
public TransportVersion(String name, int id, TransportVersion patchVersion) { | ||
this.name = name; | ||
this.id = id; | ||
this.nextPatchVersion = patchVersion; | ||
} | ||
|
||
public String name() { | ||
return name; | ||
} | ||
|
||
public int id() { | ||
return id; | ||
} | ||
|
||
public TransportVersion nextPatchVersion() { | ||
return nextPatchVersion; | ||
} | ||
|
||
private static final ParseField NAME = new ParseField("name"); | ||
private static final ParseField IDS = new ParseField("ids"); | ||
|
||
private static final ConstructingObjectParser<TransportVersion, Integer> PARSER = new ConstructingObjectParser<>( | ||
TransportVersion.class.getCanonicalName(), | ||
false, | ||
(args, latestTransportId) -> { | ||
String name = (String) args[0]; | ||
@SuppressWarnings("unchecked") | ||
List<Integer> ids = (List<Integer>) args[1]; | ||
ids.sort(Integer::compareTo); | ||
TransportVersion transportVersion = null; | ||
for (int idIndex = 0; idIndex < ids.size(); ++idIndex) { | ||
if (ids.get(idIndex) > latestTransportId) { | ||
break; | ||
} | ||
transportVersion = new TransportVersion(name, ids.get(idIndex), transportVersion); | ||
} | ||
return transportVersion; | ||
} | ||
); | ||
|
||
static { | ||
PARSER.declareString(ConstructingObjectParser.constructorArg(), NAME); | ||
PARSER.declareIntArray(ConstructingObjectParser.constructorArg(), IDS); | ||
} | ||
|
||
public static TransportVersion fromXContent(InputStream inputStream, Integer latest) { | ||
try (XContentParser parser = JsonXContent.jsonXContent.createParser(XContentParserConfiguration.EMPTY, inputStream)) { | ||
return PARSER.apply(parser, latest); | ||
} catch (IOException ioe) { | ||
throw new UncheckedIOException(ioe); | ||
} | ||
} | ||
|
||
public static TransportVersion readVersion(StreamInput in) throws IOException { | ||
return fromId(in.readVInt()); | ||
|
@@ -70,12 +143,20 @@ public static TransportVersion readVersion(StreamInput in) throws IOException { | |
* The new instance is not registered in {@code TransportVersion.getAllVersions}. | ||
*/ | ||
public static TransportVersion fromId(int id) { | ||
JVerwolf marked this conversation as resolved.
Show resolved
Hide resolved
|
||
TransportVersion known = VersionsHolder.ALL_VERSIONS_MAP.get(id); | ||
TransportVersion known = VersionsHolder.ALL_VERSIONS_BY_ID.get(id); | ||
if (known != null) { | ||
return known; | ||
} | ||
// this is a version we don't otherwise know about - just create a placeholder | ||
return new TransportVersion(id); | ||
return new TransportVersion(null, id, null); | ||
} | ||
|
||
public static TransportVersion fromName(String name) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Javadocs please, referencing how these are are in resource files and looked up There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Javadoc should probably also explain usage/generation, or perhaps leave a TODO untill that part is finalized. Thinking out loud: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have added JavaDocs for the methods that would be used by consumers. I consider loading to be generally internal. |
||
TransportVersion known = VersionsHolder.ALL_VERSIONS_BY_NAME.get(name); | ||
if (known == null) { | ||
throw new IllegalStateException("unknown transport version [" + name + "]"); | ||
} | ||
return known; | ||
} | ||
|
||
public static void writeVersion(TransportVersion version, StreamOutput out) throws IOException { | ||
|
@@ -123,7 +204,7 @@ public static List<TransportVersion> getAllVersions() { | |
* in the wild (they're sent over the wire by numeric ID) but we don't know how to communicate using such versions. | ||
*/ | ||
public boolean isKnown() { | ||
return VersionsHolder.ALL_VERSIONS_MAP.containsKey(id); | ||
return VersionsHolder.ALL_VERSIONS_BY_ID.containsKey(id); | ||
} | ||
|
||
/** | ||
|
@@ -135,7 +216,7 @@ public TransportVersion bestKnownVersion() { | |
return this; | ||
} | ||
TransportVersion bestSoFar = TransportVersions.ZERO; | ||
for (final var knownVersion : VersionsHolder.ALL_VERSIONS_MAP.values()) { | ||
for (final var knownVersion : VersionsHolder.ALL_VERSIONS_BY_ID.values()) { | ||
if (knownVersion.after(bestSoFar) && knownVersion.before(this)) { | ||
bestSoFar = knownVersion; | ||
} | ||
|
@@ -171,38 +252,134 @@ public boolean isPatchFrom(TransportVersion version) { | |
return onOrAfter(version) && id < version.id + 100 - (version.id % 100); | ||
} | ||
|
||
public boolean supports(TransportVersion version) { | ||
if (onOrAfter(version)) { | ||
return true; | ||
} | ||
TransportVersion nextPatchVersion = version.nextPatchVersion; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This linked structure makes the logic a little harder to follow. I'm still not sure it's better than an array of patch ids. But if we are to use it, can we at least document it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: I can see how this is simpler on one hand, and also creates less objects in memory. However, I'm uncertain about perf implications, if that even matters? This is effectively the same as boxing/wrapping, as our comparison now requires loading objects from the heap, IIUC. I'm not sure that matters, however, so I'll leave it as your call Jack. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would prefer to leave it this way for now as I think having duplicate ids floating around even if an internal representation may be confusing. Right now it should have similar performance to checking each patch version since those ids are "unboxed" as well. |
||
while (nextPatchVersion != null) { | ||
if (isPatchFrom(version)) { | ||
return true; | ||
} | ||
nextPatchVersion = nextPatchVersion.nextPatchVersion; | ||
} | ||
return false; | ||
} | ||
|
||
/** | ||
* Returns a string representing the Elasticsearch release version of this transport version, | ||
* if applicable for this deployment, otherwise the raw version number. | ||
*/ | ||
public String toReleaseVersion() { | ||
return TransportVersions.VERSION_LOOKUP.apply(id); | ||
return VersionsHolder.VERSION_LOOKUP_BY_RELEASE.apply(id); | ||
} | ||
|
||
@Override | ||
public boolean equals(Object o) { | ||
if (o == null || getClass() != o.getClass()) return false; | ||
TransportVersion that = (TransportVersion) o; | ||
return id == that.id; | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return id; | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return Integer.toString(id); | ||
return "" + id; | ||
|
||
} | ||
|
||
private static class VersionsHolder { | ||
|
||
private static final List<TransportVersion> ALL_VERSIONS; | ||
private static final Map<Integer, TransportVersion> ALL_VERSIONS_MAP; | ||
private static final Map<Integer, TransportVersion> ALL_VERSIONS_BY_ID; | ||
private static final Map<String, TransportVersion> ALL_VERSIONS_BY_NAME; | ||
private static final IntFunction<String> VERSION_LOOKUP_BY_RELEASE; | ||
private static final TransportVersion CURRENT; | ||
|
||
static { | ||
// collect all the transport versions from server and es modules/plugins (defined in server) | ||
List<TransportVersion> allVersions = new ArrayList<>(TransportVersions.DEFINED_VERSIONS); | ||
Map<String, TransportVersion> allVersionsByName = loadTransportVersionsByName(); | ||
addTransportVersions(allVersionsByName.values(), allVersions).sort(TransportVersion::compareTo); | ||
|
||
// set version lookup by release before adding serverless versions | ||
VERSION_LOOKUP_BY_RELEASE = ReleaseVersions.generateVersionsLookup( | ||
TransportVersions.class, | ||
allVersions.get(allVersions.size() - 1).id() | ||
); | ||
|
||
// collect all the transport versions from serverless | ||
Collection<TransportVersion> extendedVersions = ExtensionLoader.loadSingleton(ServiceLoader.load(VersionExtension.class)) | ||
.map(VersionExtension::getTransportVersions) | ||
.orElse(Collections.emptyList()); | ||
addTransportVersions(extendedVersions, allVersions).sort(TransportVersion::compareTo); | ||
for (TransportVersion version : extendedVersions) { | ||
if (version.name() != null) { | ||
allVersionsByName.put(version.name(), version); | ||
} | ||
} | ||
|
||
// set the transport version lookups | ||
ALL_VERSIONS = Collections.unmodifiableList(allVersions); | ||
ALL_VERSIONS_BY_ID = ALL_VERSIONS.stream().collect(Collectors.toUnmodifiableMap(TransportVersion::id, Function.identity())); | ||
ALL_VERSIONS_BY_NAME = Collections.unmodifiableMap(allVersionsByName); | ||
CURRENT = ALL_VERSIONS.getLast(); | ||
} | ||
|
||
private static Map<String, TransportVersion> loadTransportVersionsByName() { | ||
Map<String, TransportVersion> transportVersions = new HashMap<>(); | ||
|
||
String latestLocation = "/transport/latest/" + Version.CURRENT.major + "." + Version.CURRENT.minor + "-LATEST.json"; | ||
|
||
int latestId; | ||
try (InputStream inputStream = TransportVersion.class.getResourceAsStream(latestLocation)) { | ||
TransportVersion latest = fromXContent(inputStream, Integer.MAX_VALUE); | ||
if (latest == null) { | ||
throw new IllegalStateException( | ||
"invalid latest transport version for release version [" + Version.CURRENT.major + "." + Version.CURRENT.minor + "]" | ||
|
||
); | ||
} | ||
latestId = latest.id(); | ||
} catch (IOException ioe) { | ||
throw new UncheckedIOException("latest transport version file not found at [" + latestLocation + "]", ioe); | ||
} | ||
|
||
if (extendedVersions.isEmpty()) { | ||
ALL_VERSIONS = TransportVersions.DEFINED_VERSIONS; | ||
} else { | ||
ALL_VERSIONS = Stream.concat(TransportVersions.DEFINED_VERSIONS.stream(), extendedVersions.stream()).sorted().toList(); | ||
String manifestLocation = "/transport/generated/generated-transport-versions-files-manifest.txt"; | ||
|
||
List<String> versionFileNames; | ||
try (InputStream transportVersionsManifest = TransportVersion.class.getResourceAsStream(manifestLocation)) { | ||
BufferedReader reader = new BufferedReader(new InputStreamReader(transportVersionsManifest, StandardCharsets.UTF_8)); | ||
versionFileNames = reader.lines().filter(line -> line.isBlank() == false).toList(); | ||
} catch (IOException ioe) { | ||
throw new UncheckedIOException("transport version manifest file not found at [" + manifestLocation + "]", ioe); | ||
} | ||
|
||
ALL_VERSIONS_MAP = ALL_VERSIONS.stream().collect(Collectors.toUnmodifiableMap(TransportVersion::id, Function.identity())); | ||
for (String name : versionFileNames) { | ||
String versionLocation = "/transport/generated/" + name; | ||
try (InputStream inputStream = TransportVersion.class.getResourceAsStream(versionLocation)) { | ||
TransportVersion transportVersion = TransportVersion.fromXContent(inputStream, latestId); | ||
if (transportVersion != null) { | ||
transportVersions.put(transportVersion.name(), transportVersion); | ||
} | ||
} catch (IOException ioe) { | ||
throw new UncheckedIOException("transport version set file not found at [ " + versionLocation + "]", ioe); | ||
} | ||
} | ||
|
||
CURRENT = ALL_VERSIONS.getLast(); | ||
return transportVersions; | ||
} | ||
|
||
private static List<TransportVersion> addTransportVersions(Collection<TransportVersion> addFrom, List<TransportVersion> addTo) { | ||
for (TransportVersion transportVersion : addFrom) { | ||
addTo.add(transportVersion); | ||
TransportVersion patchVersion = transportVersion.nextPatchVersion(); | ||
while (patchVersion != null) { | ||
addTo.add(patchVersion); | ||
patchVersion = patchVersion.nextPatchVersion(); | ||
} | ||
} | ||
return addTo; | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
{ | ||
"name": "esql-split-on-big-values", | ||
"ids": [ | ||
9116000, | ||
9112001, | ||
8841063 | ||
] | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
{ | ||
"name": "esql-topn-timings", | ||
"ids": [ | ||
9128000 | ||
] | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
esql-split-on-big-values.json | ||
|
||
ml-inference-azure-ai-studio-rerank-added.json | ||
esql-topn-timings.json |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
{ | ||
"name": "ml-inference-azure-ai-studio-rerank-added", | ||
"ids": [ | ||
9123000 | ||
] | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there something about these changes that precludes using a record?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had gone down making the members private without getters in an initial version, but I don't think that makes sense anymore. I switched this back to a record. One thing I wasn't completely sure about is the implications of having name included as part of equals/hashcode. I would hope nothing is using these for direct comparisons for any type of serialization branching.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can customize equals and hashCode for the record to only consider the id field.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, but is it correct to only use id for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed this to only use
id
to maintain exactly what was already there.