Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 130 additions & 70 deletions api/ee/src/core/meters/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,106 +68,166 @@ async def report(self):
log.warn("Missing Stripe API Key.")
return

log.info("[report] Starting meter report job")

try:
meters = await self.dump()

log.info(f"[report] Dumped {len(meters)} meters to sync")
except Exception as e: # pylint: disable=broad-exception-caught
log.error("Error dumping meters: %s", e)
return

try:
for meter in meters:
if meter.subscription is None:
continue

try:
if meter.key.value in REPORTS:
subscription_id = meter.subscription.subscription_id
customer_id = meter.subscription.customer_id

if not subscription_id:
continue

if not customer_id:
continue
reported_count = 0
skipped_count = 0
error_count = 0

for meter in meters:
log.debug(
f"[report] Processing meter {meter.organization_id}/{meter.key} (value={meter.value}, synced={meter.synced})"
)

if meter.subscription is None:
log.debug(
f"[report] Skipping meter {meter.organization_id}/{meter.key} - no subscription"
)
skipped_count += 1
continue

try:
if meter.key.value in REPORTS:
subscription_id = meter.subscription.subscription_id
customer_id = meter.subscription.customer_id

if not subscription_id:
log.warn(
f"[report] Skipping meter {meter.organization_id}/{meter.key} - missing subscription_id"
)
skipped_count += 1
continue

if not customer_id:
log.warn(
f"[report] Skipping meter {meter.organization_id}/{meter.key} - missing customer_id"
)
skipped_count += 1
continue

if meter.key.name in Gauge.__members__.keys():
try:
price_id = (
AGENTA_PRICING.get(meter.subscription.plan, {})
.get("users", {})
.get("price")
)

if meter.key.name in Gauge.__members__.keys():
try:
price_id = (
AGENTA_PRICING.get(meter.subscription.plan, {})
.get("users", {})
.get("price")
if not price_id:
log.warn(
f"[report] Skipping meter {meter.organization_id}/{meter.key} - missing price_id for plan {meter.subscription.plan}"
)
skipped_count += 1
continue

if not price_id:
continue

_id = None
for item in stripe.SubscriptionItem.list(
subscription=subscription_id,
).auto_paging_iter():
if item.price.id == price_id:
_id = item.id
break

if not _id:
continue

quantity = meter.value

items = [{"id": _id, "quantity": quantity}]

stripe.Subscription.modify(
subscription_id,
items=items,
_id = None
for item in stripe.SubscriptionItem.list(
subscription=subscription_id,
).auto_paging_iter():
if item.price.id == price_id:
_id = item.id
break

if not _id:
log.warn(
f"[report] Skipping meter {meter.organization_id}/{meter.key} - subscription item not found for price_id {price_id}"
)

except (
Exception # pylint: disable=broad-exception-caught
) as e:
log.error("Error modifying subscription: %s", e)
skipped_count += 1
continue

quantity = meter.value
items = [{"id": _id, "quantity": quantity}]

stripe.Subscription.modify(
subscription_id,
items=items,
)

reported_count += 1
log.info(
f"[stripe] updating: {meter.organization_id} | | {'sync ' if meter.key.value in REPORTS else ' '} | {meter.key}: {meter.value}"
)

if meter.key.name in Counter.__members__.keys():
try:
event_name = meter.key.value
delta = meter.value - meter.synced
payload = {"delta": delta, "customer_id": customer_id}
except Exception as e: # pylint: disable=broad-exception-caught
log.error(
f"Error modifying subscription for {meter.organization_id}/{meter.key}: %s",
e,
)
error_count += 1
continue

if meter.key.name in Counter.__members__.keys():
try:
event_name = meter.key.value
delta = meter.value - meter.synced

stripe.billing.MeterEvent.create(
event_name=event_name,
payload=payload,
if delta <= 0:
log.debug(
f"[report] Skipping meter {meter.organization_id}/{meter.key} - delta is {delta}"
)
except (
Exception # pylint: disable=broad-exception-caught
) as e:
log.error("Error creating meter event: %s", e)
skipped_count += 1
continue

payload = {"delta": delta, "customer_id": customer_id}

stripe.billing.MeterEvent.create(
event_name=event_name,
payload=payload,
)

reported_count += 1
log.info(
f"[stripe] reporting: {meter.organization_id} | {(('0' if (meter.month != 0 and meter.month < 10) else '') + str(meter.month)) if meter.month != 0 else ' '}.{meter.year if meter.year else ' '} | {'sync ' if meter.key.value in REPORTS else ' '} | {meter.key}: {meter.value - meter.synced}"
)

except Exception as e: # pylint: disable=broad-exception-caught
log.error("Error reporting meter: %s", e)
except Exception as e: # pylint: disable=broad-exception-caught
log.error(
f"Error creating meter event for {meter.organization_id}/{meter.key}: %s",
e,
)
error_count += 1
continue

except Exception as e: # pylint: disable=broad-exception-caught
log.error("Error reporting meters: %s", e)
except Exception as e: # pylint: disable=broad-exception-caught
log.error(
f"Error reporting meter {meter.organization_id}/{meter.key}: %s", e
)
error_count += 1

try:
for meter in meters:
meter.synced = meter.value
log.info(
f"[report] Reporting complete: {reported_count} reported, {skipped_count} skipped, {error_count} errors"
)

except Exception as e: # pylint: disable=broad-exception-caught
log.error("Error syncing meters: %s", e)
log.info(f"[report] Setting synced values for {len(meters)} meters")
synced_count = 0
sync_error_count = 0

for meter in meters:
try:
meter.synced = meter.value
synced_count += 1
except Exception as e: # pylint: disable=broad-exception-caught
log.error(
f"Error setting synced value for {meter.organization_id}/{meter.key}: %s",
e,
)
sync_error_count += 1

log.info(
f"[report] Set synced values: {synced_count} success, {sync_error_count} errors"
)

try:
log.info(f"[report] Calling bump for {len(meters)} meters")
await self.bump(meters=meters)

log.info(f"[report] Bump completed successfully")
except Exception as e: # pylint: disable=broad-exception-caught
log.error("Error bumping meters: %s", e)
return
2 changes: 1 addition & 1 deletion api/ee/src/crons/meters.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
* * * * * root echo "cron test $(date)" >> /proc/1/fd/1 2>&1
0 * * * * root sh /meters.sh >> /proc/1/fd/1 2>&1
* * * * * root sh /meters.sh >> /proc/1/fd/1 2>&1
Loading