Skip to content

Commit b804cc4

Browse files
feat: Integrate DuckDB/Polars analytics stack and OpenTelemetry
Integrates a new analytics pipeline using DuckDB and Polars, leveraging Parquet files as an intermediate data store. This allows for faster analytics queries compared to hitting the primary transactional database (Postgres) directly for complex analytical workloads. Key changes include: - **ETL Process**: A new script (`backend/app/scripts/export_to_parquet.py`) extracts data from transactional User and Item tables in Postgres and exports it to Parquet files (`users_analytics.parquet`, `items_analytics.parquet`). This script is intended for periodic execution (e.g., nightly). - **Analytics Core**: A new module (`backend/app/core/analytics.py`) manages DuckDB connections and uses Polars to query data from the Parquet files. DuckDB is configured to read these files as external tables. - **New Analytics API Endpoints**: - `GET /api/v1/analytics/items_by_user`: Returns item counts grouped by user. - `GET /api/v1/analytics/active_users`: Returns most active users by item creation. These endpoints utilize the DuckDB/Polars stack for data retrieval. - **OpenTelemetry Integration**: - Added OpenTelemetry for distributed tracing. - Includes auto-instrumentation for FastAPI and SQLAlchemy. - Custom tracing implemented for the ETL script, the core analytics module, and the new analytics API routes. - Traces are currently exported to the console. - **Configuration**: New configuration options (`PARQUET_DATA_PATH`, `SERVICE_NAME`) added to `backend/app/core/config.py`. - **Testing**: Comprehensive unit tests added for the ETL script, analytics module, and API endpoints. - **Documentation**: Updated `README.md` and `backend/README.md` with details on the new architecture, setup, and usage. This change keeps Postgres and SQLModel for existing transactional operations (users, auth, items) while offloading analytics to a more specialized stack.
1 parent 6c9b1fa commit b804cc4

File tree

12 files changed

+1384
-1
lines changed

12 files changed

+1384
-1
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
- 📞 [Traefik](https://traefik.io) as a reverse proxy / load balancer.
2424
- 🚢 Deployment instructions using Docker Compose, including how to set up a frontend Traefik proxy to handle automatic HTTPS certificates.
2525
- 🏭 CI (continuous integration) and CD (continuous deployment) based on GitHub Actions.
26+
- 📊 **New Analytics Module**: Integrated analytics capabilities using DuckDB and Polars for efficient querying of exported data. Features an ETL process for data extraction and OpenTelemetry for performance tracing. For more details, see the [Backend README](./backend/README.md#analytics-module).
2627

2728
### Dashboard Login
2829

backend/README.md

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,17 @@
11
# FastAPI Project - Backend
22

3+
## Contents
4+
5+
- [Requirements](#requirements)
6+
- [Docker Compose](#docker-compose)
7+
- [General Workflow](#general-workflow)
8+
- [VS Code](#vs-code)
9+
- [Docker Compose Override](#docker-compose-override)
10+
- [Backend tests](#backend-tests)
11+
- [Migrations](#migrations)
12+
- [Email Templates](#email-templates)
13+
- [Analytics Module](#analytics-module)
14+
315
## Requirements
416

517
* [Docker](https://www.docker.com/).
@@ -170,3 +182,88 @@ The email templates are in `./backend/app/email-templates/`. Here, there are two
170182
Before continuing, ensure you have the [MJML extension](https://marketplace.visualstudio.com/items?itemName=attilabuti.vscode-mjml) installed in your VS Code.
171183

172184
Once you have the MJML extension installed, you can create a new email template in the `src` directory. After creating the new email template and with the `.mjml` file open in your editor, open the command palette with `Ctrl+Shift+P` and search for `MJML: Export to HTML`. This will convert the `.mjml` file to a `.html` file and now you can save it in the build directory.
185+
186+
## Analytics Module
187+
188+
This section details the integrated analytics capabilities, designed to provide insights into application data without impacting the performance of the primary transactional database.
189+
190+
### Architecture Overview
191+
192+
The analytics architecture employs a dual-database approach:
193+
194+
- **PostgreSQL**: Serves as the primary transactional database, handling real-time data for Users, Items, and other core application entities.
195+
- **DuckDB with Polars**: Used as the analytical processing engine. Data is periodically moved from PostgreSQL to Parquet files, which are then queried efficiently by DuckDB. Polars is utilized for high-performance DataFrame manipulations where needed.
196+
197+
This separation ensures that complex analytical queries do not overload the operational database.
198+
199+
### ETL Process
200+
201+
An ETL (Extract, Transform, Load) process is responsible for populating the analytical data store.
202+
203+
- **Script**: `backend/app/scripts/export_to_parquet.py`
204+
- **Purpose**: This script extracts data from the main PostgreSQL database (specifically the `User` and `Item` tables) and saves it into Parquet files (`users_analytics.parquet`, `items_analytics.parquet`). Parquet format is chosen for its efficiency in analytical workloads.
205+
- **Usage**: The script is designed to be run periodically (e.g., as a nightly batch job or via a scheduler like cron) to update the data available for analytics. To run the script manually (ensure your Python environment with backend dependencies is active, or run within the Docker container):
206+
```bash
207+
python backend/app/scripts/export_to_parquet.py
208+
```
209+
- **Output Location**: The Parquet files are stored in a directory specified by the `PARQUET_DATA_PATH` environment variable. The default location is `backend/data/parquet/`.
210+
211+
### Analytics API Endpoints
212+
213+
New API endpoints provide access to analytical insights. These are available under the `/api/v1/analytics` prefix:
214+
215+
- **`GET /api/v1/analytics/items_by_user`**:
216+
- **Provides**: A list of users and the total count of items they own.
217+
- **Details**: Only includes users who own at least one item. Results are ordered by the number of items in descending order.
218+
- **Response Model**: `List[UserItemCount]` where `UserItemCount` includes `email: str` and `item_count: int`.
219+
220+
- **`GET /api/v1/analytics/active_users`**:
221+
- **Provides**: The top 10 most active users, based on the number of items they own.
222+
- **Details**: Users are ordered by item count in descending order. This endpoint uses a left join, so users who may not own any items could theoretically be included if the query were adjusted (currently, it effectively shows top item owners).
223+
- **Response Model**: `List[ActiveUser]` where `ActiveUser` includes `user_id: int`, `email: str`, `full_name: str | None`, and `item_count: int`.
224+
225+
These endpoints query the DuckDB instance, which reads from the Parquet files generated by the ETL script.
226+
227+
### OpenTelemetry Tracing
228+
229+
OpenTelemetry has been integrated into the backend for enhanced observability:
230+
231+
- **Purpose**: To trace application performance and behavior, helping to identify bottlenecks and understand request flows.
232+
- **Export**: Currently, traces are configured to be exported to the console. This is useful for development and debugging. For production, an appropriate OpenTelemetry collector and backend (e.g., Jaeger, Zipkin, Datadog) should be configured.
233+
- **Coverage**:
234+
- **Auto-instrumentation**: FastAPI and SQLAlchemy interactions are automatically instrumented, providing traces for API requests and database calls to the PostgreSQL database.
235+
- **Custom Tracing**:
236+
- The analytics module (`backend/app/core/analytics.py`) includes custom spans for DuckDB connection setup and query execution.
237+
- The analytics API routes (`backend/app/api/routes/analytics.py`) have custom spans for their request handlers.
238+
- The ETL script (`backend/app/scripts/export_to_parquet.py`) is instrumented with custom spans for its key operations (database extraction, Parquet file writing).
239+
240+
### Key New Dependencies
241+
242+
The following main dependencies were added to support the analytics features:
243+
244+
- `duckdb`: An in-process analytical data management system.
245+
- `polars`: A fast DataFrame library.
246+
- `opentelemetry-api`: Core OpenTelemetry API.
247+
- `opentelemetry-sdk`: OpenTelemetry SDK for configuring telemetry.
248+
- `opentelemetry-exporter-otlp-proto-http`: OTLP exporter (though console exporter is used by default in current setup).
249+
- `opentelemetry-instrumentation-fastapi`: Auto-instrumentation for FastAPI.
250+
- `opentelemetry-instrumentation-sqlalchemy`: Auto-instrumentation for SQLAlchemy.
251+
- `opentelemetry-instrumentation-psycopg2`: Auto-instrumentation for Psycopg2 (PostgreSQL driver).
252+
253+
Refer to `backend/pyproject.toml` for specific versions.
254+
255+
### New Configuration Options
256+
257+
The following environment variables can be set (e.g., in your `.env` file) to configure the analytics and OpenTelemetry features:
258+
259+
- **`PARQUET_DATA_PATH`**:
260+
- **Description**: Specifies the directory where the ETL script saves Parquet files and where DuckDB reads them from.
261+
- **Default**: `backend/data/parquet/`
262+
- **`SERVICE_NAME`**:
263+
- **Description**: Sets the service name attribute for OpenTelemetry traces. This helps in identifying and filtering traces in a distributed tracing system.
264+
- **Default**: `fastapi-analytics-app` (Note: The ETL script appends "-etl-script" to this name for its traces).
265+
- **`OTEL_EXPORTER_OTLP_ENDPOINT`** (Optional, for future use):
266+
- **Description**: If you configure an OTLP exporter (e.g., for Jaeger or Prometheus), this variable would specify its endpoint URL.
267+
- **Default**: Not set (console exporter is used by default).
268+
269+
These settings are defined in `backend/app/core/config.py`.

backend/app/api/main.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
from fastapi import APIRouter
22

3-
from app.api.routes import items, login, private, users, utils
3+
from app.api.routes import items, login, private, users, utils, analytics
44
from app.core.config import settings
55

66
api_router = APIRouter()
77
api_router.include_router(login.router)
88
api_router.include_router(users.router)
99
api_router.include_router(utils.router)
1010
api_router.include_router(items.router)
11+
api_router.include_router(analytics.router)
1112

1213

1314
if settings.ENVIRONMENT == "local":
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
from fastapi import APIRouter, HTTPException
2+
import polars as pl
3+
from typing import List
4+
from pydantic import BaseModel
5+
6+
# OpenTelemetry Imports
7+
from opentelemetry import trace
8+
from opentelemetry.trace import Status, StatusCode # For setting span status
9+
10+
# Import duckdb for its specific error type
11+
import duckdb
12+
13+
from app.core.analytics import query_duckdb, PARQUET_DATA_PATH # For logging the path, if needed
14+
import logging # For logging
15+
16+
logger = logging.getLogger(__name__)
17+
tracer = trace.get_tracer(__name__) # Initialize OpenTelemetry Tracer
18+
19+
router = APIRouter(prefix="/analytics", tags=["analytics"])
20+
21+
# Pydantic models for response structures
22+
class UserItemCount(BaseModel):
23+
email: str
24+
item_count: int
25+
26+
class ActiveUser(BaseModel):
27+
user_id: int # Added user_id based on the query
28+
email: str
29+
full_name: str | None = None # Made full_name optional as it can be NULL
30+
item_count: int
31+
32+
@router.get("/items_by_user", response_model=List[UserItemCount])
33+
def get_items_by_user():
34+
"""
35+
Retrieves a list of users and the count of items they own,
36+
ordered by the number of items in descending order.
37+
Only users who own at least one item are included.
38+
"""
39+
with tracer.start_as_current_span("analytics_items_by_user_handler") as span:
40+
query = """
41+
SELECT u.email, COUNT(i.item_id) AS item_count
42+
FROM users u
43+
JOIN items i ON u.user_id = i.owner_id
44+
GROUP BY u.email
45+
ORDER BY item_count DESC;
46+
"""
47+
span.set_attribute("analytics.query", query)
48+
try:
49+
logger.info("Executing query for items_by_user")
50+
df: pl.DataFrame = query_duckdb(query) # query_duckdb is already traced
51+
result = df.to_dicts()
52+
span.set_attribute("response.items_count", len(result))
53+
logger.info(f"Successfully retrieved {len(result)} records for items_by_user")
54+
span.set_status(Status(StatusCode.OK))
55+
return result
56+
except ConnectionError as e: # Specific error from get_duckdb_connection if it fails
57+
logger.error(f"ConnectionError in /items_by_user: {e}. Ensure Parquet files exist at {PARQUET_DATA_PATH} and are readable.", exc_info=True)
58+
span.record_exception(e)
59+
span.set_status(Status(StatusCode.ERROR, f"Analytics service connection error: {e}"))
60+
raise HTTPException(status_code=503, detail=f"Analytics service unavailable: Database connection failed. {e}")
61+
except duckdb.Error as e: # Catch DuckDB specific errors
62+
logger.error(f"DuckDB query error in /items_by_user: {e}", exc_info=True)
63+
span.record_exception(e)
64+
span.set_status(Status(StatusCode.ERROR, f"Analytics query error: {e}"))
65+
raise HTTPException(status_code=500, detail=f"Analytics query failed: {e}")
66+
except Exception as e:
67+
logger.error(f"Unexpected error in /items_by_user: {e}", exc_info=True)
68+
# Log the PARQUET_DATA_PATH to help diagnose if it's a file not found issue from underlying module
69+
logger.info(f"Current PARQUET_DATA_PATH for analytics module: {PARQUET_DATA_PATH}")
70+
span.record_exception(e)
71+
span.set_status(Status(StatusCode.ERROR, f"Unexpected error: {e}"))
72+
raise HTTPException(status_code=500, detail=f"An unexpected error occurred while fetching items by user. {e}")
73+
74+
@router.get("/active_users", response_model=List[ActiveUser])
75+
def get_active_users():
76+
"""
77+
Retrieves the top 10 most active users based on the number of items they own.
78+
Users are ordered by item count in descending order.
79+
Includes users who may not own any items (LEFT JOIN).
80+
"""
81+
with tracer.start_as_current_span("analytics_active_users_handler") as span:
82+
# Query updated to match ActiveUser model: user_id, email, full_name, item_count
83+
query = """
84+
SELECT u.user_id, u.email, u.full_name, COUNT(i.item_id) AS item_count
85+
FROM users u
86+
LEFT JOIN items i ON u.user_id = i.owner_id
87+
GROUP BY u.user_id, u.email, u.full_name -- Group by all selected non-aggregated columns
88+
ORDER BY item_count DESC
89+
LIMIT 10;
90+
"""
91+
span.set_attribute("analytics.query", query)
92+
try:
93+
logger.info("Executing query for active_users")
94+
df: pl.DataFrame = query_duckdb(query) # query_duckdb is already traced
95+
result = df.to_dicts()
96+
span.set_attribute("response.users_count", len(result))
97+
logger.info(f"Successfully retrieved {len(result)} records for active_users")
98+
span.set_status(Status(StatusCode.OK))
99+
return result
100+
except ConnectionError as e:
101+
logger.error(f"ConnectionError in /active_users: {e}. Ensure Parquet files exist at {PARQUET_DATA_PATH} and are readable.", exc_info=True)
102+
span.record_exception(e)
103+
span.set_status(Status(StatusCode.ERROR, f"Analytics service connection error: {e}"))
104+
raise HTTPException(status_code=503, detail=f"Analytics service unavailable: Database connection failed. {e}")
105+
except duckdb.Error as e: # Catch DuckDB specific errors
106+
logger.error(f"DuckDB query error in /active_users: {e}", exc_info=True)
107+
span.record_exception(e)
108+
span.set_status(Status(StatusCode.ERROR, f"Analytics query error: {e}"))
109+
raise HTTPException(status_code=500, detail=f"Analytics query failed: {e}")
110+
except Exception as e:
111+
logger.error(f"Unexpected error in /active_users: {e}", exc_info=True)
112+
logger.info(f"Current PARQUET_DATA_PATH for analytics module: {PARQUET_DATA_PATH}")
113+
span.record_exception(e)
114+
span.set_status(Status(StatusCode.ERROR, f"Unexpected error: {e}"))
115+
raise HTTPException(status_code=500, detail=f"An unexpected error occurred while fetching active users. {e}")

0 commit comments

Comments
 (0)