diff --git a/app/__init__.py b/app/__init__.py index 6c08f22..f261ae5 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -18,7 +18,7 @@ logger = logging.getLogger("multiqc_api") logger.setLevel(logging.DEBUG) -formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') +formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") fh = RotatingFileHandler(log_path, maxBytes=10_000, backupCount=10) fh.setLevel(logging.DEBUG) fh.setFormatter(formatter) diff --git a/app/main.py b/app/main.py index acd5a6e..b7a8476 100644 --- a/app/main.py +++ b/app/main.py @@ -1,31 +1,24 @@ -import logging - -from typing import List, Dict, Optional - -from pathlib import Path - -import csv import datetime -import os -from threading import Lock - -import uvicorn +import logging +import time from enum import Enum from os import getenv import pandas as pd import plotly.express as px -from fastapi import BackgroundTasks, FastAPI, HTTPException, status +import uvicorn +from fastapi import BackgroundTasks, FastAPI, HTTPException, status, Request from fastapi.responses import HTMLResponse, PlainTextResponse, Response from fastapi_utilities import repeat_every from github import Github from plotly.graph_objs import Layout from sqlalchemy.exc import ProgrammingError +from sqlmodel import Session from app import __version__, db, models +from app.db import engine, VisitStats from app.downloads import daily - logger = logging.getLogger("multiqc_api") logger.info("Starting MultiQC API service") @@ -80,25 +73,25 @@ def update_version(): app.latest_release = get_latest_release() -# Fields to store per visit -visit_fieldnames = [ - "version_multiqc", - "version_python", - "operating_system", - "is_docker", - "is_singularity", - "is_conda", - "is_ci", -] - -# Thread-safe in-memory buffer to accumulate recent visits before writing to the CSV file -visit_buffer: List[Dict[str, str]] = [] -visit_buffer_lock = Lock() +@app.get("/health") +async def health(): + """ + Health check endpoint. Checks if the visits table contains records + in the past 10 minutes. + """ + try: + visits = db.get_visit_stats(start=datetime.datetime.now() - datetime.timedelta(minutes=10)) + except Exception as e: + raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) + if not visits: + raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="No recent visits found") + return PlainTextResponse(content=str(len(visits))) -@app.get("/version") # log a visit +@app.get("/version") async def version( background_tasks: BackgroundTasks, + request: Request, version_multiqc: str = "", version_python: str = "", operating_system: str = "", @@ -111,9 +104,9 @@ async def version( Endpoint for MultiQC that returns the latest release, and logs the visit along with basic user environment detail. """ - background_tasks.add_task( - _log_visit, - timestamp=datetime.datetime.now().isoformat(), + _log_visit_endpoint( + background_tasks=background_tasks, + request=request, version_multiqc=version_multiqc, version_python=version_python, operating_system=operating_system, @@ -125,23 +118,41 @@ async def version( return models.VersionResponse(latest_release=app.latest_release) -@app.get("/health") -async def health(): +@app.get("/version.php", response_class=PlainTextResponse) +async def version_legacy( + background_tasks: BackgroundTasks, + request: Request, + v: str = "", +): """ - Health check endpoint. Checks if the visits table contains records - in the past 10 minutes. + Legacy endpoint that mimics response from the old PHP script. + + Accessed by MultiQC versions 1.14 and earlier, + after being redirected to by https://multiqc.info/version.php """ - try: - visits = db.get_visit_stats(start=datetime.datetime.now() - datetime.timedelta(minutes=10)) - except Exception as e: - raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) - if not visits: - raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="No recent visits found") - return PlainTextResponse(content=str(len(visits))) + _log_visit_endpoint( + background_tasks=background_tasks, + request=request, + version_multiqc=v, + version_python="", + operating_system="", + is_docker="", + is_singularity="", + is_conda="", + is_ci="", + ) + return app.latest_release.version -def _log_visit( - timestamp: str, +# In-memory storage for rate limiting: number of requests within a window per IP +requests_by_ip = {} +REQUESTS = 1000 # max requests allowed within a window +WINDOW = 1 # seconds + + +def _log_visit_endpoint( + background_tasks: BackgroundTasks, + request: Request, version_multiqc: str = "", version_python: str = "", operating_system: str = "", @@ -150,150 +161,76 @@ def _log_visit( is_conda: str = "", is_ci: str = "", ): - global visit_buffer - with visit_buffer_lock: - visit_buffer.append( - { - "timestamp": timestamp, - "version_multiqc": version_multiqc, - "version_python": version_python, - "operating_system": operating_system, - "is_docker": is_docker, - "is_singularity": is_singularity, - "is_conda": is_conda, - "is_ci": is_ci, - } - ) - logger.debug(f"Logged visit, total visits in buffer: {len(visit_buffer)}") - - -# Path to a buffer CSV file to persist recent visits before dumping to the database -# In the same folder as this script -CSV_FILE_PATH = Path(os.getenv("TMPDIR", "/tmp")) / "visits.csv" - - -def _persist_visits(verbose=False) -> Optional[Response]: """ - Write visits from memory to a CSV file. + Called by /version as well as the legacy /version.php endpoint. """ - global visit_buffer_lock - global visit_buffer - with visit_buffer_lock: - if verbose: - n_visits_file = 0 - if CSV_FILE_PATH.exists(): - with open(CSV_FILE_PATH, mode="r") as file: - n_visits_file = sum(1 for _ in file) - if not visit_buffer: - return PlainTextResponse(content=f"No new visits to persist. File contains {n_visits_file} entries") - logger.debug( - f"Appending {len(visit_buffer)} visits to {CSV_FILE_PATH} that currently contains {n_visits_file} visits" - ) + # Checking previous request time from this IP. If it's exceeded, we do not want to + # log the visit in the database, but still return the latest version. + current_time = float(time.time()) + client_ip = request.client.host + if client_ip not in requests_by_ip: + requests_by_ip[client_ip] = {"timestamp": current_time, "count": 1} + else: + if current_time - requests_by_ip[client_ip]["timestamp"] > WINDOW: + requests_by_ip[client_ip] = {"timestamp": current_time, "count": 1} + else: + requests_by_ip[client_ip]["count"] += 1 + if requests_by_ip[client_ip]["count"] > REQUESTS: + logger.debug(f"Rate limiting requests: skipping logging visit from {client_ip}") + print(requests_by_ip) + return + + for ip in list(requests_by_ip.keys()): + if current_time - requests_by_ip[ip]["timestamp"] > WINDOW: + requests_by_ip.pop(ip) + + print(requests_by_ip) + logger.debug(f"Logging visit from {client_ip}") + background_tasks.add_task( + _log_visit_task, + timestamp=datetime.datetime.now().isoformat(), + version_multiqc=version_multiqc, + version_python=version_python, + operating_system=operating_system, + is_docker=is_docker, + is_singularity=is_singularity, + is_conda=is_conda, + is_ci=is_ci, + ) - with open(CSV_FILE_PATH, mode="a") as file: - writer: csv.DictWriter = csv.DictWriter(file, fieldnames=["timestamp"] + visit_fieldnames) - writer.writerows(visit_buffer) - if verbose: - with open(CSV_FILE_PATH, mode="r") as file: - n_visits_file = sum(1 for _ in file) - msg = ( - f"Successfully persisted {len(visit_buffer)} visits to {CSV_FILE_PATH}, " - f"file now contains {n_visits_file} entries" +def _log_visit_task( + timestamp: str, + version_multiqc: str = "", + version_python: str = "", + operating_system: str = "", + is_docker: str = "", + is_singularity: str = "", + is_conda: str = "", + is_ci: str = "", +): + def strtobool(val) -> bool: + return str(val).lower() in ("y", "yes", "t", "true", "on", "1") + + with Session(engine) as session: + session.add( + VisitStats( + start=timestamp, + end=timestamp, + version_multiqc=version_multiqc, + version_python=version_python, + operating_system=operating_system, + is_docker=strtobool(is_docker), + is_singularity=strtobool(is_singularity), + is_conda=strtobool(is_conda), + is_ci=strtobool(is_ci), + count=1, ) - logger.debug(msg) - - visit_buffer = [] # Reset the buffer - - if verbose: - return PlainTextResponse(content=msg) - - return None - - -@app.on_event("startup") -@repeat_every( - seconds=10, - wait_first=True, - logger=logger, -) -async def persist_visits(): - return _persist_visits(verbose=True) - - -def _summarize_visits(interval="5min") -> Response: - """ - Summarize visits from the CSV file and write to the database - """ - _persist_visits(verbose=True) - global visit_buffer_lock - with visit_buffer_lock: - if not CSV_FILE_PATH.exists(): - msg = f"File {CSV_FILE_PATH} doesn't yet exist, no visits to summarize" - logger.info(msg) - return PlainTextResponse(content=msg) - - df = pd.read_csv( - CSV_FILE_PATH, - sep=",", - names=["timestamp"] + visit_fieldnames, - dtype="string", - na_filter=False, # prevent empty strings from converting to nan or ) - df["start"] = pd.to_datetime(df["timestamp"]) - df["end"] = df["start"] + pd.to_timedelta(interval) - df["start"] = df["start"].dt.strftime("%Y-%m-%d %H:%M") - df["end"] = df["end"].dt.strftime("%Y-%m-%d %H:%M") - - def strtobool(val) -> bool: - return str(val).lower() in ("y", "yes", "t", "true", "on", "1") - - df["is_docker"] = df["is_docker"].apply(strtobool) - df["is_singularity"] = df["is_singularity"].apply(strtobool) - df["is_conda"] = df["is_conda"].apply(strtobool) - df["is_ci"] = df["is_ci"].apply(strtobool) - df = df.drop(columns=["timestamp"]) - - # Summarize visits per user per time interval - interval_summary = df.groupby(["start", "end"] + visit_fieldnames).size().reset_index(name="count") - if len(interval_summary) == 0: - msg = "No new visits to summarize" - logger.info(msg) - return PlainTextResponse(content=msg) - - logger.info( - f"Summarizing {len(df)} visits in {CSV_FILE_PATH} and writing {len(interval_summary)} rows to the DB" - ) - try: - db.insert_visit_stats(interval_summary) - except Exception as e: - msg = f"Failed to write to the database: {e}" - logger.error(msg) - return PlainTextResponse( - status_code=status.INTERNAL_SERVER_ERROR, - content=msg, - ) - else: - msg = f"Successfully summarized {len(df)} visits to {len(interval_summary)} per-interval entries" - logger.info(msg) - open(CSV_FILE_PATH, "w").close() # Clear the CSV file on successful write - return PlainTextResponse(content=msg) - - -@app.on_event("startup") -@repeat_every( - seconds=60 * 60 * 1, # every hour - wait_first=True, - logger=logger, -) -async def summarize_visits(): - """ - Repeated task to summarize visits. - """ - return _summarize_visits() + session.commit() -def _update_download_stats(): +def _update_download_stats_task(): """ Update the daily download statistics in the database """ @@ -327,33 +264,13 @@ async def update_downloads(): """ Repeated task to update the daily download statistics. """ - _update_download_stats() - - -@app.post("/persist_visits") -async def persist_visits_endpoint(): - try: - return _persist_visits(verbose=True) - except Exception as e: - msg = f"Failed to persist the visits: {e}" - logger.error(msg) - raise HTTPException(status_code=status.INTERNAL_SERVER_ERROR, detail=msg) - - -@app.post("/summarize_visits") -async def summarize_visits_endpoint(): - try: - return _summarize_visits() - except Exception as e: - msg = f"Failed to summarize the visits: {e}" - logger.error(msg) - raise HTTPException(status_code=status.INTERNAL_SERVER_ERROR, detail=msg) + _update_download_stats_task() @app.post("/update_downloads") async def update_downloads_endpoint(background_tasks: BackgroundTasks): try: - background_tasks.add_task(_update_download_stats) + background_tasks.add_task(_update_download_stats_task) msg = "Queued updating the download stats in the DB" logger.info(msg) return PlainTextResponse(content=msg) @@ -362,47 +279,6 @@ async def update_downloads_endpoint(background_tasks: BackgroundTasks): raise HTTPException(status_code=status.INTERNAL_SERVER_ERROR, detail=msg) -if os.getenv("ENVIRONMENT") == "DEV": - - @app.post("/clean_visits_csv_file") - async def clean_visits_csv_file(): - try: - if CSV_FILE_PATH.exists(): - CSV_FILE_PATH.unlink() - msg = f"Removed {CSV_FILE_PATH}" - logger.info(msg) - return PlainTextResponse(content=msg) - else: - msg = f"File {CSV_FILE_PATH} doesn't exist" - logger.info(msg) - return PlainTextResponse(content=msg) - except Exception as e: - msg = f"Failed to remove {CSV_FILE_PATH}: {e}" - raise HTTPException(status_code=status.INTERNAL_SERVER_ERROR, detail=msg) - - -@app.get("/version.php", response_class=PlainTextResponse) -async def version_legacy(background_tasks: BackgroundTasks, v: str = ""): - """ - Legacy endpoint that mimics response from the old PHP script. - - Accessed by MultiQC versions 1.14 and earlier, - after being redirected to by https://multiqc.info/version.php - """ - background_tasks.add_task( - _log_visit, - timestamp=datetime.datetime.now().isoformat(), - version_multiqc=v, - version_python="", - operating_system="", - is_docker="", - is_singularity="", - is_conda="", - is_ci="", - ) - return app.latest_release.version - - @app.get("/") async def index(background_tasks: BackgroundTasks): """ diff --git a/requirements.txt b/requirements.txt index 08f196f..9f3759d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,3 +12,4 @@ uvicorn python-dotenv pypistats logzio-python-handler +rich