Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
"""Add retention helper indexes on projects/subscriptions

Revision ID: a2b3c4d5e6f7
Revises: c3b2a1d4e5f6
Create Date: 2025-01-06 12:00:00.000000

"""

from typing import Sequence, Union

from alembic import op
from sqlalchemy import text

# revision identifiers, used by Alembic.
revision: str = "a2b3c4d5e6f7"
down_revision: Union[str, None] = "c3b2a1d4e5f6"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
# CREATE INDEX CONCURRENTLY must run outside a transaction
with op.get_context().autocommit_block():
# Index for projects -> subscriptions join
op.execute(
text("""
CREATE INDEX CONCURRENTLY IF NOT EXISTS ix_projects_organization_id
ON public.projects (organization_id);
""")
)

# Index for plan-based lookups
op.execute(
text("""
CREATE INDEX CONCURRENTLY IF NOT EXISTS ix_subscriptions_plan
ON public.subscriptions (plan);
""")
)


def downgrade() -> None:
with op.get_context().autocommit_block():
op.execute(
text(
"DROP INDEX CONCURRENTLY IF EXISTS public.ix_projects_organization_id;"
)
)
op.execute(
text("DROP INDEX CONCURRENTLY IF EXISTS public.ix_subscriptions_plan;")
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
"""Add retention helper indexes on spans + autovacuum tuning

Revision ID: a2b3c4d5e6f7
Revises: cfa14a847972
Create Date: 2025-01-06 12:00:00.000000

"""

from typing import Sequence, Union

from alembic import op
from sqlalchemy import text

# revision identifiers, used by Alembic.
revision: str = "a2b3c4d5e6f7"
down_revision: Union[str, None] = "cfa14a847972"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
with op.get_context().autocommit_block():
# Unique partial index: enforce single root span per trace
op.execute(
text("""
CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS ux_spans_root_per_trace
ON public.spans (project_id, trace_id)
WHERE parent_id IS NULL;
""")
)

# Retention selection index (critical for performance)
op.execute(
text("""
CREATE INDEX CONCURRENTLY IF NOT EXISTS ix_spans_root_project_created_trace
ON public.spans (project_id, created_at, trace_id)
WHERE parent_id IS NULL;
""")
)

# Autovacuum tuning for high-churn retention workload
op.execute(
text("""
ALTER TABLE public.spans SET (
autovacuum_vacuum_scale_factor = 0.02,
autovacuum_analyze_scale_factor = 0.01,
autovacuum_vacuum_cost_delay = 5,
autovacuum_vacuum_cost_limit = 4000
);
""")
)


def downgrade() -> None:
with op.get_context().autocommit_block():
op.execute(
text("DROP INDEX CONCURRENTLY IF EXISTS public.ux_spans_root_per_trace;")
)
op.execute(
text(
"DROP INDEX CONCURRENTLY IF EXISTS public.ix_spans_root_project_created_trace;"
)
)
op.execute(
text("""
ALTER TABLE public.spans RESET (
autovacuum_vacuum_scale_factor,
autovacuum_analyze_scale_factor,
autovacuum_vacuum_cost_delay,
autovacuum_vacuum_cost_limit
);
""")
)
20 changes: 14 additions & 6 deletions api/ee/docker/Dockerfile.dev
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ COPY ./entrypoints /app/entrypoints/

ENV PYTHONPATH=/sdk:$PYTHONPATH

COPY ./oss/src/crons/queries.sh /queries.sh
COPY ./oss/src/crons/queries.txt /etc/cron.d/queries-cron
RUN sed -i -e '$a\' /etc/cron.d/queries-cron
RUN cat -A /etc/cron.d/queries-cron

RUN chmod +x /queries.sh \
&& chmod 0644 /etc/cron.d/queries-cron

COPY ./ee/src/crons/meters.sh /meters.sh
COPY ./ee/src/crons/meters.txt /etc/cron.d/meters-cron
RUN sed -i -e '$a\' /etc/cron.d/meters-cron
Expand All @@ -34,12 +42,12 @@ RUN cat -A /etc/cron.d/meters-cron
RUN chmod +x /meters.sh \
&& chmod 0644 /etc/cron.d/meters-cron

COPY ./oss/src/crons/queries.sh /queries.sh
COPY ./oss/src/crons/queries.txt /etc/cron.d/queries-cron
RUN sed -i -e '$a\' /etc/cron.d/queries-cron
RUN cat -A /etc/cron.d/queries-cron
COPY ./ee/src/crons/spans.sh /spans.sh
COPY ./ee/src/crons/spans.txt /etc/cron.d/spans-cron
RUN sed -i -e '$a\' /etc/cron.d/spans-cron
RUN cat -A /etc/cron.d/spans-cron

RUN chmod +x /queries.sh \
&& chmod 0644 /etc/cron.d/queries-cron
RUN chmod +x /spans.sh \
&& chmod 0644 /etc/cron.d/spans-cron

EXPOSE 8000
22 changes: 15 additions & 7 deletions api/ee/docker/Dockerfile.gh
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,15 @@ COPY ./ee /app/ee/
COPY ./oss /app/oss/
COPY ./entrypoints /app/entrypoints/

#
ENV PYTHONPATH=/app

COPY ./oss/src/crons/queries.sh /queries.sh
COPY ./oss/src/crons/queries.txt /etc/cron.d/queries-cron
RUN sed -i -e '$a\' /etc/cron.d/queries-cron
RUN cat -A /etc/cron.d/queries-cron

RUN chmod +x /queries.sh \
&& chmod 0644 /etc/cron.d/queries-cron

COPY ./ee/src/crons/meters.sh /meters.sh
COPY ./ee/src/crons/meters.txt /etc/cron.d/meters-cron
Expand All @@ -34,12 +42,12 @@ RUN cat -A /etc/cron.d/meters-cron
RUN chmod +x /meters.sh \
&& chmod 0644 /etc/cron.d/meters-cron

COPY ./oss/src/crons/queries.sh /queries.sh
COPY ./oss/src/crons/queries.txt /etc/cron.d/queries-cron
RUN sed -i -e '$a\' /etc/cron.d/queries-cron
RUN cat -A /etc/cron.d/queries-cron
COPY ./ee/src/crons/spans.sh /spans.sh
COPY ./ee/src/crons/spans.txt /etc/cron.d/spans-cron
RUN sed -i -e '$a\' /etc/cron.d/spans-cron
RUN cat -A /etc/cron.d/spans-cron

RUN chmod +x /queries.sh \
&& chmod 0644 /etc/cron.d/queries-cron
RUN chmod +x /spans.sh \
&& chmod 0644 /etc/cron.d/spans-cron

EXPOSE 8000
93 changes: 80 additions & 13 deletions api/ee/src/apis/fastapi/billing/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from oss.src.utils.common import is_ee
from oss.src.utils.logging import get_module_logger
from oss.src.utils.exceptions import intercept_exceptions
from oss.src.utils.caching import get_cache, set_cache, invalidate_cache
from oss.src.utils.caching import acquire_lock, release_lock
from oss.src.utils.env import env

from oss.src.services.db_manager import (
Expand All @@ -23,6 +23,8 @@
from ee.src.models.shared_models import Permission
from ee.src.core.entitlements.types import ENTITLEMENTS, CATALOG, Tracker, Quota
from ee.src.core.subscriptions.types import Event, Plan
from ee.src.core.meters.service import MetersService
from ee.src.core.tracing.service import TracingService
from ee.src.core.subscriptions.service import (
SubscriptionsService,
SwitchException,
Expand All @@ -47,12 +49,16 @@
)


class SubscriptionsRouter:
class BillingRouter:
def __init__(
self,
subscription_service: SubscriptionsService,
meters_service: MetersService,
tracing_service: TracingService,
):
self.subscription_service = subscription_service
self.meters_service = meters_service
self.tracing_service = tracing_service

# ROUTER
self.router = APIRouter()
Expand Down Expand Up @@ -153,6 +159,13 @@ def __init__(
operation_id="admin_report_usage",
)

self.admin_router.add_api_route(
"/usage/flush",
self.flush_usage,
methods=["POST"],
operation_id="admin_flush_usage",
)

# HANDLERS

@intercept_exceptions()
Expand Down Expand Up @@ -782,7 +795,7 @@ async def fetch_usage(
content={"status": "error", "message": "Plan not found"},
)

meters = await self.subscription_service.meters_service.fetch(
meters = await self.meters_service.fetch(
organization_id=organization_id,
)

Expand Down Expand Up @@ -817,29 +830,24 @@ async def report_usage(
log.info("[report] [endpoint] Trigger")

try:
report_ongoing = await get_cache(
lock_key = await acquire_lock(
namespace="meters:report",
key={},
ttl=3600, # 1 hour
)

if report_ongoing:
if not lock_key:
log.info("[report] [endpoint] Skipped (ongoing)")
return JSONResponse(
status_code=status.HTTP_200_OK,
content={"status": "skipped"},
)

# await set_cache(
# namespace="meters:report",
# key={},
# value=True,
# ttl=60 * 60, # 1 hour
# )
log.info("[report] [endpoint] Lock acquired")

try:
log.info("[report] [endpoint] Reporting usage started")
await self.subscription_service.meters_service.report()
await self.meters_service.report()
log.info("[report] [endpoint] Reporting usage completed")

return JSONResponse(
Expand All @@ -858,7 +866,7 @@ async def report_usage(
)

finally:
await invalidate_cache(
await release_lock(
namespace="meters:report",
key={},
)
Expand All @@ -875,6 +883,65 @@ async def report_usage(
content={"status": "error", "message": "Fatal error"},
)

@intercept_exceptions()
async def flush_usage(
self,
):
log.info("[flush] [endpoint] Trigger")

try:
lock_key = await acquire_lock(
namespace="spans:flush",
key={},
ttl=3600, # 1 hour
)

if not lock_key:
log.info("[flush] [endpoint] Skipped (ongoing)")
return JSONResponse(
status_code=status.HTTP_200_OK,
content={"status": "skipped"},
)

log.info("[flush] [endpoint] Lock acquired")

try:
log.info("[flush] [endpoint] Retention started")
await self.tracing_service.flush_spans()
log.info("[flush] [endpoint] Retention completed")

return JSONResponse(
status_code=status.HTTP_200_OK,
content={"status": "success"},
)

except Exception:
log.error(
"[flush] [endpoint] Retention failed:",
exc_info=True,
)
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={"status": "error", "message": "Retention failed"},
)

finally:
await release_lock(
namespace="spans:flush",
key={},
)
log.info("[flush] [endpoint] Lock released")

except Exception:
log.error(
"[flush] [endpoint] Fatal error:",
exc_info=True,
)
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={"status": "error", "message": "Fatal error"},
)

# ROUTES

@intercept_exceptions()
Expand Down
Loading