@@ -82,47 +82,121 @@ public TransportVersion(int id) {
8282 this (null , id , null );
8383 }
8484
85+ interface BufferedReaderParser <T > {
86+ T parse (String component , String path , BufferedReader bufferedReader );
87+ }
88+
89+ static <T > T parseFromBufferedReader (
90+ String component ,
91+ String path ,
92+ Function <String , InputStream > nameToStream ,
93+ BufferedReaderParser <T > parser
94+ ) {
95+ try (InputStream inputStream = nameToStream .apply (path )) {
96+ if (inputStream == null ) {
97+ return null ;
98+ }
99+ try (BufferedReader bufferedReader = new BufferedReader (new InputStreamReader (inputStream , StandardCharsets .UTF_8 ))) {
100+ return parser .parse (component , path , bufferedReader );
101+ }
102+ } catch (IOException ioe ) {
103+ throw new UncheckedIOException ("parsing error [" + component + ":" + path + "]" , ioe );
104+ }
105+ }
106+
85107 /**
86108 * Constructs a named transport version along with its set of compatible patch versions from x-content.
87109 * This method takes in the parameter {@code latest} which is the highest valid transport version id
88110 * supported by this node. Versions newer than the current transport version id for this node are discarded.
89111 */
90- public static TransportVersion fromInputStream (String path , boolean nameInFile , InputStream stream , Integer latest ) {
91- try (BufferedReader reader = new BufferedReader (new InputStreamReader (stream , StandardCharsets .UTF_8 ))) {
92- String line = reader .readLine ();
112+ public static TransportVersion fromBufferedReader (
113+ String component ,
114+ String path ,
115+ boolean nameInFile ,
116+ BufferedReader bufferedReader ,
117+ Integer latest
118+ ) {
119+ try {
120+ String line = bufferedReader .readLine ();
93121 String [] parts = line .replaceAll ("\\ s+" , "" ).split ("," );
94122 String check ;
95- while ((check = reader .readLine ()) != null ) {
123+ while ((check = bufferedReader .readLine ()) != null ) {
96124 if (check .replaceAll ("\\ s+" , "" ).isEmpty () == false ) {
97- throw new IllegalArgumentException ("invalid transport version file format [" + path + "]" );
125+ throw new IllegalArgumentException ("invalid transport version file format [" + toComponentPath ( component , path ) + "]" );
98126 }
99127 }
100128 if (parts .length < (nameInFile ? 2 : 1 )) {
101- throw new IllegalStateException ("invalid transport version file format [" + path + "]" );
129+ throw new IllegalStateException ("invalid transport version file format [" + toComponentPath ( component , path ) + "]" );
102130 }
103131 String name = nameInFile ? parts [0 ] : path .substring (path .lastIndexOf ('/' ) + 1 , path .length () - 4 );
104132 List <Integer > ids = new ArrayList <>();
105133 for (int i = nameInFile ? 1 : 0 ; i < parts .length ; ++i ) {
106134 try {
107135 ids .add (Integer .parseInt (parts [i ]));
108136 } catch (NumberFormatException nfe ) {
109- throw new IllegalStateException ("invalid transport version file format [" + path + "]" , nfe );
137+ throw new IllegalStateException (
138+ "invalid transport version file format [" + toComponentPath (component , path ) + "]" ,
139+ nfe
140+ );
110141 }
111142 }
112- ids .sort (Integer ::compareTo );
113143 TransportVersion transportVersion = null ;
114- for (int idIndex = 0 ; idIndex < ids .size (); ++idIndex ) {
144+ for (int idIndex = ids .size () - 1 ; idIndex >= 0 ; --idIndex ) {
145+ if (idIndex > 0 && ids .get (idIndex - 1 ) <= ids .get (idIndex )) {
146+ throw new IllegalStateException ("invalid transport version file format [" + toComponentPath (component , path ) + "]" );
147+ }
115148 if (ids .get (idIndex ) > latest ) {
116149 break ;
117150 }
118151 transportVersion = new TransportVersion (name , ids .get (idIndex ), transportVersion );
119152 }
120153 return transportVersion ;
121154 } catch (IOException ioe ) {
122- throw new UncheckedIOException ("cannot parse transport version [" + path + "]" , ioe );
155+ throw new UncheckedIOException ("invalid transport version file format [" + toComponentPath ( component , path ) + "]" , ioe );
123156 }
124157 }
125158
159+ public static Map <String , TransportVersion > collectFromInputStreams (
160+ String component ,
161+ Function <String , InputStream > nameToStream ,
162+ String latestFileName
163+ ) {
164+ TransportVersion latest = parseFromBufferedReader (
165+ component ,
166+ "/transport/latest/" + latestFileName ,
167+ nameToStream ,
168+ (c , p , br ) -> fromBufferedReader (c , p , true , br , Integer .MAX_VALUE )
169+ );
170+ if (latest != null ) {
171+ List <String > versionFilesNames = parseFromBufferedReader (
172+ component ,
173+ "/transport/defined/manifest.txt" ,
174+ nameToStream ,
175+ (c , p , br ) -> br .lines ().filter (line -> line .isBlank () == false ).toList ()
176+ );
177+ if (versionFilesNames != null ) {
178+ Map <String , TransportVersion > transportVersions = new HashMap <>();
179+ for (String versionFileName : versionFilesNames ) {
180+ TransportVersion transportVersion = parseFromBufferedReader (
181+ component ,
182+ "/transport/defined/" + versionFileName ,
183+ nameToStream ,
184+ (c , p , br ) -> fromBufferedReader (c , p , false , br , latest .id ())
185+ );
186+ if (transportVersion != null ) {
187+ transportVersions .put (versionFileName .substring (0 , versionFileName .length () - 4 ), transportVersion );
188+ }
189+ }
190+ return transportVersions ;
191+ }
192+ }
193+ return Map .of ();
194+ }
195+
196+ private static String toComponentPath (String component , String path ) {
197+ return component + ":" + path ;
198+ }
199+
126200 public static TransportVersion readVersion (StreamInput in ) throws IOException {
127201 return fromId (in .readVInt ());
128202 }
@@ -345,7 +419,11 @@ private static class VersionsHolder {
345419 static {
346420 // collect all the transport versions from server and es modules/plugins (defined in server)
347421 List <TransportVersion > allVersions = new ArrayList <>(TransportVersions .DEFINED_VERSIONS );
348- Map <String , TransportVersion > allVersionsByName = loadTransportVersionsByName ();
422+ Map <String , TransportVersion > allVersionsByName = collectFromInputStreams (
423+ "<server>" ,
424+ TransportVersion .class ::getResourceAsStream ,
425+ Version .CURRENT .major + "." + Version .CURRENT .minor + ".csv"
426+ );
349427 addTransportVersions (allVersionsByName .values (), allVersions ).sort (TransportVersion ::compareTo );
350428
351429 // set version lookup by release before adding serverless versions
@@ -373,65 +451,6 @@ private static class VersionsHolder {
373451 CURRENT = ALL_VERSIONS .getLast ();
374452 }
375453
376- private static Map <String , TransportVersion > loadTransportVersionsByName () {
377- Map <String , TransportVersion > transportVersions = new HashMap <>();
378-
379- String latestLocation = "/transport/latest/" + Version .CURRENT .major + "." + Version .CURRENT .minor + ".csv" ;
380- int latestId = -1 ;
381- try (InputStream inputStream = TransportVersion .class .getResourceAsStream (latestLocation )) {
382- // this check is required until bootstrapping for the new transport versions format is completed;
383- // when load is false, we will only use the transport versions in the legacy format;
384- // load becomes false if we don't find the latest or manifest files required for the new format
385- if (inputStream != null ) {
386- TransportVersion latest = fromInputStream (latestLocation , true , inputStream , Integer .MAX_VALUE );
387- if (latest == null ) {
388- throw new IllegalStateException (
389- "invalid latest transport version for minor version ["
390- + Version .CURRENT .major
391- + "."
392- + Version .CURRENT .minor
393- + "]"
394- );
395- }
396- latestId = latest .id ();
397- }
398- } catch (IOException ioe ) {
399- throw new UncheckedIOException ("latest transport version file not found at [" + latestLocation + "]" , ioe );
400- }
401-
402- String manifestLocation = "/transport/defined/manifest.txt" ;
403- List <String > versionFileNames = null ;
404- if (latestId > -1 ) {
405- try (InputStream inputStream = TransportVersion .class .getResourceAsStream (manifestLocation )) {
406- if (inputStream != null ) {
407- BufferedReader reader = new BufferedReader (new InputStreamReader (inputStream , StandardCharsets .UTF_8 ));
408- versionFileNames = reader .lines ().filter (line -> line .isBlank () == false ).toList ();
409- }
410- } catch (IOException ioe ) {
411- throw new UncheckedIOException ("transport version manifest file not found at [" + manifestLocation + "]" , ioe );
412- }
413- }
414-
415- if (versionFileNames != null ) {
416- for (String name : versionFileNames ) {
417- String versionLocation = "/transport/defined/" + name ;
418- try (InputStream inputStream = TransportVersion .class .getResourceAsStream (versionLocation )) {
419- if (inputStream == null ) {
420- throw new IllegalStateException ("transport version file not found at [" + versionLocation + "]" );
421- }
422- TransportVersion transportVersion = TransportVersion .fromInputStream (versionLocation , false , inputStream , latestId );
423- if (transportVersion != null ) {
424- transportVersions .put (transportVersion .name (), transportVersion );
425- }
426- } catch (IOException ioe ) {
427- throw new UncheckedIOException ("transport version file not found at [ " + versionLocation + "]" , ioe );
428- }
429- }
430- }
431-
432- return transportVersions ;
433- }
434-
435454 private static List <TransportVersion > addTransportVersions (Collection <TransportVersion > addFrom , List <TransportVersion > addTo ) {
436455 for (TransportVersion transportVersion : addFrom ) {
437456 addTo .add (transportVersion );
0 commit comments