Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 32 additions & 24 deletions app/etl/extract.py
Original file line number Diff line number Diff line change
@@ -1,71 +1,79 @@
import pandas as pd
import os
import logging as lg

# Get the base directory (app/) relative to this file (app/etl/extract.py)
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
DEFAULT_DATA_PATH = os.path.join(BASE_DIR, "data.csv")
# TODO (Find & Fix)

def extract(path: str = DEFAULT_DATA_PATH) -> pd.DataFrame :

logger = lg.getLogger(__name__)
logger.setLevel(lg.DEBUG)


def extract(path: str = "xyz.csv") -> pd.DataFrame:
"""
Extracts data from CSV, Excel, or JSON file.

Args:
path: Path to the data file (supports .csv, .xlsx, .json)

Returns:
pd.DataFrame: DataFrame containing the extracted data

Raises:
FileNotFoundError: If the file doesn't exist
ValueError: If the file is empty or invalid
"""
# Validate file path
if not os.path.exists(path):
raise FileNotFoundError(f"❌ File not found: {path}")

# Get file extension
ext = os.path.splitext(path)[-1].lower()

# Check if file format is supported
if ext not in ['.csv', '.xlsx', '.xls', '.json']:
raise ValueError(f"Unsupported file format: {ext}")

try:
if ext == '.csv':
# Try different encodings for CSV files
encodings = ['utf-8', 'latin-1', 'cp1252', 'iso-8859-1']
df = None

for encoding in encodings:
try:
df = pd.read_csv(path, encoding=encoding)
print(f"Successfully read CSV with encoding: {encoding}")
logger.info(f"Successfully read CSV with encoding: {encoding}")
break
except UnicodeDecodeError:
print(f"Failed to read with encoding '{encoding}'")
logger.error(f"Failed to read with encoding '{encoding}'")
continue
except Exception as e:
print(f"Error reading with encoding '{encoding}': {e}")
logger.error(
f"Error reading with encoding '{encoding}': {e}")
continue

if df is None:
raise ValueError(f"Could not read CSV with tried encodings: {encodings}")

raise ValueError(
f"Could not read CSV with tried encodings: {encodings}")

elif ext in ['.xls', '.xlsx']:
df = pd.read_excel(path)
print(f"Successfully read Excel file: {path}")
logger.info(f"Successfully read Excel file: {path}")

elif ext == '.json':
df = pd.read_json(path)
print(f"Successfully read JSON file: {path}")
logger.info(f"Successfully read JSON file: {path}")

# Validate data
if df.empty:
raise ValueError("File contains no data")

print(f"✅ Extracted {len(df)} rows and {len(df.columns)} columns") # TODO: Use logging instead of print

# TODO: Use logging instead of print
logger.info(
f"✅ Extracted {len(df)} rows and {len(df.columns)} columns")
return df

except pd.errors.EmptyDataError:
raise ValueError("❌ File contains no data")
except pd.errors.ParserError as e:
Expand Down
8 changes: 6 additions & 2 deletions app/etl/load.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import pandas as pd
import sqlite3
import os
import logging as lg
# TODO (Find & Fix)
logger=lg.getLogger(__name__)
logger.setLevel(lg.DEBUG)


def load(df: pd.DataFrame, db_path: str = "etl_data.db", table_name: str = "processed_data"):
"""
Expand All @@ -13,10 +17,10 @@ def load(df: pd.DataFrame, db_path: str = "etl_data.db", table_name: str = "proc
table_name: Name of the table to create/update
"""
if df.empty:
print("⚠️ Warning: Empty DataFrame received, nothing to load") # TODO (Find & Fix)
logger.warning("⚠️ Warning: Empty DataFrame received, nothing to load") # TODO (Find & Fix)
return

print(f"🔄 Loading {len(df)} rows into database '{db_path}'") # TODO (Find & Fix)
logger.info(f"🔄 Loading {len(df)} rows into database '{db_path}'") # TODO (Find & Fix)

# Ensure directory exists
db_dir = os.path.dirname(db_path)
Expand Down
1 change: 1 addition & 0 deletions app/etl/transform.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import pandas as pd
import logging as lg
# TODO (Find & Fix)

def _remove_duplicates(df: pd.DataFrame) -> pd.DataFrame:
Expand Down
37 changes: 20 additions & 17 deletions app/main.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import os

from app.etl.extract import extract
from app.etl.transform import transform
from app.etl.load import load
import logging as lg
lg.basicConfig(level=lg.debug())

logger = lg.getLogger(__name__)

BASE_DIR = os.path.dirname(os.path.abspath(__file__))
data_path = os.path.join(BASE_DIR, "data.csv")
Expand All @@ -16,34 +19,34 @@ def run_pipeline(csv_path: str =data_path, db_path: str = "etl_data.db"):
db_path: Path to the output SQLite database
"""
try:
print("🚀 Starting ETL Pipeline") # TODO (Find & Fix): Use logging instead of print
print(f"📁 Input file: {csv_path}")
print(f"🗄️ Output database: {db_path}")
print("-" * 50)
lg.info("🚀 Starting ETL Pipeline") # TODO (Find & Fix): Use logging instead of print
lg.info(f"📁 Input file: {csv_path}")
lg.info(f"🗄️ Output database: {db_path}")
lg.info("-" * 50)

# Extract
print("📥 STEP 1: EXTRACT")
lg.info("📥 STEP 1: EXTRACT")
df = extract(csv_path)
print(f"✅ Extracted {len(df)} rows")
print(f"📊 Columns: {list(df.columns)}")
print()
lg.info(f"✅ Extracted {len(df)} rows")
lg.info(f"📊 Columns: {list(df.columns)}")
lg.info()

# Transform
print("🔄 STEP 2: TRANSFORM")
lg.info("🔄 STEP 2: TRANSFORM")
df_transformed = transform(df)
print(f"✅ Transformed data ready")
print()
lg.info(f"✅ Transformed data ready")
lg.info()

# Load
print("📤 STEP 3: LOAD")
lg.info("📤 STEP 3: LOAD")
load(df_transformed, db_path)
print()
lg.info()

print("🎉 ETL Pipeline completed successfully!")
print(f"📈 Final dataset: {len(df_transformed)} rows, {len(df_transformed.columns)} columns")
lg.info("🎉 ETL Pipeline completed successfully!")
lg.info(f"📈 Final dataset: {len(df_transformed)} rows, {len(df_transformed.columns)} columns")

except FileNotFoundError as e:
print(f"❌ File Error: {e}")
lg.error(f"❌ File Error: {e}")

except ValueError as e:
# TODO (Find & Fix): Error handling missing
Expand Down
Loading