Skip to content

Commit bbadb72

Browse files
Merge pull request #36 from Satvik-Singh192/test/tableplus
chore: verified data load in tableplus
2 parents f858c55 + e61ad0f commit bbadb72

File tree

9 files changed

+19
-13
lines changed

9 files changed

+19
-13
lines changed

.gitignore

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,12 @@ venv/
66
.venv
77
ENV/
88
env.bak/
9-
venv.bak/
9+
venv.bak/
10+
11+
etl_data.db
12+
13+
# Ignore Python cache files
14+
__pycache__/
15+
*.pyc
16+
*.pyo
17+
*.pyd
-129 Bytes
Binary file not shown.
-133 Bytes
Binary file not shown.
-1.53 KB
Binary file not shown.
-2.35 KB
Binary file not shown.
-3.75 KB
Binary file not shown.
-3.75 KB
Binary file not shown.

app/etl/load.py

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,20 +21,17 @@ def load(df: pd.DataFrame, db_path: str = "etl_data.db", table_name: str = "proc
2121
# Ensure directory exists
2222
db_dir = os.path.dirname(db_path)
2323
if db_dir and not os.path.exists(db_dir):
24-
os.makedirs(db_dir)
25-
24+
os.makedirs(db_dir)
2625
conn = None
2726
try:
2827
# Connect to database
2928
conn = sqlite3.connect(db_path)
3029
cursor = conn.cursor()
3130

3231
# TODO (Find & Fix): Table creation and schema logic missing
33-
34-
# Idempotency check (should avoid duplicate inserts)
3532
cursor.execute(f"""
3633
CREATE TABLE IF NOT EXISTS {table_name} (
37-
employee_id INTEGER PRIMARY KEY,
34+
employee_id TEXT PRIMARY KEY,
3835
name TEXT,
3936
email TEXT,
4037
age INTEGER,
@@ -50,15 +47,11 @@ def load(df: pd.DataFrame, db_path: str = "etl_data.db", table_name: str = "proc
5047
)
5148
""")
5249

53-
data_to_insert = [tuple(row) for row in df.itertuples(index=False, name=None)]
5450
placeholders = ", ".join(["?"] * len(df.columns))
5551
column_names = ", ".join(df.columns)
56-
sql_query = f"INSERT OR IGNORE INTO {table_name} ({column_names}) VALUES ({placeholders})"
57-
cursor.executemany(sql_query, data_to_insert)
52+
sql_query = f"INSERT OR REPLACE INTO {table_name} ({column_names}) VALUES ({placeholders})"
53+
cursor.executemany(sql_query, df.itertuples(index=False, name=None))
5854
conn.commit()
59-
# TODO (Find & Fix): Bulk insert without checking for duplicates
60-
61-
6255
except sqlite3.Error as e:
6356
if conn:
6457
conn.rollback()

app/main.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
1+
import os
2+
13
from app.etl.extract import extract
24
from app.etl.transform import transform
35
from app.etl.load import load
46

5-
def run_pipeline(csv_path: str = "data.csv", db_path: str = "etl_data.db"):
7+
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
8+
data_path = os.path.join(BASE_DIR, "data.csv")
9+
10+
def run_pipeline(csv_path: str =data_path, db_path: str = "etl_data.db"):
611
"""
712
Run the complete ETL pipeline.
813

0 commit comments

Comments
 (0)