4646import java .util .Map ;
4747import java .util .Set ;
4848import java .util .concurrent .Semaphore ;
49+ import java .util .regex .Matcher ;
50+ import java .util .regex .Pattern ;
4951import java .util .stream .Collectors ;
5052import java .util .stream .Stream ;
5153
@@ -637,6 +639,8 @@ public static String readTextFile(URL resource) throws IOException {
637639 }
638640 }
639641
642+ record ColumnHeader (String name , String type ) {}
643+
640644 @ SuppressWarnings ("unchecked" )
641645 /**
642646 * Loads a classic csv file in an ES cluster using a RestClient.
@@ -654,12 +658,13 @@ public static String readTextFile(URL resource) throws IOException {
654658 */
655659 private static void loadCsvData (RestClient client , String indexName , URL resource , boolean allowSubFields , Logger logger )
656660 throws IOException {
661+
657662 ArrayList <String > failures = new ArrayList <>();
658663 StringBuilder builder = new StringBuilder ();
659664 try (BufferedReader reader = reader (resource )) {
660665 String line ;
661666 int lineNumber = 1 ;
662- String [] columns = null ; // list of column names . If one column name contains dot, it is a subfield and its value will be null
667+ ColumnHeader [] columns = null ; // Column info . If one column name contains dot, it is a subfield and its value will be null
663668 List <Integer > subFieldsIndices = new ArrayList <>(); // list containing the index of a subfield in "columns" String[]
664669
665670 while ((line = reader .readLine ()) != null ) {
@@ -669,15 +674,16 @@ private static void loadCsvData(RestClient client, String indexName, URL resourc
669674 String [] entries = multiValuesAwareCsvToStringArray (line , lineNumber );
670675 // the schema row
671676 if (columns == null ) {
672- columns = new String [entries .length ];
677+ columns = new ColumnHeader [entries .length ];
673678 for (int i = 0 ; i < entries .length ; i ++) {
674679 int split = entries [i ].indexOf (':' );
675680 if (split < 0 ) {
676- columns [i ] = entries [i ].trim ();
681+ columns [i ] = new ColumnHeader ( entries [i ].trim (), null );
677682 } else {
678683 String name = entries [i ].substring (0 , split ).trim ();
684+ String type = entries [i ].substring (split + 1 ).trim ();
679685 if (allowSubFields || name .contains ("." ) == false ) {
680- columns [i ] = name ;
686+ columns [i ] = new ColumnHeader ( name , type ) ;
681687 } else {// if it's a subfield, ignore it in the _bulk request
682688 columns [i ] = null ;
683689 subFieldsIndices .add (i );
@@ -707,7 +713,7 @@ private static void loadCsvData(RestClient client, String indexName, URL resourc
707713 // Value is null, skip
708714 continue ;
709715 }
710- if ("_id" .equals (columns [i ])) {
716+ if (columns [ i ] != null && "_id" .equals (columns [i ]. name )) {
711717 // Value is an _id
712718 idField = entries [i ];
713719 continue ;
@@ -722,17 +728,17 @@ private static void loadCsvData(RestClient client, String indexName, URL resourc
722728 if (multiValues .length > 1 ) {
723729 StringBuilder rowStringValue = new StringBuilder ("[" );
724730 for (String s : multiValues ) {
725- rowStringValue .append (quoteIfNecessary ( s )).append ("," );
731+ rowStringValue .append (toJson ( columns [ i ]. type , s )).append ("," );
726732 }
727733 // remove the last comma and put a closing bracket instead
728734 rowStringValue .replace (rowStringValue .length () - 1 , rowStringValue .length (), "]" );
729735 entries [i ] = rowStringValue .toString ();
730736 } else {
731- entries [i ] = quoteIfNecessary ( entries [i ]);
737+ entries [i ] = toJson ( columns [ i ]. type , entries [i ]);
732738 }
733739 // replace any escaped commas with single comma
734740 entries [i ] = entries [i ].replace (ESCAPED_COMMA_SEQUENCE , "," );
735- row .append ("\" " ).append (columns [i ]).append ("\" :" ).append (entries [i ]);
741+ row .append ("\" " ).append (columns [i ]. name ).append ("\" :" ).append (entries [i ]);
736742 } catch (Exception e ) {
737743 throw new IllegalArgumentException (
738744 format (
@@ -770,10 +776,23 @@ private static void loadCsvData(RestClient client, String indexName, URL resourc
770776 }
771777 }
772778
773- private static String quoteIfNecessary (String value ) {
774- boolean isQuoted = (value .startsWith ("\" " ) && value .endsWith ("\" " )) || (value .startsWith ("{" ) && value .endsWith ("}" ));
775- boolean isNumeric = value .matches (NUMERIC_REGEX );
776- return isQuoted || isNumeric ? value : "\" " + value + "\" " ;
779+ private static final Pattern RANGE_PATTERN = Pattern .compile ("([0-9\\ -.Z:]+)\\ .\\ .([0-9\\ -.Z:]+)" );
780+
781+ private static String toJson (String type , String value ) {
782+ return switch (type == null ? "" : type ) {
783+ case "date_range" , "double_range" , "integer_range" -> {
784+ Matcher m = RANGE_PATTERN .matcher (value );
785+ if (m .matches () == false ) {
786+ throw new IllegalArgumentException ("can't parse range: " + value );
787+ }
788+ yield "{\" gte\" : \" " + m .group (1 ) + "\" , \" lt\" : \" " + m .group (2 ) + "\" }" ;
789+ }
790+ default -> {
791+ boolean isQuoted = (value .startsWith ("\" " ) && value .endsWith ("\" " )) || (value .startsWith ("{" ) && value .endsWith ("}" ));
792+ boolean isNumeric = value .matches (NUMERIC_REGEX );
793+ yield isQuoted || isNumeric ? value : "\" " + value + "\" " ;
794+ }
795+ };
777796 }
778797
779798 private static void sendBulkRequest (String indexName , StringBuilder builder , RestClient client , Logger logger , List <String > failures )
0 commit comments