From 1aa678943b99783ec58fba734729e62c01eb1fa5 Mon Sep 17 00:00:00 2001 From: Juan Pablo Vega Date: Wed, 12 Nov 2025 15:02:10 +0100 Subject: [PATCH 1/2] bombard with logs --- api/ee/src/core/meters/service.py | 200 +++++++++++++++++--------- api/ee/src/dbs/postgres/meters/dao.py | 156 +++++++++++++++----- 2 files changed, 247 insertions(+), 109 deletions(-) diff --git a/api/ee/src/core/meters/service.py b/api/ee/src/core/meters/service.py index ed2ef0fa33..a5e5d7ebbd 100644 --- a/api/ee/src/core/meters/service.py +++ b/api/ee/src/core/meters/service.py @@ -68,106 +68,166 @@ async def report(self): log.warn("Missing Stripe API Key.") return + log.info("[report] Starting meter report job") + try: meters = await self.dump() - + log.info(f"[report] Dumped {len(meters)} meters to sync") except Exception as e: # pylint: disable=broad-exception-caught log.error("Error dumping meters: %s", e) return - try: - for meter in meters: - if meter.subscription is None: - continue - - try: - if meter.key.value in REPORTS: - subscription_id = meter.subscription.subscription_id - customer_id = meter.subscription.customer_id - - if not subscription_id: - continue - - if not customer_id: - continue + reported_count = 0 + skipped_count = 0 + error_count = 0 + + for meter in meters: + log.debug( + f"[report] Processing meter {meter.organization_id}/{meter.key} (value={meter.value}, synced={meter.synced})" + ) + + if meter.subscription is None: + log.debug( + f"[report] Skipping meter {meter.organization_id}/{meter.key} - no subscription" + ) + skipped_count += 1 + continue + + try: + if meter.key.value in REPORTS: + subscription_id = meter.subscription.subscription_id + customer_id = meter.subscription.customer_id + + if not subscription_id: + log.warn( + f"[report] Skipping meter {meter.organization_id}/{meter.key} - missing subscription_id" + ) + skipped_count += 1 + continue + + if not customer_id: + log.warn( + f"[report] Skipping meter {meter.organization_id}/{meter.key} - missing customer_id" + ) + skipped_count += 1 + continue + + if meter.key.name in Gauge.__members__.keys(): + try: + price_id = ( + AGENTA_PRICING.get(meter.subscription.plan, {}) + .get("users", {}) + .get("price") + ) - if meter.key.name in Gauge.__members__.keys(): - try: - price_id = ( - AGENTA_PRICING.get(meter.subscription.plan, {}) - .get("users", {}) - .get("price") + if not price_id: + log.warn( + f"[report] Skipping meter {meter.organization_id}/{meter.key} - missing price_id for plan {meter.subscription.plan}" ) + skipped_count += 1 + continue - if not price_id: - continue - - _id = None - for item in stripe.SubscriptionItem.list( - subscription=subscription_id, - ).auto_paging_iter(): - if item.price.id == price_id: - _id = item.id - break - - if not _id: - continue - - quantity = meter.value - - items = [{"id": _id, "quantity": quantity}] - - stripe.Subscription.modify( - subscription_id, - items=items, + _id = None + for item in stripe.SubscriptionItem.list( + subscription=subscription_id, + ).auto_paging_iter(): + if item.price.id == price_id: + _id = item.id + break + + if not _id: + log.warn( + f"[report] Skipping meter {meter.organization_id}/{meter.key} - subscription item not found for price_id {price_id}" ) - - except ( - Exception # pylint: disable=broad-exception-caught - ) as e: - log.error("Error modifying subscription: %s", e) + skipped_count += 1 continue + quantity = meter.value + items = [{"id": _id, "quantity": quantity}] + + stripe.Subscription.modify( + subscription_id, + items=items, + ) + + reported_count += 1 log.info( f"[stripe] updating: {meter.organization_id} | | {'sync ' if meter.key.value in REPORTS else ' '} | {meter.key}: {meter.value}" ) - if meter.key.name in Counter.__members__.keys(): - try: - event_name = meter.key.value - delta = meter.value - meter.synced - payload = {"delta": delta, "customer_id": customer_id} + except Exception as e: # pylint: disable=broad-exception-caught + log.error( + f"Error modifying subscription for {meter.organization_id}/{meter.key}: %s", + e, + ) + error_count += 1 + continue + + if meter.key.name in Counter.__members__.keys(): + try: + event_name = meter.key.value + delta = meter.value - meter.synced - stripe.billing.MeterEvent.create( - event_name=event_name, - payload=payload, + if delta <= 0: + log.debug( + f"[report] Skipping meter {meter.organization_id}/{meter.key} - delta is {delta}" ) - except ( - Exception # pylint: disable=broad-exception-caught - ) as e: - log.error("Error creating meter event: %s", e) + skipped_count += 1 continue + payload = {"delta": delta, "customer_id": customer_id} + + stripe.billing.MeterEvent.create( + event_name=event_name, + payload=payload, + ) + + reported_count += 1 log.info( f"[stripe] reporting: {meter.organization_id} | {(('0' if (meter.month != 0 and meter.month < 10) else '') + str(meter.month)) if meter.month != 0 else ' '}.{meter.year if meter.year else ' '} | {'sync ' if meter.key.value in REPORTS else ' '} | {meter.key}: {meter.value - meter.synced}" ) - except Exception as e: # pylint: disable=broad-exception-caught - log.error("Error reporting meter: %s", e) + except Exception as e: # pylint: disable=broad-exception-caught + log.error( + f"Error creating meter event for {meter.organization_id}/{meter.key}: %s", + e, + ) + error_count += 1 + continue - except Exception as e: # pylint: disable=broad-exception-caught - log.error("Error reporting meters: %s", e) + except Exception as e: # pylint: disable=broad-exception-caught + log.error( + f"Error reporting meter {meter.organization_id}/{meter.key}: %s", e + ) + error_count += 1 - try: - for meter in meters: - meter.synced = meter.value + log.info( + f"[report] Reporting complete: {reported_count} reported, {skipped_count} skipped, {error_count} errors" + ) - except Exception as e: # pylint: disable=broad-exception-caught - log.error("Error syncing meters: %s", e) + log.info(f"[report] Setting synced values for {len(meters)} meters") + synced_count = 0 + sync_error_count = 0 + + for meter in meters: + try: + meter.synced = meter.value + synced_count += 1 + except Exception as e: # pylint: disable=broad-exception-caught + log.error( + f"Error setting synced value for {meter.organization_id}/{meter.key}: %s", + e, + ) + sync_error_count += 1 + + log.info( + f"[report] Set synced values: {synced_count} success, {sync_error_count} errors" + ) try: + log.info(f"[report] Calling bump for {len(meters)} meters") await self.bump(meters=meters) - + log.info(f"[report] Bump completed successfully") except Exception as e: # pylint: disable=broad-exception-caught log.error("Error bumping meters: %s", e) return diff --git a/api/ee/src/dbs/postgres/meters/dao.py b/api/ee/src/dbs/postgres/meters/dao.py index 36a2338060..c570044621 100644 --- a/api/ee/src/dbs/postgres/meters/dao.py +++ b/api/ee/src/dbs/postgres/meters/dao.py @@ -28,47 +28,88 @@ def __init__(self): pass async def dump(self) -> list[MeterDTO]: + log.info("[report] [dump] Starting meter dump") + async with engine.core_session() as session: - stmt = ( - select(MeterDBE) - .filter(MeterDBE.synced != MeterDBE.value) - .options(joinedload(MeterDBE.subscription)) - ) # NO RISK OF DEADLOCK + try: + stmt = ( + select(MeterDBE) + .filter(MeterDBE.synced != MeterDBE.value) + .options(joinedload(MeterDBE.subscription)) + ) # NO RISK OF DEADLOCK - result = await session.execute(stmt) - meters = result.scalars().all() + log.debug( + "[report] [dump] Executing query for meters where synced != value" + ) + result = await session.execute(stmt) + meters = result.scalars().all() - return [ - MeterDTO( - organization_id=meter.organization_id, - year=meter.year, - month=meter.month, - value=meter.value, - key=meter.key, - synced=meter.synced, - subscription=( - SubscriptionDTO( - organization_id=meter.subscription.organization_id, - customer_id=meter.subscription.customer_id, - subscription_id=meter.subscription.subscription_id, - plan=meter.subscription.plan, - active=meter.subscription.active, - anchor=meter.subscription.anchor, + log.info( + f"[report] [dump] Found {len(meters)} meters with synced != value" + ) + + dto_list = [] + for meter in meters: + try: + log.debug( + f"[report] [dump] Processing meter {meter.organization_id}/{meter.key} year={meter.year} month={meter.month} value={meter.value} synced={meter.synced}" + ) + + subscription_dto = None + if meter.subscription: + subscription_dto = SubscriptionDTO( + organization_id=meter.subscription.organization_id, + customer_id=meter.subscription.customer_id, + subscription_id=meter.subscription.subscription_id, + plan=meter.subscription.plan, + active=meter.subscription.active, + anchor=meter.subscription.anchor, + ) + log.debug( + f"[report] [dump] Meter {meter.organization_id}/{meter.key} has subscription (customer_id={meter.subscription.customer_id}, plan={meter.subscription.plan})" + ) + else: + log.debug( + f"[report] [dump] Meter {meter.organization_id}/{meter.key} has NO subscription" + ) + + meter_dto = MeterDTO( + organization_id=meter.organization_id, + year=meter.year, + month=meter.month, + value=meter.value, + key=meter.key, + synced=meter.synced, + subscription=subscription_dto, + ) + dto_list.append(meter_dto) + + except Exception as e: + log.error( + f"[report] [dump] Error converting meter {meter.organization_id if hasattr(meter, 'organization_id') else 'UNKNOWN'} to DTO: %s", + e, ) - if meter.subscription - else None - ), + continue + + log.info( + f"[report] [dump] Successfully converted {len(dto_list)} meters to DTOs" ) - for meter in meters - ] + return dto_list + + except Exception as e: + log.error(f"[report] [dump] Error executing dump query: %s", e) + raise async def bump( self, meters: list[MeterDTO], ) -> None: if not meters: + log.info("[report] [bump] No meters to bump") return + log.info(f"[report] [bump] Starting bump for {len(meters)} meters") + # Sort for consistent lock acquisition sorted_meters = sorted( meters, @@ -80,22 +121,59 @@ async def bump( ), ) + log.info(f"[report] [bump] Sorted {len(sorted_meters)} meters") + async with engine.core_session() as session: + updates_executed = 0 + for meter in sorted_meters: - stmt = ( - update(MeterDBE) - .where( - MeterDBE.organization_id == meter.organization_id, - MeterDBE.key == meter.key, - MeterDBE.year == meter.year, - MeterDBE.month == meter.month, + try: + log.debug( + f"[report] [bump] Updating meter {meter.organization_id}/{meter.key} year={meter.year} month={meter.month} synced={meter.synced}" + ) + + stmt = ( + update(MeterDBE) + .where( + MeterDBE.organization_id == meter.organization_id, + MeterDBE.key == meter.key, + MeterDBE.year == meter.year, + MeterDBE.month == meter.month, + ) + .values(synced=meter.synced) ) - .values(synced=meter.synced) - ) - await session.execute(stmt) + result = await session.execute(stmt) + updates_executed += 1 - await session.commit() + if result.rowcount == 0: + log.warn( + f"[report] [bump] No rows updated for meter {meter.organization_id}/{meter.key} year={meter.year} month={meter.month}" + ) + else: + log.debug( + f"[report] [bump] Updated {result.rowcount} row(s) for meter {meter.organization_id}/{meter.key}" + ) + + except Exception as e: + log.error( + f"[report] [bump] Error executing update for meter {meter.organization_id}/{meter.key}: %s", + e, + ) + raise # Re-raise to trigger rollback + + log.info( + f"[report] [bump] Executed {updates_executed} update statements, committing transaction" + ) + + try: + await session.commit() + log.info(f"[report] [bump] Transaction committed successfully") + except Exception as e: + log.error(f"[report] [bump] Error committing transaction: %s", e) + await session.rollback() + log.error(f"[report] [bump] Transaction rolled back") + raise async def fetch( self, From 15b0d6c958ecb67800af9664ca73a63f6dce100b Mon Sep 17 00:00:00 2001 From: Juan Pablo Vega Date: Wed, 12 Nov 2025 15:04:54 +0100 Subject: [PATCH 2/2] fix cron jobs --- api/ee/src/crons/meters.txt | 2 +- api/oss/src/crons/queries.txt | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/api/ee/src/crons/meters.txt b/api/ee/src/crons/meters.txt index f3acd78570..dbdb77a390 100644 --- a/api/ee/src/crons/meters.txt +++ b/api/ee/src/crons/meters.txt @@ -1,2 +1,2 @@ * * * * * root echo "cron test $(date)" >> /proc/1/fd/1 2>&1 -0 * * * * root sh /meters.sh >> /proc/1/fd/1 2>&1 +* * * * * root sh /meters.sh >> /proc/1/fd/1 2>&1 diff --git a/api/oss/src/crons/queries.txt b/api/oss/src/crons/queries.txt index 586a61af8e..b47dc59d42 100644 --- a/api/oss/src/crons/queries.txt +++ b/api/oss/src/crons/queries.txt @@ -1,2 +1,2 @@ * * * * * root echo "cron test $(date)" >> /proc/1/fd/1 2>&1 -*/1 * * * * root sh /queries.sh >> /proc/1/fd/1 2>&1 +* * * * * root sh /queries.sh >> /proc/1/fd/1 2>&1