@@ -79,47 +79,121 @@ public TransportVersion(int id) {
7979 this (null , id , null );
8080 }
8181
82+ interface BufferedReaderParser <T > {
83+ T parse (String component , String path , BufferedReader bufferedReader );
84+ }
85+
86+ static <T > T parseFromBufferedReader (
87+ String component ,
88+ String path ,
89+ Function <String , InputStream > nameToStream ,
90+ BufferedReaderParser <T > parser
91+ ) {
92+ try (InputStream inputStream = nameToStream .apply (path )) {
93+ if (inputStream == null ) {
94+ return null ;
95+ }
96+ try (BufferedReader bufferedReader = new BufferedReader (new InputStreamReader (inputStream , StandardCharsets .UTF_8 ))) {
97+ return parser .parse (component , path , bufferedReader );
98+ }
99+ } catch (IOException ioe ) {
100+ throw new UncheckedIOException ("parsing error [" + component + ":" + path + "]" , ioe );
101+ }
102+ }
103+
82104 /**
83105 * Constructs a named transport version along with its set of compatible patch versions from x-content.
84106 * This method takes in the parameter {@code latest} which is the highest valid transport version id
85107 * supported by this node. Versions newer than the current transport version id for this node are discarded.
86108 */
87- public static TransportVersion fromInputStream (String path , boolean nameInFile , InputStream stream , Integer latest ) {
88- try (BufferedReader reader = new BufferedReader (new InputStreamReader (stream , StandardCharsets .UTF_8 ))) {
89- String line = reader .readLine ();
109+ public static TransportVersion fromBufferedReader (
110+ String component ,
111+ String path ,
112+ boolean nameInFile ,
113+ BufferedReader bufferedReader ,
114+ Integer latest
115+ ) {
116+ try {
117+ String line = bufferedReader .readLine ();
90118 String [] parts = line .replaceAll ("\\ s+" , "" ).split ("," );
91119 String check ;
92- while ((check = reader .readLine ()) != null ) {
120+ while ((check = bufferedReader .readLine ()) != null ) {
93121 if (check .replaceAll ("\\ s+" , "" ).isEmpty () == false ) {
94- throw new IllegalArgumentException ("invalid transport version file format [" + path + "]" );
122+ throw new IllegalArgumentException ("invalid transport version file format [" + toComponentPath ( component , path ) + "]" );
95123 }
96124 }
97125 if (parts .length < (nameInFile ? 2 : 1 )) {
98- throw new IllegalStateException ("invalid transport version file format [" + path + "]" );
126+ throw new IllegalStateException ("invalid transport version file format [" + toComponentPath ( component , path ) + "]" );
99127 }
100128 String name = nameInFile ? parts [0 ] : path .substring (path .lastIndexOf ('/' ) + 1 , path .length () - 4 );
101129 List <Integer > ids = new ArrayList <>();
102130 for (int i = nameInFile ? 1 : 0 ; i < parts .length ; ++i ) {
103131 try {
104132 ids .add (Integer .parseInt (parts [i ]));
105133 } catch (NumberFormatException nfe ) {
106- throw new IllegalStateException ("invalid transport version file format [" + path + "]" , nfe );
134+ throw new IllegalStateException (
135+ "invalid transport version file format [" + toComponentPath (component , path ) + "]" ,
136+ nfe
137+ );
107138 }
108139 }
109- ids .sort (Integer ::compareTo );
110140 TransportVersion transportVersion = null ;
111- for (int idIndex = 0 ; idIndex < ids .size (); ++idIndex ) {
141+ for (int idIndex = ids .size () - 1 ; idIndex >= 0 ; --idIndex ) {
142+ if (idIndex > 0 && ids .get (idIndex - 1 ) <= ids .get (idIndex )) {
143+ throw new IllegalStateException ("invalid transport version file format [" + toComponentPath (component , path ) + "]" );
144+ }
112145 if (ids .get (idIndex ) > latest ) {
113146 break ;
114147 }
115148 transportVersion = new TransportVersion (name , ids .get (idIndex ), transportVersion );
116149 }
117150 return transportVersion ;
118151 } catch (IOException ioe ) {
119- throw new UncheckedIOException ("cannot parse transport version [" + path + "]" , ioe );
152+ throw new UncheckedIOException ("invalid transport version file format [" + toComponentPath ( component , path ) + "]" , ioe );
120153 }
121154 }
122155
156+ public static Map <String , TransportVersion > collectFromInputStreams (
157+ String component ,
158+ Function <String , InputStream > nameToStream ,
159+ String latestFileName
160+ ) {
161+ TransportVersion latest = parseFromBufferedReader (
162+ component ,
163+ "/transport/latest/" + latestFileName ,
164+ nameToStream ,
165+ (c , p , br ) -> fromBufferedReader (c , p , true , br , Integer .MAX_VALUE )
166+ );
167+ if (latest != null ) {
168+ List <String > versionFilesNames = parseFromBufferedReader (
169+ component ,
170+ "/transport/defined/manifest.txt" ,
171+ nameToStream ,
172+ (c , p , br ) -> br .lines ().filter (line -> line .isBlank () == false ).toList ()
173+ );
174+ if (versionFilesNames != null ) {
175+ Map <String , TransportVersion > transportVersions = new HashMap <>();
176+ for (String versionFileName : versionFilesNames ) {
177+ TransportVersion transportVersion = parseFromBufferedReader (
178+ component ,
179+ "/transport/defined/" + versionFileName ,
180+ nameToStream ,
181+ (c , p , br ) -> fromBufferedReader (c , p , false , br , latest .id ())
182+ );
183+ if (transportVersion != null ) {
184+ transportVersions .put (versionFileName .substring (0 , versionFileName .length () - 4 ), transportVersion );
185+ }
186+ }
187+ return transportVersions ;
188+ }
189+ }
190+ return Map .of ();
191+ }
192+
193+ private static String toComponentPath (String component , String path ) {
194+ return component + ":" + path ;
195+ }
196+
123197 public static TransportVersion readVersion (StreamInput in ) throws IOException {
124198 return fromId (in .readVInt ());
125199 }
@@ -337,7 +411,11 @@ private static class VersionsHolder {
337411 static {
338412 // collect all the transport versions from server and es modules/plugins (defined in server)
339413 List <TransportVersion > allVersions = new ArrayList <>(TransportVersions .DEFINED_VERSIONS );
340- Map <String , TransportVersion > allVersionsByName = loadTransportVersionsByName ();
414+ Map <String , TransportVersion > allVersionsByName = collectFromInputStreams (
415+ "<server>" ,
416+ TransportVersion .class ::getResourceAsStream ,
417+ Version .CURRENT .major + "." + Version .CURRENT .minor + ".csv"
418+ );
341419 addTransportVersions (allVersionsByName .values (), allVersions ).sort (TransportVersion ::compareTo );
342420
343421 // set the transport version lookups
@@ -351,65 +429,6 @@ private static class VersionsHolder {
351429 CURRENT = ALL_VERSIONS .get (ALL_VERSIONS .size () - 1 );
352430 }
353431
354- private static Map <String , TransportVersion > loadTransportVersionsByName () {
355- Map <String , TransportVersion > transportVersions = new HashMap <>();
356-
357- String latestLocation = "/transport/latest/" + Version .CURRENT .major + "." + Version .CURRENT .minor + ".csv" ;
358- int latestId = -1 ;
359- try (InputStream inputStream = TransportVersion .class .getResourceAsStream (latestLocation )) {
360- // this check is required until bootstrapping for the new transport versions format is completed;
361- // when load is false, we will only use the transport versions in the legacy format;
362- // load becomes false if we don't find the latest or manifest files required for the new format
363- if (inputStream != null ) {
364- TransportVersion latest = fromInputStream (latestLocation , true , inputStream , Integer .MAX_VALUE );
365- if (latest == null ) {
366- throw new IllegalStateException (
367- "invalid latest transport version for minor version ["
368- + Version .CURRENT .major
369- + "."
370- + Version .CURRENT .minor
371- + "]"
372- );
373- }
374- latestId = latest .id ();
375- }
376- } catch (IOException ioe ) {
377- throw new UncheckedIOException ("latest transport version file not found at [" + latestLocation + "]" , ioe );
378- }
379-
380- String manifestLocation = "/transport/defined/manifest.txt" ;
381- List <String > versionFileNames = null ;
382- if (latestId > -1 ) {
383- try (InputStream inputStream = TransportVersion .class .getResourceAsStream (manifestLocation )) {
384- if (inputStream != null ) {
385- BufferedReader reader = new BufferedReader (new InputStreamReader (inputStream , StandardCharsets .UTF_8 ));
386- versionFileNames = reader .lines ().filter (line -> line .isBlank () == false ).toList ();
387- }
388- } catch (IOException ioe ) {
389- throw new UncheckedIOException ("transport version manifest file not found at [" + manifestLocation + "]" , ioe );
390- }
391- }
392-
393- if (versionFileNames != null ) {
394- for (String name : versionFileNames ) {
395- String versionLocation = "/transport/defined/" + name ;
396- try (InputStream inputStream = TransportVersion .class .getResourceAsStream (versionLocation )) {
397- if (inputStream == null ) {
398- throw new IllegalStateException ("transport version file not found at [" + versionLocation + "]" );
399- }
400- TransportVersion transportVersion = TransportVersion .fromInputStream (versionLocation , false , inputStream , latestId );
401- if (transportVersion != null ) {
402- transportVersions .put (transportVersion .name (), transportVersion );
403- }
404- } catch (IOException ioe ) {
405- throw new UncheckedIOException ("transport version file not found at [ " + versionLocation + "]" , ioe );
406- }
407- }
408- }
409-
410- return transportVersions ;
411- }
412-
413432 private static List <TransportVersion > addTransportVersions (Collection <TransportVersion > addFrom , List <TransportVersion > addTo ) {
414433 for (TransportVersion transportVersion : addFrom ) {
415434 addTo .add (transportVersion );
0 commit comments