@@ -160,6 +160,7 @@ class IngestionManagerPandas:
160160 data_frame : DataFrame = attr .ib ()
161161 max_workers : int = attr .ib (default = 1 )
162162 _futures : Dict [Any , Any ] = attr .ib (init = False , factory = dict )
163+ _failed_indices : List [int ] = attr .ib (factory = list )
163164
164165 @staticmethod
165166 def _ingest_single_batch (
@@ -168,7 +169,7 @@ def _ingest_single_batch(
168169 sagemaker_session : Session ,
169170 start_index : int ,
170171 end_index : int ,
171- ):
172+ ) -> List [ int ] :
172173 """Ingest a single batch of DataFrame rows into FeatureStore.
173174
174175 Args:
@@ -177,19 +178,38 @@ def _ingest_single_batch(
177178 sagemaker_session (Session): session instance to perform boto calls.
178179 start_index (int): starting position to ingest in this batch.
179180 end_index (int): ending position to ingest in this batch.
181+
182+ Returns:
183+ List of row indices that failed to be ingested.
180184 """
181185 logger .info ("Started ingesting index %d to %d" , start_index , end_index )
182- for row in data_frame [start_index :end_index ].itertuples (index = False ):
186+ failed_rows = list ()
187+ for row in data_frame [start_index :end_index ].itertuples ():
183188 record = [
184189 FeatureValue (
185- feature_name = data_frame .columns [index ], value_as_string = str (row [index ])
190+ feature_name = data_frame .columns [index - 1 ], value_as_string = str (row [index ])
186191 )
187- for index in range (len (row ))
192+ for index in range (1 , len (row ))
188193 if pd .notna (row [index ])
189194 ]
190- sagemaker_session .put_record (
191- feature_group_name = feature_group_name , record = [value .to_dict () for value in record ]
192- )
195+ try :
196+ sagemaker_session .put_record (
197+ feature_group_name = feature_group_name ,
198+ record = [value .to_dict () for value in record ],
199+ )
200+ except Exception as e : # pylint: disable=broad-except
201+ logger .error ("Failed to ingest row %d: %s" , row [0 ], e )
202+ failed_rows .append (row [0 ])
203+ return failed_rows
204+
205+ @property
206+ def failed_rows (self ) -> List [int ]:
207+ """Get rows that failed to ingest
208+
209+ Returns:
210+ List of row indices that failed to be ingested.
211+ """
212+ return self ._failed_indices
193213
194214 def wait (self , timeout = None ):
195215 """Wait for the ingestion process to finish.
@@ -198,18 +218,17 @@ def wait(self, timeout=None):
198218 timeout (Union[int, float]): ``concurrent.futures.TimeoutError`` will be raised
199219 if timeout is reached.
200220 """
201- failed = False
221+ self . _failed_indices = list ()
202222 for future in as_completed (self ._futures , timeout = timeout ):
203223 start , end = self ._futures [future ]
204- try :
205- future .result ()
206- except Exception as e : # pylint: disable=broad-except
207- failed = True
208- logger .error ("Failed to ingest row %d to %d: %s" , start , end , e )
224+ result = future .result ()
225+ if result :
226+ logger .error ("Failed to ingest row %d to %d" , start , end )
209227 else :
210228 logger .info ("Successfully ingested row %d to %d" , start , end )
229+ self ._failed_indices += result
211230
212- if failed :
231+ if len ( self . _failed_indices ) > 0 :
213232 raise RuntimeError (
214233 f"Failed to ingest some data into FeatureGroup { self .feature_group_name } "
215234 )
0 commit comments