88import static org .opensearch .ml .common .utils .StringUtils .getJsonPath ;
99import static org .opensearch .ml .common .utils .StringUtils .obtainFieldNameFromJsonPath ;
1010
11- import java .util .Collection ;
11+ import java .util .Arrays ;
1212import java .util .HashMap ;
1313import java .util .List ;
1414import java .util .Map ;
15- import java .util .Optional ;
15+ import java .util .Objects ;
1616import java .util .concurrent .CompletableFuture ;
1717import java .util .concurrent .atomic .AtomicInteger ;
1818import java .util .stream .Collectors ;
3434
3535@ Log4j2
3636public class AbstractIngestion implements Ingestable {
37- public static final String OUTPUT = "output" ;
38- public static final String INPUT = "input" ;
39- public static final String OUTPUT_FIELD_NAMES = "output_names" ;
40- public static final String INPUT_FIELD_NAMES = "input_names" ;
41- public static final String INGEST_FIELDS = "ingest_fields" ;
42- public static final String ID_FIELD = "id_field" ;
4337
4438 private final Client client ;
4539
@@ -85,12 +79,11 @@ protected double calculateSuccessRate(List<Double> successRates) {
8579 * Filters fields in the map where the value contains the specified source index as a prefix.
8680 *
8781 * @param mlBatchIngestionInput The MLBatchIngestionInput.
88- * @param index The source index to filter by.
89- * @return A new map with only the entries that match the specified source index.
82+ * @param indexInFieldMap The source index to filter by.
83+ * @return A new map with only the entries that match the specified source index and correctly mapped to JsonPath .
9084 */
91- protected Map <String , Object > filterFieldMapping (MLBatchIngestionInput mlBatchIngestionInput , int index ) {
85+ protected Map <String , Object > filterFieldMapping (MLBatchIngestionInput mlBatchIngestionInput , int indexInFieldMap ) {
9286 Map <String , Object > fieldMap = mlBatchIngestionInput .getFieldMapping ();
93- int indexInFieldMap = index + 1 ;
9487 String prefix = "source[" + indexInFieldMap + "]" ;
9588
9689 Map <String , Object > filteredFieldMap = fieldMap .entrySet ().stream ().filter (entry -> {
@@ -104,19 +97,29 @@ protected Map<String, Object> filterFieldMapping(MLBatchIngestionInput mlBatchIn
10497 }).collect (Collectors .toMap (Map .Entry ::getKey , entry -> {
10598 Object value = entry .getValue ();
10699 if (value instanceof String ) {
107- return value ;
100+ return getJsonPath (( String ) value ) ;
108101 } else if (value instanceof List ) {
109- return ((List <String >) value ).stream ().filter (val -> val .contains (prefix )).collect (Collectors .toList ());
102+ return ((List <String >) value )
103+ .stream ()
104+ .filter (val -> val .contains (prefix ))
105+ .map (StringUtils ::getJsonPath )
106+ .collect (Collectors .toList ());
110107 }
111108 return null ;
112109 }));
113110
114- if (filteredFieldMap .containsKey (OUTPUT )) {
115- filteredFieldMap .put (OUTPUT_FIELD_NAMES , fieldMap .get (OUTPUT_FIELD_NAMES ));
116- }
117- if (filteredFieldMap .containsKey (INPUT )) {
118- filteredFieldMap .put (INPUT_FIELD_NAMES , fieldMap .get (INPUT_FIELD_NAMES ));
111+ String [] ingestFields = mlBatchIngestionInput .getIngestFields ();
112+ if (ingestFields != null ) {
113+ Arrays
114+ .stream (ingestFields )
115+ .filter (Objects ::nonNull )
116+ .filter (val -> val .contains (prefix ))
117+ .map (StringUtils ::getJsonPath )
118+ .forEach (jsonPath -> {
119+ filteredFieldMap .put (obtainFieldNameFromJsonPath (jsonPath ), jsonPath );
120+ });
119121 }
122+
120123 return filteredFieldMap ;
121124 }
122125
@@ -128,42 +131,21 @@ protected Map<String, Object> filterFieldMapping(MLBatchIngestionInput mlBatchIn
128131 * @return A new map that contains all the fields and data for ingestion.
129132 */
130133 protected Map <String , Object > processFieldMapping (String jsonStr , Map <String , Object > fieldMapping ) {
131- String inputJsonPath = fieldMapping .containsKey (INPUT ) ? getJsonPath ((String ) fieldMapping .get (INPUT )) : null ;
132- List <String > remoteModelInput = inputJsonPath != null ? (List <String >) JsonPath .read (jsonStr , inputJsonPath ) : null ;
133- List <String > inputFieldNames = inputJsonPath != null ? (List <String >) fieldMapping .get (INPUT_FIELD_NAMES ) : null ;
134-
135- String outputJsonPath = fieldMapping .containsKey (OUTPUT ) ? getJsonPath ((String ) fieldMapping .get (OUTPUT )) : null ;
136- List <List > remoteModelOutput = outputJsonPath != null ? (List <List >) JsonPath .read (jsonStr , outputJsonPath ) : null ;
137- List <String > outputFieldNames = outputJsonPath != null ? (List <String >) fieldMapping .get (OUTPUT_FIELD_NAMES ) : null ;
138-
139- List <String > ingestFieldsJsonPath = Optional
140- .ofNullable ((List <String >) fieldMapping .get (INGEST_FIELDS ))
141- .stream ()
142- .flatMap (Collection ::stream )
143- .map (StringUtils ::getJsonPath )
144- .collect (Collectors .toList ());
145-
146134 Map <String , Object > jsonMap = new HashMap <>();
147-
148- populateJsonMap (jsonMap , inputFieldNames , remoteModelInput );
149- populateJsonMap (jsonMap , outputFieldNames , remoteModelOutput );
150-
151- for (String fieldPath : ingestFieldsJsonPath ) {
152- jsonMap .put (obtainFieldNameFromJsonPath (fieldPath ), JsonPath .read (jsonStr , fieldPath ));
135+ if (fieldMapping == null || fieldMapping .isEmpty ()) {
136+ return jsonMap ;
153137 }
154138
155- if (fieldMapping .containsKey (ID_FIELD )) {
156- List <String > docIdJsonPath = Optional
157- .ofNullable ((List <String >) fieldMapping .get (ID_FIELD ))
158- .stream ()
159- .flatMap (Collection ::stream )
160- .map (StringUtils ::getJsonPath )
161- .collect (Collectors .toList ());
162- if (docIdJsonPath .size () != 1 ) {
163- throw new IllegalArgumentException ("The Id field must contains only 1 jsonPath for each source" );
139+ fieldMapping .entrySet ().stream ().forEach (entry -> {
140+ Object value = entry .getValue ();
141+ if (value instanceof String ) {
142+ String jsonPath = (String ) value ;
143+ jsonMap .put (entry .getKey (), JsonPath .read (jsonStr , jsonPath ));
144+ } else if (value instanceof List ) {
145+ ((List <String >) value ).stream ().forEach (jsonPath -> { jsonMap .put (entry .getKey (), JsonPath .read (jsonStr , jsonPath )); });
164146 }
165- jsonMap . put ( "_id" , JsonPath . read ( jsonStr , docIdJsonPath . get ( 0 )) );
166- }
147+ } );
148+
167149 return jsonMap ;
168150 }
169151
@@ -180,12 +162,11 @@ protected void batchIngest(
180162 ? mlBatchIngestionInput .getFieldMapping ()
181163 : filterFieldMapping (mlBatchIngestionInput , sourceIndex );
182164 Map <String , Object > jsonMap = processFieldMapping (jsonStr , filteredMapping );
183- if (isSoleSource || sourceIndex == 0 ) {
165+ if (jsonMap .isEmpty ()) {
166+ return ;
167+ }
168+ if (isSoleSource && !jsonMap .containsKey ("_id" )) {
184169 IndexRequest indexRequest = new IndexRequest (mlBatchIngestionInput .getIndexName ());
185- if (jsonMap .containsKey ("_id" )) {
186- String id = (String ) jsonMap .remove ("_id" );
187- indexRequest .id (id );
188- }
189170 indexRequest .source (jsonMap );
190171 bulkRequest .add (indexRequest );
191172 } else {
@@ -198,6 +179,13 @@ protected void batchIngest(
198179 bulkRequest .add (updateRequest );
199180 }
200181 });
182+ if (bulkRequest .numberOfActions () == 0 ) {
183+ bulkResponseListener
184+ .onFailure (
185+ new IllegalArgumentException ("the bulk ingestion is empty: please check your field mapping to match your sources" )
186+ );
187+ return ;
188+ }
201189 client .bulk (bulkRequest , bulkResponseListener );
202190 }
203191
0 commit comments