Skip to content

Commit 3f6b4c1

Browse files
authored
Merge pull request #3398 from Agenta-AI/feat/add-data-retention-periods
[feat] Add data retention periods (for `spans`)
2 parents 7350dd4 + f71b45f commit 3f6b4c1

File tree

40 files changed

+1317
-93
lines changed

40 files changed

+1317
-93
lines changed
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
"""Add retention helper indexes on projects/subscriptions
2+
3+
Revision ID: a2b3c4d5e6f7
4+
Revises: c3b2a1d4e5f6
5+
Create Date: 2025-01-06 12:00:00.000000
6+
7+
"""
8+
9+
from typing import Sequence, Union
10+
11+
from alembic import op
12+
from sqlalchemy import text
13+
14+
# revision identifiers, used by Alembic.
15+
revision: str = "a2b3c4d5e6f7"
16+
down_revision: Union[str, None] = "c3b2a1d4e5f6"
17+
branch_labels: Union[str, Sequence[str], None] = None
18+
depends_on: Union[str, Sequence[str], None] = None
19+
20+
21+
def upgrade() -> None:
22+
# CREATE INDEX CONCURRENTLY must run outside a transaction
23+
with op.get_context().autocommit_block():
24+
# Index for projects -> subscriptions join
25+
op.execute(
26+
text("""
27+
CREATE INDEX CONCURRENTLY IF NOT EXISTS ix_projects_organization_id
28+
ON public.projects (organization_id);
29+
""")
30+
)
31+
32+
# Index for plan-based lookups
33+
op.execute(
34+
text("""
35+
CREATE INDEX CONCURRENTLY IF NOT EXISTS ix_subscriptions_plan
36+
ON public.subscriptions (plan);
37+
""")
38+
)
39+
40+
41+
def downgrade() -> None:
42+
with op.get_context().autocommit_block():
43+
op.execute(
44+
text(
45+
"DROP INDEX CONCURRENTLY IF EXISTS public.ix_projects_organization_id;"
46+
)
47+
)
48+
op.execute(
49+
text("DROP INDEX CONCURRENTLY IF EXISTS public.ix_subscriptions_plan;")
50+
)
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
"""Add retention helper indexes on spans + autovacuum tuning
2+
3+
Revision ID: a2b3c4d5e6f7
4+
Revises: cfa14a847972
5+
Create Date: 2025-01-06 12:00:00.000000
6+
7+
"""
8+
9+
from typing import Sequence, Union
10+
11+
from alembic import op
12+
from sqlalchemy import text
13+
14+
# revision identifiers, used by Alembic.
15+
revision: str = "a2b3c4d5e6f7"
16+
down_revision: Union[str, None] = "cfa14a847972"
17+
branch_labels: Union[str, Sequence[str], None] = None
18+
depends_on: Union[str, Sequence[str], None] = None
19+
20+
21+
def upgrade() -> None:
22+
with op.get_context().autocommit_block():
23+
# Unique partial index: enforce single root span per trace
24+
op.execute(
25+
text("""
26+
CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS ux_spans_root_per_trace
27+
ON public.spans (project_id, trace_id)
28+
WHERE parent_id IS NULL;
29+
""")
30+
)
31+
32+
# Retention selection index (critical for performance)
33+
op.execute(
34+
text("""
35+
CREATE INDEX CONCURRENTLY IF NOT EXISTS ix_spans_root_project_created_trace
36+
ON public.spans (project_id, created_at, trace_id)
37+
WHERE parent_id IS NULL;
38+
""")
39+
)
40+
41+
# Autovacuum tuning for high-churn retention workload
42+
op.execute(
43+
text("""
44+
ALTER TABLE public.spans SET (
45+
autovacuum_vacuum_scale_factor = 0.02,
46+
autovacuum_analyze_scale_factor = 0.01,
47+
autovacuum_vacuum_cost_delay = 5,
48+
autovacuum_vacuum_cost_limit = 4000
49+
);
50+
""")
51+
)
52+
53+
54+
def downgrade() -> None:
55+
with op.get_context().autocommit_block():
56+
op.execute(
57+
text("DROP INDEX CONCURRENTLY IF EXISTS public.ux_spans_root_per_trace;")
58+
)
59+
op.execute(
60+
text(
61+
"DROP INDEX CONCURRENTLY IF EXISTS public.ix_spans_root_project_created_trace;"
62+
)
63+
)
64+
op.execute(
65+
text("""
66+
ALTER TABLE public.spans RESET (
67+
autovacuum_vacuum_scale_factor,
68+
autovacuum_analyze_scale_factor,
69+
autovacuum_vacuum_cost_delay,
70+
autovacuum_vacuum_cost_limit
71+
);
72+
""")
73+
)

api/ee/docker/Dockerfile.dev

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,14 @@ COPY ./entrypoints /app/entrypoints/
2626

2727
ENV PYTHONPATH=/sdk:$PYTHONPATH
2828

29+
COPY ./oss/src/crons/queries.sh /queries.sh
30+
COPY ./oss/src/crons/queries.txt /etc/cron.d/queries-cron
31+
RUN sed -i -e '$a\' /etc/cron.d/queries-cron
32+
RUN cat -A /etc/cron.d/queries-cron
33+
34+
RUN chmod +x /queries.sh \
35+
&& chmod 0644 /etc/cron.d/queries-cron
36+
2937
COPY ./ee/src/crons/meters.sh /meters.sh
3038
COPY ./ee/src/crons/meters.txt /etc/cron.d/meters-cron
3139
RUN sed -i -e '$a\' /etc/cron.d/meters-cron
@@ -34,12 +42,12 @@ RUN cat -A /etc/cron.d/meters-cron
3442
RUN chmod +x /meters.sh \
3543
&& chmod 0644 /etc/cron.d/meters-cron
3644

37-
COPY ./oss/src/crons/queries.sh /queries.sh
38-
COPY ./oss/src/crons/queries.txt /etc/cron.d/queries-cron
39-
RUN sed -i -e '$a\' /etc/cron.d/queries-cron
40-
RUN cat -A /etc/cron.d/queries-cron
45+
COPY ./ee/src/crons/spans.sh /spans.sh
46+
COPY ./ee/src/crons/spans.txt /etc/cron.d/spans-cron
47+
RUN sed -i -e '$a\' /etc/cron.d/spans-cron
48+
RUN cat -A /etc/cron.d/spans-cron
4149

42-
RUN chmod +x /queries.sh \
43-
&& chmod 0644 /etc/cron.d/queries-cron
50+
RUN chmod +x /spans.sh \
51+
&& chmod 0644 /etc/cron.d/spans-cron
4452

4553
EXPOSE 8000

api/ee/docker/Dockerfile.gh

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,15 @@ COPY ./ee /app/ee/
2424
COPY ./oss /app/oss/
2525
COPY ./entrypoints /app/entrypoints/
2626

27-
#
27+
ENV PYTHONPATH=/app
28+
29+
COPY ./oss/src/crons/queries.sh /queries.sh
30+
COPY ./oss/src/crons/queries.txt /etc/cron.d/queries-cron
31+
RUN sed -i -e '$a\' /etc/cron.d/queries-cron
32+
RUN cat -A /etc/cron.d/queries-cron
33+
34+
RUN chmod +x /queries.sh \
35+
&& chmod 0644 /etc/cron.d/queries-cron
2836

2937
COPY ./ee/src/crons/meters.sh /meters.sh
3038
COPY ./ee/src/crons/meters.txt /etc/cron.d/meters-cron
@@ -34,12 +42,12 @@ RUN cat -A /etc/cron.d/meters-cron
3442
RUN chmod +x /meters.sh \
3543
&& chmod 0644 /etc/cron.d/meters-cron
3644

37-
COPY ./oss/src/crons/queries.sh /queries.sh
38-
COPY ./oss/src/crons/queries.txt /etc/cron.d/queries-cron
39-
RUN sed -i -e '$a\' /etc/cron.d/queries-cron
40-
RUN cat -A /etc/cron.d/queries-cron
45+
COPY ./ee/src/crons/spans.sh /spans.sh
46+
COPY ./ee/src/crons/spans.txt /etc/cron.d/spans-cron
47+
RUN sed -i -e '$a\' /etc/cron.d/spans-cron
48+
RUN cat -A /etc/cron.d/spans-cron
4149

42-
RUN chmod +x /queries.sh \
43-
&& chmod 0644 /etc/cron.d/queries-cron
50+
RUN chmod +x /spans.sh \
51+
&& chmod 0644 /etc/cron.d/spans-cron
4452

4553
EXPOSE 8000

api/ee/src/apis/fastapi/billing/router.py

Lines changed: 80 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from oss.src.utils.common import is_ee
1212
from oss.src.utils.logging import get_module_logger
1313
from oss.src.utils.exceptions import intercept_exceptions
14-
from oss.src.utils.caching import get_cache, set_cache, invalidate_cache
14+
from oss.src.utils.caching import acquire_lock, release_lock
1515
from oss.src.utils.env import env
1616

1717
from oss.src.services.db_manager import (
@@ -24,6 +24,8 @@
2424
from ee.src.models.shared_models import Permission
2525
from ee.src.core.entitlements.types import ENTITLEMENTS, CATALOG, Tracker, Quota
2626
from ee.src.core.subscriptions.types import Event, Plan
27+
from ee.src.core.meters.service import MetersService
28+
from ee.src.core.tracing.service import TracingService
2729
from ee.src.core.subscriptions.service import (
2830
SubscriptionsService,
2931
SwitchException,
@@ -49,12 +51,16 @@
4951
)
5052

5153

52-
class SubscriptionsRouter:
54+
class BillingRouter:
5355
def __init__(
5456
self,
5557
subscription_service: SubscriptionsService,
58+
meters_service: MetersService,
59+
tracing_service: TracingService,
5660
):
5761
self.subscription_service = subscription_service
62+
self.meters_service = meters_service
63+
self.tracing_service = tracing_service
5864

5965
# ROUTER
6066
self.router = APIRouter()
@@ -178,6 +184,13 @@ async def _reset_organization_flags(self, organization_id: str) -> None:
178184
operation_id="admin_report_usage",
179185
)
180186

187+
self.admin_router.add_api_route(
188+
"/usage/flush",
189+
self.flush_usage,
190+
methods=["POST"],
191+
operation_id="admin_flush_usage",
192+
)
193+
181194
# HANDLERS
182195

183196
@intercept_exceptions()
@@ -811,7 +824,7 @@ async def fetch_usage(
811824
content={"status": "error", "message": "Plan not found"},
812825
)
813826

814-
meters = await self.subscription_service.meters_service.fetch(
827+
meters = await self.meters_service.fetch(
815828
organization_id=organization_id,
816829
)
817830

@@ -846,29 +859,24 @@ async def report_usage(
846859
log.info("[report] [endpoint] Trigger")
847860

848861
try:
849-
report_ongoing = await get_cache(
862+
lock_key = await acquire_lock(
850863
namespace="meters:report",
851864
key={},
865+
ttl=3600, # 1 hour
852866
)
853867

854-
if report_ongoing:
868+
if not lock_key:
855869
log.info("[report] [endpoint] Skipped (ongoing)")
856870
return JSONResponse(
857871
status_code=status.HTTP_200_OK,
858872
content={"status": "skipped"},
859873
)
860874

861-
# await set_cache(
862-
# namespace="meters:report",
863-
# key={},
864-
# value=True,
865-
# ttl=60 * 60, # 1 hour
866-
# )
867875
log.info("[report] [endpoint] Lock acquired")
868876

869877
try:
870878
log.info("[report] [endpoint] Reporting usage started")
871-
await self.subscription_service.meters_service.report()
879+
await self.meters_service.report()
872880
log.info("[report] [endpoint] Reporting usage completed")
873881

874882
return JSONResponse(
@@ -887,7 +895,7 @@ async def report_usage(
887895
)
888896

889897
finally:
890-
await invalidate_cache(
898+
await release_lock(
891899
namespace="meters:report",
892900
key={},
893901
)
@@ -904,6 +912,65 @@ async def report_usage(
904912
content={"status": "error", "message": "Fatal error"},
905913
)
906914

915+
@intercept_exceptions()
916+
async def flush_usage(
917+
self,
918+
):
919+
log.info("[flush] [endpoint] Trigger")
920+
921+
try:
922+
lock_key = await acquire_lock(
923+
namespace="spans:flush",
924+
key={},
925+
ttl=3600, # 1 hour
926+
)
927+
928+
if not lock_key:
929+
log.info("[flush] [endpoint] Skipped (ongoing)")
930+
return JSONResponse(
931+
status_code=status.HTTP_200_OK,
932+
content={"status": "skipped"},
933+
)
934+
935+
log.info("[flush] [endpoint] Lock acquired")
936+
937+
try:
938+
log.info("[flush] [endpoint] Retention started")
939+
await self.tracing_service.flush_spans()
940+
log.info("[flush] [endpoint] Retention completed")
941+
942+
return JSONResponse(
943+
status_code=status.HTTP_200_OK,
944+
content={"status": "success"},
945+
)
946+
947+
except Exception:
948+
log.error(
949+
"[flush] [endpoint] Retention failed:",
950+
exc_info=True,
951+
)
952+
return JSONResponse(
953+
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
954+
content={"status": "error", "message": "Retention failed"},
955+
)
956+
957+
finally:
958+
await release_lock(
959+
namespace="spans:flush",
960+
key={},
961+
)
962+
log.info("[flush] [endpoint] Lock released")
963+
964+
except Exception:
965+
log.error(
966+
"[flush] [endpoint] Fatal error:",
967+
exc_info=True,
968+
)
969+
return JSONResponse(
970+
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
971+
content={"status": "error", "message": "Fatal error"},
972+
)
973+
907974
# ROUTES
908975

909976
@intercept_exceptions()

0 commit comments

Comments
 (0)