@@ -82,47 +82,121 @@ public TransportVersion(int id) {
82
82
this (null , id , null );
83
83
}
84
84
85
+ interface BufferedReaderParser <T > {
86
+ T parse (String component , String path , BufferedReader bufferedReader );
87
+ }
88
+
89
+ static <T > T parseFromBufferedReader (
90
+ String component ,
91
+ String path ,
92
+ Function <String , InputStream > nameToStream ,
93
+ BufferedReaderParser <T > parser
94
+ ) {
95
+ try (InputStream inputStream = nameToStream .apply (path )) {
96
+ if (inputStream == null ) {
97
+ return null ;
98
+ }
99
+ try (BufferedReader bufferedReader = new BufferedReader (new InputStreamReader (inputStream , StandardCharsets .UTF_8 ))) {
100
+ return parser .parse (component , path , bufferedReader );
101
+ }
102
+ } catch (IOException ioe ) {
103
+ throw new UncheckedIOException ("parsing error [" + component + ":" + path + "]" , ioe );
104
+ }
105
+ }
106
+
85
107
/**
86
108
* Constructs a named transport version along with its set of compatible patch versions from x-content.
87
109
* This method takes in the parameter {@code latest} which is the highest valid transport version id
88
110
* supported by this node. Versions newer than the current transport version id for this node are discarded.
89
111
*/
90
- public static TransportVersion fromInputStream (String path , boolean nameInFile , InputStream stream , Integer latest ) {
91
- try (BufferedReader reader = new BufferedReader (new InputStreamReader (stream , StandardCharsets .UTF_8 ))) {
92
- String line = reader .readLine ();
112
+ public static TransportVersion fromBufferedReader (
113
+ String component ,
114
+ String path ,
115
+ boolean nameInFile ,
116
+ BufferedReader bufferedReader ,
117
+ Integer latest
118
+ ) {
119
+ try {
120
+ String line = bufferedReader .readLine ();
93
121
String [] parts = line .replaceAll ("\\ s+" , "" ).split ("," );
94
122
String check ;
95
- while ((check = reader .readLine ()) != null ) {
123
+ while ((check = bufferedReader .readLine ()) != null ) {
96
124
if (check .replaceAll ("\\ s+" , "" ).isEmpty () == false ) {
97
- throw new IllegalArgumentException ("invalid transport version file format [" + path + "]" );
125
+ throw new IllegalArgumentException ("invalid transport version file format [" + toComponentPath ( component , path ) + "]" );
98
126
}
99
127
}
100
128
if (parts .length < (nameInFile ? 2 : 1 )) {
101
- throw new IllegalStateException ("invalid transport version file format [" + path + "]" );
129
+ throw new IllegalStateException ("invalid transport version file format [" + toComponentPath ( component , path ) + "]" );
102
130
}
103
131
String name = nameInFile ? parts [0 ] : path .substring (path .lastIndexOf ('/' ) + 1 , path .length () - 4 );
104
132
List <Integer > ids = new ArrayList <>();
105
133
for (int i = nameInFile ? 1 : 0 ; i < parts .length ; ++i ) {
106
134
try {
107
135
ids .add (Integer .parseInt (parts [i ]));
108
136
} catch (NumberFormatException nfe ) {
109
- throw new IllegalStateException ("invalid transport version file format [" + path + "]" , nfe );
137
+ throw new IllegalStateException (
138
+ "invalid transport version file format [" + toComponentPath (component , path ) + "]" ,
139
+ nfe
140
+ );
110
141
}
111
142
}
112
- ids .sort (Integer ::compareTo );
113
143
TransportVersion transportVersion = null ;
114
- for (int idIndex = 0 ; idIndex < ids .size (); ++idIndex ) {
144
+ for (int idIndex = ids .size () - 1 ; idIndex >= 0 ; --idIndex ) {
145
+ if (idIndex > 0 && ids .get (idIndex - 1 ) <= ids .get (idIndex )) {
146
+ throw new IllegalStateException ("invalid transport version file format [" + toComponentPath (component , path ) + "]" );
147
+ }
115
148
if (ids .get (idIndex ) > latest ) {
116
149
break ;
117
150
}
118
151
transportVersion = new TransportVersion (name , ids .get (idIndex ), transportVersion );
119
152
}
120
153
return transportVersion ;
121
154
} catch (IOException ioe ) {
122
- throw new UncheckedIOException ("cannot parse transport version [" + path + "]" , ioe );
155
+ throw new UncheckedIOException ("invalid transport version file format [" + toComponentPath ( component , path ) + "]" , ioe );
123
156
}
124
157
}
125
158
159
+ public static Map <String , TransportVersion > collectFromInputStreams (
160
+ String component ,
161
+ Function <String , InputStream > nameToStream ,
162
+ String latestFileName
163
+ ) {
164
+ TransportVersion latest = parseFromBufferedReader (
165
+ component ,
166
+ "/transport/latest/" + latestFileName ,
167
+ nameToStream ,
168
+ (c , p , br ) -> fromBufferedReader (c , p , true , br , Integer .MAX_VALUE )
169
+ );
170
+ if (latest != null ) {
171
+ List <String > versionFilesNames = parseFromBufferedReader (
172
+ component ,
173
+ "/transport/defined/manifest.txt" ,
174
+ nameToStream ,
175
+ (c , p , br ) -> br .lines ().filter (line -> line .isBlank () == false ).toList ()
176
+ );
177
+ if (versionFilesNames != null ) {
178
+ Map <String , TransportVersion > transportVersions = new HashMap <>();
179
+ for (String versionFileName : versionFilesNames ) {
180
+ TransportVersion transportVersion = parseFromBufferedReader (
181
+ component ,
182
+ "/transport/defined/" + versionFileName ,
183
+ nameToStream ,
184
+ (c , p , br ) -> fromBufferedReader (c , p , false , br , latest .id ())
185
+ );
186
+ if (transportVersion != null ) {
187
+ transportVersions .put (versionFileName .substring (0 , versionFileName .length () - 4 ), transportVersion );
188
+ }
189
+ }
190
+ return transportVersions ;
191
+ }
192
+ }
193
+ return Map .of ();
194
+ }
195
+
196
+ private static String toComponentPath (String component , String path ) {
197
+ return component + ":" + path ;
198
+ }
199
+
126
200
public static TransportVersion readVersion (StreamInput in ) throws IOException {
127
201
return fromId (in .readVInt ());
128
202
}
@@ -345,7 +419,11 @@ private static class VersionsHolder {
345
419
static {
346
420
// collect all the transport versions from server and es modules/plugins (defined in server)
347
421
List <TransportVersion > allVersions = new ArrayList <>(TransportVersions .DEFINED_VERSIONS );
348
- Map <String , TransportVersion > allVersionsByName = loadTransportVersionsByName ();
422
+ Map <String , TransportVersion > allVersionsByName = collectFromInputStreams (
423
+ "<server>" ,
424
+ TransportVersion .class ::getResourceAsStream ,
425
+ Version .CURRENT .major + "." + Version .CURRENT .minor + ".csv"
426
+ );
349
427
addTransportVersions (allVersionsByName .values (), allVersions ).sort (TransportVersion ::compareTo );
350
428
351
429
// set version lookup by release before adding serverless versions
@@ -373,65 +451,6 @@ private static class VersionsHolder {
373
451
CURRENT = ALL_VERSIONS .getLast ();
374
452
}
375
453
376
- private static Map <String , TransportVersion > loadTransportVersionsByName () {
377
- Map <String , TransportVersion > transportVersions = new HashMap <>();
378
-
379
- String latestLocation = "/transport/latest/" + Version .CURRENT .major + "." + Version .CURRENT .minor + ".csv" ;
380
- int latestId = -1 ;
381
- try (InputStream inputStream = TransportVersion .class .getResourceAsStream (latestLocation )) {
382
- // this check is required until bootstrapping for the new transport versions format is completed;
383
- // when load is false, we will only use the transport versions in the legacy format;
384
- // load becomes false if we don't find the latest or manifest files required for the new format
385
- if (inputStream != null ) {
386
- TransportVersion latest = fromInputStream (latestLocation , true , inputStream , Integer .MAX_VALUE );
387
- if (latest == null ) {
388
- throw new IllegalStateException (
389
- "invalid latest transport version for minor version ["
390
- + Version .CURRENT .major
391
- + "."
392
- + Version .CURRENT .minor
393
- + "]"
394
- );
395
- }
396
- latestId = latest .id ();
397
- }
398
- } catch (IOException ioe ) {
399
- throw new UncheckedIOException ("latest transport version file not found at [" + latestLocation + "]" , ioe );
400
- }
401
-
402
- String manifestLocation = "/transport/defined/manifest.txt" ;
403
- List <String > versionFileNames = null ;
404
- if (latestId > -1 ) {
405
- try (InputStream inputStream = TransportVersion .class .getResourceAsStream (manifestLocation )) {
406
- if (inputStream != null ) {
407
- BufferedReader reader = new BufferedReader (new InputStreamReader (inputStream , StandardCharsets .UTF_8 ));
408
- versionFileNames = reader .lines ().filter (line -> line .isBlank () == false ).toList ();
409
- }
410
- } catch (IOException ioe ) {
411
- throw new UncheckedIOException ("transport version manifest file not found at [" + manifestLocation + "]" , ioe );
412
- }
413
- }
414
-
415
- if (versionFileNames != null ) {
416
- for (String name : versionFileNames ) {
417
- String versionLocation = "/transport/defined/" + name ;
418
- try (InputStream inputStream = TransportVersion .class .getResourceAsStream (versionLocation )) {
419
- if (inputStream == null ) {
420
- throw new IllegalStateException ("transport version file not found at [" + versionLocation + "]" );
421
- }
422
- TransportVersion transportVersion = TransportVersion .fromInputStream (versionLocation , false , inputStream , latestId );
423
- if (transportVersion != null ) {
424
- transportVersions .put (transportVersion .name (), transportVersion );
425
- }
426
- } catch (IOException ioe ) {
427
- throw new UncheckedIOException ("transport version file not found at [ " + versionLocation + "]" , ioe );
428
- }
429
- }
430
- }
431
-
432
- return transportVersions ;
433
- }
434
-
435
454
private static List <TransportVersion > addTransportVersions (Collection <TransportVersion > addFrom , List <TransportVersion > addTo ) {
436
455
for (TransportVersion transportVersion : addFrom ) {
437
456
addTo .add (transportVersion );
0 commit comments