99
1010import logging
1111import shutil
12+ import time
1213from datetime import date , datetime
1314from pathlib import Path
1415from typing import List , Tuple
@@ -136,8 +137,13 @@ def clean_old_date_directories(self, max_age_before_deletion: int) -> None:
136137 # Remove if older than max_age_before_deletion
137138 if age_days > max_age_before_deletion :
138139 logger .info (f"Removing old data directory: { item } ({ age_days } days old)" )
139- shutil .rmtree (item )
140- directories_removed += 1
140+ try :
141+ shutil .rmtree (item )
142+ directories_removed += 1
143+ except (FileNotFoundError , OSError ) as e :
144+ # Directory already deleted by another process or became inaccessible
145+ logger .debug (f"Directory { item } already removed or inaccessible: { e } " )
146+ continue
141147
142148 except ValueError :
143149 # Skip directories that don't match date format
@@ -163,6 +169,160 @@ def get_date_output_directory(self, current_date: date) -> Path:
163169 return self .output_dir / current_date .strftime ("%Y-%m-%d" )
164170
165171
172+ def has_existing_processed_data (self , current_date : date ) -> bool :
173+ """
174+ Check if processed data already exists for the given date.
175+
176+ Args:
177+ current_date: The date to check for existing data
178+
179+ Returns:
180+ bool: True if all required CSV files exist and are not empty
181+ """
182+ output_date_dir = self .get_date_output_directory (current_date )
183+
184+ # Check if the date directory exists
185+ if not output_date_dir .exists ():
186+ return False
187+
188+ # Define required files
189+ required_files = [
190+ "eligible_indexers.csv" ,
191+ "indexer_issuance_eligibility_data.csv" ,
192+ "ineligible_indexers.csv" ,
193+ ]
194+
195+ # Check that all required files exist and are not empty
196+ for filename in required_files :
197+ file_path = output_date_dir / filename
198+ try :
199+ if not file_path .exists () or file_path .stat ().st_size == 0 :
200+ return False
201+ except (FileNotFoundError , OSError ):
202+ # File disappeared between exists() check and stat() call
203+ logger .debug (f"File { file_path } disappeared during existence check" )
204+ return False
205+
206+ return True
207+
208+
209+ def get_data_age_minutes (self , current_date : date ) -> float :
210+ """
211+ Calculate the age of existing processed data in minutes.
212+
213+ Args:
214+ current_date: The date for which to check data age
215+
216+ Returns:
217+ float: Age of the data in minutes (based on oldest file)
218+
219+ Raises:
220+ FileNotFoundError: If no CSV files exist for the given date
221+ """
222+ output_date_dir = self .get_date_output_directory (current_date )
223+
224+ if not output_date_dir .exists ():
225+ raise FileNotFoundError (f"No data directory found for date: { current_date } " )
226+
227+ csv_files = list (output_date_dir .glob ("*.csv" ))
228+ if not csv_files :
229+ raise FileNotFoundError (f"No CSV files found in directory: { output_date_dir } " )
230+
231+ # Get the oldest file's modification time to be conservative
232+ # Handle race condition where files could disappear between glob() and stat()
233+ file_mtimes = []
234+ for file in csv_files :
235+ try :
236+ file_mtimes .append (file .stat ().st_mtime )
237+ except (FileNotFoundError , OSError ):
238+ # File disappeared between glob() and stat(), skip it
239+ logger .debug (f"File { file } disappeared during age calculation" )
240+ continue
241+
242+ if not file_mtimes :
243+ raise FileNotFoundError (f"All CSV files disappeared during age calculation in: { output_date_dir } " )
244+
245+ oldest_mtime = min (file_mtimes )
246+ age_seconds = time .time () - oldest_mtime
247+ return age_seconds / 60.0
248+
249+
250+ def has_fresh_processed_data (self , current_date : date , max_age_minutes : int = 30 ) -> bool :
251+ """
252+ Check if processed data exists and is fresh (within the specified age limit).
253+
254+ Args:
255+ current_date: The date to check for existing data
256+ max_age_minutes: Maximum age in minutes for data to be considered fresh
257+
258+ Returns:
259+ bool: True if all required CSV files exist, are complete, and are fresh
260+ """
261+ # First check if data exists and is complete
262+ if not self .has_existing_processed_data (current_date ):
263+ return False
264+
265+ try :
266+ # Check if data is fresh enough
267+ data_age_minutes = self .get_data_age_minutes (current_date )
268+ is_fresh = data_age_minutes <= max_age_minutes
269+
270+ if is_fresh :
271+ logger .info (f"Found fresh cached data for { current_date } (age: { data_age_minutes :.1f} minutes)" )
272+ else :
273+ logger .info (
274+ f"Cached data for { current_date } is stale "
275+ f"(age: { data_age_minutes :.1f} minutes, max: { max_age_minutes } )"
276+ )
277+
278+ return is_fresh
279+
280+ except FileNotFoundError :
281+ return False
282+
283+
284+ def load_eligible_indexers_from_csv (self , current_date : date ) -> List [str ]:
285+ """
286+ Load the list of eligible indexers from existing CSV file.
287+
288+ Args:
289+ current_date: The date for which to load existing data
290+
291+ Returns:
292+ List[str]: List of eligible indexer addresses
293+
294+ Raises:
295+ FileNotFoundError: If the required CSV file doesn't exist
296+ ValueError: If the CSV file is malformed or empty
297+ """
298+ output_date_dir = self .get_date_output_directory (current_date )
299+ eligible_file = output_date_dir / "eligible_indexers.csv"
300+
301+ if not eligible_file .exists ():
302+ raise FileNotFoundError (f"Eligible indexers CSV not found: { eligible_file } " )
303+
304+ try :
305+ # Read the CSV file - it should have a header row with 'indexer' column
306+ df = pd .read_csv (eligible_file )
307+
308+ if df .empty :
309+ logger .warning (f"Eligible indexers CSV is empty: { eligible_file } " )
310+ return []
311+
312+ if "indexer" not in df .columns :
313+ raise ValueError (
314+ f"CSV file { eligible_file } missing 'indexer' column. Found columns: { list (df .columns )} "
315+ )
316+
317+ indexer_list = df ["indexer" ].tolist ()
318+ logger .info (f"Loaded { len (indexer_list )} eligible indexers from cached CSV for { current_date } " )
319+
320+ return indexer_list
321+
322+ except Exception as e :
323+ raise ValueError (f"Error reading CSV file { eligible_file } : { e } " )
324+
325+
166326 def validate_dataframe_structure (self , df : pd .DataFrame , required_columns : List [str ]) -> bool :
167327 """
168328 Validate that a DataFrame has the required columns.
0 commit comments