diff --git a/api/ee/databases/postgres/migrations/core/versions/a2b3c4d5e6f7_add_retention_helper_indexes.py b/api/ee/databases/postgres/migrations/core/versions/a2b3c4d5e6f7_add_retention_helper_indexes.py new file mode 100644 index 0000000000..dfcb1b8e9b --- /dev/null +++ b/api/ee/databases/postgres/migrations/core/versions/a2b3c4d5e6f7_add_retention_helper_indexes.py @@ -0,0 +1,50 @@ +"""Add retention helper indexes on projects/subscriptions + +Revision ID: a2b3c4d5e6f7 +Revises: c3b2a1d4e5f6 +Create Date: 2025-01-06 12:00:00.000000 + +""" + +from typing import Sequence, Union + +from alembic import op +from sqlalchemy import text + +# revision identifiers, used by Alembic. +revision: str = "a2b3c4d5e6f7" +down_revision: Union[str, None] = "c3b2a1d4e5f6" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # CREATE INDEX CONCURRENTLY must run outside a transaction + with op.get_context().autocommit_block(): + # Index for projects -> subscriptions join + op.execute( + text(""" + CREATE INDEX CONCURRENTLY IF NOT EXISTS ix_projects_organization_id + ON public.projects (organization_id); + """) + ) + + # Index for plan-based lookups + op.execute( + text(""" + CREATE INDEX CONCURRENTLY IF NOT EXISTS ix_subscriptions_plan + ON public.subscriptions (plan); + """) + ) + + +def downgrade() -> None: + with op.get_context().autocommit_block(): + op.execute( + text( + "DROP INDEX CONCURRENTLY IF EXISTS public.ix_projects_organization_id;" + ) + ) + op.execute( + text("DROP INDEX CONCURRENTLY IF EXISTS public.ix_subscriptions_plan;") + ) diff --git a/api/ee/databases/postgres/migrations/tracing/versions/a2b3c4d5e6f7_add_retention_indexes_and_autovacuum.py b/api/ee/databases/postgres/migrations/tracing/versions/a2b3c4d5e6f7_add_retention_indexes_and_autovacuum.py new file mode 100644 index 0000000000..a88af6166a --- /dev/null +++ b/api/ee/databases/postgres/migrations/tracing/versions/a2b3c4d5e6f7_add_retention_indexes_and_autovacuum.py @@ -0,0 +1,73 @@ +"""Add retention helper indexes on spans + autovacuum tuning + +Revision ID: a2b3c4d5e6f7 +Revises: cfa14a847972 +Create Date: 2025-01-06 12:00:00.000000 + +""" + +from typing import Sequence, Union + +from alembic import op +from sqlalchemy import text + +# revision identifiers, used by Alembic. +revision: str = "a2b3c4d5e6f7" +down_revision: Union[str, None] = "cfa14a847972" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + with op.get_context().autocommit_block(): + # Unique partial index: enforce single root span per trace + op.execute( + text(""" + CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS ux_spans_root_per_trace + ON public.spans (project_id, trace_id) + WHERE parent_id IS NULL; + """) + ) + + # Retention selection index (critical for performance) + op.execute( + text(""" + CREATE INDEX CONCURRENTLY IF NOT EXISTS ix_spans_root_project_created_trace + ON public.spans (project_id, created_at, trace_id) + WHERE parent_id IS NULL; + """) + ) + + # Autovacuum tuning for high-churn retention workload + op.execute( + text(""" + ALTER TABLE public.spans SET ( + autovacuum_vacuum_scale_factor = 0.02, + autovacuum_analyze_scale_factor = 0.01, + autovacuum_vacuum_cost_delay = 5, + autovacuum_vacuum_cost_limit = 4000 + ); + """) + ) + + +def downgrade() -> None: + with op.get_context().autocommit_block(): + op.execute( + text("DROP INDEX CONCURRENTLY IF EXISTS public.ux_spans_root_per_trace;") + ) + op.execute( + text( + "DROP INDEX CONCURRENTLY IF EXISTS public.ix_spans_root_project_created_trace;" + ) + ) + op.execute( + text(""" + ALTER TABLE public.spans RESET ( + autovacuum_vacuum_scale_factor, + autovacuum_analyze_scale_factor, + autovacuum_vacuum_cost_delay, + autovacuum_vacuum_cost_limit + ); + """) + ) diff --git a/api/ee/docker/Dockerfile.dev b/api/ee/docker/Dockerfile.dev index bdc5642865..d2e1fadbcc 100644 --- a/api/ee/docker/Dockerfile.dev +++ b/api/ee/docker/Dockerfile.dev @@ -26,6 +26,14 @@ COPY ./entrypoints /app/entrypoints/ ENV PYTHONPATH=/sdk:$PYTHONPATH +COPY ./oss/src/crons/queries.sh /queries.sh +COPY ./oss/src/crons/queries.txt /etc/cron.d/queries-cron +RUN sed -i -e '$a\' /etc/cron.d/queries-cron +RUN cat -A /etc/cron.d/queries-cron + +RUN chmod +x /queries.sh \ + && chmod 0644 /etc/cron.d/queries-cron + COPY ./ee/src/crons/meters.sh /meters.sh COPY ./ee/src/crons/meters.txt /etc/cron.d/meters-cron RUN sed -i -e '$a\' /etc/cron.d/meters-cron @@ -34,12 +42,12 @@ RUN cat -A /etc/cron.d/meters-cron RUN chmod +x /meters.sh \ && chmod 0644 /etc/cron.d/meters-cron -COPY ./oss/src/crons/queries.sh /queries.sh -COPY ./oss/src/crons/queries.txt /etc/cron.d/queries-cron -RUN sed -i -e '$a\' /etc/cron.d/queries-cron -RUN cat -A /etc/cron.d/queries-cron +COPY ./ee/src/crons/spans.sh /spans.sh +COPY ./ee/src/crons/spans.txt /etc/cron.d/spans-cron +RUN sed -i -e '$a\' /etc/cron.d/spans-cron +RUN cat -A /etc/cron.d/spans-cron -RUN chmod +x /queries.sh \ - && chmod 0644 /etc/cron.d/queries-cron +RUN chmod +x /spans.sh \ + && chmod 0644 /etc/cron.d/spans-cron EXPOSE 8000 diff --git a/api/ee/docker/Dockerfile.gh b/api/ee/docker/Dockerfile.gh index 4cd9df1251..18644c5f3e 100644 --- a/api/ee/docker/Dockerfile.gh +++ b/api/ee/docker/Dockerfile.gh @@ -24,7 +24,15 @@ COPY ./ee /app/ee/ COPY ./oss /app/oss/ COPY ./entrypoints /app/entrypoints/ -# +ENV PYTHONPATH=/app + +COPY ./oss/src/crons/queries.sh /queries.sh +COPY ./oss/src/crons/queries.txt /etc/cron.d/queries-cron +RUN sed -i -e '$a\' /etc/cron.d/queries-cron +RUN cat -A /etc/cron.d/queries-cron + +RUN chmod +x /queries.sh \ + && chmod 0644 /etc/cron.d/queries-cron COPY ./ee/src/crons/meters.sh /meters.sh COPY ./ee/src/crons/meters.txt /etc/cron.d/meters-cron @@ -34,12 +42,12 @@ RUN cat -A /etc/cron.d/meters-cron RUN chmod +x /meters.sh \ && chmod 0644 /etc/cron.d/meters-cron -COPY ./oss/src/crons/queries.sh /queries.sh -COPY ./oss/src/crons/queries.txt /etc/cron.d/queries-cron -RUN sed -i -e '$a\' /etc/cron.d/queries-cron -RUN cat -A /etc/cron.d/queries-cron +COPY ./ee/src/crons/spans.sh /spans.sh +COPY ./ee/src/crons/spans.txt /etc/cron.d/spans-cron +RUN sed -i -e '$a\' /etc/cron.d/spans-cron +RUN cat -A /etc/cron.d/spans-cron -RUN chmod +x /queries.sh \ - && chmod 0644 /etc/cron.d/queries-cron +RUN chmod +x /spans.sh \ + && chmod 0644 /etc/cron.d/spans-cron EXPOSE 8000 diff --git a/api/ee/src/apis/fastapi/billing/router.py b/api/ee/src/apis/fastapi/billing/router.py index 625604a199..b55bf6ae9b 100644 --- a/api/ee/src/apis/fastapi/billing/router.py +++ b/api/ee/src/apis/fastapi/billing/router.py @@ -11,7 +11,7 @@ from oss.src.utils.common import is_ee from oss.src.utils.logging import get_module_logger from oss.src.utils.exceptions import intercept_exceptions -from oss.src.utils.caching import get_cache, set_cache, invalidate_cache +from oss.src.utils.caching import acquire_lock, release_lock from oss.src.utils.env import env from oss.src.services.db_manager import ( @@ -23,6 +23,8 @@ from ee.src.models.shared_models import Permission from ee.src.core.entitlements.types import ENTITLEMENTS, CATALOG, Tracker, Quota from ee.src.core.subscriptions.types import Event, Plan +from ee.src.core.meters.service import MetersService +from ee.src.core.tracing.service import TracingService from ee.src.core.subscriptions.service import ( SubscriptionsService, SwitchException, @@ -47,12 +49,16 @@ ) -class SubscriptionsRouter: +class BillingRouter: def __init__( self, subscription_service: SubscriptionsService, + meters_service: MetersService, + tracing_service: TracingService, ): self.subscription_service = subscription_service + self.meters_service = meters_service + self.tracing_service = tracing_service # ROUTER self.router = APIRouter() @@ -153,6 +159,13 @@ def __init__( operation_id="admin_report_usage", ) + self.admin_router.add_api_route( + "/usage/flush", + self.flush_usage, + methods=["POST"], + operation_id="admin_flush_usage", + ) + # HANDLERS @intercept_exceptions() @@ -782,7 +795,7 @@ async def fetch_usage( content={"status": "error", "message": "Plan not found"}, ) - meters = await self.subscription_service.meters_service.fetch( + meters = await self.meters_service.fetch( organization_id=organization_id, ) @@ -817,29 +830,24 @@ async def report_usage( log.info("[report] [endpoint] Trigger") try: - report_ongoing = await get_cache( + lock_key = await acquire_lock( namespace="meters:report", key={}, + ttl=3600, # 1 hour ) - if report_ongoing: + if not lock_key: log.info("[report] [endpoint] Skipped (ongoing)") return JSONResponse( status_code=status.HTTP_200_OK, content={"status": "skipped"}, ) - # await set_cache( - # namespace="meters:report", - # key={}, - # value=True, - # ttl=60 * 60, # 1 hour - # ) log.info("[report] [endpoint] Lock acquired") try: log.info("[report] [endpoint] Reporting usage started") - await self.subscription_service.meters_service.report() + await self.meters_service.report() log.info("[report] [endpoint] Reporting usage completed") return JSONResponse( @@ -858,7 +866,7 @@ async def report_usage( ) finally: - await invalidate_cache( + await release_lock( namespace="meters:report", key={}, ) @@ -875,6 +883,65 @@ async def report_usage( content={"status": "error", "message": "Fatal error"}, ) + @intercept_exceptions() + async def flush_usage( + self, + ): + log.info("[flush] [endpoint] Trigger") + + try: + lock_key = await acquire_lock( + namespace="spans:flush", + key={}, + ttl=3600, # 1 hour + ) + + if not lock_key: + log.info("[flush] [endpoint] Skipped (ongoing)") + return JSONResponse( + status_code=status.HTTP_200_OK, + content={"status": "skipped"}, + ) + + log.info("[flush] [endpoint] Lock acquired") + + try: + log.info("[flush] [endpoint] Retention started") + await self.tracing_service.flush_spans() + log.info("[flush] [endpoint] Retention completed") + + return JSONResponse( + status_code=status.HTTP_200_OK, + content={"status": "success"}, + ) + + except Exception: + log.error( + "[flush] [endpoint] Retention failed:", + exc_info=True, + ) + return JSONResponse( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + content={"status": "error", "message": "Retention failed"}, + ) + + finally: + await release_lock( + namespace="spans:flush", + key={}, + ) + log.info("[flush] [endpoint] Lock released") + + except Exception: + log.error( + "[flush] [endpoint] Fatal error:", + exc_info=True, + ) + return JSONResponse( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + content={"status": "error", "message": "Fatal error"}, + ) + # ROUTES @intercept_exceptions() diff --git a/api/ee/src/core/entitlements/types.py b/api/ee/src/core/entitlements/types.py index d64e90740d..479f153ccd 100644 --- a/api/ee/src/core/entitlements/types.py +++ b/api/ee/src/core/entitlements/types.py @@ -35,11 +35,21 @@ class Constraint(str, Enum): READ_ONLY = "read_only" +class Periods(str, Enum): + EPHEMERAL = 0 # instant + HOURLY = 60 # 1 hour = 60 minutes + DAILY = 1440 # 24 hours = 1 day = 1440 minutes + MONTHLY = 44640 # 31 days = 744 hours = 44640 minutes + QUARTERLY = 131040 # 91 days = 2184 hours = 131040 minutes + YEARLY = 525600 # 365 days = 8760 hours = 525600 minutes + + class Quota(BaseModel): free: Optional[int] = None limit: Optional[int] = None monthly: Optional[bool] = None strict: Optional[bool] = False + retention: Optional[int] = None class Probe(BaseModel): @@ -53,6 +63,7 @@ class Probe(BaseModel): "description": "Great for hobby projects and POCs.", "type": "standard", "plan": Plan.CLOUD_V0_HOBBY.value, + "retention": Periods.MONTHLY.value, "price": { "base": { "type": "flat", @@ -72,6 +83,7 @@ class Probe(BaseModel): "description": "For production projects.", "type": "standard", "plan": Plan.CLOUD_V0_PRO.value, + "retention": Periods.QUARTERLY.value, "price": { "base": { "type": "flat", @@ -121,6 +133,7 @@ class Probe(BaseModel): "description": "For scale, security, and support.", "type": "standard", "plan": Plan.CLOUD_V0_BUSINESS.value, + "retention": Periods.YEARLY.value, "price": { "base": { "type": "flat", @@ -208,13 +221,34 @@ class Probe(BaseModel): Flag.RBAC: False, }, Tracker.COUNTERS: { - Counter.TRACES: Quota(limit=5_000, monthly=True, free=5_000), - Counter.EVALUATIONS: Quota(limit=20, monthly=True, free=20, strict=True), - Counter.CREDITS: Quota(limit=100, monthly=True, free=100, strict=True), + Counter.TRACES: Quota( + limit=5_000, + monthly=True, + free=5_000, + retention=Periods.MONTHLY.value, + ), + Counter.EVALUATIONS: Quota( + limit=20, + monthly=True, + free=20, + strict=True, + ), + Counter.CREDITS: Quota( + limit=100, + monthly=True, + free=100, + strict=True, + ), }, Tracker.GAUGES: { - Gauge.USERS: Quota(limit=2, strict=True, free=2), - Gauge.APPLICATIONS: Quota(strict=True), + Gauge.USERS: Quota( + limit=2, + strict=True, + free=2, + ), + Gauge.APPLICATIONS: Quota( + strict=True, + ), }, }, Plan.CLOUD_V0_PRO: { @@ -223,13 +257,31 @@ class Probe(BaseModel): Flag.RBAC: False, }, Tracker.COUNTERS: { - Counter.TRACES: Quota(monthly=True, free=10_000), - Counter.EVALUATIONS: Quota(monthly=True, strict=True), - Counter.CREDITS: Quota(limit=100, monthly=True, free=100, strict=True), + Counter.TRACES: Quota( + monthly=True, + free=10_000, + retention=Periods.QUARTERLY.value, + ), + Counter.EVALUATIONS: Quota( + monthly=True, + strict=True, + ), + Counter.CREDITS: Quota( + limit=100, + monthly=True, + free=100, + strict=True, + ), }, Tracker.GAUGES: { - Gauge.USERS: Quota(limit=10, strict=True, free=3), - Gauge.APPLICATIONS: Quota(strict=True), + Gauge.USERS: Quota( + limit=10, + strict=True, + free=3, + ), + Gauge.APPLICATIONS: Quota( + strict=True, + ), }, }, Plan.CLOUD_V0_BUSINESS: { @@ -238,13 +290,29 @@ class Probe(BaseModel): Flag.RBAC: True, }, Tracker.COUNTERS: { - Counter.TRACES: Quota(monthly=True, free=1_000_000), - Counter.EVALUATIONS: Quota(monthly=True, strict=True), - Counter.CREDITS: Quota(limit=100, monthly=True, free=100, strict=True), + Counter.TRACES: Quota( + monthly=True, + free=1_000_000, + retention=Periods.YEARLY.value, + ), + Counter.EVALUATIONS: Quota( + monthly=True, + strict=True, + ), + Counter.CREDITS: Quota( + limit=100, + monthly=True, + free=100, + strict=True, + ), }, Tracker.GAUGES: { - Gauge.USERS: Quota(strict=True), - Gauge.APPLICATIONS: Quota(strict=True), + Gauge.USERS: Quota( + strict=True, + ), + Gauge.APPLICATIONS: Quota( + strict=True, + ), }, }, Plan.CLOUD_V0_HUMANITY_LABS: { @@ -253,12 +321,21 @@ class Probe(BaseModel): Flag.RBAC: True, }, Tracker.COUNTERS: { - Counter.TRACES: Quota(monthly=True), - Counter.EVALUATIONS: Quota(monthly=True, strict=True), + Counter.TRACES: Quota( + monthly=True, + ), + Counter.EVALUATIONS: Quota( + monthly=True, + strict=True, + ), }, Tracker.GAUGES: { - Gauge.USERS: Quota(strict=True), - Gauge.APPLICATIONS: Quota(strict=True), + Gauge.USERS: Quota( + strict=True, + ), + Gauge.APPLICATIONS: Quota( + strict=True, + ), }, }, Plan.CLOUD_V0_X_LABS: { @@ -267,12 +344,21 @@ class Probe(BaseModel): Flag.RBAC: False, }, Tracker.COUNTERS: { - Counter.TRACES: Quota(monthly=True), - Counter.EVALUATIONS: Quota(monthly=True, strict=True), + Counter.TRACES: Quota( + monthly=True, + ), + Counter.EVALUATIONS: Quota( + monthly=True, + strict=True, + ), }, Tracker.GAUGES: { - Gauge.USERS: Quota(strict=True), - Gauge.APPLICATIONS: Quota(strict=True), + Gauge.USERS: Quota( + strict=True, + ), + Gauge.APPLICATIONS: Quota( + strict=True, + ), }, }, Plan.CLOUD_V0_AGENTA_AI: { @@ -281,8 +367,13 @@ class Probe(BaseModel): Flag.RBAC: True, }, Tracker.COUNTERS: { - Counter.TRACES: Quota(monthly=True), - Counter.EVALUATIONS: Quota(monthly=True, strict=True), + Counter.TRACES: Quota( + monthly=True, + ), + Counter.EVALUATIONS: Quota( + monthly=True, + strict=True, + ), Counter.CREDITS: Quota( limit=100_000, monthly=True, @@ -291,8 +382,12 @@ class Probe(BaseModel): ), }, Tracker.GAUGES: { - Gauge.USERS: Quota(strict=True), - Gauge.APPLICATIONS: Quota(strict=True), + Gauge.USERS: Quota( + strict=True, + ), + Gauge.APPLICATIONS: Quota( + strict=True, + ), }, }, } diff --git a/api/ee/src/core/tracing/__init__.py b/api/ee/src/core/tracing/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/api/ee/src/core/tracing/service.py b/api/ee/src/core/tracing/service.py new file mode 100644 index 0000000000..ef62344ad0 --- /dev/null +++ b/api/ee/src/core/tracing/service.py @@ -0,0 +1,128 @@ +from datetime import datetime, timezone, timedelta + +from oss.src.utils.logging import get_module_logger + +from ee.src.core.subscriptions.types import Plan +from ee.src.core.entitlements.types import ENTITLEMENTS, Tracker, Counter +from ee.src.dbs.postgres.tracing.dao import TracingDAO + + +log = get_module_logger(__name__) + + +class TracingService: + def __init__( + self, + tracing_dao: TracingDAO, + ): + self.tracing_dao = tracing_dao + + async def flush_spans( + self, + *, + max_projects_per_batch: int = 500, + max_traces_per_batch: int = 5000, + ) -> None: + log.info("[flush] ============================================") + log.info("[flush] Starting spans flush job") + log.info("[flush] ============================================") + + total_plans = 0 + total_skipped = 0 + total_traces = 0 + total_spans = 0 + + for plan in Plan: + total_plans += 1 + + entitlements = ENTITLEMENTS.get(plan) + + if not entitlements: + log.info(f"[flush] [{plan.value}] Skipped (no entitlements)") + total_skipped += 1 + continue + + traces_quota = entitlements.get(Tracker.COUNTERS, {}).get(Counter.TRACES) + + if not traces_quota or traces_quota.retention is None: + log.info(f"[flush] [{plan.value}] Skipped (unlimited retention)") + total_skipped += 1 + continue + + retention_minutes = traces_quota.retention + cutoff = datetime.now(timezone.utc) - timedelta(minutes=retention_minutes) + + log.info( + f"[flush] [{plan.value}] Processing with cutoff={cutoff.isoformat()} (retention={retention_minutes} minutes)" + ) + + try: + plan_traces, plan_spans = await self._flush_spans_for_plan( + plan=plan, + cutoff=cutoff, + max_projects_per_batch=max_projects_per_batch, + max_traces_per_batch=max_traces_per_batch, + ) + + total_traces += plan_traces + total_spans += plan_spans + + log.info( + f"[flush] [{plan.value}] ✅ Completed: {plan_traces} traces, {plan_spans} spans" + ) + + except Exception: + log.error( + f"[flush] [{plan.value}] ❌ Failed", + exc_info=True, + ) + + log.info("[flush] ============================================") + log.info("[flush] ✅ FLUSH JOB COMPLETED") + log.info(f"[flush] Total plans covered: {total_plans}") + log.info(f"[flush] Total plans skipped: {total_skipped}") + log.info(f"[flush] Total traces deleted: {total_traces}") + log.info(f"[flush] Total spans deleted: {total_spans}") + log.info("[flush] ============================================") + + async def _flush_spans_for_plan( + self, + *, + plan: Plan, + cutoff: datetime, + max_projects_per_batch: int, + max_traces_per_batch: int, + ) -> tuple[int, int]: + last_project_id = None + batch_idx = 0 + total_traces = 0 + total_spans = 0 + + while True: + project_ids = await self.tracing_dao.fetch_projects_with_plan( + plan=plan.value, + project_id=last_project_id, + max_projects=max_projects_per_batch, + ) + + if not project_ids: + break + + batch_idx += 1 + last_project_id = project_ids[-1] + + traces, spans = await self.tracing_dao.delete_traces_before_cutoff( + cutoff=cutoff, + project_ids=project_ids, + max_traces=max_traces_per_batch, + ) + + total_traces += traces + total_spans += spans + + # if traces > 0: + # log.debug( + # f"[flush] [{plan.value}] Chunk #{batch_idx}: {traces} traces, {spans} spans" + # ) + + return total_traces, total_spans diff --git a/api/ee/src/crons/meters.txt b/api/ee/src/crons/meters.txt index 15fb131882..3bd0528feb 100644 --- a/api/ee/src/crons/meters.txt +++ b/api/ee/src/crons/meters.txt @@ -1,2 +1,2 @@ * * * * * root echo "cron test $(date)" >> /proc/1/fd/1 2>&1 -*/15 * * * * root sh /meters.sh >> /proc/1/fd/1 2>&1 +15,45 * * * * root sh /meters.sh >> /proc/1/fd/1 2>&1 \ No newline at end of file diff --git a/api/ee/src/crons/spans.sh b/api/ee/src/crons/spans.sh new file mode 100644 index 0000000000..7f8c22227b --- /dev/null +++ b/api/ee/src/crons/spans.sh @@ -0,0 +1,39 @@ +#!/bin/sh +set -eu + +AGENTA_AUTH_KEY=$(tr '\0' '\n' < /proc/1/environ | grep ^AGENTA_AUTH_KEY= | cut -d= -f2-) + +echo "--------------------------------------------------------" +echo "[$(date)] spans.sh running from cron" >> /proc/1/fd/1 + +# Make POST request with 30 minute timeout (retention can be slow) +RESPONSE=$(curl \ + --max-time 1800 \ + --connect-timeout 10 \ + -s \ + -w "\nHTTP_STATUS:%{http_code}\n" \ + -X POST \ + -H "Authorization: Access ${AGENTA_AUTH_KEY}" \ + "http://api:8000/admin/billing/usage/flush" 2>&1) || CURL_EXIT=$? + +if [ -n "${CURL_EXIT:-}" ]; then + echo "❌ CURL failed with exit code: ${CURL_EXIT}" >> /proc/1/fd/1 + case ${CURL_EXIT} in + 6) echo " Could not resolve host" >> /proc/1/fd/1 ;; + 7) echo " Failed to connect to host" >> /proc/1/fd/1 ;; + 28) echo " Operation timeout (exceeded 1800s / 30 minutes)" >> /proc/1/fd/1 ;; + 52) echo " Empty reply from server" >> /proc/1/fd/1 ;; + 56) echo " Failure in receiving network data" >> /proc/1/fd/1 ;; + *) echo " Unknown curl error" >> /proc/1/fd/1 ;; + esac +else + echo "${RESPONSE}" >> /proc/1/fd/1 + HTTP_CODE=$(echo "${RESPONSE}" | grep "HTTP_STATUS:" | cut -d: -f2) + if [ "${HTTP_CODE}" = "200" ]; then + echo "✅ Spans retention completed successfully" >> /proc/1/fd/1 + else + echo "❌ Spans retention failed with HTTP ${HTTP_CODE}" >> /proc/1/fd/1 + fi +fi + +echo "[$(date)] spans.sh done" >> /proc/1/fd/1 diff --git a/api/ee/src/crons/spans.txt b/api/ee/src/crons/spans.txt new file mode 100644 index 0000000000..b30f889fe7 --- /dev/null +++ b/api/ee/src/crons/spans.txt @@ -0,0 +1,2 @@ +* * * * * root echo "cron test $(date)" >> /proc/1/fd/1 2>&1 +0,30 * * * * root sh /spans.sh >> /proc/1/fd/1 2>&1 diff --git a/api/ee/src/dbs/postgres/tracing/__init__.py b/api/ee/src/dbs/postgres/tracing/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/api/ee/src/dbs/postgres/tracing/dao.py b/api/ee/src/dbs/postgres/tracing/dao.py new file mode 100644 index 0000000000..a3c3b0f91b --- /dev/null +++ b/api/ee/src/dbs/postgres/tracing/dao.py @@ -0,0 +1,226 @@ +from typing import Optional, List, Tuple +from uuid import UUID +from datetime import datetime + +from sqlalchemy import delete, func, literal, select, text, tuple_, bindparam +from sqlalchemy.sql import any_ +from sqlalchemy.dialects.postgresql import ARRAY, UUID as PG_UUID + +from oss.src.utils.logging import get_module_logger + +from oss.src.models.db_models import ProjectDB + +from oss.src.dbs.postgres.shared.engine import engine +from oss.src.dbs.postgres.tracing.dbes import SpanDBE + +from ee.src.dbs.postgres.subscriptions.dbes import SubscriptionDBE + + +log = get_module_logger(__name__) + + +# --------------------------- # +# Raw SQL (text()) statements +# --------------------------- # + +CORE_PROJECTS_PAGE_SQL = text( + """ + SELECT p.id AS project_id + FROM public.projects p + JOIN public.subscriptions s + ON s.organization_id = p.organization_id + WHERE s.plan = :plan + AND (:project_id IS NULL OR p.id > :project_id) + ORDER BY p.id + LIMIT :max_projects; + """ +).bindparams( + bindparam("plan"), # text/varchar; driver will adapt + bindparam("project_id", type_=PG_UUID(as_uuid=True)), + bindparam("max_projects"), +) + +TRACING_DELETE_SQL = text( + """ + WITH expired_traces AS ( + SELECT sp.project_id, sp.trace_id + FROM public.spans sp + WHERE sp.parent_id IS NULL + AND sp.project_id = ANY(:project_ids) + AND sp.created_at < :cutoff + ORDER BY sp.created_at + LIMIT :max_traces + ), + expired_spans AS ( + DELETE FROM public.spans sp + USING expired_traces et + WHERE sp.project_id = et.project_id + AND sp.trace_id = et.trace_id + RETURNING 1 + ) + SELECT + (SELECT count(*) FROM expired_traces) AS traces_selected, + (SELECT count(*) FROM expired_spans) AS spans_deleted; + """ +).bindparams( + bindparam("project_ids", type_=ARRAY(PG_UUID(as_uuid=True))), + bindparam("cutoff"), + bindparam("max_traces"), +) + + +class TracingDAO: + # ---------------- # + # Raw-SQL versions + # ---------------- # + + async def _fetch_projects_with_plan( + self, + *, + plan: str, + project_id: Optional[UUID], + max_projects: int, + ) -> List[UUID]: + async with engine.core_session() as session: + result = await session.execute( + CORE_PROJECTS_PAGE_SQL, + { + "plan": plan, + "project_id": project_id if project_id else None, + "max_projects": max_projects, + }, + ) + + rows = result.fetchall() + + return [row[0] for row in rows] + + async def _delete_traces_before_cutoff( + self, + *, + cutoff: datetime, + project_ids: List[UUID], + max_traces: int, + ) -> Tuple[int, int]: + if not project_ids: + return (0, 0) + + async with engine.tracing_session() as session: + result = await session.execute( + TRACING_DELETE_SQL, + { + "project_ids": project_ids, + "cutoff": cutoff, + "max_traces": max_traces, + }, + ) + + row = result.fetchone() + + await session.commit() + + traces_selected = int(row[0]) if row and row[0] is not None else 0 + spans_deleted = int(row[1]) if row and row[1] is not None else 0 + + return (traces_selected, spans_deleted) + + # ------------------- # + # SQLAlchemy versions + # ------------------- # + + async def fetch_projects_with_plan( + self, + *, + plan: str, + project_id: Optional[UUID], + max_projects: int, + ) -> List[UUID]: + async with engine.core_session() as session: + stmt = ( + select(ProjectDB.id) + .select_from( + ProjectDB.__table__.join( + SubscriptionDBE.__table__, + SubscriptionDBE.organization_id == ProjectDB.organization_id, + ) + ) + .where(SubscriptionDBE.plan == plan) + ) + + if project_id: + stmt = stmt.where(ProjectDB.id > project_id) + + stmt = stmt.order_by(ProjectDB.id).limit(max_projects) + + result = await session.execute(stmt) + rows = result.fetchall() + + return [row[0] for row in rows] + + async def delete_traces_before_cutoff( + self, + *, + cutoff: datetime, + project_ids: List[UUID], + max_traces: int, + ) -> Tuple[int, int]: + if not project_ids: + return (0, 0) + + async with engine.tracing_session() as session: + project_ids_param = bindparam( + "project_ids", + value=project_ids, + type_=ARRAY(PG_UUID(as_uuid=True)), + ) + + expired_traces = ( + select( + SpanDBE.project_id.label("project_id"), + SpanDBE.trace_id.label("trace_id"), + ) + .where( + SpanDBE.parent_id.is_(None), + SpanDBE.project_id == any_(project_ids_param), + SpanDBE.created_at < bindparam("cutoff", value=cutoff), + ) + .order_by(SpanDBE.created_at) + .limit(bindparam("max_traces", value=max_traces)) + .cte("expired_traces") + ) + + deleted = ( + delete(SpanDBE) + .where( + tuple_(SpanDBE.project_id, SpanDBE.trace_id).in_( + select( + expired_traces.c.project_id, + expired_traces.c.trace_id, + ) + ) + ) + .returning(literal(1).label("deleted")) + .cte("deleted") + ) + + stmt = select( + select(func.count()) + .select_from(expired_traces) + .scalar_subquery() + .label("traces_selected"), + select(func.count()) + .select_from(deleted) + .scalar_subquery() + .label("spans_deleted"), + ) + + result = await session.execute(stmt) + + row = result.fetchone() + + await session.commit() + + traces_selected = int(row[0]) if row and row[0] is not None else 0 + spans_deleted = int(row[1]) if row and row[1] is not None else 0 + + return (traces_selected, spans_deleted) diff --git a/api/ee/src/main.py b/api/ee/src/main.py index 1f82d14c5d..13e9bcff3e 100644 --- a/api/ee/src/main.py +++ b/api/ee/src/main.py @@ -8,12 +8,14 @@ ) from ee.src.dbs.postgres.meters.dao import MetersDAO +from ee.src.dbs.postgres.tracing.dao import TracingDAO from ee.src.dbs.postgres.subscriptions.dao import SubscriptionsDAO from ee.src.core.meters.service import MetersService +from ee.src.core.tracing.service import TracingService from ee.src.core.subscriptions.service import SubscriptionsService -from ee.src.apis.fastapi.billing.router import SubscriptionsRouter +from ee.src.apis.fastapi.billing.router import BillingRouter from ee.src.apis.fastapi.organizations.router import ( router as organization_router, ) @@ -23,6 +25,8 @@ meters_dao = MetersDAO() +tracing_dao = TracingDAO() + subscriptions_dao = SubscriptionsDAO() # CORE ------------------------------------------------------------------------- @@ -31,6 +35,10 @@ meters_dao=meters_dao, ) +tracing_service = TracingService( + tracing_dao=tracing_dao, +) + subscription_service = SubscriptionsService( subscriptions_dao=subscriptions_dao, meters_service=meters_service, @@ -38,8 +46,10 @@ # APIS ------------------------------------------------------------------------- -subscriptions_router = SubscriptionsRouter( +billing_router = BillingRouter( subscription_service=subscription_service, + meters_service=meters_service, + tracing_service=tracing_service, ) @@ -50,13 +60,13 @@ def extend_main(app: FastAPI): # ROUTES ------------------------------------------------------------------- app.include_router( - router=subscriptions_router.router, + router=billing_router.router, prefix="/billing", tags=["Billing"], ) app.include_router( - router=subscriptions_router.admin_router, + router=billing_router.admin_router, prefix="/admin/billing", tags=["Admin", "Billing"], ) diff --git a/api/oss/databases/postgres/migrations/core/versions/a2b3c4d5e6f7_add_retention_helper_indexes.py b/api/oss/databases/postgres/migrations/core/versions/a2b3c4d5e6f7_add_retention_helper_indexes.py new file mode 100644 index 0000000000..6eed9ae834 --- /dev/null +++ b/api/oss/databases/postgres/migrations/core/versions/a2b3c4d5e6f7_add_retention_helper_indexes.py @@ -0,0 +1,38 @@ +"""Add retention helper indexes on projects + +Revision ID: a2b3c4d5e6f7 +Revises: a2b3c4d5e6f7 +Create Date: 2025-01-06 12:00:00.000000 + +""" + +from typing import Sequence, Union + +from alembic import op +from sqlalchemy import text + +# revision identifiers, used by Alembic. +revision: str = "a2b3c4d5e6f7" +down_revision: Union[str, None] = "a2b3c4d5e6f7" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # CREATE INDEX CONCURRENTLY must run outside a transaction + with op.get_context().autocommit_block(): + op.execute( + text(""" + CREATE INDEX CONCURRENTLY IF NOT EXISTS ix_projects_organization_id + ON public.projects (organization_id); + """) + ) + + +def downgrade() -> None: + with op.get_context().autocommit_block(): + op.execute( + text( + "DROP INDEX CONCURRENTLY IF EXISTS public.ix_projects_organization_id;" + ) + ) diff --git a/api/oss/databases/postgres/migrations/tracing/versions/a2b3c4d5e6f7_add_retention_indexes_and_autovacuum.py b/api/oss/databases/postgres/migrations/tracing/versions/a2b3c4d5e6f7_add_retention_indexes_and_autovacuum.py new file mode 100644 index 0000000000..a88af6166a --- /dev/null +++ b/api/oss/databases/postgres/migrations/tracing/versions/a2b3c4d5e6f7_add_retention_indexes_and_autovacuum.py @@ -0,0 +1,73 @@ +"""Add retention helper indexes on spans + autovacuum tuning + +Revision ID: a2b3c4d5e6f7 +Revises: cfa14a847972 +Create Date: 2025-01-06 12:00:00.000000 + +""" + +from typing import Sequence, Union + +from alembic import op +from sqlalchemy import text + +# revision identifiers, used by Alembic. +revision: str = "a2b3c4d5e6f7" +down_revision: Union[str, None] = "cfa14a847972" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + with op.get_context().autocommit_block(): + # Unique partial index: enforce single root span per trace + op.execute( + text(""" + CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS ux_spans_root_per_trace + ON public.spans (project_id, trace_id) + WHERE parent_id IS NULL; + """) + ) + + # Retention selection index (critical for performance) + op.execute( + text(""" + CREATE INDEX CONCURRENTLY IF NOT EXISTS ix_spans_root_project_created_trace + ON public.spans (project_id, created_at, trace_id) + WHERE parent_id IS NULL; + """) + ) + + # Autovacuum tuning for high-churn retention workload + op.execute( + text(""" + ALTER TABLE public.spans SET ( + autovacuum_vacuum_scale_factor = 0.02, + autovacuum_analyze_scale_factor = 0.01, + autovacuum_vacuum_cost_delay = 5, + autovacuum_vacuum_cost_limit = 4000 + ); + """) + ) + + +def downgrade() -> None: + with op.get_context().autocommit_block(): + op.execute( + text("DROP INDEX CONCURRENTLY IF EXISTS public.ux_spans_root_per_trace;") + ) + op.execute( + text( + "DROP INDEX CONCURRENTLY IF EXISTS public.ix_spans_root_project_created_trace;" + ) + ) + op.execute( + text(""" + ALTER TABLE public.spans RESET ( + autovacuum_vacuum_scale_factor, + autovacuum_analyze_scale_factor, + autovacuum_vacuum_cost_delay, + autovacuum_vacuum_cost_limit + ); + """) + ) diff --git a/api/oss/docker/Dockerfile.dev b/api/oss/docker/Dockerfile.dev index a546dec469..f5b1135329 100644 --- a/api/oss/docker/Dockerfile.dev +++ b/api/oss/docker/Dockerfile.dev @@ -26,6 +26,14 @@ COPY ./entrypoints /app/entrypoints/ ENV PYTHONPATH=/sdk:$PYTHONPATH +COPY ./oss/src/crons/queries.sh /queries.sh +COPY ./oss/src/crons/queries.txt /etc/cron.d/queries-cron +RUN sed -i -e '$a\' /etc/cron.d/queries-cron +RUN cat -A /etc/cron.d/queries-cron + +RUN chmod +x /queries.sh \ + && chmod 0644 /etc/cron.d/queries-cron + # # # @@ -34,12 +42,12 @@ ENV PYTHONPATH=/sdk:$PYTHONPATH # # -COPY ./oss/src/crons/queries.sh /queries.sh -COPY ./oss/src/crons/queries.txt /etc/cron.d/queries-cron -RUN sed -i -e '$a\' /etc/cron.d/queries-cron -RUN cat -A /etc/cron.d/queries-cron +# +# +# +# -RUN chmod +x /queries.sh \ - && chmod 0644 /etc/cron.d/queries-cron +# +# EXPOSE 8000 diff --git a/api/oss/docker/Dockerfile.gh b/api/oss/docker/Dockerfile.gh index 9221016c1a..f628584453 100644 --- a/api/oss/docker/Dockerfile.gh +++ b/api/oss/docker/Dockerfile.gh @@ -24,10 +24,15 @@ RUN poetry config virtualenvs.create false \ COPY ./oss /app/oss/ COPY ./entrypoints /app/entrypoints/ -# Set PYTHONPATH so worker scripts can import from oss module ENV PYTHONPATH=/app -# +COPY ./oss/src/crons/queries.sh /queries.sh +COPY ./oss/src/crons/queries.txt /etc/cron.d/queries-cron +RUN sed -i -e '$a\' /etc/cron.d/queries-cron +RUN cat -A /etc/cron.d/queries-cron + +RUN chmod +x /queries.sh \ + && chmod 0644 /etc/cron.d/queries-cron # # @@ -37,12 +42,12 @@ ENV PYTHONPATH=/app # # -COPY ./oss/src/crons/queries.sh /queries.sh -COPY ./oss/src/crons/queries.txt /etc/cron.d/queries-cron -RUN sed -i -e '$a\' /etc/cron.d/queries-cron -RUN cat -A /etc/cron.d/queries-cron +# +# +# +# -RUN chmod +x /queries.sh \ - && chmod 0644 /etc/cron.d/queries-cron +# +# EXPOSE 8000 diff --git a/docs/designs/data-retention/PR.md b/docs/designs/data-retention/PR.md new file mode 100644 index 0000000000..164139f8eb --- /dev/null +++ b/docs/designs/data-retention/PR.md @@ -0,0 +1,11 @@ +# PR: Data Retention + +## Summary +- Add plan-based trace retention windows. +- Add tracing retention DAO/service and admin endpoint to flush old spans. +- Add cron entry to trigger retention runs. + +## Scope +- EE tracing retention (plans and billing endpoint). +- Core/OSS migrations for retention-related indexes. +- UI exposure of retention settings. diff --git a/docs/designs/data-retention/QA.md b/docs/designs/data-retention/QA.md new file mode 100644 index 0000000000..dfe003f50b --- /dev/null +++ b/docs/designs/data-retention/QA.md @@ -0,0 +1,14 @@ +# QA: Data Retention + +## Functional checks +- Verify logs show per-plan retention cutoff and deletion counts. +- Confirm old traces are removed and newer traces remain. + +## Edge cases +- No projects for a plan (should log skip/zero counts). +- Unlimited retention (plan should be skipped). +- Large batch sizes (ensure job completes under cron timeout). + +## Monitoring +- Check logs for `flush` start/end markers. +- Track DB load during retention run. diff --git a/docs/designs/data-retention/README.md b/docs/designs/data-retention/README.md new file mode 100644 index 0000000000..ed3fb6ecbc --- /dev/null +++ b/docs/designs/data-retention/README.md @@ -0,0 +1,26 @@ +# Data Retention + +## Overview +Data retention defines how long trace data is kept per plan. Retention windows are +enforced by a scheduled job that deletes traces older than the configured window +for each plan. + +## Goals +- Enforce plan-based retention windows for traces. +- Keep deletion workloads bounded with batching. +- Provide a safe administrative endpoint for on-demand retention runs. + +## Architecture +- Retention windows are configured in plan entitlements. +- A tracing DAO deletes root traces and their spans before a cutoff. +- A cron triggers the retention job via the admin billing endpoint. + +## Runtime flow +1. Cron calls the admin endpoint `/admin/billing/usage/flush`. +2. The tracing service enumerates plans with finite retention. +3. For each plan, it batches projects and deletes traces older than the cutoff. +4. Logs include per-plan and total deletion counts. + +## Operational notes +- Retention runs should be idempotent and safe to re-run. +- Deletions are limited per batch to keep runtime bounded. diff --git a/docs/designs/data-retention/data-retention-periods.initial.specs.md b/docs/designs/data-retention/data-retention-periods.initial.specs.md new file mode 100644 index 0000000000..4787ef9a99 --- /dev/null +++ b/docs/designs/data-retention/data-retention-periods.initial.specs.md @@ -0,0 +1,343 @@ +# Plan-based retention (Postgres) — schema migrations + retention job + +This document captures the **exact schema changes** (indexes + constraints + autovacuum reloptions) and a **Python/SQLAlchemy implementation** for plan-based retention with: + +- **Two databases** + - **Core DB**: `projects`, `subscriptions` (and org/workspace tables, etc.) + - **Tracing DB**: `spans` (no cross-DB foreign keys) +- **Option A per plan** (but split across DBs due to the cross-DB boundary): + 1) In **Core DB**, resolve **eligible projects** for a given `plan` (paged in chunks). + 2) In **Tracing DB**, delete **expired traces** (trace-scoped) for that chunk using a **constant cutoff** and a **max traces per chunk** limit. +- **No temp tables** +- **Do not use**: + - `spans.deleted_at` + - `subscriptions.active` + +--- + +## Current schema snapshots (as provided) + +### `public.spans` (Tracing DB) +- Primary key: `PRIMARY KEY (project_id, trace_id, span_id)` +- Relevant columns: + - `project_id uuid NOT NULL` + - `trace_id uuid NOT NULL` + - `span_id uuid NOT NULL` + - `parent_id uuid NULL` (root span when NULL) + - `created_at timestamptz NOT NULL DEFAULT CURRENT_TIMESTAMP` +- Existing indexes (subset): + - `ix_project_id_trace_id (project_id, trace_id)` + - various GIN indexes for jsonb columns +- Missing for retention: + - **partial root-span index** for `(project_id, created_at, trace_id) WHERE parent_id IS NULL` + - (recommended) **unique partial root per trace**: `(project_id, trace_id) WHERE parent_id IS NULL` + +### `public.projects` (Core DB) +- PK: `PRIMARY KEY (id)` +- Has `organization_id uuid NULL` with an FK to `organizations(id)` +- Only index currently shown: `projects_pkey (id)` +- Missing for retention: + - index on `organization_id` to support `subscriptions -> projects` joins + +### `public.subscriptions` (Core DB) +- PK: `PRIMARY KEY (organization_id)` +- Columns: + - `plan varchar NOT NULL` + - `organization_id uuid NOT NULL` + - `active boolean NOT NULL` (ignored for retention) +- Only index currently shown: `subscriptions_pkey (organization_id)` +- Missing for retention: + - index on `plan` for “all orgs on plan X” lookup + +--- + +## Retention algorithm (current flow) + +We run this periodically (e.g., every **15–60 minutes**). + +For each **plan** with a finite retention window: + +1) **Core DB**: page through eligible projects in deterministic chunks (`max_projects_per_batch`). +2) For each project chunk: + - **Tracing DB**: delete up to `max_traces_per_batch` expired traces. + - This is **one delete per chunk** (no inner drain loop). + +**Parameters (4)** +- `plan: str` — subscription plan value (enum serialized to text). +- `cutoff: datetime` — constant cutoff timestamp (e.g., `now() - retention_minutes`). +- `max_projects_per_batch: int` — number of project IDs to fetch per page from Core DB. +- `max_traces_per_batch: int` — max number of **traces** deleted per project chunk. + +**Plan retention source** +- Retention minutes are defined per plan in entitlements as `Quota.retention` on `Counter.TRACES`. +- Plans without a retention value are skipped. + +--- + +## Schema migrations (DDL) + +Because `CREATE INDEX CONCURRENTLY` cannot run inside a transaction, Alembic migrations must use **autocommit** for those statements. + +### A. Core DB migrations + +#### A.1 `projects`: add index on `organization_id` + +```sql +CREATE INDEX CONCURRENTLY IF NOT EXISTS ix_projects_organization_id +ON public.projects (organization_id); +``` + +#### A.2 `subscriptions`: add index on `plan` (do not include `active`) + +```sql +CREATE INDEX CONCURRENTLY IF NOT EXISTS ix_subscriptions_plan +ON public.subscriptions (plan); +``` + +*(Nothing else is required for retention in Core DB.)* + +--- + +### B. Tracing DB migrations + +#### B.1 `spans`: enforce a single root span per `(project_id, trace_id)` (recommended) + +This is a **unique partial index**, not a table constraint, and is compatible with your composite PK. + +```sql +CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS ux_spans_root_per_trace +ON public.spans (project_id, trace_id) +WHERE parent_id IS NULL; +``` + +> This does not affect deletion correctness (you’ll still delete by `(project_id, trace_id)`), but it makes traces deterministic and prevents malformed input. + +#### B.2 `spans`: index for retention selection (critical) + +This is the index that allows Postgres to find expired root spans using an index range scan. + +```sql +CREATE INDEX CONCURRENTLY IF NOT EXISTS ix_spans_root_project_created_trace +ON public.spans (project_id, created_at, trace_id) +WHERE parent_id IS NULL; +``` + +#### B.3 `spans`: autovacuum reloptions (steady-state retention) + +Tune autovacuum at the table level. These defaults are a reasonable starting point for “append + periodic deletes” tables; adjust after observing `pg_stat_all_tables.n_dead_tup` and vacuum timing. + +```sql +ALTER TABLE public.spans SET ( + autovacuum_vacuum_scale_factor = 0.02, + autovacuum_analyze_scale_factor = 0.01, + autovacuum_vacuum_cost_delay = 5, + autovacuum_vacuum_cost_limit = 4000 +); +``` + +Notes: +- Scale factors control *when* autovacuum triggers (lower triggers sooner). +- Cost settings control vacuum aggressiveness. If vacuum lags, increase `cost_limit` or reduce `cost_delay`. +- You can tune other options later (`autovacuum_vacuum_threshold`, `freeze_max_age`, etc.) but start minimal. + +--- + +## Alembic migration templates (current implementation) + +Because Core DB and Tracing DB are separate databases, you typically have **separate Alembic environments** (two `alembic.ini` / env.py) or at least separate branches. + +Below are two minimal revisions. + +### A. Core DB Alembic revision (indexes) + +```python +\"\"\"Add retention helper indexes on projects/subscriptions + +Revision ID: +Revises: +Create Date: +\"\"\" + +from alembic import op + +# revision identifiers, used by Alembic. +revision = "" +down_revision = "" +branch_labels = None +depends_on = None + +def upgrade() -> None: + # CREATE INDEX CONCURRENTLY must run outside a transaction + with op.get_context().autocommit_block(): + op.execute(text(\"\"\" + CREATE INDEX CONCURRENTLY IF NOT EXISTS ix_projects_organization_id + ON public.projects (organization_id); + \"\"\")) + op.execute(text(\"\"\" + CREATE INDEX CONCURRENTLY IF NOT EXISTS ix_subscriptions_plan + ON public.subscriptions (plan); + \"\"\")) + +def downgrade() -> None: + with op.get_context().autocommit_block(): + op.execute(text(\"DROP INDEX CONCURRENTLY IF EXISTS public.ix_projects_organization_id;\")) + op.execute(text(\"DROP INDEX CONCURRENTLY IF EXISTS public.ix_subscriptions_plan;\")) +``` + +> If your Alembic version doesn’t expose `op.text`, use `from sqlalchemy import text` and `conn.execute(text(...))`. + +--- + +### B. Tracing DB Alembic revision (spans indexes + autovacuum) + +```python +\"\"\"Add retention helper indexes on spans + autovacuum tuning + +Revision ID: +Revises: +Create Date: +\"\"\" + +from alembic import op +from sqlalchemy import text + +revision = "" +down_revision = "" +branch_labels = None +depends_on = None + +def upgrade() -> None: + with op.get_context().autocommit_block(): + op.execute(text(\"\"\" + CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS ux_spans_root_per_trace + ON public.spans (project_id, trace_id) + WHERE parent_id IS NULL; + \"\"\")) + + op.execute(text(\"\"\" + CREATE INDEX CONCURRENTLY IF NOT EXISTS ix_spans_root_project_created_trace + ON public.spans (project_id, created_at, trace_id) + WHERE parent_id IS NULL; + \"\"\")) + + # autovacuum reloptions (can run in a transaction, but safe here) + op.execute(text(\"\"\" + ALTER TABLE public.spans SET ( + autovacuum_vacuum_scale_factor = 0.02, + autovacuum_analyze_scale_factor = 0.01, + autovacuum_vacuum_cost_delay = 5, + autovacuum_vacuum_cost_limit = 4000 + ); + \"\"\")) + +def downgrade() -> None: + with op.get_context().autocommit_block(): + op.execute(text(\"DROP INDEX CONCURRENTLY IF EXISTS public.ux_spans_root_per_trace;\")) + op.execute(text(\"DROP INDEX CONCURRENTLY IF EXISTS public.ix_spans_root_project_created_trace;\")) + # Reset reloptions (NULL clears table-level overrides) + op.execute(text(\"ALTER TABLE public.spans RESET (autovacuum_vacuum_scale_factor, autovacuum_analyze_scale_factor, autovacuum_vacuum_cost_delay, autovacuum_vacuum_cost_limit);\")) +``` + +--- + +## SQL used by the retention job + +### 1) Core DB — page eligible projects for a plan (keyset pagination) + +> We page by `projects.id` (uuid). UUID ordering is arbitrary but total and stable, so it works for pagination. + +```sql +-- Inputs: +-- :plan (text) +-- :project_id (uuid, nullable) +-- :max_projects (int) + +SELECT p.id AS project_id +FROM public.projects p +JOIN public.subscriptions s + ON s.organization_id = p.organization_id +WHERE s.plan = :plan + AND (:project_id IS NULL OR p.id > :project_id) +ORDER BY p.id +LIMIT :max_projects; +``` + +### 2) Tracing DB — delete expired traces for a chunk (single statement) + +This statement: +- selects up to `:max_traces` **root spans** in the given projects chunk that are older than `:cutoff` +- deletes **all spans** for those `(project_id, trace_id)` pairs +- returns **two counters**: + - `traces_selected` (<= `max_traces`) + - `spans_deleted` (total spans removed) + +```sql +-- Inputs: +-- :project_ids (uuid[]) +-- :cutoff (timestamptz) +-- :max_traces (int) + +WITH expired_traces AS ( + SELECT sp.project_id, sp.trace_id + FROM public.spans sp + WHERE sp.parent_id IS NULL + AND sp.project_id = ANY(:project_ids) + AND sp.created_at < :cutoff + ORDER BY sp.created_at + LIMIT :max_traces +), +deleted AS ( + DELETE FROM public.spans sp + USING expired_traces et + WHERE sp.project_id = et.project_id + AND sp.trace_id = et.trace_id + RETURNING 1 +) +SELECT + (SELECT count(*) FROM expired_traces) AS traces_selected, + (SELECT count(*) FROM deleted) AS spans_deleted; +``` + +--- + +## Implementation (SQLAlchemy, async) + +Current implementation lives in: +- `ee/src/dbs/postgres/tracing/dao.py` (raw SQL + SQLAlchemy CTE versions). +- `ee/src/core/tracing/service.py` (plan loop + batch orchestration). + +Key points: +- Async SQLAlchemy sessions for Core and Tracing DBs. +- Project pagination uses `project_id` cursor (keyset pagination by `projects.id`). +- One delete statement per project chunk (no inner drain loop). +- SQLAlchemy version uses a CTE to select expired traces and delete spans in one statement. +- Raw SQL version uses the same CTE shape (`expired_traces` → delete). + +## Entrypoints + +- Admin endpoint: `POST /admin/billing/usage/flush` in `ee/src/apis/fastapi/billing/router.py`. +- Cron: `ee/src/crons/spans.sh` calls the endpoint (30 minute timeout). +- Locking: `acquire_lock`/`release_lock` guard the flush to avoid overlaps. + +--- + +## Operational recommendations (minimal) + +- Run per plan in code; you can sequence plans from smallest retention to largest or vice versa. +- Current defaults: + - `max_projects_per_batch = 500` + - `max_traces_per_batch = 5000` +- Keep transactions short: + - commit after each delete statement +- Verify index usage once: + - `EXPLAIN (ANALYZE, BUFFERS)` on the Tracing DB delete CTE (replace binds with literals) + +--- + +## What this setup guarantees + +- Deletes are **trace-scoped** (by `(project_id, trace_id)`), preserving trace integrity. +- Eligibility is **evaluated at deletion time** (projects are resolved dynamically by plan). +- Selection is **index-driven** on root spans via the partial index. +- No reliance on `subscriptions.active` or `spans.deleted_at`. +- No temp tables. diff --git a/web/oss/src/components/EvalRunDetails/components/FocusDrawerHeader.tsx b/web/oss/src/components/EvalRunDetails/components/FocusDrawerHeader.tsx index 7d830b330d..ebfda7d2fb 100644 --- a/web/oss/src/components/EvalRunDetails/components/FocusDrawerHeader.tsx +++ b/web/oss/src/components/EvalRunDetails/components/FocusDrawerHeader.tsx @@ -74,7 +74,7 @@ const FocusDrawerHeader = ({runId, scenarioId, onScenarioChange}: FocusDrawerHea loadNextPage, ]) - const scenarioLabel = evalType === "human" ? "Scenario" : "Test case" + const scenarioLabel = evalType === "human" ? "Scenario" : "Testcase" const options = useMemo(() => { const base = loadedScenarios.map((row) => ({ diff --git a/web/oss/src/components/EvalRunDetails/components/FocusDrawerSidePanel.tsx b/web/oss/src/components/EvalRunDetails/components/FocusDrawerSidePanel.tsx index fb78d62c5a..27c49826b4 100644 --- a/web/oss/src/components/EvalRunDetails/components/FocusDrawerSidePanel.tsx +++ b/web/oss/src/components/EvalRunDetails/components/FocusDrawerSidePanel.tsx @@ -47,7 +47,7 @@ const FocusDrawerSidePanel = ({runId, scenarioId}: FocusDrawerSidePanelProps) => [rows, scenarioId], ) const scenarioIndex: number | undefined = scenarioRow?.scenarioIndex - const scenarioBase = evalType === "human" ? "Scenario" : "Test case" + const scenarioBase = evalType === "human" ? "Scenario" : "Testcase" const parentTitle = scenarioIndex ? `${scenarioBase} #${scenarioIndex}` : scenarioId diff --git a/web/oss/src/components/EvalRunDetails/components/views/ConfigurationView/components/TestsetSection.tsx b/web/oss/src/components/EvalRunDetails/components/views/ConfigurationView/components/TestsetSection.tsx index 3501f18754..a04395403a 100644 --- a/web/oss/src/components/EvalRunDetails/components/views/ConfigurationView/components/TestsetSection.tsx +++ b/web/oss/src/components/EvalRunDetails/components/views/ConfigurationView/components/TestsetSection.tsx @@ -100,7 +100,7 @@ const TestsetCard = ({
- Test cases + Testcases {testcaseCount ?? "—"} diff --git a/web/oss/src/components/EvalRunDetails/components/views/ConfigurationView/index.tsx b/web/oss/src/components/EvalRunDetails/components/views/ConfigurationView/index.tsx index 8cf0a00c2f..29845e8702 100644 --- a/web/oss/src/components/EvalRunDetails/components/views/ConfigurationView/index.tsx +++ b/web/oss/src/components/EvalRunDetails/components/views/ConfigurationView/index.tsx @@ -263,7 +263,7 @@ const sectionDefinitions: SectionDefinition[] = [ }, { key: "testsets", - title: "Test sets", + title: "Testsets", hasData: (summary) => summary.hasTestsets, getSubtitle: (summary) => summary?.testsetSubtitle, render: (runId) => , diff --git a/web/oss/src/components/EvalRunDetails/components/views/OverviewView/components/MetadataSummaryTable.tsx b/web/oss/src/components/EvalRunDetails/components/views/OverviewView/components/MetadataSummaryTable.tsx index 2f405dadf6..32fab8831e 100644 --- a/web/oss/src/components/EvalRunDetails/components/views/OverviewView/components/MetadataSummaryTable.tsx +++ b/web/oss/src/components/EvalRunDetails/components/views/OverviewView/components/MetadataSummaryTable.tsx @@ -381,7 +381,7 @@ const InvocationErrorsCell = makeMetricCell("attributes.ag.metrics.errors.cumula const METADATA_ROWS: MetadataRowRecord[] = [ { key: "testsets", - label: "Test set", + label: "Testset", Cell: LegacyTestsetsCell, shouldDisplay: ({snapshots}) => snapshots.some(({testsetIds}) => (testsetIds?.length ?? 0) > 0), diff --git a/web/oss/src/components/EvalRunDetails/components/views/OverviewView/components/RunNameTag.tsx b/web/oss/src/components/EvalRunDetails/components/views/OverviewView/components/RunNameTag.tsx index 4ffa5ff94f..43a80c3863 100644 --- a/web/oss/src/components/EvalRunDetails/components/views/OverviewView/components/RunNameTag.tsx +++ b/web/oss/src/components/EvalRunDetails/components/views/OverviewView/components/RunNameTag.tsx @@ -135,7 +135,7 @@ const RunNameTag = ({runId, label, accentColor}: RunNameTagProps) => { />
- Test sets + Testsets { size="small" />
- Test case {currentRowIndex + 1} + Testcase {currentRowIndex + 1}
} mainContent={} diff --git a/web/oss/src/components/Playground/Components/PlaygroundGenerations/assets/GenerationCompletionRow/SingleView.tsx b/web/oss/src/components/Playground/Components/PlaygroundGenerations/assets/GenerationCompletionRow/SingleView.tsx index 3ae93a2704..fc0aa37bd1 100644 --- a/web/oss/src/components/Playground/Components/PlaygroundGenerations/assets/GenerationCompletionRow/SingleView.tsx +++ b/web/oss/src/components/Playground/Components/PlaygroundGenerations/assets/GenerationCompletionRow/SingleView.tsx @@ -127,7 +127,7 @@ const SingleView = ({ size="small" /> {testCaseNumber && ( - Test case {testCaseNumber} + Testcase {testCaseNumber} )}
@@ -196,7 +196,7 @@ const SingleView = ({ size="small" /> {testCaseNumber && ( - Test case {testCaseNumber} + Testcase {testCaseNumber} )}
diff --git a/web/oss/src/components/SharedDrawers/TraceDrawer/components/TraceSidePanel/TraceReferences/index.tsx b/web/oss/src/components/SharedDrawers/TraceDrawer/components/TraceSidePanel/TraceReferences/index.tsx index e112ff8b09..986f15b24f 100644 --- a/web/oss/src/components/SharedDrawers/TraceDrawer/components/TraceSidePanel/TraceReferences/index.tsx +++ b/web/oss/src/components/SharedDrawers/TraceDrawer/components/TraceSidePanel/TraceReferences/index.tsx @@ -22,7 +22,7 @@ const labelMap: Record = { application: "Applications", application_variant: "Variants", environment: "Environments", - testset: "Test sets", + testset: "Testsets", } const TraceReferences = () => { diff --git a/web/oss/src/components/TestcasesTableNew/hooks/useTestcaseActions.ts b/web/oss/src/components/TestcasesTableNew/hooks/useTestcaseActions.ts index 4db5a4d76e..e34d9cc31f 100644 --- a/web/oss/src/components/TestcasesTableNew/hooks/useTestcaseActions.ts +++ b/web/oss/src/components/TestcasesTableNew/hooks/useTestcaseActions.ts @@ -43,7 +43,7 @@ export interface UseTestcaseActionsResult { // Navigation blocker skipBlockerRef: React.MutableRefObject - // Test case actions + // Testcase actions handleAddTestcase: () => void handleDeleteSelected: (selectedRowKeys: React.Key[]) => void handleRowClick: (record: TestcaseTableRow) => void