Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added app/__pycache__/main.cpython-314.pyc
Binary file not shown.
Binary file added app/etl/__pycache__/extract.cpython-314.pyc
Binary file not shown.
Binary file added app/etl/__pycache__/load.cpython-314.pyc
Binary file not shown.
Binary file added app/etl/__pycache__/transform.cpython-314.pyc
Binary file not shown.
15 changes: 11 additions & 4 deletions app/etl/extract.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
import pandas as pd
import os
import logging
# TODO (Find & Fix)
from typing import Optional

logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
def extract(path: str = "xyz.csv") -> pd.DataFrame :
"""
Extracts data from CSV file.
Expand All @@ -27,14 +32,16 @@ def extract(path: str = "xyz.csv") -> pd.DataFrame :
try:
# Try different encodings
encodings = ['utf-8', 'latin-1', 'cp1252', 'iso-8859-1']
df = None
df: Optional[pd.DataFrame] = None

for encoding in encodings:
try:
df = pd.read_csv(path, encoding=encoding)
break
# TODO (Find & Fix)
pass

except UnicodeDecodeError:
print(f"Failed to read with encoding '{encoding}'") # Log the encoding that failed
logging.warning(f"Failed to read with encoding '{encoding}'") # Log the encoding that failed

if df is None:
raise ValueError(f" Could not read CSV with tried encodings: {encodings}")
Expand All @@ -43,7 +50,7 @@ def extract(path: str = "xyz.csv") -> pd.DataFrame :
if df.empty:
raise ValueError("File contains no data")

print(f"✅ Extracted {len(df)} rows and {len(df.columns)} columns") # TODO: Use logging instead of print
logging.info(f"✅ Extracted {len(df)} rows and {len(df.columns)} columns") # TODO: Use logging instead of print
return df

except pd.errors.EmptyDataError:
Expand Down
23 changes: 18 additions & 5 deletions app/etl/load.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
import pandas as pd
import sqlite3
import os
import logging
# TODO (Find & Fix)
from typing import Optional

logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
def load(df: pd.DataFrame, db_path: str = "etl_data.db", table_name: str = "processed_data"):
"""
Loads data into SQLite database with proper error handling and upsert logic.
Expand All @@ -14,10 +19,10 @@ def load(df: pd.DataFrame, db_path: str = "etl_data.db", table_name: str = "proc
table_name: Name of the table to create/update
"""
if df.empty:
print("⚠️ Warning: Empty DataFrame received, nothing to load") # TODO (Find & Fix)
logging.warning("⚠️ Warning: Empty DataFrame received, nothing to load") # TODO (Find & Fix)
return

print(f"🔄 Loading {len(df)} rows into database '{db_path}'") # TODO (Find & Fix)
logging.info(f"🔄 Loading {len(df)} rows into database '{db_path}'") # TODO (Find & Fix)

# Ensure directory exists
db_dir = os.path.dirname(db_path)
Expand Down Expand Up @@ -50,13 +55,21 @@ def load(df: pd.DataFrame, db_path: str = "etl_data.db", table_name: str = "proc
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
columns = ", ".join(df.columns)
placeholders = ", ".join(["?"] * len(df.columns))
update_clause = ", ".join([f"{col}=excluded.{col}" for col in df.columns if col != "employee_id"])

sql_query = f"""
INSERT INTO {table_name} ({columns})
VALUES ({placeholders})
ON CONFLICT(employee_id) DO UPDATE SET {update_clause};
"""

data_to_insert = [tuple(row) for row in df.itertuples(index=False, name=None)]
placeholders = ", ".join(["?"] * len(df.columns))
column_names = ", ".join(df.columns)
sql_query = f"INSERT OR IGNORE INTO {table_name} ({column_names}) VALUES ({placeholders})"
cursor.executemany(sql_query, data_to_insert)
conn.commit()

logging.info(f"Successfully loaded {len(df)} records into '{table_name}'.")
# TODO (Find & Fix): Bulk insert without checking for duplicates


Expand Down
32 changes: 27 additions & 5 deletions app/etl/transform.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
import pandas as pd
import logging
from datetime import datetime
# TODO (Find & Fix)
from typing import Optional


logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)

def transform(df: pd.DataFrame) -> pd.DataFrame:
"""
Transform data by cleaning and standardizing it.
Expand All @@ -14,31 +22,38 @@ def transform(df: pd.DataFrame) -> pd.DataFrame:
Transformed DataFrame
"""
if df.empty:
raise ValueError("DataFrame is Empty.")
# TODO (Find & Fix): Should raise a ValueError if DataFrame is empty
pass


# Create a copy to avoid modifying original
df_transformed = df.copy()

print(f"🔄 Starting transformation of {len(df_transformed)} rows") # TODO (Find & Fix): Use logging instead of print
logger.info(f"🔄 Starting transformation of {len(df_transformed)} rows") # TODO (Find & Fix): Use logging instead of print

# Handle duplicates
initial_rows = len(df_transformed)
# TODO (Find & Fix): Duplicates are not removed
df_transformed.drop_duplicates(inplace=True)# TODO (Find & Fix): Duplicates are not removed
duplicates_removed = initial_rows - len(df_transformed)
if duplicates_removed > 0:
logger.info(f"Removed {duplicates_removed} duplicate rows.")
# TODO (Find & Fix): Should log how many duplicates were removed
pass

# Handle null values in numeric columns
numeric_columns = df_transformed.select_dtypes(include=['number']).columns
for col in numeric_columns:
if df_transformed[col].isnull().any():
mean_value = df_transformed[col].mean()
df_transformed[col].fillna(mean_value, inplace=True)
# TODO (Find & Fix): Nulls in numeric columns are not handled
pass

# Handle null values in text columns
text_columns = df_transformed.select_dtypes(include=['object']).columns
for col in text_columns:
if df_transformed[col].isnull().any():
df_transformed[col].fillna("Unknown", inplace=True)
# TODO (Find & Fix): Nulls in text columns are not handled
pass

Expand All @@ -47,8 +62,15 @@ def transform(df: pd.DataFrame) -> pd.DataFrame:
if any(keyword in col.lower() for keyword in ['date', 'time', 'created', 'updated'])]

for col in date_columns:
# TODO (Find & Fix): Date columns are not standardized
pass
df_transformed[col] = pd.to_datetime(df_transformed[col], errors='coerce')
if df_transformed[col].isnull().any():
median_date = df_transformed[col].median()
df_transformed[col].fillna(median_date, inplace=True)# TODO (Find & Fix): Date columns are not standardized

for col in text_columns:
df_transformed[col] = df_transformed[col].astype(str).str.strip().str.lower()

logger.info("Transformation completed successfully.")

# TODO (Find & Fix): Text columns are not cleaned (strip, lowercase)
return df_transformed
46 changes: 28 additions & 18 deletions app/main.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
import logging
from app.etl.extract import extract
from app.etl.transform import transform
from app.etl.load import load

logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
def run_pipeline(csv_path: str = "data.csv", db_path: str = "etl_data.db"):
"""
Run the complete ETL pipeline.
Expand All @@ -11,41 +16,46 @@ def run_pipeline(csv_path: str = "data.csv", db_path: str = "etl_data.db"):
db_path: Path to the output SQLite database
"""
try:
print("🚀 Starting ETL Pipeline") # TODO (Find & Fix): Use logging instead of print
print(f"📁 Input file: {csv_path}")
print(f"🗄️ Output database: {db_path}")
print("-" * 50)
logging.info("🚀 Starting ETL Pipeline") # TODO (Find & Fix): Use logging instead of logging.info
logging.info(f"📁 Input file: {csv_path}")
logging.info(f"🗄️ Output database: {db_path}")
logging.info("-" * 50)

# Extract
print("📥 STEP 1: EXTRACT")
logging.info("📥 STEP 1: EXTRACT")
df = extract(csv_path)
print(f"✅ Extracted {len(df)} rows")
print(f"📊 Columns: {list(df.columns)}")
print()
logging.info(f"✅ Extracted {len(df)} rows")
logging.info(f"📊 Columns: {list(df.columns)}")
logging.info("-" * 50)


# Transform
print("🔄 STEP 2: TRANSFORM")
logging.info("🔄 STEP 2: TRANSFORM")
df_transformed = transform(df)
print(f"✅ Transformed data ready")
print()
logging.info(f"✅ Transformed data ready")
logging.info("-" * 50)


# Load
print("📤 STEP 3: LOAD")
logging.info("📤 STEP 3: LOAD")
load(df_transformed, db_path)
print()
logging.info("-" * 50)


print("🎉 ETL Pipeline completed successfully!")
print(f"📈 Final dataset: {len(df_transformed)} rows, {len(df_transformed.columns)} columns")
logging.info("🎉 ETL Pipeline completed successfully!")
logging.info(f"📈 Final dataset: {len(df_transformed)} rows, {len(df_transformed.columns)} columns")

except FileNotFoundError as e:
print(f"❌ File Error: {e}")
logging.error(f"❌ File Error: {e}")

except ValueError as e:
logging.error(f"⚠️ Value Error: {e}")
raise
# TODO (Find & Fix): Error handling missing
pass

except Exception as e:
# TODO (Find & Fix): Error handling missing
pass
logging.exception(f"🔥 Unexpected error: {e}")

if __name__ == "__main__":
# Run the pipeline
Expand Down
Loading