diff --git a/api/ee/src/apis/fastapi/billing/router.py b/api/ee/src/apis/fastapi/billing/router.py index 3facdde2fa..028980a220 100644 --- a/api/ee/src/apis/fastapi/billing/router.py +++ b/api/ee/src/apis/fastapi/billing/router.py @@ -809,48 +809,64 @@ async def report_usage( ): log.info("[report] [endpoint] Trigger") - report_ongoing = await get_cache( - namespace="meters:report", - key={}, - ) - - if report_ongoing: - log.info("[report] [endpoint] Skipped (ongoing)") - - return JSONResponse( - status_code=status.HTTP_200_OK, - content={"status": "skipped"}, + try: + report_ongoing = await get_cache( + namespace="meters:report", + key={}, ) - await set_cache( - namespace="meters:report", - key={}, - value=True, - ttl=60 * 60, # 1 hour - ) + if report_ongoing: + log.info("[report] [endpoint] Skipped (ongoing)") + return JSONResponse( + status_code=status.HTTP_200_OK, + content={"status": "skipped"}, + ) - log.info("[report] [endpoint] Lock acquired") + # await set_cache( + # namespace="meters:report", + # key={}, + # value=True, + # ttl=60 * 60, # 1 hour + # ) + log.info("[report] [endpoint] Lock acquired") - try: - log.info("[report] [endpoint] Reporting usage started") - await self.subscription_service.meters_service.report() - log.info("[report] [endpoint] Reporting usage completed") + try: + log.info("[report] [endpoint] Reporting usage started") + await self.subscription_service.meters_service.report() + log.info("[report] [endpoint] Reporting usage completed") - return JSONResponse( - status_code=status.HTTP_200_OK, - content={"status": "success"}, - ) + return JSONResponse( + status_code=status.HTTP_200_OK, + content={"status": "success"}, + ) - except Exception as e: - raise HTTPException(status_code=500, detail="unexpected error") from e + except Exception: + log.error( + "[report] [endpoint] Report failed:", + exc_info=True, + ) + return JSONResponse( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + content={"status": "error", "message": "Report failed"}, + ) - finally: - await invalidate_cache( - namespace="meters:report", - key={}, - ) + finally: + await invalidate_cache( + namespace="meters:report", + key={}, + ) + log.info("[report] [endpoint] Lock released") - log.info("[report] [endpoint] Lock released") + except Exception: + # Catch-all for any errors, including cache errors + log.error( + "[report] [endpoint] Fatal error:", + exc_info=True, + ) + return JSONResponse( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + content={"status": "error", "message": "Fatal error"}, + ) # ROUTES diff --git a/api/ee/src/core/meters/interfaces.py b/api/ee/src/core/meters/interfaces.py index 9f4e66605d..263c3f0e77 100644 --- a/api/ee/src/core/meters/interfaces.py +++ b/api/ee/src/core/meters/interfaces.py @@ -11,6 +11,7 @@ def __init__(self): async def dump( self, + limit: Optional[int] = None, ) -> list[MeterDTO]: """ Dump all meters where 'synced' != 'value'. diff --git a/api/ee/src/core/meters/service.py b/api/ee/src/core/meters/service.py index 5dd0544e4e..2a14464714 100644 --- a/api/ee/src/core/meters/service.py +++ b/api/ee/src/core/meters/service.py @@ -28,8 +28,9 @@ def __init__( async def dump( self, + limit: Optional[int] = None, ) -> List[MeterDTO]: - return await self.meters_dao.dump() + return await self.meters_dao.dump(limit=limit) async def bump( self, @@ -68,168 +69,191 @@ async def report(self): log.warn("[report] Missing Stripe API Key.") return + log.info("[report] ============================================") log.info("[report] Starting meter report job") + log.info("[report] ============================================") - try: - log.info("[report] Dumping meters to sync") - meters = await self.dump() - log.info(f"[report] Dumped {len(meters)} meters to sync") - except Exception as e: # pylint: disable=broad-exception-caught - log.error("[report] Error dumping meters: %s", e) - return + BATCH_SIZE = 100 + MAX_BATCHES = 50 # Safety limit: 50 batches * 100 meters = 5000 meters max + total_reported = 0 + total_skipped = 0 + total_errors = 0 + batch_number = 0 - reported_count = 0 - skipped_count = 0 - error_count = 0 + while True: + batch_number += 1 - for meter in meters: - log.info( - f"[report] Processing meter {meter.organization_id}/{meter.key} (year={meter.year}, month={meter.month}) (value={meter.value}, synced={meter.synced})" - ) + if batch_number > MAX_BATCHES: + log.error( + f"[report] ⚠️ Reached maximum batch limit ({MAX_BATCHES}), stopping to prevent infinite loop" + ) + break + + log.info(f"[report] Processing batch #{batch_number}") - if meter.subscription is None: + try: + meters = await self.dump(limit=BATCH_SIZE) log.info( - f"[report] Skipping meter {meter.organization_id}/{meter.key} - no subscription" + f"[report] Dumped {len(meters)} meters for batch #{batch_number}" ) - skipped_count += 1 - continue - try: - if meter.key.value in REPORTS: - subscription_id = meter.subscription.subscription_id - customer_id = meter.subscription.customer_id - - if not subscription_id: - log.warn( - f"[report] Skipping meter {meter.organization_id}/{meter.key} - missing subscription_id" - ) - skipped_count += 1 - continue - - if not customer_id: - log.warn( - f"[report] Skipping meter {meter.organization_id}/{meter.key} - missing customer_id" - ) - skipped_count += 1 - continue - - if meter.key.name in Gauge.__members__.keys(): - try: - price_id = ( - AGENTA_PRICING.get(meter.subscription.plan, {}) - .get("users", {}) - .get("price") - ) + if not meters: + log.info(f"[report] No more meters to process") + break - if not price_id: - log.warn( - f"[report] Skipping meter {meter.organization_id}/{meter.key} - missing price_id for plan {meter.subscription.plan}" - ) - skipped_count += 1 - continue + except Exception as e: # pylint: disable=broad-exception-caught + log.error( + f"[report] Error dumping meters for batch #{batch_number}:", + exc_info=True, + ) + break - _id = None - for item in stripe.SubscriptionItem.list( - subscription=subscription_id, - ).auto_paging_iter(): - if item.price.id == price_id: - _id = item.id - break - - if not _id: - log.warn( - f"[report] Skipping meter {meter.organization_id}/{meter.key} - subscription item not found for price_id {price_id}" - ) - skipped_count += 1 - continue + reported_count = 0 + skipped_count = 0 + error_count = 0 - quantity = meter.value - items = [{"id": _id, "quantity": quantity}] + for idx, meter in enumerate(meters, 1): + if meter.subscription is None: + skipped_count += 1 + continue - stripe.Subscription.modify( - subscription_id, - items=items, - ) + try: + if meter.key.value in REPORTS: + subscription_id = meter.subscription.subscription_id + customer_id = meter.subscription.customer_id - reported_count += 1 - log.info( - f"[report] [stripe] {meter.organization_id} | | {'sync ' if meter.key.value in REPORTS else ' '} | {meter.key}: {meter.value}" + if not subscription_id: + log.warn( + f"[report] Skipping meter {meter.organization_id}/{meter.key} - missing subscription_id" ) + skipped_count += 1 + continue - except Exception as e: # pylint: disable=broad-exception-caught - log.error( - f"[report] Error modifying subscription for {meter.organization_id}/{meter.key}: %s", - e, + if not customer_id: + log.warn( + f"[report] Skipping meter {meter.organization_id}/{meter.key} - missing customer_id" ) - error_count += 1 + skipped_count += 1 continue - if meter.key.name in Counter.__members__.keys(): - try: - event_name = meter.key.value - delta = meter.value - meter.synced + if meter.key.name in Gauge.__members__.keys(): + try: + price_id = ( + AGENTA_PRICING.get(meter.subscription.plan, {}) + .get("users", {}) + .get("price") + ) + + if not price_id: + log.warn( + f"[report] Skipping meter {meter.organization_id}/{meter.key} - missing price_id" + ) + skipped_count += 1 + continue + + _id = None + for item in stripe.SubscriptionItem.list( + subscription=subscription_id, + ).auto_paging_iter(): + if item.price.id == price_id: + _id = item.id + break + + if not _id: + log.warn( + f"[report] Skipping meter {meter.organization_id}/{meter.key} - subscription item not found" + ) + skipped_count += 1 + continue + + quantity = meter.value + items = [{"id": _id, "quantity": quantity}] + + stripe.Subscription.modify( + subscription_id, + items=items, + ) - if delta <= 0: + reported_count += 1 log.info( - f"[report] Skipping meter {meter.organization_id}/{meter.key} - delta is {delta}" + f"[stripe] updating: {meter.organization_id} | | {'sync ' if meter.key.value in REPORTS else ' '} | {meter.key}: {meter.value}" + ) + + except Exception as e: # pylint: disable=broad-exception-caught + log.error( + f"[report] Error modifying subscription for {meter.organization_id}/{meter.key}:", + exc_info=True, ) - skipped_count += 1 + error_count += 1 continue - payload = {"delta": delta, "customer_id": customer_id} + if meter.key.name in Counter.__members__.keys(): + try: + event_name = meter.key.value + delta = meter.value - meter.synced - stripe.billing.MeterEvent.create( - event_name=event_name, - payload=payload, - ) + if delta <= 0: + skipped_count += 1 + continue - reported_count += 1 - log.info( - f"[report] [stripe] {meter.organization_id} | {(('0' if (meter.month != 0 and meter.month < 10) else '') + str(meter.month)) if meter.month != 0 else ' '}.{meter.year if meter.year else ' '} | {'sync ' if meter.key.value in REPORTS else ' '} | {meter.key}: {meter.value - meter.synced}" - ) + payload = {"delta": delta, "customer_id": customer_id} - except Exception as e: # pylint: disable=broad-exception-caught - log.error( - f"[report] Error creating meter event for {meter.organization_id}/{meter.key}: %s", - e, - ) - error_count += 1 - continue + stripe.billing.MeterEvent.create( + event_name=event_name, + payload=payload, + ) - except Exception as e: # pylint: disable=broad-exception-caught - log.error( - f"E[report] rror reporting meter {meter.organization_id}/{meter.key}: %s", - e, - ) - error_count += 1 + reported_count += 1 + log.info( + f"[stripe] reporting: {meter.organization_id} | {(('0' if (meter.month != 0 and meter.month < 10) else '') + str(meter.month)) if meter.month != 0 else ' '}.{meter.year if meter.year else ' '} | {'sync ' if meter.key.value in REPORTS else ' '} | {meter.key}: {meter.value - meter.synced}" + ) - log.info( - f"[report] Reporting complete: {reported_count} reported, {skipped_count} skipped, {error_count} errors" - ) + except Exception as e: # pylint: disable=broad-exception-caught + log.error( + f"[report] Error creating meter event for {meter.organization_id}/{meter.key}:", + exc_info=True, + ) + error_count += 1 + continue - log.info(f"[report] Setting synced values for {len(meters)} meters") - synced_count = 0 - sync_error_count = 0 + except Exception as e: # pylint: disable=broad-exception-caught + log.error( + f"[report] Error reporting meter {meter.organization_id}/{meter.key}:", + exc_info=True, + ) + error_count += 1 - for meter in meters: - try: + log.info( + f"[report] Batch #{batch_number}: {reported_count} reported, {skipped_count} skipped, {error_count} errors" + ) + + # Set synced values for this batch + for meter in meters: meter.synced = meter.value - synced_count += 1 + + # Commit this batch to DB + try: + log.info( + f"[report] Bumping batch #{batch_number} ({len(meters)} meters)" + ) + await self.bump(meters=meters) + log.info(f"[report] ✅ Batch #{batch_number} completed successfully") except Exception as e: # pylint: disable=broad-exception-caught log.error( - f"[report] Error setting synced value for {meter.organization_id}/{meter.key}: %s", - e, + f"[report] ❌ Error bumping batch #{batch_number}:", exc_info=True ) - sync_error_count += 1 - - log.info( - f"[report] Set synced values: {synced_count} success, {sync_error_count} errors" - ) - - try: - log.info(f"[report] Bumping {len(meters)} meters") - await self.bump(meters=meters) - log.info("[report] Bumped successfully") - except Exception as e: # pylint: disable=broad-exception-caught - log.error("[report] Error bumping meters: %s", e) - return + total_errors += len(meters) + continue + + # Update totals + total_reported += reported_count + total_skipped += skipped_count + total_errors += error_count + + log.info(f"[report] ============================================") + log.info(f"[report] ✅ REPORT JOB COMPLETED") + log.info(f"[report] Total batches: {batch_number}") + log.info(f"[report] Total reported: {total_reported}") + log.info(f"[report] Total skipped: {total_skipped}") + log.info(f"[report] Total errors: {total_errors}") + log.info(f"[report] ============================================") diff --git a/api/ee/src/crons/meters.sh b/api/ee/src/crons/meters.sh index c0f7d8c5ae..e5144d6947 100644 --- a/api/ee/src/crons/meters.sh +++ b/api/ee/src/crons/meters.sh @@ -6,12 +6,34 @@ AGENTA_AUTH_KEY=$(tr '\0' '\n' < /proc/1/environ | grep ^AGENTA_AUTH_KEY= | cut echo "--------------------------------------------------------" echo "[$(date)] meters.sh running from cron" >> /proc/1/fd/1 -# Make POST request, show status and response -curl \ +# Make POST request with 15 minute timeout +RESPONSE=$(curl \ + --max-time 900 \ + --connect-timeout 10 \ -s \ -w "\nHTTP_STATUS:%{http_code}\n" \ -X POST \ -H "Authorization: Access ${AGENTA_AUTH_KEY}" \ - "http://api:8000/admin/billing/usage/report" || echo "❌ CURL failed" + "http://api:8000/admin/billing/usage/report" 2>&1) || CURL_EXIT=$? + +if [ -n "${CURL_EXIT:-}" ]; then + echo "❌ CURL failed with exit code: ${CURL_EXIT}" >> /proc/1/fd/1 + case ${CURL_EXIT} in + 6) echo " Could not resolve host" >> /proc/1/fd/1 ;; + 7) echo " Failed to connect to host" >> /proc/1/fd/1 ;; + 28) echo " Operation timeout (exceeded 900s / 15 minutes)" >> /proc/1/fd/1 ;; + 52) echo " Empty reply from server (server closed connection)" >> /proc/1/fd/1 ;; + 56) echo " Failure in receiving network data" >> /proc/1/fd/1 ;; + *) echo " Unknown curl error" >> /proc/1/fd/1 ;; + esac +else + echo "${RESPONSE}" >> /proc/1/fd/1 + HTTP_CODE=$(echo "${RESPONSE}" | grep "HTTP_STATUS:" | cut -d: -f2) + if [ "${HTTP_CODE}" = "200" ]; then + echo "✅ Report completed successfully" >> /proc/1/fd/1 + else + echo "❌ Report failed with HTTP ${HTTP_CODE}" >> /proc/1/fd/1 + fi +fi echo "[$(date)] meters.sh done" >> /proc/1/fd/1 \ No newline at end of file diff --git a/api/ee/src/crons/meters.txt b/api/ee/src/crons/meters.txt index dbdb77a390..15fb131882 100644 --- a/api/ee/src/crons/meters.txt +++ b/api/ee/src/crons/meters.txt @@ -1,2 +1,2 @@ * * * * * root echo "cron test $(date)" >> /proc/1/fd/1 2>&1 -* * * * * root sh /meters.sh >> /proc/1/fd/1 2>&1 +*/15 * * * * root sh /meters.sh >> /proc/1/fd/1 2>&1 diff --git a/api/ee/src/dbs/postgres/meters/dao.py b/api/ee/src/dbs/postgres/meters/dao.py index a9575cb2d3..bb8771feef 100644 --- a/api/ee/src/dbs/postgres/meters/dao.py +++ b/api/ee/src/dbs/postgres/meters/dao.py @@ -27,8 +27,11 @@ class MetersDAO(MetersDAOInterface): def __init__(self): pass - async def dump(self) -> list[MeterDTO]: - log.info("[report] [dump] Starting meter dump") + async def dump( + self, + limit: Optional[int] = None, + ) -> list[MeterDTO]: + log.info(f"[report] [dump] Starting (limit={limit or 'none'})") async with engine.core_session() as session: try: @@ -36,25 +39,25 @@ async def dump(self) -> list[MeterDTO]: select(MeterDBE) .filter(MeterDBE.synced != MeterDBE.value) .options(joinedload(MeterDBE.subscription)) - ) # NO RISK OF DEADLOCK - - log.info( - "[report] [dump] Executing query for meters where synced != value" + .order_by( + MeterDBE.organization_id, + MeterDBE.key, + MeterDBE.year, + MeterDBE.month, + ) ) + + if limit: + stmt = stmt.limit(limit) + result = await session.execute(stmt) meters = result.scalars().all() - log.info( - f"[report] [dump] Found {len(meters)} meters with synced != value" - ) + log.info(f"[report] [dump] Found {len(meters)} unsynced meters") dto_list = [] for meter in meters: try: - log.info( - f"[report] [dump] Processing meter {meter.organization_id}/{meter.key} year={meter.year} month={meter.month} value={meter.value} synced={meter.synced}" - ) - subscription_dto = None if meter.subscription: subscription_dto = SubscriptionDTO( @@ -65,13 +68,6 @@ async def dump(self) -> list[MeterDTO]: active=meter.subscription.active, anchor=meter.subscription.anchor, ) - log.info( - f"[report] [dump] Meter {meter.organization_id}/{meter.key} has subscription (customer_id={meter.subscription.customer_id}, plan={meter.subscription.plan})" - ) - else: - log.warn( - f"[report] [dump] Meter {meter.organization_id}/{meter.key} has NO subscription" - ) meter_dto = MeterDTO( organization_id=meter.organization_id, @@ -84,20 +80,18 @@ async def dump(self) -> list[MeterDTO]: ) dto_list.append(meter_dto) - except Exception as e: + except Exception: log.error( - f"[report] [dump] Error converting meter {meter.organization_id if hasattr(meter, 'organization_id') else 'UNKNOWN'} to DTO: %s", - e, + f"[report] [dump] Error converting meter to DTO", + exc_info=True, ) continue - log.info( - f"[report] [dump] Successfully converted {len(dto_list)} meters to DTOs" - ) + log.info(f"[report] [dump] Converted {len(dto_list)} meters to DTOs") return dto_list - except Exception as e: - log.error(f"[report] [dump] Error executing dump query: %s", e) + except Exception: + log.error("[report] [dump] Error executing query", exc_info=True) raise async def bump( @@ -105,33 +99,20 @@ async def bump( meters: list[MeterDTO], ) -> None: if not meters: - log.info("[report] [bump] No meters to bump") return - log.info(f"[report] [bump] Starting bump for {len(meters)} meters") + log.info(f"[report] [bump] Starting for {len(meters)} meters") - # Sort for consistent lock acquisition sorted_meters = sorted( meters, - key=lambda m: ( - m.organization_id, - m.key, - m.year, - m.month, - ), + key=lambda m: (m.organization_id, m.key, m.year, m.month), ) - log.info(f"[report] [bump] Sorted {len(sorted_meters)} meters") - async with engine.core_session() as session: - updates_executed = 0 + updated_count = 0 for meter in sorted_meters: try: - log.info( - f"[report] [bump] Updating meter {meter.organization_id}/{meter.key} year={meter.year} month={meter.month} synced={meter.synced}" - ) - stmt = ( update(MeterDBE) .where( @@ -144,35 +125,29 @@ async def bump( ) result = await session.execute(stmt) - updates_executed += 1 if result.rowcount == 0: log.warn( - f"[report] [bump] No rows updated for meter {meter.organization_id}/{meter.key} year={meter.year} month={meter.month}" + f"[report] [bump] No rows updated for {meter.organization_id}/{meter.key}" ) else: - log.info( - f"[report] [bump] Updated {result.rowcount} row(s) for meter {meter.organization_id}/{meter.key}" - ) + updated_count += result.rowcount - except Exception as e: + except Exception: log.error( - f"[report] [bump] Error executing update for meter {meter.organization_id}/{meter.key}: %s", - e, + f"[report] [bump] Error updating meter {meter.organization_id}/{meter.key}", + exc_info=True, ) - raise # Re-raise to trigger rollback + raise - log.info( - f"[report] [bump] Executed {updates_executed} update statements, committing transaction" - ) + log.info(f"[report] [bump] Committing {updated_count} updates") try: await session.commit() - log.info(f"[report] [bump] Transaction committed successfully") - except Exception as e: - log.error(f"[report] [bump] Error committing transaction: %s", e) + log.info(f"[report] [bump] ✅ Committed successfully") + except Exception: + log.error("[report] [bump] ❌ Commit failed", exc_info=True) await session.rollback() - log.error(f"[report] [bump] Transaction rolled back") raise async def fetch(