diff --git a/README.md b/README.md index afe124f3fb..2cf1a477c9 100644 --- a/README.md +++ b/README.md @@ -23,6 +23,7 @@ - 📞 [Traefik](https://traefik.io) as a reverse proxy / load balancer. - 🚢 Deployment instructions using Docker Compose, including how to set up a frontend Traefik proxy to handle automatic HTTPS certificates. - 🏭 CI (continuous integration) and CD (continuous deployment) based on GitHub Actions. +- 📊 **New Analytics Module**: Integrated analytics capabilities using DuckDB and Polars for efficient querying of exported data. Features an ETL process for data extraction and OpenTelemetry for performance tracing. For more details, see the [Backend README](./backend/README.md#analytics-module). ### Dashboard Login diff --git a/backend/README.md b/backend/README.md index 17210a2f2c..4a8c9fa0ba 100644 --- a/backend/README.md +++ b/backend/README.md @@ -1,5 +1,17 @@ # FastAPI Project - Backend +## Contents + +- [Requirements](#requirements) +- [Docker Compose](#docker-compose) +- [General Workflow](#general-workflow) +- [VS Code](#vs-code) +- [Docker Compose Override](#docker-compose-override) +- [Backend tests](#backend-tests) +- [Migrations](#migrations) +- [Email Templates](#email-templates) +- [Analytics Module](#analytics-module) + ## Requirements * [Docker](https://www.docker.com/). @@ -170,3 +182,88 @@ The email templates are in `./backend/app/email-templates/`. Here, there are two Before continuing, ensure you have the [MJML extension](https://marketplace.visualstudio.com/items?itemName=attilabuti.vscode-mjml) installed in your VS Code. Once you have the MJML extension installed, you can create a new email template in the `src` directory. After creating the new email template and with the `.mjml` file open in your editor, open the command palette with `Ctrl+Shift+P` and search for `MJML: Export to HTML`. This will convert the `.mjml` file to a `.html` file and now you can save it in the build directory. + +## Analytics Module + +This section details the integrated analytics capabilities, designed to provide insights into application data without impacting the performance of the primary transactional database. + +### Architecture Overview + +The analytics architecture employs a dual-database approach: + +- **PostgreSQL**: Serves as the primary transactional database, handling real-time data for Users, Items, and other core application entities. +- **DuckDB with Polars**: Used as the analytical processing engine. Data is periodically moved from PostgreSQL to Parquet files, which are then queried efficiently by DuckDB. Polars is utilized for high-performance DataFrame manipulations where needed. + +This separation ensures that complex analytical queries do not overload the operational database. + +### ETL Process + +An ETL (Extract, Transform, Load) process is responsible for populating the analytical data store. + +- **Script**: `backend/app/scripts/export_to_parquet.py` +- **Purpose**: This script extracts data from the main PostgreSQL database (specifically the `User` and `Item` tables) and saves it into Parquet files (`users_analytics.parquet`, `items_analytics.parquet`). Parquet format is chosen for its efficiency in analytical workloads. +- **Usage**: The script is designed to be run periodically (e.g., as a nightly batch job or via a scheduler like cron) to update the data available for analytics. To run the script manually (ensure your Python environment with backend dependencies is active, or run within the Docker container): + ```bash + python backend/app/scripts/export_to_parquet.py + ``` +- **Output Location**: The Parquet files are stored in a directory specified by the `PARQUET_DATA_PATH` environment variable. The default location is `backend/data/parquet/`. + +### Analytics API Endpoints + +New API endpoints provide access to analytical insights. These are available under the `/api/v1/analytics` prefix: + +- **`GET /api/v1/analytics/items_by_user`**: + - **Provides**: A list of users and the total count of items they own. + - **Details**: Only includes users who own at least one item. Results are ordered by the number of items in descending order. + - **Response Model**: `List[UserItemCount]` where `UserItemCount` includes `email: str` and `item_count: int`. + +- **`GET /api/v1/analytics/active_users`**: + - **Provides**: The top 10 most active users, based on the number of items they own. + - **Details**: Users are ordered by item count in descending order. This endpoint uses a left join, so users who may not own any items could theoretically be included if the query were adjusted (currently, it effectively shows top item owners). + - **Response Model**: `List[ActiveUser]` where `ActiveUser` includes `user_id: int`, `email: str`, `full_name: str | None`, and `item_count: int`. + +These endpoints query the DuckDB instance, which reads from the Parquet files generated by the ETL script. + +### OpenTelemetry Tracing + +OpenTelemetry has been integrated into the backend for enhanced observability: + +- **Purpose**: To trace application performance and behavior, helping to identify bottlenecks and understand request flows. +- **Export**: Currently, traces are configured to be exported to the console. This is useful for development and debugging. For production, an appropriate OpenTelemetry collector and backend (e.g., Jaeger, Zipkin, Datadog) should be configured. +- **Coverage**: + - **Auto-instrumentation**: FastAPI and SQLAlchemy interactions are automatically instrumented, providing traces for API requests and database calls to the PostgreSQL database. + - **Custom Tracing**: + - The analytics module (`backend/app/core/analytics.py`) includes custom spans for DuckDB connection setup and query execution. + - The analytics API routes (`backend/app/api/routes/analytics.py`) have custom spans for their request handlers. + - The ETL script (`backend/app/scripts/export_to_parquet.py`) is instrumented with custom spans for its key operations (database extraction, Parquet file writing). + +### Key New Dependencies + +The following main dependencies were added to support the analytics features: + +- `duckdb`: An in-process analytical data management system. +- `polars`: A fast DataFrame library. +- `opentelemetry-api`: Core OpenTelemetry API. +- `opentelemetry-sdk`: OpenTelemetry SDK for configuring telemetry. +- `opentelemetry-exporter-otlp-proto-http`: OTLP exporter (though console exporter is used by default in current setup). +- `opentelemetry-instrumentation-fastapi`: Auto-instrumentation for FastAPI. +- `opentelemetry-instrumentation-sqlalchemy`: Auto-instrumentation for SQLAlchemy. +- `opentelemetry-instrumentation-psycopg2`: Auto-instrumentation for Psycopg2 (PostgreSQL driver). + +Refer to `backend/pyproject.toml` for specific versions. + +### New Configuration Options + +The following environment variables can be set (e.g., in your `.env` file) to configure the analytics and OpenTelemetry features: + +- **`PARQUET_DATA_PATH`**: + - **Description**: Specifies the directory where the ETL script saves Parquet files and where DuckDB reads them from. + - **Default**: `backend/data/parquet/` +- **`SERVICE_NAME`**: + - **Description**: Sets the service name attribute for OpenTelemetry traces. This helps in identifying and filtering traces in a distributed tracing system. + - **Default**: `fastapi-analytics-app` (Note: The ETL script appends "-etl-script" to this name for its traces). +- **`OTEL_EXPORTER_OTLP_ENDPOINT`** (Optional, for future use): + - **Description**: If you configure an OTLP exporter (e.g., for Jaeger or Prometheus), this variable would specify its endpoint URL. + - **Default**: Not set (console exporter is used by default). + +These settings are defined in `backend/app/core/config.py`. diff --git a/backend/app/api/main.py b/backend/app/api/main.py index eac18c8e8f..fa8dc18100 100644 --- a/backend/app/api/main.py +++ b/backend/app/api/main.py @@ -1,6 +1,6 @@ from fastapi import APIRouter -from app.api.routes import items, login, private, users, utils +from app.api.routes import items, login, private, users, utils, analytics from app.core.config import settings api_router = APIRouter() @@ -8,6 +8,7 @@ api_router.include_router(users.router) api_router.include_router(utils.router) api_router.include_router(items.router) +api_router.include_router(analytics.router) if settings.ENVIRONMENT == "local": diff --git a/backend/app/api/routes/analytics.py b/backend/app/api/routes/analytics.py new file mode 100644 index 0000000000..39dbdb7138 --- /dev/null +++ b/backend/app/api/routes/analytics.py @@ -0,0 +1,115 @@ +from fastapi import APIRouter, HTTPException +import polars as pl +from typing import List +from pydantic import BaseModel + +# OpenTelemetry Imports +from opentelemetry import trace +from opentelemetry.trace import Status, StatusCode # For setting span status + +# Import duckdb for its specific error type +import duckdb + +from app.core.analytics import query_duckdb, PARQUET_DATA_PATH # For logging the path, if needed +import logging # For logging + +logger = logging.getLogger(__name__) +tracer = trace.get_tracer(__name__) # Initialize OpenTelemetry Tracer + +router = APIRouter(prefix="/analytics", tags=["analytics"]) + +# Pydantic models for response structures +class UserItemCount(BaseModel): + email: str + item_count: int + +class ActiveUser(BaseModel): + user_id: int # Added user_id based on the query + email: str + full_name: str | None = None # Made full_name optional as it can be NULL + item_count: int + +@router.get("/items_by_user", response_model=List[UserItemCount]) +def get_items_by_user(): + """ + Retrieves a list of users and the count of items they own, + ordered by the number of items in descending order. + Only users who own at least one item are included. + """ + with tracer.start_as_current_span("analytics_items_by_user_handler") as span: + query = """ + SELECT u.email, COUNT(i.item_id) AS item_count + FROM users u + JOIN items i ON u.user_id = i.owner_id + GROUP BY u.email + ORDER BY item_count DESC; + """ + span.set_attribute("analytics.query", query) + try: + logger.info("Executing query for items_by_user") + df: pl.DataFrame = query_duckdb(query) # query_duckdb is already traced + result = df.to_dicts() + span.set_attribute("response.items_count", len(result)) + logger.info(f"Successfully retrieved {len(result)} records for items_by_user") + span.set_status(Status(StatusCode.OK)) + return result + except ConnectionError as e: # Specific error from get_duckdb_connection if it fails + logger.error(f"ConnectionError in /items_by_user: {e}. Ensure Parquet files exist at {PARQUET_DATA_PATH} and are readable.", exc_info=True) + span.record_exception(e) + span.set_status(Status(StatusCode.ERROR, f"Analytics service connection error: {e}")) + raise HTTPException(status_code=503, detail=f"Analytics service unavailable: Database connection failed. {e}") + except duckdb.Error as e: # Catch DuckDB specific errors + logger.error(f"DuckDB query error in /items_by_user: {e}", exc_info=True) + span.record_exception(e) + span.set_status(Status(StatusCode.ERROR, f"Analytics query error: {e}")) + raise HTTPException(status_code=500, detail=f"Analytics query failed: {e}") + except Exception as e: + logger.error(f"Unexpected error in /items_by_user: {e}", exc_info=True) + # Log the PARQUET_DATA_PATH to help diagnose if it's a file not found issue from underlying module + logger.info(f"Current PARQUET_DATA_PATH for analytics module: {PARQUET_DATA_PATH}") + span.record_exception(e) + span.set_status(Status(StatusCode.ERROR, f"Unexpected error: {e}")) + raise HTTPException(status_code=500, detail=f"An unexpected error occurred while fetching items by user. {e}") + +@router.get("/active_users", response_model=List[ActiveUser]) +def get_active_users(): + """ + Retrieves the top 10 most active users based on the number of items they own. + Users are ordered by item count in descending order. + Includes users who may not own any items (LEFT JOIN). + """ + with tracer.start_as_current_span("analytics_active_users_handler") as span: + # Query updated to match ActiveUser model: user_id, email, full_name, item_count + query = """ + SELECT u.user_id, u.email, u.full_name, COUNT(i.item_id) AS item_count + FROM users u + LEFT JOIN items i ON u.user_id = i.owner_id + GROUP BY u.user_id, u.email, u.full_name -- Group by all selected non-aggregated columns + ORDER BY item_count DESC + LIMIT 10; + """ + span.set_attribute("analytics.query", query) + try: + logger.info("Executing query for active_users") + df: pl.DataFrame = query_duckdb(query) # query_duckdb is already traced + result = df.to_dicts() + span.set_attribute("response.users_count", len(result)) + logger.info(f"Successfully retrieved {len(result)} records for active_users") + span.set_status(Status(StatusCode.OK)) + return result + except ConnectionError as e: + logger.error(f"ConnectionError in /active_users: {e}. Ensure Parquet files exist at {PARQUET_DATA_PATH} and are readable.", exc_info=True) + span.record_exception(e) + span.set_status(Status(StatusCode.ERROR, f"Analytics service connection error: {e}")) + raise HTTPException(status_code=503, detail=f"Analytics service unavailable: Database connection failed. {e}") + except duckdb.Error as e: # Catch DuckDB specific errors + logger.error(f"DuckDB query error in /active_users: {e}", exc_info=True) + span.record_exception(e) + span.set_status(Status(StatusCode.ERROR, f"Analytics query error: {e}")) + raise HTTPException(status_code=500, detail=f"Analytics query failed: {e}") + except Exception as e: + logger.error(f"Unexpected error in /active_users: {e}", exc_info=True) + logger.info(f"Current PARQUET_DATA_PATH for analytics module: {PARQUET_DATA_PATH}") + span.record_exception(e) + span.set_status(Status(StatusCode.ERROR, f"Unexpected error: {e}")) + raise HTTPException(status_code=500, detail=f"An unexpected error occurred while fetching active users. {e}") diff --git a/backend/app/core/analytics.py b/backend/app/core/analytics.py new file mode 100644 index 0000000000..9958cfee5d --- /dev/null +++ b/backend/app/core/analytics.py @@ -0,0 +1,203 @@ +import duckdb +import polars as pl +import os +import logging +from app.core.config import settings + +# OpenTelemetry Imports +from opentelemetry import trace +from opentelemetry.trace import Status, StatusCode + +# Configure logging +# logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(module)s - %(message)s') # BasicConfig is often called by main app +logger = logging.getLogger(__name__) # Use module-level logger + +# Initialize OpenTelemetry Tracer +tracer = trace.get_tracer(__name__) + +# Get PARQUET_DATA_PATH from settings, with a default if not set. +# This assumes that if run, it's from the project root, or PARQUET_DATA_PATH will be an absolute path. +DEFAULT_PARQUET_PATH = os.path.join("backend", "data", "parquet") +PARQUET_DATA_PATH = getattr(settings, "PARQUET_DATA_PATH", DEFAULT_PARQUET_PATH) + +USERS_PARQUET_PATH = os.path.join(PARQUET_DATA_PATH, "users_analytics.parquet") +ITEMS_PARQUET_PATH = os.path.join(PARQUET_DATA_PATH, "items_analytics.parquet") + +# Global DuckDB connection instance +_DUCKDB_CONNECTION: duckdb.DuckDBPyConnection | None = None + +def get_duckdb_connection() -> duckdb.DuckDBPyConnection: + """ + Establishes and returns a DuckDB connection. + If a connection doesn't exist or is closed, it creates a new one. + It also registers Parquet files as tables in DuckDB. + """ + global _DUCKDB_CONNECTION + + # Check if the connection is None or has been closed + # This initial check is outside the span, as the span is for the actual connection/setup attempt. + if _DUCKDB_CONNECTION is not None and not _DUCKDB_CONNECTION.isclosed(): + return _DUCKDB_CONNECTION + + with tracer.start_as_current_span("get_duckdb_connection_and_setup_tables") as span: + span.set_attribute("parquet.users.path", USERS_PARQUET_PATH) + span.set_attribute("parquet.items.path", ITEMS_PARQUET_PATH) + users_table_created = False + items_table_created = False + + try: + logger.info("Attempting to establish a new DuckDB connection and setup tables.") + _DUCKDB_CONNECTION = duckdb.connect(database=':memory:', read_only=False) + logger.info("Successfully connected to DuckDB (in-memory).") + span.set_attribute("duckdb.connection.status", "established") + + # Register users table + if not os.path.exists(USERS_PARQUET_PATH): + logger.error(f"Users parquet file not found at {USERS_PARQUET_PATH}. Table 'users' will not be created.") + span.set_attribute("table.users.error", "File not found") + else: + logger.info(f"Found users parquet file at {USERS_PARQUET_PATH}. Creating table 'users'.") + _DUCKDB_CONNECTION.execute(f"CREATE OR REPLACE TABLE users AS SELECT * FROM read_parquet('{USERS_PARQUET_PATH}');") + users_table_created = True + logger.info(f"Table 'users' created successfully from {USERS_PARQUET_PATH}.") + span.set_attribute("table.users.created", users_table_created) + + # Register items table + if not os.path.exists(ITEMS_PARQUET_PATH): + logger.error(f"Items parquet file not found at {ITEMS_PARQUET_PATH}. Table 'items' will not be created.") + span.set_attribute("table.items.error", "File not found") + else: + logger.info(f"Found items parquet file at {ITEMS_PARQUET_PATH}. Creating table 'items'.") + _DUCKDB_CONNECTION.execute(f"CREATE OR REPLACE TABLE items AS SELECT * FROM read_parquet('{ITEMS_PARQUET_PATH}');") + items_table_created = True + logger.info(f"Table 'items' created successfully from {ITEMS_PARQUET_PATH}.") + span.set_attribute("table.items.created", items_table_created) + + if not users_table_created and not items_table_created: + # If neither table could be created because files are missing, this might be considered an error state for the connection setup + # depending on strictness requirements. For now, logging handles it. + logger.warning("Neither users nor items table could be created due to missing Parquet files.") + # span.set_status(Status(StatusCode.ERROR, "Parquet files missing for table creation")) # Optional: set error if critical + + span.set_status(Status(StatusCode.OK)) + return _DUCKDB_CONNECTION + + except Exception as e: + logger.error(f"Failed to connect to DuckDB or register tables: {e}", exc_info=True) + if _DUCKDB_CONNECTION: + _DUCKDB_CONNECTION.close() + _DUCKDB_CONNECTION = None + span.record_exception(e) + span.set_status(Status(StatusCode.ERROR, str(e))) + raise + +def query_duckdb(query: str) -> pl.DataFrame: + """ + Executes a SQL query using DuckDB and returns the result as a Polars DataFrame. + + Args: + query: The SQL query string to execute. + + Returns: + A Polars DataFrame containing the query results. + + Raises: + Exception: If the DuckDB connection cannot be established or if the query fails. + """ + with tracer.start_as_current_span("duckdb_query") as span: + span.set_attribute("db.statement", query) + try: + connection = get_duckdb_connection() # This will create a nested span if connection needs init + if connection is None: # Should be handled by get_duckdb_connection raising an error + logger.error("Failed to get DuckDB connection for query (should have been raised by get_duckdb_connection).") + # This case should ideally not be reached if get_duckdb_connection is robust + span.set_status(Status(StatusCode.ERROR, "Failed to obtain DuckDB connection")) + raise ConnectionError("Failed to establish DuckDB connection.") + + logger.info(f"Executing DuckDB query: {query}") + result_df = connection.execute(query).pl() + logger.info(f"Query executed successfully. Result shape: {result_df.shape}") + span.set_attribute("db.rows_returned", len(result_df)) + span.set_status(Status(StatusCode.OK)) + return result_df + except duckdb.Error as e: # Catch DuckDB specific errors + logger.error(f"DuckDB error executing query '{query}': {e}", exc_info=True) + span.record_exception(e) + span.set_status(Status(StatusCode.ERROR, str(e))) + raise + except Exception as e: # Catch any other unexpected errors + logger.error(f"An unexpected error occurred while executing DuckDB query '{query}': {e}", exc_info=True) + span.record_exception(e) + span.set_status(Status(StatusCode.ERROR, str(e))) + raise + +def close_duckdb_connection(): + """ + Closes the global DuckDB connection if it's open. + Useful for application shutdown hooks. + """ + global _DUCKDB_CONNECTION + if not (_DUCKDB_CONNECTION and not _DUCKDB_CONNECTION.isclosed()): + return # Nothing to do if no connection or already closed + + with tracer.start_as_current_span("close_duckdb_connection") as span: + try: + _DUCKDB_CONNECTION.close() + logger.info("DuckDB connection closed successfully.") + span.set_status(Status(StatusCode.OK)) + except Exception as e: + logger.error(f"Error while closing DuckDB connection: {e}", exc_info=True) + span.record_exception(e) + span.set_status(Status(StatusCode.ERROR, str(e))) + # Optionally re-raise if the caller needs to know about close failures + finally: + _DUCKDB_CONNECTION = None # Ensure it's reset + +# Example usage (can be removed or kept for testing) +if __name__ == '__main__': + logger.info("Running analytics module directly for testing.") + + # Create dummy Parquet files if they don't exist for testing + # This part is for local testing of this module and might not be needed in production + os.makedirs(PARQUET_DATA_PATH, exist_ok=True) + if not os.path.exists(USERS_PARQUET_PATH): + logger.info(f"Creating dummy {USERS_PARQUET_PATH} for testing.") + pl.DataFrame({"user_id": [1, 2], "email": ["test1@example.com", "test2@example.com"]}).write_parquet(USERS_PARQUET_PATH) + + if not os.path.exists(ITEMS_PARQUET_PATH): + logger.info(f"Creating dummy {ITEMS_PARQUET_PATH} for testing.") + pl.DataFrame({"item_id": [101, 102], "owner_id": [1, 2], "title": ["Item A", "Item B"]}).write_parquet(ITEMS_PARQUET_PATH) + + try: + # Test connection and table creation + conn = get_duckdb_connection() + if conn: + logger.info("DuckDB connection obtained.") + + # Test querying users + users_df = query_duckdb("SELECT * FROM users LIMIT 5;") + logger.info("Users Data:") + logger.info(f"\n{users_df}") + + # Test querying items + items_df = query_duckdb("SELECT * FROM items LIMIT 5;") + logger.info("Items Data:") + logger.info(f"\n{items_df}") + + # Test a join query + join_query = """ + SELECT u.email, COUNT(i.item_id) as item_count + FROM users u + LEFT JOIN items i ON u.user_id = i.owner_id + GROUP BY u.email + ORDER BY item_count DESC; + """ + user_item_counts_df = query_duckdb(join_query) + logger.info("User Item Counts:") + logger.info(f"\n{user_item_counts_df}") + + except Exception as e: + logger.error(f"An error occurred during example usage: {e}") + finally: + close_duckdb_connection() + logger.info("Finished example usage and closed connection.") diff --git a/backend/app/core/config.py b/backend/app/core/config.py index d58e03c87d..aab65951b8 100644 --- a/backend/app/core/config.py +++ b/backend/app/core/config.py @@ -1,6 +1,7 @@ import secrets import warnings from typing import Annotated, Any, Literal +import os # Added for os.getenv from pydantic import ( AnyUrl, @@ -37,6 +38,10 @@ class Settings(BaseSettings): ACCESS_TOKEN_EXPIRE_MINUTES: int = 60 * 24 * 8 FRONTEND_HOST: str = "http://localhost:5173" ENVIRONMENT: Literal["local", "staging", "production"] = "local" + + # Analytics settings + PARQUET_DATA_PATH: str = os.getenv("PARQUET_DATA_PATH", "backend/data/parquet/") + SERVICE_NAME: str = os.getenv("SERVICE_NAME", "fastapi-analytics-app") BACKEND_CORS_ORIGINS: Annotated[ list[AnyUrl] | str, BeforeValidator(parse_cors) diff --git a/backend/app/main.py b/backend/app/main.py index 9a95801e74..d4bd4c6570 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -6,6 +6,19 @@ from app.api.main import api_router from app.core.config import settings +# OpenTelemetry Imports +import logging +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import ( + ConsoleSpanExporter, + SimpleSpanProcessor, +) +from opentelemetry.trace import set_tracer_provider +from opentelemetry.sdk.resources import Resource +from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor +from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor +from app.core.db import engine as db_engine # Renamed to avoid conflict with sqlalchemy.engine + def custom_generate_unique_id(route: APIRoute) -> str: return f"{route.tags[0]}-{route.name}" @@ -14,12 +27,44 @@ def custom_generate_unique_id(route: APIRoute) -> str: if settings.SENTRY_DSN and settings.ENVIRONMENT != "local": sentry_sdk.init(dsn=str(settings.SENTRY_DSN), enable_tracing=True) +# Configure OpenTelemetry +def setup_opentelemetry(app: FastAPI): + logger = logging.getLogger(__name__) + try: + SERVICE_NAME = getattr(settings, "SERVICE_NAME", "my-fastapi-app") + resource = Resource.create({"service.name": SERVICE_NAME}) + + provider = TracerProvider(resource=resource) + console_exporter = ConsoleSpanExporter() + processor = SimpleSpanProcessor(console_exporter) + provider.add_span_processor(processor) + set_tracer_provider(provider) + + logger.info(f"OpenTelemetry configured for service: {SERVICE_NAME} with ConsoleExporter") + + FastAPIInstrumentor.instrument_app(app, tracer_provider=provider) + + # Ensure db_engine.engine is the actual SQLAlchemy engine + actual_sqlalchemy_engine = getattr(db_engine, "engine", db_engine) + if actual_sqlalchemy_engine is None: # Should not happen if db_engine is correctly initialized + logger.error("SQLAlchemy engine not found for OpenTelemetry instrumentation.") + return + + SQLAlchemyInstrumentor().instrument(engine=actual_sqlalchemy_engine, tracer_provider=provider) + logger.info("FastAPI and SQLAlchemy instrumented for OpenTelemetry.") + + except Exception as e: + logger.error(f"Failed to configure OpenTelemetry: {e}", exc_info=True) + app = FastAPI( title=settings.PROJECT_NAME, openapi_url=f"{settings.API_V1_STR}/openapi.json", generate_unique_id_function=custom_generate_unique_id, ) +# Call OpenTelemetry setup after app creation +setup_opentelemetry(app) + # Set all CORS enabled origins if settings.all_cors_origins: app.add_middleware( diff --git a/backend/app/scripts/export_to_parquet.py b/backend/app/scripts/export_to_parquet.py new file mode 100644 index 0000000000..64cc9b4bd4 --- /dev/null +++ b/backend/app/scripts/export_to_parquet.py @@ -0,0 +1,250 @@ +import os +import logging +import pandas as pd +from sqlalchemy import create_engine, text +# Assuming models are accessible like this. If not, this import might need adjustment or removal if not used. +# from app.models import User, Item +from app.core.config import settings + +# OpenTelemetry Imports +from opentelemetry import trace +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor +from opentelemetry.sdk.resources import Resource +from opentelemetry.trace import Status, StatusCode + +# Setup Logging +# Using logging.getLogger for better module-level logging control +logger = logging.getLogger(__name__) +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(name)s - %(message)s') + +# Define Output Directory +OUTPUT_DIR = "backend/data/parquet/" + +# Global tracer variable, will be initialized by setup_otel_for_script +tracer: trace.Tracer | None = None + +# Define Output Directory (can be overridden by settings.PARQUET_DATA_PATH if available) +# This remains as the default if settings does not provide PARQUET_DATA_PATH +# For testing, we'll primarily mock settings or the OUTPUT_DIR itself. +DEFAULT_OUTPUT_DIR = "backend/data/parquet/" +OUTPUT_DIR = DEFAULT_OUTPUT_DIR # Initialize with default + +def get_output_dir(): + """Gets the output directory, prioritizing settings if available.""" + global OUTPUT_DIR + # In a real app, settings might be more complexly loaded, but for the script's context: + # We assume 'settings' is available and has PARQUET_DATA_PATH. + # The original script's analytics module used: + # PARQUET_DATA_PATH = getattr(settings, "PARQUET_DATA_PATH", DEFAULT_PARQUET_PATH) + # We'll adapt this for the script's OUTPUT_DIR. + # However, the script already uses a global OUTPUT_DIR. Let's ensure it's configurable for tests. + # For now, we'll assume tests will patch 'export_to_parquet.OUTPUT_DIR' or 'settings.PARQUET_DATA_PATH'. + # The script itself doesn't dynamically update OUTPUT_DIR from settings post-init. + # Let's refine this to make it more testable by having create_output_directory use a path. + return getattr(settings, "PARQUET_DATA_PATH", DEFAULT_OUTPUT_DIR) + + +def create_output_directory(): + """Creates the output directory if it doesn't exist.""" + current_output_dir = get_output_dir() + """Creates the output directory if it doesn't exist.""" + # This function could also be traced if it involved more complex operations or I/O + if not os.path.exists(current_output_dir): + os.makedirs(current_output_dir) + logger.info(f"Created output directory: {current_output_dir}") + +def get_db_engine(): + """Creates and returns a SQLAlchemy engine.""" + # Ensuring tracer is available if this function is called outside main context (though unlikely for this script) + global tracer + if not tracer: # Fallback if not initialized, though it should be by main + tracer = trace.get_tracer(__name__) + + with tracer.start_as_current_span("create_db_engine") as span: + try: + engine = create_engine(str(settings.SQLALCHEMY_DATABASE_URI)) + span.set_attribute("db.system", "postgresql") # Example attribute + span.set_attribute("db.uri", str(settings.SQLALCHEMY_DATABASE_URI).split('@')[1] if '@' in str(settings.SQLALCHEMY_DATABASE_URI) else "uri_hidden") + logger.info("Database engine created.") + span.set_status(Status(StatusCode.OK)) + return engine + except Exception as e: + logger.error(f"Error creating database engine: {e}") + if span: + span.record_exception(e) + span.set_status(Status(StatusCode.ERROR, str(e))) + raise + +def export_users_data(engine): + """Exports users data to a Parquet file.""" + global tracer + if not tracer: tracer = trace.get_tracer(__name__) + + query = text('SELECT id AS user_id, email, is_active, is_superuser, full_name FROM "user";') + current_output_dir = get_output_dir() + output_path = os.path.join(current_output_dir, "users_analytics.parquet") + + with tracer.start_as_current_span("export_users_to_parquet") as parent_span: + try: + df = None + with tracer.start_as_current_span("query_users_from_db", links=[trace.Link(parent_span.get_context())]) as query_span: + try: + with engine.connect() as connection: + df = pd.read_sql_query(query, con=connection) + query_span.set_attribute("db.statement", str(query)) + query_span.set_attribute("data.rows.queried", len(df) if df is not None else 0) + query_span.set_status(Status(StatusCode.OK)) + logger.info(f"Successfully queried users data. Found {len(df) if df is not None else 0} rows.") + except Exception as e_query: + logger.error(f"Error querying users data from DB: {e_query}") + query_span.record_exception(e_query) + query_span.set_status(Status(StatusCode.ERROR, str(e_query))) + raise # Propagate to parent span's error handling + + if df is not None: # Proceed only if data query was successful + with tracer.start_as_current_span("write_users_to_parquet", links=[trace.Link(parent_span.get_context())]) as write_span: + try: + df.to_parquet(output_path, index=False) + write_span.set_attribute("parquet.file.path", output_path) + write_span.set_attribute("data.rows.written", len(df)) + logger.info(f"Successfully exported users data to {output_path}") + write_span.set_status(Status(StatusCode.OK)) + except Exception as e_write: + logger.error(f"Error writing users data to Parquet: {e_write}") + write_span.record_exception(e_write) + write_span.set_status(Status(StatusCode.ERROR, str(e_write))) + raise # Propagate to parent span's error handling + + parent_span.set_attribute("parquet.file.path", output_path) + parent_span.set_attribute("data.rows.count", len(df) if df is not None else 0) + parent_span.set_status(Status(StatusCode.OK)) + + except Exception as e: + logger.error(f"Error exporting users data: {e}") + parent_span.record_exception(e) + parent_span.set_status(Status(StatusCode.ERROR, str(e))) + # Not re-raising here as the main loop handles logging general errors + +def export_items_data(engine): + """Exports items data to a Parquet file.""" + global tracer + if not tracer: tracer = trace.get_tracer(__name__) + + query = text('SELECT id AS item_id, owner_id, title, description FROM item;') + current_output_dir = get_output_dir() + output_path = os.path.join(current_output_dir, "items_analytics.parquet") + + with tracer.start_as_current_span("export_items_to_parquet") as parent_span: + try: + df = None + with tracer.start_as_current_span("query_items_from_db", links=[trace.Link(parent_span.get_context())]) as query_span: + try: + with engine.connect() as connection: + df = pd.read_sql_query(query, con=connection) + query_span.set_attribute("db.statement", str(query)) + query_span.set_attribute("data.rows.queried", len(df) if df is not None else 0) + query_span.set_status(Status(StatusCode.OK)) + logger.info(f"Successfully queried items data. Found {len(df) if df is not None else 0} rows.") + except Exception as e_query: + logger.error(f"Error querying items data from DB: {e_query}") + query_span.record_exception(e_query) + query_span.set_status(Status(StatusCode.ERROR, str(e_query))) + raise + + if df is not None: + with tracer.start_as_current_span("write_items_to_parquet", links=[trace.Link(parent_span.get_context())]) as write_span: + try: + df.to_parquet(output_path, index=False) + write_span.set_attribute("parquet.file.path", output_path) + write_span.set_attribute("data.rows.written", len(df)) + logger.info(f"Successfully exported items data to {output_path}") + write_span.set_status(Status(StatusCode.OK)) + except Exception as e_write: + logger.error(f"Error writing items data to Parquet: {e_write}") + write_span.record_exception(e_write) + write_span.set_status(Status(StatusCode.ERROR, str(e_write))) + raise + + parent_span.set_attribute("parquet.file.path", output_path) + parent_span.set_attribute("data.rows.count", len(df) if df is not None else 0) + parent_span.set_status(Status(StatusCode.OK)) + + except Exception as e: + logger.error(f"Error exporting items data: {e}") + parent_span.record_exception(e) + parent_span.set_status(Status(StatusCode.ERROR, str(e))) + # Not re-raising here + +def setup_otel_for_script(): + """Configures OpenTelemetry for the script if not already configured by main app.""" + global tracer + # Check if a global tracer provider is already set (e.g., by the main FastAPI app if script is imported) + # This simple check might need to be more robust in complex scenarios. + if trace.get_tracer_provider().__class__.__name__ == 'ProxyTracerProvider': # Default if no SDK provider is set + logger.info("Configuring OpenTelemetry for script execution context.") + resource = Resource.create({"service.name": settings.SERVICE_NAME + "-etl-script"}) # Use SERVICE_NAME from settings + provider = TracerProvider(resource=resource) + console_exporter = ConsoleSpanExporter() # For script output, console is fine + processor = SimpleSpanProcessor(console_exporter) + provider.add_span_processor(processor) + trace.set_tracer_provider(provider) + tracer = trace.get_tracer("etl-script.opentelemetry") # Specific tracer name + else: + logger.info("OpenTelemetry already configured. Using existing provider.") + tracer = trace.get_tracer("etl-script.opentelemetry.existing_provider") + + +def run_etl_operations(db_engine_instance): + """Runs the main ETL operations: exporting users and items data.""" + # This function assumes tracer is initialized by setup_otel_for_script or already available + global tracer + if not tracer: # Should be initialized by setup_otel_for_script + logger.warning("Tracer not initialized prior to run_etl_operations. OTEL setup might be missing.") + tracer = trace.get_tracer("etl-script.opentelemetry.fallback") + + with tracer.start_as_current_span("run_etl_script_operations") as etl_ops_span: + try: + if db_engine_instance: + export_users_data(db_engine_instance) + export_items_data(db_engine_instance) + etl_ops_span.set_status(Status(StatusCode.OK)) + else: + logger.error("Database engine not provided to run_etl_operations.") + etl_ops_span.set_status(Status(StatusCode.ERROR, "DB engine missing")) + except Exception as e: + logger.error(f"Error during ETL operations: {e}", exc_info=True) + etl_ops_span.record_exception(e) + etl_ops_span.set_status(Status(StatusCode.ERROR, str(e))) + raise # Re-raise to be caught by main_etl_flow if needed + +def main_etl_flow(): + """Main flow for the ETL script, including OTEL setup and cleanup.""" + setup_otel_for_script() # Initialize OpenTelemetry + global tracer # Ensure we're using the initialized tracer + + with tracer.start_as_current_span("etl_process_to_parquet") as main_script_span: + db_engine = None + try: + create_output_directory() + db_engine = get_db_engine() # get_db_engine has its own span + run_etl_operations(db_engine) # Core logic wrapped in its own span + main_script_span.set_status(Status(StatusCode.OK)) + except Exception as e: + logger.error(f"An error occurred during the main ETL flow: {e}", exc_info=True) + main_script_span.record_exception(e) + main_script_span.set_status(Status(StatusCode.ERROR, str(e))) + finally: + if db_engine: + with tracer.start_as_current_span("dispose_db_engine_main_flow", links=[trace.Link(main_script_span.get_context())]) as dispose_span: + try: + db_engine.dispose() + logger.info("Database engine disposed.") + dispose_span.set_status(Status(StatusCode.OK)) + except Exception as e_dispose: + logger.error(f"Error disposing database engine: {e_dispose}", exc_info=True) + dispose_span.record_exception(e_dispose) + dispose_span.set_status(Status(StatusCode.ERROR, str(e_dispose))) + +if __name__ == "__main__": + main_etl_flow() diff --git a/backend/app/tests/api/routes/test_analytics.py b/backend/app/tests/api/routes/test_analytics.py new file mode 100644 index 0000000000..a93b4c4736 --- /dev/null +++ b/backend/app/tests/api/routes/test_analytics.py @@ -0,0 +1,133 @@ +from unittest.mock import patch, MagicMock +import pytest +import polars as pl +from fastapi.testclient import TestClient + +# Assuming your main FastAPI app instance is in backend.app.main +from backend.app.main import app +# Import duckdb for duckdb.Error +import duckdb + +# Define the API prefix from settings or main router, if applicable. +# For these tests, we'll assume API_V1_STR is /api/v1 as is common. +API_V1_STR = "/api/v1" +ANALYTICS_ENDPOINT_PREFIX = f"{API_V1_STR}/analytics" + +@pytest.fixture(scope="module") +def client(): + with TestClient(app) as c: + yield c + +@pytest.fixture(autouse=True) # Applied to all tests in this module +def mock_otel_tracer_api(monkeypatch): + # Mock the tracer used in analytics API routes to avoid actual tracing + # This path should point to the tracer instance in backend.app.api.routes.analytics + mock_tracer = MagicMock() + mock_span = MagicMock() # Mock for the span object + mock_span.get_context.return_value = MagicMock() # Mock context for Link if used by tracer + mock_tracer.start_as_current_span.return_value.__enter__.return_value = mock_span + monkeypatch.setattr("backend.app.api.routes.analytics.tracer", mock_tracer) + +# --- Tests for /analytics/items_by_user --- + +def test_get_items_by_user_success(client): + sample_df = pl.DataFrame({ + "email": ["user1@example.com", "user2@example.com"], + "item_count": [10, 5] + }) + expected_json = [ + {"email": "user1@example.com", "item_count": 10}, + {"email": "user2@example.com", "item_count": 5} + ] + # The SQL query from the actual route implementation + expected_query = """ + SELECT u.email, COUNT(i.item_id) AS item_count + FROM users u + JOIN items i ON u.user_id = i.owner_id + GROUP BY u.email + ORDER BY item_count DESC; + """ + + with patch("backend.app.api.routes.analytics.query_duckdb", return_value=sample_df) as mock_query: + response = client.get(f"{ANALYTICS_ENDPOINT_PREFIX}/items_by_user") + assert response.status_code == 200 + assert response.json() == expected_json + mock_query.assert_called_once_with(expected_query) + +def test_get_items_by_user_connection_error(client): + with patch("backend.app.api.routes.analytics.query_duckdb", side_effect=ConnectionError("Test connection error")) as mock_query: + response = client.get(f"{ANALYTICS_ENDPOINT_PREFIX}/items_by_user") + assert response.status_code == 503 + json_response = response.json() + assert "Analytics service unavailable: Database connection failed." in json_response["detail"] + assert "Test connection error" in json_response["detail"] + + +def test_get_items_by_user_duckdb_error(client): + with patch("backend.app.api.routes.analytics.query_duckdb", side_effect=duckdb.Error("Test DB error")) as mock_query: + response = client.get(f"{ANALYTICS_ENDPOINT_PREFIX}/items_by_user") + assert response.status_code == 500 + json_response = response.json() + assert "Analytics query failed: Test DB error" in json_response["detail"] + +def test_get_items_by_user_generic_exception(client): + with patch("backend.app.api.routes.analytics.query_duckdb", side_effect=Exception("Test generic error")) as mock_query: + response = client.get(f"{ANALYTICS_ENDPOINT_PREFIX}/items_by_user") + assert response.status_code == 500 + json_response = response.json() + assert "An unexpected error occurred while fetching items by user." in json_response["detail"] + assert "Test generic error" in json_response["detail"] + +# --- Tests for /analytics/active_users --- + +def test_get_active_users_success(client): + sample_df = pl.DataFrame({ + "user_id": [1, 2], + "email": ["active1@example.com", "active2@example.com"], + "full_name": ["Active User One", "Active User Two"], + "item_count": [20, 15] + }) + expected_json = [ + {"user_id": 1, "email": "active1@example.com", "full_name": "Active User One", "item_count": 20}, + {"user_id": 2, "email": "active2@example.com", "full_name": "Active User Two", "item_count": 15} + ] + # The SQL query from the actual route implementation + expected_query = """ + SELECT u.user_id, u.email, u.full_name, COUNT(i.item_id) AS item_count + FROM users u + LEFT JOIN items i ON u.user_id = i.owner_id + GROUP BY u.user_id, u.email, u.full_name -- Group by all selected non-aggregated columns + ORDER BY item_count DESC + LIMIT 10; + """ + + with patch("backend.app.api.routes.analytics.query_duckdb", return_value=sample_df) as mock_query: + response = client.get(f"{ANALYTICS_ENDPOINT_PREFIX}/active_users") + assert response.status_code == 200 + assert response.json() == expected_json + mock_query.assert_called_once_with(expected_query) + +def test_get_active_users_connection_error(client): + with patch("backend.app.api.routes.analytics.query_duckdb", side_effect=ConnectionError("Test connection error")) as mock_query: + response = client.get(f"{ANALYTICS_ENDPOINT_PREFIX}/active_users") + assert response.status_code == 503 + json_response = response.json() + assert "Analytics service unavailable: Database connection failed." in json_response["detail"] + assert "Test connection error" in json_response["detail"] + +def test_get_active_users_duckdb_error(client): + with patch("backend.app.api.routes.analytics.query_duckdb", side_effect=duckdb.Error("Test DB error")) as mock_query: + response = client.get(f"{ANALYTICS_ENDPOINT_PREFIX}/active_users") + assert response.status_code == 500 + json_response = response.json() + assert "Analytics query failed: Test DB error" in json_response["detail"] + +def test_get_active_users_generic_exception(client): + with patch("backend.app.api.routes.analytics.query_duckdb", side_effect=Exception("Test generic error")) as mock_query: + response = client.get(f"{ANALYTICS_ENDPOINT_PREFIX}/active_users") + assert response.status_code == 500 + json_response = response.json() + assert "An unexpected error occurred while fetching active users." in json_response["detail"] + assert "Test generic error" in json_response["detail"] + +``` diff --git a/backend/app/tests/core/test_analytics.py b/backend/app/tests/core/test_analytics.py new file mode 100644 index 0000000000..ece59690b7 --- /dev/null +++ b/backend/app/tests/core/test_analytics.py @@ -0,0 +1,239 @@ +from unittest.mock import patch, MagicMock, call +import pytest +import polars as pl +from polars.testing import assert_frame_equal +import duckdb # For duckdb.Error and spec for mock connection +import os +import logging # For checking log messages + +# Functions/objects to test from the script +from backend.app.core import analytics +from backend.app.core.analytics import get_duckdb_connection, query_duckdb, close_duckdb_connection, _DUCKDB_CONNECTION as SCRIPT_DUCKDB_CONNECTION + +# Fixture to automatically mock OpenTelemetry tracer for all tests in this module +@pytest.fixture(autouse=True) +def mock_otel_tracer(monkeypatch): + """Mocks the OpenTelemetry tracer used in analytics.py to prevent actual tracing.""" + mock_tracer_instance = MagicMock() + mock_span = MagicMock() + mock_tracer_instance.start_as_current_span.return_value.__enter__.return_value = mock_span + monkeypatch.setattr(analytics, "tracer", mock_tracer_instance) + +@pytest.fixture +def mock_settings_analytics(monkeypatch): + """Mocks settings for analytics.py, specifically PARQUET_DATA_PATH.""" + mock_path = "/mocked/data/parquet" + monkeypatch.setattr(analytics.settings, "PARQUET_DATA_PATH", mock_path) + + # Since USERS_PARQUET_PATH and ITEMS_PARQUET_PATH are module-level variables + # derived from settings.PARQUET_DATA_PATH at import time, we need to update them too. + analytics.USERS_PARQUET_PATH = os.path.join(mock_path, "users_analytics.parquet") + analytics.ITEMS_PARQUET_PATH = os.path.join(mock_path, "items_analytics.parquet") + + # Yield the path for assertions if needed, though settings are directly patched + yield {"PARQUET_DATA_PATH": mock_path} + + +@pytest.fixture(autouse=True) # Ensure this runs for every test to reset global state +def reset_global_duckdb_connection(monkeypatch): + """Resets the global _DUCKDB_CONNECTION in analytics.py before and after each test.""" + # Reset before the test + monkeypatch.setattr(analytics, "_DUCKDB_CONNECTION", None) + yield + # Reset after the test (optional, as next test will reset anyway, but good for cleanup) + monkeypatch.setattr(analytics, "_DUCKDB_CONNECTION", None) + + +# --- Tests for get_duckdb_connection --- + +def test_get_duckdb_connection_success_and_table_creation(mock_settings_analytics, monkeypatch): + """ + Tests successful DuckDB connection and registration of Parquet files as tables. + """ + mock_duckdb_conn_instance = MagicMock(spec=duckdb.DuckDBPyConnection) + mock_duckdb_conn_instance.isclosed.return_value = False # Ensure it's not seen as closed if checked + + mock_duckdb_connect = MagicMock(return_value=mock_duckdb_conn_instance) + monkeypatch.setattr(analytics.duckdb, "connect", mock_duckdb_connect) + + mock_os_path_exists = MagicMock(return_value=True) # Simulate Parquet files exist + monkeypatch.setattr(analytics.os.path, "exists", mock_os_path_exists) + + # Call the function + conn = get_duckdb_connection() + + # Assertions + mock_duckdb_connect.assert_called_once_with(database=':memory:', read_only=False) + + expected_execute_calls = [ + call(f"CREATE OR REPLACE TABLE users AS SELECT * FROM read_parquet('{analytics.USERS_PARQUET_PATH}');"), + call(f"CREATE OR REPLACE TABLE items AS SELECT * FROM read_parquet('{analytics.ITEMS_PARQUET_PATH}');") + ] + mock_duckdb_conn_instance.execute.assert_has_calls(expected_execute_calls, any_order=False) + + assert conn == mock_duckdb_conn_instance, "Returned connection should be the mock connection." + # Access the global variable directly from the analytics module for assertion + assert analytics._DUCKDB_CONNECTION == mock_duckdb_conn_instance, "Global connection variable not set." + + +def test_get_duckdb_connection_parquet_files_not_found(mock_settings_analytics, monkeypatch, caplog): + """ + Tests behavior when Parquet files are not found during connection setup. + """ + mock_duckdb_conn_instance = MagicMock(spec=duckdb.DuckDBPyConnection) + mock_duckdb_conn_instance.isclosed.return_value = False + + mock_duckdb_connect = MagicMock(return_value=mock_duckdb_conn_instance) + monkeypatch.setattr(analytics.duckdb, "connect", mock_duckdb_connect) + + mock_os_path_exists = MagicMock(return_value=False) # Simulate Parquet files DO NOT exist + monkeypatch.setattr(analytics.os.path, "exists", mock_os_path_exists) + + # Capture logs + caplog.set_level(logging.ERROR, logger="backend.app.core.analytics") + + # Call the function + conn = get_duckdb_connection() + + # Assertions + mock_duckdb_connect.assert_called_once_with(database=':memory:', read_only=False) + + # Check that execute was NOT called for table creation + for exec_call in mock_duckdb_conn_instance.execute.call_args_list: + args, _ = exec_call + assert "CREATE OR REPLACE TABLE" not in args[0] + + # Check logs for error messages about missing files + assert f"Users parquet file not found at {analytics.USERS_PARQUET_PATH}" in caplog.text + assert f"Items parquet file not found at {analytics.ITEMS_PARQUET_PATH}" in caplog.text + + assert conn == mock_duckdb_conn_instance, "Should still return a connection object." + assert analytics._DUCKDB_CONNECTION == mock_duckdb_conn_instance + + +def test_get_duckdb_connection_reuse(mock_settings_analytics, monkeypatch): + """ + Tests that an existing connection is reused on subsequent calls. + """ + mock_duckdb_conn_instance = MagicMock(spec=duckdb.DuckDBPyConnection) + mock_duckdb_conn_instance.isclosed.return_value = False + + mock_duckdb_connect = MagicMock(return_value=mock_duckdb_conn_instance) + monkeypatch.setattr(analytics.duckdb, "connect", mock_duckdb_connect) + + mock_os_path_exists = MagicMock(return_value=True) + monkeypatch.setattr(analytics.os.path, "exists", mock_os_path_exists) + + # First call - should create connection + conn1 = get_duckdb_connection() + mock_duckdb_connect.assert_called_once() + assert analytics._DUCKDB_CONNECTION == mock_duckdb_conn_instance + + # Second call - should reuse existing connection + conn2 = get_duckdb_connection() + mock_duckdb_connect.assert_called_once() # Still called only once + assert conn2 == conn1 + + +# --- Tests for query_duckdb --- + +def test_query_duckdb_successful(mock_settings_analytics, monkeypatch): + """ + Tests successful query execution by query_duckdb. + """ + mock_conn = MagicMock(spec=duckdb.DuckDBPyConnection) + sample_df = pl.DataFrame({"id": [1, 2], "data": ["a", "b"]}) + # Mock the chain: connection.execute("query").pl() + mock_conn.execute.return_value.pl.return_value = sample_df + + # Patch get_duckdb_connection to return our mock connection + monkeypatch.setattr(analytics, "get_duckdb_connection", MagicMock(return_value=mock_conn)) + + query_str = "SELECT * FROM users_table;" + result_df = query_duckdb(query_str) + + analytics.get_duckdb_connection.assert_called_once() + mock_conn.execute.assert_called_once_with(query_str) + assert_frame_equal(result_df, sample_df) + + +def test_query_duckdb_execution_failure(mock_settings_analytics, monkeypatch): + """ + Tests query execution failure handling in query_duckdb. + """ + mock_conn = MagicMock(spec=duckdb.DuckDBPyConnection) + # Simulate a DuckDB error on execute + mock_conn.execute.side_effect = duckdb.Error("Simulated DuckDB query error") + + monkeypatch.setattr(analytics, "get_duckdb_connection", MagicMock(return_value=mock_conn)) + + query_str = "SELECT * FROM non_existent_table;" + + with pytest.raises(duckdb.Error, match="Simulated DuckDB query error"): + query_duckdb(query_str) + + analytics.get_duckdb_connection.assert_called_once() + mock_conn.execute.assert_called_once_with(query_str) + + +# --- Tests for close_duckdb_connection --- + +def test_close_duckdb_connection_active_connection(monkeypatch): + """ + Tests closing an active DuckDB connection. + """ + mock_conn_to_close = MagicMock(spec=duckdb.DuckDBPyConnection) + mock_conn_to_close.isclosed.return_value = False + + # Set the global connection to our mock + monkeypatch.setattr(analytics, "_DUCKDB_CONNECTION", mock_conn_to_close) + + close_duckdb_connection() + + mock_conn_to_close.close.assert_called_once() + assert analytics._DUCKDB_CONNECTION is None, "Global connection should be reset to None." + + +def test_close_duckdb_connection_no_connection(monkeypatch): + """ + Tests close_duckdb_connection when there's no active connection. + """ + # Ensure global connection is None (which it should be due to reset_global_duckdb_connection) + assert analytics._DUCKDB_CONNECTION is None + + # Call close - it should do nothing and not raise errors + try: + close_duckdb_connection() + except Exception as e: + pytest.fail(f"close_duckdb_connection raised an exception unexpectedly: {e}") + + +def test_close_duckdb_connection_already_closed(monkeypatch): + """ + Tests close_duckdb_connection when the connection is already closed. + """ + mock_conn_already_closed = MagicMock(spec=duckdb.DuckDBPyConnection) + mock_conn_already_closed.isclosed.return_value = True # Simulate it's already closed + + monkeypatch.setattr(analytics, "_DUCKDB_CONNECTION", mock_conn_already_closed) + + close_duckdb_connection() + + # The close method on the mock connection should NOT be called if the real logic checks isclosed() first. + # The current analytics.py code is: `if _DUCKDB_CONNECTION and not _DUCKDB_CONNECTION.isclosed():` + # So, .close() should not be called. + mock_conn_already_closed.close.assert_not_called() + # _DUCKDB_CONNECTION is set to None regardless in the finally block of the original close_duckdb_connection + # However, the span logic in the provided analytics.py means it returns if not (_DUCKDB_CONNECTION and not _DUCKDB_CONNECTION.isclosed()) + # So, if it's already closed, it does nothing, and _DUCKDB_CONNECTION is not set to None. + # This depends on the exact implementation in analytics.py. + # The provided analytics.py has: + # `if not (_DUCKDB_CONNECTION and not _DUCKDB_CONNECTION.isclosed()): return` + # This means if it's already closed, _DUCKDB_CONNECTION is NOT reset to None by close_duckdb_connection. + # It's only reset if it *was* open and then an attempt to close it was made. + # Let's adjust the assertion based on the provided code. + assert analytics._DUCKDB_CONNECTION == mock_conn_already_closed, "Global connection should not be reset if already closed." + # If the requirement is that it *should* be reset, the analytics.py code would need adjustment. + # For this test, we test the *current* behavior. + # The `reset_global_duckdb_connection` fixture will clean it up for the next test. +``` diff --git a/backend/app/tests/scripts/test_export_to_parquet.py b/backend/app/tests/scripts/test_export_to_parquet.py new file mode 100644 index 0000000000..064a4453aa --- /dev/null +++ b/backend/app/tests/scripts/test_export_to_parquet.py @@ -0,0 +1,286 @@ +import pytest +from unittest.mock import patch, MagicMock, call +import pandas as pd +import os +from sqlalchemy.exc import SQLAlchemyError # For simulating DB errors + +# Functions/objects to test from the script +from backend.app.scripts.export_to_parquet import ( + main_etl_flow, + create_output_directory, + get_db_engine, + export_users_data, + export_items_data, + setup_otel_for_script, + run_etl_operations, + # Import settings to allow monkeypatching its attributes if used by the script directly for config + settings as script_settings +) + +# Mock the opentelemetry trace module entirely for most tests to avoid side effects +# unless specific OTEL functionality is being tested. +@pytest.fixture(autouse=True) +def mock_otel_trace(): + with patch('backend.app.scripts.export_to_parquet.trace') as mock_trace_module: + # Setup a mock tracer and span that can be used by the script without error + mock_span = MagicMock() + mock_span.get_context.return_value = MagicMock() # Mock context for Link + mock_tracer_instance = MagicMock() + mock_tracer_instance.start_as_current_span.return_value.__enter__.return_value = mock_span + mock_trace_module.get_tracer.return_value = mock_tracer_instance + mock_trace_module.get_tracer_provider.return_value.__class__.__name__ = 'ProxyTracerProvider' # Default for test_otel_setup_when_no_provider + yield mock_trace_module + +@pytest.fixture +def mock_db_engine_fixture(): + """Provides a MagicMock for the SQLAlchemy engine.""" + return MagicMock() + +@pytest.fixture +def mock_pandas_fixture(): + """Provides MagicMocks for pandas operations.""" + with patch('backend.app.scripts.export_to_parquet.pd.read_sql_query') as mock_read_sql, \ + patch('backend.app.scripts.export_to_parquet.pd.DataFrame.to_parquet') as mock_to_parquet: + yield { + "read_sql_query": mock_read_sql, + "to_parquet": mock_to_parquet + } + +@pytest.fixture +def mock_os_ops_fixture(): + """Provides MagicMocks for os operations.""" + with patch('backend.app.scripts.export_to_parquet.os.path.exists') as mock_exists, \ + patch('backend.app.scripts.export_to_parquet.os.makedirs') as mock_makedirs: + yield { + "exists": mock_exists, + "makedirs": mock_makedirs + } + +@pytest.fixture +def test_output_dir(tmp_path): + """Creates a temporary output directory for tests.""" + return tmp_path / "test_parquet_exports" + +# This fixture will patch the settings object used by the script. +@pytest.fixture +def patched_script_settings(monkeypatch, test_output_dir): + # Patch settings used by the script to control output path for tests + monkeypatch.setattr(script_settings, "SQLALCHEMY_DATABASE_URI", "sqlite:///:memory:dummy") + monkeypatch.setattr(script_settings, "PARQUET_DATA_PATH", str(test_output_dir)) + monkeypatch.setattr(script_settings, "SERVICE_NAME", "test-service") + return script_settings + +# --- Test Cases --- + +def test_successful_export( + patched_script_settings, + mock_db_engine_fixture, + mock_pandas_fixture, + mock_os_ops_fixture, + mock_otel_trace # Ensure OTEL is properly mocked +): + """Test the successful export of users and items data.""" + # Arrange + mock_os_ops_fixture["exists"].return_value = True # Assume directory exists + + sample_users_df = pd.DataFrame({'user_id': [1], 'email': ['user@example.com']}) + sample_items_df = pd.DataFrame({'item_id': [101], 'owner_id': [1]}) + + def read_sql_side_effect(query_text, con): + query_str = str(query_text).lower() # Convert query object to string + if 'from "user"' in query_str: + return sample_users_df + elif 'from item' in query_str: + return sample_items_df + return pd.DataFrame() + mock_pandas_fixture["read_sql_query"].side_effect = read_sql_side_effect + + # Patch get_db_engine to return our mock engine + with patch('backend.app.scripts.export_to_parquet.get_db_engine', return_value=mock_db_engine_fixture): + # Act + main_etl_flow() + + # Assert + # Check create_engine was called (implicitly via get_db_engine being called by main_etl_flow) + # get_db_engine itself is patched, so we check its usage. + # If we wanted to check create_engine inside get_db_engine, we'd need a different patch approach. + + assert mock_pandas_fixture["read_sql_query"].call_count == 2 + + # Check to_parquet calls + assert mock_pandas_fixture["to_parquet"].call_count == 2 + + # Check users export + first_call_df_arg = mock_pandas_fixture["to_parquet"].call_args_list[0][0][0] + first_call_path_arg = mock_pandas_fixture["to_parquet"].call_args_list[0][1] + pd.testing.assert_frame_equal(first_call_df_arg, sample_users_df) + assert str(patched_script_settings.PARQUET_DATA_PATH / "users_analytics.parquet") == first_call_path_arg + + # Check items export + second_call_df_arg = mock_pandas_fixture["to_parquet"].call_args_list[1][0][0] + second_call_path_arg = mock_pandas_fixture["to_parquet"].call_args_list[1][1] + pd.testing.assert_frame_equal(second_call_df_arg, sample_items_df) + assert str(patched_script_settings.PARQUET_DATA_PATH / "items_analytics.parquet") == second_call_path_arg + + mock_os_ops_fixture["makedirs"].assert_not_called() + + +def test_directory_creation( + patched_script_settings, + mock_os_ops_fixture, + mock_otel_trace # Ensure OTEL is properly mocked +): + """Test that os.makedirs is called when the output directory does not exist.""" + # Arrange + mock_os_ops_fixture["exists"].return_value = False # Simulate directory does NOT exist + + # Act + # Call create_output_directory directly, or call main_etl_flow and check as part of it. + # Let's test create_output_directory directly for this specific unit. + create_output_directory() + + # Assert + mock_os_ops_fixture["makedirs"].assert_called_once_with(str(patched_script_settings.PARQUET_DATA_PATH)) + + # Test via main_etl_flow (integration check) + mock_os_ops_fixture["exists"].return_value = False # Reset for this part + mock_os_ops_fixture["makedirs"].reset_mock() + + # Mock away deeper calls to avoid full execution, focus on directory creation part of main flow + with patch('backend.app.scripts.export_to_parquet.get_db_engine', return_value=None), \ + patch('backend.app.scripts.export_to_parquet.run_etl_operations', return_value=None): + main_etl_flow() + mock_os_ops_fixture["makedirs"].assert_called_once_with(str(patched_script_settings.PARQUET_DATA_PATH)) + + +def test_db_query_failure( + patched_script_settings, + mock_db_engine_fixture, + mock_pandas_fixture, + mock_os_ops_fixture, + caplog, # Pytest fixture to capture logs + mock_otel_trace +): + """Test handling of a database query failure.""" + # Arrange + mock_os_ops_fixture["exists"].return_value = True + mock_pandas_fixture["read_sql_query"].side_effect = SQLAlchemyError("Simulated DB query error") + + with patch('backend.app.scripts.export_to_parquet.get_db_engine', return_value=mock_db_engine_fixture): + # Act + # We expect main_etl_flow to catch the exception from run_etl_operations, which catches from export_... + main_etl_flow() + + # Assert + # Check that read_sql_query was attempted (e.g., for users) + mock_pandas_fixture["read_sql_query"].assert_called() + # Check that to_parquet was not called for the failed part (or at all if it stops on first error) + # Depending on error handling strategy (e.g., if it tries items after users fail) + # Current script tries to export users, then items. If users query fails, items export might still be attempted. + # The current script's export_users_data and export_items_data log errors but don't re-raise to stop run_etl_operations, + # run_etl_operations re-raises, and main_etl_flow catches and logs. + # Let's assume the first query (users) fails. + mock_pandas_fixture["to_parquet"].assert_not_called() + + assert "Error querying users data from DB: Simulated DB query error" in caplog.text + # Check that the main ETL flow also logged an error + assert "An error occurred during the main ETL flow" in caplog.text or "Error during ETL operations" in caplog.text + + +def test_file_writing_failure( + patched_script_settings, + mock_db_engine_fixture, + mock_pandas_fixture, + mock_os_ops_fixture, + caplog, + mock_otel_trace +): + """Test handling of a Parquet file writing failure.""" + # Arrange + mock_os_ops_fixture["exists"].return_value = True + sample_users_df = pd.DataFrame({'user_id': [1], 'email': ['user@example.com']}) + mock_pandas_fixture["read_sql_query"].return_value = sample_users_df # Successful query + mock_pandas_fixture["to_parquet"].side_effect = IOError("Simulated file writing error") + + with patch('backend.app.scripts.export_to_parquet.get_db_engine', return_value=mock_db_engine_fixture): + # Act + main_etl_flow() + + # Assert + mock_pandas_fixture["read_sql_query"].assert_called() # Should be called at least for users + mock_pandas_fixture["to_parquet"].assert_called() # Attempt to write users parquet + + assert "Error writing users data to Parquet: Simulated file writing error" in caplog.text + assert "An error occurred during the main ETL flow" in caplog.text or "Error during ETL operations" in caplog.text + + +def test_otel_setup_when_provider_exists(monkeypatch, mock_otel_trace): + """Test OTEL setup uses existing provider if one is found.""" + # Arrange + # Simulate an existing provider being configured + mock_otel_trace.get_tracer_provider.return_value.__class__.__name__ = 'TracerProviderSDK' # Simulate an actual SDK provider + + mock_set_tracer_provider = MagicMock() + monkeypatch.setattr(mock_otel_trace, "set_tracer_provider", mock_set_tracer_provider) + + # Act + setup_otel_for_script() + + # Assert + # Ensure that set_tracer_provider is NOT called, as we should use the existing one. + mock_set_tracer_provider.assert_not_called() + # Ensure get_tracer was called to get a tracer instance from the existing provider + mock_otel_trace.get_tracer.assert_called_with("etl-script.opentelemetry.existing_provider") + + +def test_otel_setup_when_no_provider(monkeypatch, mock_otel_trace, patched_script_settings): + """Test OTEL setup creates a new provider if none exists.""" + # Arrange + # Ensure the fixture default 'ProxyTracerProvider' is used for get_tracer_provider + mock_otel_trace.get_tracer_provider.return_value.__class__.__name__ = 'ProxyTracerProvider' + + # Mock TracerProvider, SimpleSpanProcessor, ConsoleSpanExporter, Resource, set_tracer_provider + mock_tracer_provider_class = MagicMock() + mock_simple_span_processor_class = MagicMock() + mock_console_span_exporter_class = MagicMock() + mock_resource_class = MagicMock() + mock_set_tracer_provider_func = MagicMock() + + monkeypatch.setattr('backend.app.scripts.export_to_parquet.TracerProvider', mock_tracer_provider_class) + monkeypatch.setattr('backend.app.scripts.export_to_parquet.SimpleSpanProcessor', mock_simple_span_processor_class) + monkeypatch.setattr('backend.app.scripts.export_to_parquet.ConsoleSpanExporter', mock_console_span_exporter_class) + monkeypatch.setattr('backend.app.scripts.export_to_parquet.Resource', mock_resource_class) + monkeypatch.setattr(mock_otel_trace, "set_tracer_provider", mock_set_tracer_provider_func) + + # Act + setup_otel_for_script() + + # Assert + mock_resource_class.create.assert_called_once_with({"service.name": patched_script_settings.SERVICE_NAME + "-etl-script"}) + mock_tracer_provider_class.assert_called_once() + mock_console_span_exporter_class.assert_called_once() + mock_simple_span_processor_class.assert_called_once() + # Check that the processor was added to the provider + mock_tracer_provider_instance = mock_tracer_provider_class.return_value + mock_tracer_provider_instance.add_span_processor.assert_called_once() + # Check that set_tracer_provider was called with the new provider + mock_set_tracer_provider_func.assert_called_once_with(mock_tracer_provider_instance) + # Check that get_tracer was called for the new provider + mock_otel_trace.get_tracer.assert_called_with("etl-script.opentelemetry") + +# Example of testing a specific export function if needed, though main_etl_flow covers integration +@patch('backend.app.scripts.export_to_parquet.pd.DataFrame.to_parquet') +@patch('backend.app.scripts.export_to_parquet.pd.read_sql_query') +def test_export_users_data_specific(mock_read_sql, mock_to_parquet, mock_db_engine_fixture, patched_script_settings, caplog): + sample_df = pd.DataFrame({'user_id': [1], 'email': ['test@example.com']}) + mock_read_sql.return_value = sample_df + + export_users_data(mock_db_engine_fixture) + + mock_read_sql.assert_called_once() + mock_to_parquet.assert_called_once_with( + str(patched_script_settings.PARQUET_DATA_PATH / "users_analytics.parquet"), + index=False + ) + assert "Successfully exported users data" in caplog.text +``` diff --git a/backend/pyproject.toml b/backend/pyproject.toml index 1c77b83ded..304f3731b3 100644 --- a/backend/pyproject.toml +++ b/backend/pyproject.toml @@ -21,6 +21,14 @@ dependencies = [ "pydantic-settings<3.0.0,>=2.2.1", "sentry-sdk[fastapi]<2.0.0,>=1.40.6", "pyjwt<3.0.0,>=2.8.0", + "duckdb>=1.3.0,<2.0.0", + "polars>=1.30.0,<2.0.0", + "opentelemetry-api>=1.33.1,<2.0.0", + "opentelemetry-sdk>=1.33.1,<2.0.0", + "opentelemetry-exporter-otlp-proto-http>=1.33.1,<2.0.0", + "opentelemetry-instrumentation-fastapi>=0.44b0,<1.0.0", + "opentelemetry-instrumentation-sqlalchemy>=0.44b0,<1.0.0", + "opentelemetry-instrumentation-psycopg2>=0.44b0,<1.0.0", ] [tool.uv]