1616
1717package org .springframework .cloud .dataflow .registry .service ;
1818
19+ import java .io .BufferedReader ;
1920import java .io .IOException ;
21+ import java .io .InputStreamReader ;
2022import java .net .URI ;
21- import java .net .URISyntaxException ;
22- import java .util .AbstractMap ;
23+ import java .util .HashMap ;
2324import java .util .List ;
24- import java .util .Map ;
2525import java .util .Properties ;
26- import java .util .function .Function ;
26+ import java .util .function .BiFunction ;
27+ import java .util .function .Predicate ;
2728import java .util .stream .Collectors ;
2829import java .util .stream .Stream ;
2930
@@ -288,16 +289,6 @@ public String getResourceVersion(String uriString) {
288289 return this .getResourceVersion (this .appResourceCommon .getResource (uriString ));
289290 }
290291
291- private String getVersionOrBroken (String uri ) {
292- try {
293- return this .getResourceVersion (uri );
294- }
295- catch (IllegalStateException ise ) {
296- logger .warn ("" , ise );
297- return "broken" ;
298- }
299- }
300-
301292 protected Properties loadProperties (Resource resource ) {
302293 try {
303294 return PropertiesLoaderUtils .loadProperties (resource );
@@ -307,60 +298,6 @@ protected Properties loadProperties(Resource resource) {
307298 }
308299 }
309300
310- @ Override
311- public List <AppRegistration > importAll (boolean overwrite , Resource ... resources ) {
312- return Stream .of (resources )
313- .map (this ::loadProperties )
314- .flatMap (prop -> prop .entrySet ().stream ()
315- .map (toStringAndUriFunc )
316- .flatMap (kv -> toValidAppRegistration (kv , metadataUriFromProperties (kv .getKey (), prop )))
317- .filter (a -> isOverwrite (a , overwrite ))
318- .map (ar -> save (ar )))
319- .collect (Collectors .toList ());
320- }
321-
322- /**
323- * Builds a {@link Stream} from key/value mapping.
324- * @return
325- * <ul>
326- * <li>valid AppRegistration as single element Stream</li>
327- * <li>silently ignores well malformed metadata entries (0 element Stream) or</li>
328- * <li>fails otherwise.</li>
329- * </ul>
330- *
331- * @param kv key/value representing app key (key) and app URI (value)
332- * @param metadataURI metadataUri computed from a given app key
333- */
334- protected Stream <AppRegistration > toValidAppRegistration (Map .Entry <String , URI > kv , URI metadataURI ) {
335- String key = kv .getKey ();
336- String [] tokens = key .split ("\\ ." );
337- if (tokens .length == 2 ) {
338- String name = tokens [1 ];
339- ApplicationType type = ApplicationType .valueOf (tokens [0 ]);
340- URI appURI = warnOnMalformedURI (key , kv .getValue ());
341-
342- String version = getVersionOrBroken (appURI .toString ());
343-
344- return Stream .of (new AppRegistration (name , type , version , appURI , metadataURI ));
345- }
346- else {
347- Assert .isTrue (tokens .length == 3 && METADATA_KEY_SUFFIX .equals (tokens [2 ]),
348- "Invalid format for app key '" + key + "'in file. Must be <type>.<name> or <type>.<name>"
349- + ".metadata" );
350- return Stream .empty ();
351- }
352- }
353-
354- protected URI metadataUriFromProperties (String key , Properties properties ) {
355- String metadataValue = properties .getProperty (key + "." + METADATA_KEY_SUFFIX );
356- try {
357- return metadataValue != null ? warnOnMalformedURI (key , new URI (metadataValue )) : null ;
358- }
359- catch (URISyntaxException e ) {
360- throw new IllegalArgumentException (e );
361- }
362- }
363-
364301 protected URI warnOnMalformedURI (String key , URI uri ) {
365302 if (StringUtils .isEmpty (uri )) {
366303 logger .warn (String .format ("Error when registering '%s': URI is required" , key ));
@@ -376,12 +313,97 @@ else if (!StringUtils.hasText(uri.getSchemeSpecificPart())) {
376313 return uri ;
377314 }
378315
379- protected static final Function <Map .Entry <Object , Object >, AbstractMap .SimpleImmutableEntry <String , URI >> toStringAndUriFunc = kv -> {
316+ @ Override
317+ public List <AppRegistration > importAll (boolean overwrite , Resource ... resources ) {
318+ return Stream .of (resources )
319+ // parallel takes effect if multiple resources
320+ .parallel ()
321+ // take lines
322+ .flatMap (this ::resourceAsLines )
323+ // take valid splitted lines
324+ .flatMap (this ::splitValidLines )
325+ // reduce to AppRegistration map key'd by <type><name><version>
326+ .reduce (new HashMap <String , AppRegistration >(), reduceToAppRegistrations (), (left , right ) -> {
327+ // combiner is used if multiple resources caused parallel stream,
328+ // then just let last processed resource to override.
329+ left .putAll (right );
330+ return left ;
331+ })
332+ // don't care about keys anymore
333+ .values ()
334+ // back to stream
335+ .stream ()
336+ // drop registration if it doesn't have main uri as user only had metadata
337+ .filter (ar -> ar .getUri () != null )
338+ // filter by overriding, save to repo and collect updated registrations
339+ .filter (ar -> isOverwrite (ar , overwrite ))
340+ .map (ar -> save (ar ))
341+ .collect (Collectors .toList ());
342+ }
343+
344+ private BiFunction <HashMap <String , AppRegistration >,
345+ ? super String [],
346+ HashMap <String , AppRegistration >> reduceToAppRegistrations () {
347+ return (map , lineSplit ) -> {
348+ String [] typeName = lineSplit [0 ].split ("\\ ." );
349+ if (typeName .length < 2 || typeName .length > 3 ) {
350+ throw new IllegalArgumentException ("Invalid format for app key '" + lineSplit [0 ]
351+ + "'in file. Must be <type>.<name> or <type>.<name>.metadata" );
352+ }
353+ String type = typeName [0 ].trim ();
354+ String name = typeName [1 ].trim ();
355+ String version = getResourceVersion (lineSplit [1 ]);
356+ // This is now versioned key
357+ String key = type + name + version ;
358+ AppRegistration ar = map .getOrDefault (key , new AppRegistration ());
359+ ar .setName (name );
360+ ar .setType (ApplicationType .valueOf (type ));
361+ ar .setVersion (version );
362+ if (typeName .length == 2 ) {
363+ // normal app uri
364+ try {
365+ ar .setUri (new URI (lineSplit [1 ]));
366+ warnOnMalformedURI (lineSplit [0 ], ar .getUri ());
367+ } catch (Exception e ) {
368+ throw new IllegalArgumentException (e );
369+ }
370+ }
371+ else if (typeName .length == 3 ) {
372+ // metadata app uri
373+ try {
374+ ar .setMetadataUri (new URI (lineSplit [1 ]));
375+ warnOnMalformedURI (lineSplit [0 ], ar .getMetadataUri ());
376+ } catch (Exception e ) {
377+ throw new IllegalArgumentException (e );
378+ }
379+ }
380+ map .put (key , ar );
381+ return map ;
382+ };
383+ }
384+
385+ private Stream <String > resourceAsLines (Resource resource ) {
380386 try {
381- return new AbstractMap . SimpleImmutableEntry <>(( String ) kv . getKey (), new URI (( String ) kv . getValue ()));
382- }
383- catch (URISyntaxException e ) {
384- throw new IllegalArgumentException ( e );
387+ BufferedReader bufferedReader = new BufferedReader ( new InputStreamReader ( resource . getInputStream ()));
388+ return bufferedReader . lines ();
389+ } catch (Exception e ) {
390+ throw new RuntimeException ( "Error reading from " + resource . getDescription (), e );
385391 }
386- };
392+ }
393+
394+ private Stream <String []> splitValidLines (String line ) {
395+ // split to key/value, filter out non valid lines and trim key and value.
396+ return Stream .of (line )
397+ .filter (skipCommentLines ())
398+ .map (l -> l .split ("=" ))
399+ .filter (split -> split .length == 2 )
400+ .map (split -> new String [] { split [0 ].trim (), split [1 ].trim () });
401+ }
402+
403+ private Predicate <String > skipCommentLines () {
404+ // skipping obvious lines which we don't even try to parse
405+ return line -> line != null &&
406+ StringUtils .hasText (line ) &&
407+ (!line .startsWith ("#" ) || !line .startsWith ("/" ));
408+ }
387409}
0 commit comments