diff --git a/app/etl/extract.py b/app/etl/extract.py index 92c8f39..ba189f7 100644 --- a/app/etl/extract.py +++ b/app/etl/extract.py @@ -1,20 +1,24 @@ import pandas as pd import os +import logging as lg -# Get the base directory (app/) relative to this file (app/etl/extract.py) -BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) -DEFAULT_DATA_PATH = os.path.join(BASE_DIR, "data.csv") +# TODO (Find & Fix) -def extract(path: str = DEFAULT_DATA_PATH) -> pd.DataFrame : + +logger = lg.getLogger(__name__) +logger.setLevel(lg.DEBUG) + + +def extract(path: str = "xyz.csv") -> pd.DataFrame: """ Extracts data from CSV, Excel, or JSON file. - + Args: path: Path to the data file (supports .csv, .xlsx, .json) - + Returns: pd.DataFrame: DataFrame containing the extracted data - + Raises: FileNotFoundError: If the file doesn't exist ValueError: If the file is empty or invalid @@ -22,50 +26,54 @@ def extract(path: str = DEFAULT_DATA_PATH) -> pd.DataFrame : # Validate file path if not os.path.exists(path): raise FileNotFoundError(f"❌ File not found: {path}") - + # Get file extension ext = os.path.splitext(path)[-1].lower() - + # Check if file format is supported if ext not in ['.csv', '.xlsx', '.xls', '.json']: raise ValueError(f"Unsupported file format: {ext}") - + try: if ext == '.csv': # Try different encodings for CSV files encodings = ['utf-8', 'latin-1', 'cp1252', 'iso-8859-1'] df = None - + for encoding in encodings: try: df = pd.read_csv(path, encoding=encoding) - print(f"Successfully read CSV with encoding: {encoding}") + logger.info(f"Successfully read CSV with encoding: {encoding}") break except UnicodeDecodeError: - print(f"Failed to read with encoding '{encoding}'") + logger.error(f"Failed to read with encoding '{encoding}'") continue except Exception as e: - print(f"Error reading with encoding '{encoding}': {e}") + logger.error( + f"Error reading with encoding '{encoding}': {e}") continue - + if df is None: - raise ValueError(f"Could not read CSV with tried encodings: {encodings}") - + raise ValueError( + f"Could not read CSV with tried encodings: {encodings}") + elif ext in ['.xls', '.xlsx']: df = pd.read_excel(path) - print(f"Successfully read Excel file: {path}") - + logger.info(f"Successfully read Excel file: {path}") + elif ext == '.json': df = pd.read_json(path) - print(f"Successfully read JSON file: {path}") - + logger.info(f"Successfully read JSON file: {path}") + # Validate data if df.empty: raise ValueError("File contains no data") - - print(f"✅ Extracted {len(df)} rows and {len(df.columns)} columns") # TODO: Use logging instead of print + + # TODO: Use logging instead of print + logger.info( + f"✅ Extracted {len(df)} rows and {len(df.columns)} columns") return df - + except pd.errors.EmptyDataError: raise ValueError("❌ File contains no data") except pd.errors.ParserError as e: diff --git a/app/etl/load.py b/app/etl/load.py index 4ae4589..e9d858d 100644 --- a/app/etl/load.py +++ b/app/etl/load.py @@ -1,7 +1,11 @@ import pandas as pd import sqlite3 import os +import logging as lg # TODO (Find & Fix) +logger=lg.getLogger(__name__) +logger.setLevel(lg.DEBUG) + def load(df: pd.DataFrame, db_path: str = "etl_data.db", table_name: str = "processed_data"): """ @@ -13,10 +17,10 @@ def load(df: pd.DataFrame, db_path: str = "etl_data.db", table_name: str = "proc table_name: Name of the table to create/update """ if df.empty: - print("⚠️ Warning: Empty DataFrame received, nothing to load") # TODO (Find & Fix) + logger.warning("⚠️ Warning: Empty DataFrame received, nothing to load") # TODO (Find & Fix) return - print(f"🔄 Loading {len(df)} rows into database '{db_path}'") # TODO (Find & Fix) + logger.info(f"🔄 Loading {len(df)} rows into database '{db_path}'") # TODO (Find & Fix) # Ensure directory exists db_dir = os.path.dirname(db_path) diff --git a/app/etl/transform.py b/app/etl/transform.py index f44915a..a750a5c 100644 --- a/app/etl/transform.py +++ b/app/etl/transform.py @@ -1,4 +1,5 @@ import pandas as pd +import logging as lg # TODO (Find & Fix) def _remove_duplicates(df: pd.DataFrame) -> pd.DataFrame: diff --git a/app/main.py b/app/main.py index e881ef4..95ef56d 100644 --- a/app/main.py +++ b/app/main.py @@ -1,8 +1,11 @@ import os - from app.etl.extract import extract from app.etl.transform import transform from app.etl.load import load +import logging as lg +lg.basicConfig(level=lg.debug()) + +logger = lg.getLogger(__name__) BASE_DIR = os.path.dirname(os.path.abspath(__file__)) data_path = os.path.join(BASE_DIR, "data.csv") @@ -16,34 +19,34 @@ def run_pipeline(csv_path: str =data_path, db_path: str = "etl_data.db"): db_path: Path to the output SQLite database """ try: - print("🚀 Starting ETL Pipeline") # TODO (Find & Fix): Use logging instead of print - print(f"📁 Input file: {csv_path}") - print(f"🗄️ Output database: {db_path}") - print("-" * 50) + lg.info("🚀 Starting ETL Pipeline") # TODO (Find & Fix): Use logging instead of print + lg.info(f"📁 Input file: {csv_path}") + lg.info(f"🗄️ Output database: {db_path}") + lg.info("-" * 50) # Extract - print("📥 STEP 1: EXTRACT") + lg.info("📥 STEP 1: EXTRACT") df = extract(csv_path) - print(f"✅ Extracted {len(df)} rows") - print(f"📊 Columns: {list(df.columns)}") - print() + lg.info(f"✅ Extracted {len(df)} rows") + lg.info(f"📊 Columns: {list(df.columns)}") + lg.info() # Transform - print("🔄 STEP 2: TRANSFORM") + lg.info("🔄 STEP 2: TRANSFORM") df_transformed = transform(df) - print(f"✅ Transformed data ready") - print() + lg.info(f"✅ Transformed data ready") + lg.info() # Load - print("📤 STEP 3: LOAD") + lg.info("📤 STEP 3: LOAD") load(df_transformed, db_path) - print() + lg.info() - print("🎉 ETL Pipeline completed successfully!") - print(f"📈 Final dataset: {len(df_transformed)} rows, {len(df_transformed.columns)} columns") + lg.info("🎉 ETL Pipeline completed successfully!") + lg.info(f"📈 Final dataset: {len(df_transformed)} rows, {len(df_transformed.columns)} columns") except FileNotFoundError as e: - print(f"❌ File Error: {e}") + lg.error(f"❌ File Error: {e}") except ValueError as e: # TODO (Find & Fix): Error handling missing