-
Notifications
You must be signed in to change notification settings - Fork 0
[Phase 0] Data Foundation - Loading and Validation #1
Copy link
Copy link
Open
Description
Summary
Build the data loading infrastructure and validation layer to ensure data quality before it enters the forecasting pipeline. This is the foundation that all subsequent phases depend on.
System Context
demand-forecasting-engine/
├── src/
│ └── data/
│ ├── __init__.py
│ ├── loader.py # <- CREATE
│ ├── validator.py # <- CREATE
│ └── schema.py # <- CREATE
├── tests/
│ └── test_data_loader.py # <- CREATE
└── scripts/
└── validate_data.py # <- CREATE
Current State
The src/data/ module exists with only an empty __init__.py. No data loading or validation logic exists yet.
Proposed Solution
1. Data Loader (src/data/loader.py)
import pandas as pd
from pathlib import Path
from typing import Optional
def load_sales_data(
filepath: str,
date_column: str = "date",
required_columns: Optional[list[str]] = None,
) -> pd.DataFrame:
"""
Load sales data from CSV with validation.
Args:
filepath: Path to the CSV file
date_column: Name of the date column to parse as datetime
required_columns: Columns that must be present
Returns:
DataFrame with parsed dates and validated columns
Raises:
FileNotFoundError: If filepath doesn't exist
ValueError: If required columns are missing
"""
path = Path(filepath)
if not path.exists():
raise FileNotFoundError(f"Data file not found: {filepath}")
df = pd.read_csv(filepath, parse_dates=[date_column])
if required_columns:
missing = set(required_columns) - set(df.columns)
if missing:
raise ValueError(f"Missing required columns: {missing}")
return df2. Schema Definition (src/data/schema.py)
from dataclasses import dataclass
from typing import Dict, Any
@dataclass
class SalesDataSchema:
"""Expected schema for sales data."""
REQUIRED_COLUMNS = [
'date',
'sku_id',
'category',
'subcategory',
'sales',
]
OPTIONAL_COLUMNS = [
'price',
'promotion_type',
'store_id',
]
COLUMN_TYPES = {
'date': 'datetime64[ns]',
'sku_id': 'object',
'category': 'object',
'subcategory': 'object',
'sales': 'int64',
'price': 'float64',
}3. Validator (src/data/validator.py)
import pandas as pd
from dataclasses import dataclass
from typing import List, Dict
from .schema import SalesDataSchema
@dataclass
class ValidationReport:
"""Results of data validation."""
is_valid: bool
row_count: int
column_count: int
missing_columns: List[str]
type_mismatches: Dict[str, str]
missing_value_pct: Dict[str, float]
date_range: tuple
sku_count: int
warnings: List[str]
def __str__(self) -> str:
status = "VALID" if self.is_valid else "INVALID"
return f"""
Data Validation Report
======================
Status: {status}
Rows: {self.row_count:,}
Columns: {self.column_count}
Date Range: {self.date_range[0]} to {self.date_range[1]}
Unique SKUs: {self.sku_count:,}
Missing Columns: {self.missing_columns or 'None'}
Type Mismatches: {self.type_mismatches or 'None'}
Warnings: {len(self.warnings)}
"""
def validate_schema(df: pd.DataFrame) -> ValidationReport:
"""Validate DataFrame against expected schema."""
schema = SalesDataSchema()
missing_cols = [c for c in schema.REQUIRED_COLUMNS if c not in df.columns]
type_mismatches = {}
for col, expected_type in schema.COLUMN_TYPES.items():
if col in df.columns and str(df[col].dtype) != expected_type:
type_mismatches[col] = f"expected {expected_type}, got {df[col].dtype}"
missing_pct = {col: df[col].isna().mean() * 100 for col in df.columns}
warnings = []
for col, pct in missing_pct.items():
if pct > 5:
warnings.append(f"{col} has {pct:.1f}% missing values")
date_range = (df['date'].min(), df['date'].max()) if 'date' in df.columns else (None, None)
sku_count = df['sku_id'].nunique() if 'sku_id' in df.columns else 0
return ValidationReport(
is_valid=len(missing_cols) == 0 and len(type_mismatches) == 0,
row_count=len(df),
column_count=len(df.columns),
missing_columns=missing_cols,
type_mismatches=type_mismatches,
missing_value_pct=missing_pct,
date_range=date_range,
sku_count=sku_count,
warnings=warnings,
)4. Validation Script (scripts/validate_data.py)
#!/usr/bin/env python
"""Validate sales data file."""
import argparse
from src.data.loader import load_sales_data
from src.data.validator import validate_schema
from src.data.schema import SalesDataSchema
def main():
parser = argparse.ArgumentParser(description="Validate sales data")
parser.add_argument("filepath", help="Path to sales data CSV")
args = parser.parse_args()
print(f"Loading data from {args.filepath}...")
df = load_sales_data(
args.filepath,
required_columns=SalesDataSchema.REQUIRED_COLUMNS
)
print("Validating schema...")
report = validate_schema(df)
print(report)
if not report.is_valid:
exit(1)
if __name__ == "__main__":
main()Implementation Checklist
- Create
src/data/loader.pywithload_sales_data()function - Create
src/data/schema.pywithSalesDataSchemadataclass - Create
src/data/validator.pywithvalidate_schema()function - Create
scripts/validate_data.pyCLI script - Create
tests/test_data_loader.pywith unit tests - Generate synthetic test data for development
- Verify validation report outputs correctly
Files to Modify
| File | Lines | Action | Description |
|---|---|---|---|
src/data/loader.py |
0-40 | Create | Data loading with validation |
src/data/schema.py |
0-30 | Create | Schema definition dataclass |
src/data/validator.py |
0-80 | Create | Validation logic and report |
scripts/validate_data.py |
0-30 | Create | CLI validation script |
tests/test_data_loader.py |
0-60 | Create | Unit tests for data loading |
Technical Challenges
Data Type Parsing: Dates may come in various formats (YYYY-MM-DD, MM/DD/YYYY). Use pd.to_datetime() with infer_datetime_format=True as fallback.
Large File Handling: For files >1GB, consider chunked reading:
chunks = pd.read_csv(filepath, chunksize=100_000)
df = pd.concat(chunks, ignore_index=True)Definition of Done
-
load_sales_data()returns DataFrame with correct dtypes -
validate_schema()catches missing required columns - Date column parsed as datetime64
- Unit tests pass:
pytest tests/test_data_loader.py -v - Script runs:
python scripts/validate_data.py data/raw/sales.csv
Risk Assessment
| Risk | Impact | Mitigation |
|---|---|---|
| Date format variations | 🟡 Medium | Use flexible parsing with format inference |
| Memory issues with large files | 🟡 Medium | Add chunked reading option |
| Missing required columns | 🟢 Low | Clear error messages |
Time Estimate
Estimated Hours: 8-10 hours
Priority: Critical (blocks all other phases)
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels