Skip to content

Commit 2b9f2ae

Browse files
committed
Migrate order event backfill to a script
To make it more easy to control and handle when running
1 parent 0a2a861 commit 2b9f2ae

File tree

5 files changed

+455
-379
lines changed

5 files changed

+455
-379
lines changed

server/polar/order/tasks.py

Lines changed: 3 additions & 199 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,14 @@
1-
import asyncio
21
import uuid
3-
from collections.abc import Sequence
4-
from typing import Any
52

63
import stripe as stripe_lib
74
import structlog
85
from dramatiq import Retry
9-
from sqlalchemy import String, and_, func, or_, select
10-
from sqlalchemy.ext.asyncio import AsyncSession
11-
from sqlalchemy.orm import joinedload, selectinload
6+
from sqlalchemy.orm import joinedload
127

13-
from polar.config import settings
14-
from polar.event.repository import EventRepository
15-
from polar.event.system import OrderPaidMetadata, OrderRefundedMetadata, SystemEvent
168
from polar.exceptions import PolarTaskError
179
from polar.logging import Logger
18-
from polar.models import Customer, Event, Order
19-
from polar.models.event import EventSource
20-
from polar.models.order import OrderBillingReasonInternal, OrderStatus
21-
from polar.models.refund import Refund, RefundStatus
10+
from polar.models import Customer, Order
11+
from polar.models.order import OrderBillingReasonInternal
2212
from polar.payment_method.repository import PaymentMethodRepository
2313
from polar.product.repository import ProductRepository
2414
from polar.subscription.repository import SubscriptionRepository
@@ -233,189 +223,3 @@ async def process_dunning_order(order_id: uuid.UUID) -> None:
233223
raise OrderDoesNotExist(order_id)
234224

235225
await order_service.process_dunning_order(session, order)
236-
237-
238-
@actor(
239-
actor_name="order.backfill_order_events",
240-
priority=TaskPriority.LOW,
241-
time_limit=3600_000,
242-
)
243-
async def backfill_order_events(
244-
batch_size: int = settings.DATABASE_STREAM_YIELD_PER, rate_limit_delay: float = 1.0
245-
) -> None:
246-
"""
247-
Backfill order.paid and order.refunded events for all existing orders.
248-
Uses order.created_at for paid events and refund.created_at for refund events.
249-
250-
Args:
251-
batch_size: The number of orders to process at a time between the delays (default 100)
252-
rate_limit_delay: Seconds to wait between batch inserts (default: 1.0)
253-
"""
254-
255-
async with AsyncSessionMaker() as session:
256-
last_created_at = None
257-
last_id = None
258-
total_orders = 0
259-
total_events = 0
260-
261-
existing_order_ids_subquery = (
262-
select(Event.user_metadata["order_id"].as_string().label("order_id"))
263-
.where(Event.name.in_([SystemEvent.order_paid, SystemEvent.order_refunded]))
264-
.distinct()
265-
.subquery()
266-
)
267-
268-
count_statement = (
269-
select(func.count(Order.id))
270-
.outerjoin(
271-
existing_order_ids_subquery,
272-
existing_order_ids_subquery.c.order_id == Order.id.cast(String),
273-
)
274-
.where(
275-
Order.deleted_at.is_(None),
276-
existing_order_ids_subquery.c.order_id.is_(None),
277-
Order.status.in_(
278-
[
279-
OrderStatus.paid,
280-
OrderStatus.refunded,
281-
OrderStatus.partially_refunded,
282-
]
283-
),
284-
)
285-
)
286-
count_result = await session.execute(count_statement)
287-
total_to_process = count_result.scalar() or 0
288-
289-
log.info(
290-
"backfill_order_events.starting",
291-
total_orders_to_process=total_to_process,
292-
)
293-
294-
while True:
295-
statement = (
296-
select(Order)
297-
.outerjoin(
298-
existing_order_ids_subquery,
299-
existing_order_ids_subquery.c.order_id == Order.id.cast(String),
300-
)
301-
.where(
302-
Order.deleted_at.is_(None),
303-
existing_order_ids_subquery.c.order_id.is_(None),
304-
Order.status.in_(
305-
[
306-
OrderStatus.paid,
307-
OrderStatus.refunded,
308-
OrderStatus.partially_refunded,
309-
]
310-
),
311-
)
312-
.options(selectinload(Order.customer))
313-
.order_by(Order.created_at.asc(), Order.id.asc())
314-
.limit(batch_size)
315-
)
316-
317-
if last_created_at is not None:
318-
statement = statement.where(
319-
or_(
320-
Order.created_at > last_created_at,
321-
and_(Order.created_at == last_created_at, Order.id > last_id),
322-
)
323-
)
324-
325-
result = await session.execute(statement)
326-
orders = result.scalars().all()
327-
if not orders:
328-
break
329-
330-
last_created_at = orders[-1].created_at
331-
last_id = orders[-1].id
332-
333-
events = await _build_events_for_orders(session, orders)
334-
total_orders += len(orders)
335-
336-
if events:
337-
await EventRepository.from_session(session).insert_batch(events)
338-
await session.commit()
339-
total_events += len(events)
340-
await asyncio.sleep(rate_limit_delay)
341-
342-
progress_pct = (
343-
(total_orders / total_to_process * 100) if total_to_process > 0 else 100
344-
)
345-
log.info(
346-
"backfill_order_events.progress",
347-
progress_pct=f"{progress_pct:.1f}%",
348-
orders=total_orders,
349-
events=total_events,
350-
total_to_process=total_to_process,
351-
)
352-
353-
log.info(
354-
"backfill_order_events.completed",
355-
total_orders=total_orders,
356-
total_events=total_events,
357-
)
358-
359-
360-
async def _build_events_for_orders(
361-
session: AsyncSession, orders: Sequence[Order]
362-
) -> list[dict[str, Any]]:
363-
"""Build events for a batch of orders using order and refund timestamps."""
364-
order_ids = [order.id for order in orders]
365-
orders_by_id = {order.id: order for order in orders}
366-
367-
refunds_result = await session.execute(
368-
select(Refund)
369-
.where(
370-
Refund.order_id.in_(order_ids),
371-
Refund.status == RefundStatus.succeeded,
372-
)
373-
.order_by(Refund.created_at.asc())
374-
)
375-
refunds = refunds_result.scalars().all()
376-
377-
refunds_by_order: dict[uuid.UUID, list[Refund]] = {}
378-
for refund in refunds:
379-
if refund.order_id is None:
380-
continue
381-
if refund.order_id not in refunds_by_order:
382-
refunds_by_order[refund.order_id] = []
383-
refunds_by_order[refund.order_id].append(refund)
384-
385-
events: list[dict[str, Any]] = []
386-
for order_id, order in orders_by_id.items():
387-
events.append(
388-
{
389-
"name": SystemEvent.order_paid,
390-
"source": EventSource.system,
391-
"timestamp": order.created_at,
392-
"customer_id": order.customer_id,
393-
"organization_id": order.customer.organization_id,
394-
"user_metadata": OrderPaidMetadata(
395-
order_id=str(order.id),
396-
amount=order.total_amount,
397-
currency=order.currency,
398-
backfilled=True,
399-
),
400-
}
401-
)
402-
403-
order_refunds = refunds_by_order.get(order_id, [])
404-
for refund in order_refunds:
405-
events.append(
406-
{
407-
"name": SystemEvent.order_refunded,
408-
"source": EventSource.system,
409-
"timestamp": refund.created_at,
410-
"customer_id": order.customer_id,
411-
"organization_id": order.customer.organization_id,
412-
"user_metadata": OrderRefundedMetadata(
413-
order_id=str(order.id),
414-
refunded_amount=refund.amount,
415-
currency=order.currency,
416-
backfilled=True,
417-
),
418-
}
419-
)
420-
421-
return events

0 commit comments

Comments
 (0)