🔑 1. Project Mission
Build a local-first, cloud-portable data platform that uses the Medallion pattern (bronze → silver → gold) and can be lifted into Databricks (Delta Live Tables) or Snowflake (Snowpark) with minimal friction.
- Storage & compute: PostgreSQL 15 for both raw landing and curated marts.
- Schemas-as-code:
SQLModel(SQLAlchemy + Pydantic). - Transformations: PySpark ⇢ stay Databricks/Snowpark compatible.
- Orchestration: Apache Airflow DAGs (Python-native).
- Dev workflow: VS Code + PyEnv (.venv) + Poetry + pre-commit + pytest; no notebooks.
🔧 2. Repository Layout (summary) . ├── .venv/ # PyEnv based Python virtual environment, built using Poetry + pyproject.toml │ └── ... ├── airflow/ # DAG definitions │ └── dags//.py ├── src/ │ ├── _base/ # contains classes which can be inherited or used anywhere │ │ ├── columns/ # SQLModel common column classes │ │ │ ├── ts_tuple_range.py │ │ │ └── … │ │ ├── functions/ # SQLAlchemy common function definitions (DDL) │ │ │ ├── calculate_systime_range.py │ │ │ └── … │ │ └── tables/ # SQLModel base table classes │ │ ├── history_table.py │ │ └── … │ ├── <<database_1>>/ # a folder named to correspond to a database or data lake (collection of schemas, tables, views, etc) │ │ ├── <<database_1>>.py # function used to build database if not exists │ │ ├── data/ # SQLModel reference data classes │ │ │ ├── common.py │ │ │ ├── <<schema_1>>.py │ │ │ └── … │ │ ├── schemas/ # SQLModel schema classes │ │ │ ├── common.py │ │ │ ├── <<schema_1>>.py │ │ │ └── … │ │ └── tables/ # SQLModel base table classes │ │ ├── common/ │ │ │ └── table_1.py │ │ ├── <<schema_1>>/ │ │ │ └── table_2.py │ │ └── … │ │ └── transforms/ # PySpark jobs designed to populate or transform entities within this db │ │ └── … │ ├── utils/ # Connection helpers, logging, etc. │ └── config.py # Single source of env truth ├── docker-compose.yml # local PG + Airflow + Spark ├── tests/ ├── scripts/ # one shot admin scripts └── Claude.md # ★ you are here
Convention: File/folder names == snake_case; SQLModel class names == CamelCase.
🧑💻 3. How to ask Claude for help (critical!)
- Always specify the layer (
bronze,silver,gold) and the artefact type
(model,transform,dag,test). Claude is taught to look in those folders. - Include acceptance criteria (row count deltas, hash totals, KPI outputs).
- Stateless prompts - every prompt must re-state context; Claude does not inspect your local git diff.
- Prefer declarative instructions ("Produce a SQLModel for fact_payment with temporal system-time columns") over step-by-step chat.
- One file at a time: Ask Claude to insert or append code blocks exactly as they should appear-no narrative inside the file.
Example prompt ⭢ see section 8.
🏗️ 4. Creating & Updating Schemas
- Derive from
SQLModel, settable=True, add__tablename__ = "br_<name>"etc. - Use snake_case field names; let SQLModel generate type hints.
- Keep DDL-only metadata (indexes, partitioning) in the class-level
__table_args__.
class DimCustomerHist(SQLModel, table=True):
__tablename__ = "dim_customer_hist"
__table_args__ = (
{"postgresql_with": "system_versioning = true"}, # PG ≥17 extension
)
customer_id: int = Field(primary_key=True)
first_name: str
…
sys_start: datetime = Field(sa_column=Column(TIMESTAMP(timezone=True),
primary_key=True,
default=text("clock_timestamp()")))
sys_end: datetime = Field(sa_column=Column(TIMESTAMP(timezone=True),
default=text("'infinity'")))
Claude should:
Infer __tablename__ from class name if not supplied.
Add sys_start/sys_end automatically on *_hist tables.
Generate a paired current-state view in models/views/ when asked.
---
🔄 5. Transformations with PySpark
Each job lives under src/transforms/<database>/<name>.py.
follows signature def run(spark: SparkSession, cfg: Config): …
Input tables loaded via spark.read.format("jdbc") or Delta/Snowpark when ported.
Output written back with .mode("append") for bronze, .mode("overwrite") for
deterministic silver/gold snapshots.
Keep business logic in pure Spark SQL - easy to copy into Databricks notebooks later.
---
🪁 6. Airflow DAG Conventions
One DAG per layer, parameterised by execution_date.
Use the helper factory in src/utils/dag_factory.py:
dag = dag_factory(
dag_id="silver_load",
schedule="@daily",
default_args=DEFAULT_ARGS,
tasks=[
ETLTask("rental_to_silver", transform="rental_silver.py"),
QualityTask("dq_rental_nulls"),
],
)
Task naming: <source>_to_<layer> for loads, dq_<entity>_<rule> for data-quality.
---
⚡ 7. Python Virtual Environment & Code Execution
**CRITICAL**: This project uses Poetry + PyEnv with a pre-built `.venv/` directory containing all dependencies.
### 🔧 Determining the Correct Python Path
**ALWAYS** check the `.venv` directory structure first to determine the correct Python executable path:
```bash
# Check .venv directory structure
ls -la .venv/
# Look for either Scripts/ (Windows) or bin/ (Linux/Mac)
ls -la .venv/Scripts/ # Windows virtualenv
ls -la .venv/bin/ # Linux/Mac virtualenv- ✅ CORRECT:
.venv/Scripts/python.exe script_name.py - ✅ CORRECT:
.venv/Scripts/python.exe(for interactive) - Available tools:
.venv/Scripts/pytest.exe,.venv/Scripts/ruff.exe, etc.
- ✅ CORRECT:
.venv/bin/python script_name.py - ✅ CORRECT:
.venv/bin/python(for interactive) - Available tools:
.venv/bin/pytest,.venv/bin/ruff, etc.
python script_name.py(system Python, missing dependencies)python3 script_name.py(system Python3, missing dependencies)
When working in WSL with a Windows-created .venv:
- The
.venv/Scripts/directory will exist (Windows structure) - Windows
.exefiles can be executed directly from WSL - Use:
.venv/Scripts/python.exe script_name.py
python.exe/python- Python 3.12 with all project dependenciespytest.exe/pytest- Unit testing frameworkruff.exe/ruff- Fast Python linter and formatterblack.exe/black- Code formatterpre-commit.exe/pre-commit- Git hooks for code quality
All dependencies from pyproject.toml are pre-installed in .venv/, including:
- SQLModel, SQLAlchemy, psycopg2-binary (database)
- FastAPI, Pydantic (API framework)
- pytest (testing)
- ruff, black, pre-commit (code quality)
- "Module not found" errors: Always indicates wrong Python path
- "No such file or directory": Check if using Windows vs Linux path structure
- Encoding issues with emojis: Fixed in
utils/utils.pyconfigure_logging function
When helping with this project:
- Always check
.venv/structure first - Use appropriate path based on directory structure found
- Never assume system Python has the required dependencies
🏗️ 8. Database Orchestration & Table Type Classification
Use scripts/orchestrate_build.py for coordinated database management:
# Rebuild everything from scratch
.venv/Scripts/python.exe scripts/orchestrate_build.py
# Rebuild specific database
.venv/Scripts/python.exe scripts/orchestrate_build.py postgres demo
# With debug logging
.venv/Scripts/python.exe scripts/orchestrate_build.py postgres demo DEBUGEach script is parameterized and independently testable:
# Create databases only
.venv/Scripts/python.exe scripts/build_databases.py postgres demo INFO postgres localhost 5432
# Create schemas only
.venv/Scripts/python.exe scripts/build_schemas.py postgres demo INFO postgres localhost 5432
# Create tables only
.venv/Scripts/python.exe scripts/build_tables.py postgres demo INFO postgres localhost 5432
# Populate reference data only
.venv/Scripts/python.exe scripts/build_reference_data_improved.py postgres demo INFO postgres localhost 5432Design Philosophy: Composition over duplication. The orchestrator coordinates existing scripts rather than duplicating their logic.
Each build_xxx.py script follows a dual interface pattern for maximum flexibility:
def create_tables_for_database(engine: Engine, src_path: str, database_name: str) -> None:
def create_tables_for_all_databases(db_engine_type: str, user_name: str, host: str, port: int, src_path: str) -> None:if __name__ == "__main__":
try:
# Parameter parsing with defaults
# Logging configuration
# Engine creation
# Error handling with proper exit codes
sys.exit(0) # Success
except Exception as e:
logger.error(f"Fatal error: {e}")
sys.exit(1) # Failure-
DRY (Don't Repeat Yourself): One way to do each operation
- ✅ Use CLI composition for orchestration
- ❌ Don't import library functions and duplicate defensive coding
-
Functional Decomposition: Small, testable units
- Each script has single responsibility
- Each script can be tested in isolation
- Each script has proper error handling with exit codes
-
Composable Architecture: Scripts designed for orchestration
- Consistent parameter patterns:
engine_type db_name log_level user host port - Proper exit codes:
0 = success,1 = failure - Both single-item and batch modes supported
- Consistent parameter patterns:
-
Error Handling Strategy:
# Enhanced scripts now properly signal failures try: # Core operations logger.info("Process completed successfully") sys.exit(0) except Exception as e: logger.error(f"Fatal error: {e}") sys.exit(1)
# Test each component independently
.venv/Scripts/python.exe scripts/build_databases.py postgres demo INFO postgres localhost 5432
echo $? # Should be 0 for success, 1 for failure
.venv/Scripts/python.exe scripts/build_schemas.py postgres demo INFO postgres localhost 5432
echo $? # Should be 0 for success, 1 for failure- Orchestration: Always use
orchestrate_build.py(CLI composition) - Testing: Test individual
build_xxx.pyscripts directly - Advanced scripting: Import library functions only when you need custom logic
- Debugging: Run individual scripts to isolate issues
Tables are automatically classified into three types:
- Inherits from:
ReferenceTableBaseorReferenceTableMixin - Primary Keys: SMALLINT for performance
- Required Fields:
is_active: bool,systime: datetime - Naming:
*_type,*_status_type - Example:
TradeStatusType,ActionType
- Inherits from:
TransactionalTableBaseorTransactionalTableMixin - Primary Keys: UUID for business meaning
- Required Fields:
effective_time: datetime - Example:
Trade,Portfolio,Instrument
- Inherits from:
TemporalTableBaseorTemporalTableMixin - Required Fields:
sys_start: datetime,sys_end: datetime - Naming:
*_hist - Auto-triggers: Automatically configured for history tracking
- 🔍 Discovery: Auto-finds all database projects in
/src/ - 🏗️ Creation: Creates databases, schemas, tables in dependency order
- ⚡ Triggers: Auto-configures temporal triggers based on table type
- 📊 Reference Data: Populates lookup tables with error recovery
- 🔍 Validation: Checks schema conventions and reports issues
- 📋 Reporting: Comprehensive success/failure tracking
# Reference table example
from src._base.tables.table_type_mixins import ReferenceTableBase
class TradeStatusType(ReferenceTableBase):
__tablename__ = "trade_status_type"
__table_args__ = {"schema": "demo"}
trade_status_type_id: int = Field(primary_key=True, sa_type=SMALLINT)
trade_status_type_name: str = Field(nullable=False, max_length=50)
# is_active and systime inherited from ReferenceTableBase
# Transactional table example
from src._base.tables.table_type_mixins import TransactionalTableMixin
class Trade(TransactionalTableMixin, SQLModel, table=True):
__tablename__ = "trade"
__table_args__ = {"schema": "demo"}
trade_id: UUID = Field(default_factory=uuid.uuid4, primary_key=True)
# effective_time inherited from TransactionalTableMixin🧬 9. Platform Evolution Tracking (CRITICAL - Part of Platform DNA)
Evolution tracking is MANDATORY for all platform work. Every development session must contribute to the evolution documentation.
📚 CENTRALIZED STANDARDS: All evolution tracking requirements, templates, and automation are defined in:
👉 docs/EVOLUTION_TRACKING_STANDARDS.md 👈
# 1. Update layer-specific CHANGELOG.md (ALWAYS REQUIRED)
vim src/<layer>/CHANGELOG.md
# 2. Update platform EVOLUTION.md (if major capability)
vim EVOLUTION.md
# 3. Create ADR (if architectural decision)
vim docs/adr/ADR-XXX-<topic>.md
# 4. Check compliance
./scripts/check_evolution_status.sh- Layer-specific CHANGELOG.md updated
- Platform EVOLUTION.md updated (if major capability)
- ADR created (if architectural decision)
- Platform metrics updated (if significant change)
- Migration guide created (if breaking change)
📖 For complete requirements, templates, automation, and standards, see docs/EVOLUTION_TRACKING_STANDARDS.md
✅ 10. Testing & CI Unit tests live in tests/ and use an ephemeral Postgres via pytest-docker.
Data tests (row counts, hash totals) are in Airflow QualityTasks and fail the DAG.
Pre-commit hooks enforce ruff, black, sqlfluff.