Skip to content

Commit 9b8481a

Browse files
committed
add entitlements backfill
1 parent 229916f commit 9b8481a

File tree

6 files changed

+337
-36
lines changed

6 files changed

+337
-36
lines changed

apps/api/Dockerfile

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,22 @@ RUN pnpm deploy --filter @hypr/api --prod --legacy /runtime
2121

2222
FROM oven/bun:1 AS runtime
2323
WORKDIR /app
24+
25+
# https://github.com/aptible/supercronic/releases
26+
ENV SUPERCRONIC_URL=https://github.com/aptible/supercronic/releases/download/v0.2.33/supercronic-linux-amd64 \
27+
SUPERCRONIC=supercronic-linux-amd64 \
28+
SUPERCRONIC_SHA1SUM=71b0d58cc53f6bd72cf2f293e09e294b79c666d8
29+
30+
RUN apt-get update && apt-get install -y curl && \
31+
curl -fsSLO "$SUPERCRONIC_URL" && \
32+
echo "${SUPERCRONIC_SHA1SUM} ${SUPERCRONIC}" | sha1sum -c - && \
33+
chmod +x "$SUPERCRONIC" && \
34+
mv "$SUPERCRONIC" "/usr/local/bin/${SUPERCRONIC}" && \
35+
ln -s "/usr/local/bin/${SUPERCRONIC}" /usr/local/bin/supercronic && \
36+
apt-get remove -y curl && apt-get autoremove -y && rm -rf /var/lib/apt/lists/*
37+
2438
COPY --from=build /runtime ./
39+
COPY apps/api/crontab /app/crontab
40+
2541
EXPOSE 8787
2642
CMD ["bun", "src/index.ts"]

apps/api/crontab

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
0 * * * * cd /app && bun src/scripts/stripe-sync-entitlements.ts

apps/api/fly.toml

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,12 @@ strategy = "bluegreen"
1616
BUN_ENV = "production"
1717
PORT = "8787"
1818

19+
[processes]
20+
web = "bun src/index.ts"
21+
cron = "supercronic /app/crontab"
22+
1923
[http_service]
20-
processes = ['app']
24+
processes = ['web']
2125
internal_port = 8787
2226
force_https = true
2327
auto_stop_machines = 'stop'
@@ -38,7 +42,6 @@ protocol = "http"
3842
timeout = "4s"
3943

4044
[[vm]]
41-
processes = ['app']
4245
memory = '1gb'
4346
cpu_kind = 'shared'
4447
cpus = 1

apps/api/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
"@supabase/stripe-sync-engine": "^0.45.0",
2121
"@supabase/supabase-js": "^2.89.0",
2222
"@t3-oss/env-core": "^0.13.10",
23+
"effect": "^3.19.13",
2324
"hono": "^4.11.1",
2425
"hono-openapi": "^0.4.8",
2526
"openai": "^6.15.0",
Lines changed: 293 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,293 @@
1+
// https://github.com/supabase/stripe-sync-engine/blob/main/packages/sync-engine/README.md#syncing-a-single-entity
2+
// Entitlements can not be synced with "stripe-sync-engine". So we need this script.
3+
//
4+
// Syncs entitlements for customers that are "worth looking at":
5+
// 1. Customers with active/trialing/past_due subscriptions (should have entitlements)
6+
// 2. Customers with existing entitlements (might need updates or cleanup)
7+
//
8+
// This handles both backfill (pre-webhook customers) and daily verification.
9+
import { Effect, Schedule } from "effect";
10+
import Stripe from "stripe";
11+
import { parseArgs } from "util";
12+
13+
import { STRIPE_API_VERSION } from "../integration/stripe";
14+
15+
const { values } = parseArgs({
16+
args: Bun.argv.slice(2),
17+
options: {
18+
"skip-recent-hours": {
19+
type: "string",
20+
default: "6",
21+
},
22+
},
23+
strict: true,
24+
allowPositionals: false,
25+
});
26+
27+
const skipRecentHours = parseInt(values["skip-recent-hours"] ?? "6", 10);
28+
29+
const { STRIPE_SECRET_KEY, SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY } = Bun.env;
30+
31+
if (!STRIPE_SECRET_KEY || !SUPABASE_URL || !SUPABASE_SERVICE_ROLE_KEY) {
32+
throw new Error("Missing required environment variables");
33+
}
34+
35+
const { createClient } = await import("@supabase/supabase-js");
36+
37+
const supabaseAdmin = createClient(SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY);
38+
const stripe = new Stripe(STRIPE_SECRET_KEY, {
39+
apiVersion: STRIPE_API_VERSION,
40+
});
41+
42+
const isRateLimitError = (error: unknown): boolean =>
43+
error instanceof Stripe.errors.StripeError && error.code === "rate_limit";
44+
45+
const retryPolicy = Schedule.exponential("500 millis").pipe(
46+
Schedule.jittered,
47+
Schedule.whileInput(isRateLimitError),
48+
Schedule.intersect(Schedule.recurs(5)),
49+
);
50+
51+
const fetchRecentlySyncedCustomers = (hours: number) =>
52+
Effect.gen(function* () {
53+
if (hours <= 0) return new Set<string>();
54+
55+
const cutoff = new Date(Date.now() - hours * 60 * 60 * 1000).toISOString();
56+
57+
const { data, error } = yield* Effect.promise(() =>
58+
supabaseAdmin
59+
.schema("stripe")
60+
.from("customers")
61+
.select("id")
62+
.gte("last_synced_at", cutoff),
63+
);
64+
65+
if (error) {
66+
yield* Effect.logWarning(
67+
`Failed to fetch recently synced customers: ${error.message}`,
68+
);
69+
return new Set<string>();
70+
}
71+
72+
return new Set((data ?? []).map((c) => c.id as string).filter(Boolean));
73+
});
74+
75+
const fetchCustomersToSync = Effect.gen(function* () {
76+
const [subscriptionsResult, entitlementsResult, recentlySynced] =
77+
yield* Effect.all([
78+
Effect.promise(() =>
79+
supabaseAdmin
80+
.schema("stripe")
81+
.from("subscriptions")
82+
.select("customer")
83+
.in("status", ["active", "trialing", "past_due"]),
84+
),
85+
Effect.promise(() =>
86+
supabaseAdmin
87+
.schema("stripe")
88+
.from("active_entitlements")
89+
.select("customer"),
90+
),
91+
fetchRecentlySyncedCustomers(skipRecentHours),
92+
]);
93+
94+
if (subscriptionsResult.error) {
95+
return yield* Effect.fail(
96+
new Error(
97+
`Failed to fetch subscriptions: ${subscriptionsResult.error.message}`,
98+
),
99+
);
100+
}
101+
102+
if (entitlementsResult.error) {
103+
return yield* Effect.fail(
104+
new Error(
105+
`Failed to fetch existing entitlements: ${entitlementsResult.error.message}`,
106+
),
107+
);
108+
}
109+
110+
const uniqueIds = new Set([
111+
...(subscriptionsResult.data ?? [])
112+
.map((s) => s.customer as string)
113+
.filter(Boolean),
114+
...(entitlementsResult.data ?? [])
115+
.map((e) => e.customer as string)
116+
.filter(Boolean),
117+
]);
118+
119+
const filtered = Array.from(uniqueIds).filter(
120+
(id) => !recentlySynced.has(id),
121+
);
122+
const skipped = uniqueIds.size - filtered.length;
123+
124+
if (skipped > 0) {
125+
yield* Effect.log(
126+
`Skipping ${skipped} customers synced within the last ${skipRecentHours} hours`,
127+
);
128+
}
129+
130+
return filtered;
131+
});
132+
133+
const fetchCustomerEntitlements = (customerId: string) =>
134+
Effect.tryPromise({
135+
try: async () => {
136+
const entitlements: Stripe.Entitlements.ActiveEntitlement[] = [];
137+
for await (const entitlement of stripe.entitlements.activeEntitlements.list(
138+
{
139+
customer: customerId,
140+
},
141+
)) {
142+
entitlements.push(entitlement);
143+
}
144+
return entitlements;
145+
},
146+
catch: (error) => error,
147+
}).pipe(Effect.retry(retryPolicy));
148+
149+
const deleteAllEntitlements = (customerId: string) =>
150+
Effect.gen(function* () {
151+
const { error, count } = yield* Effect.promise(() =>
152+
supabaseAdmin
153+
.schema("stripe")
154+
.from("active_entitlements")
155+
.delete({ count: "exact" })
156+
.eq("customer", customerId),
157+
);
158+
159+
if (error) {
160+
yield* Effect.logError(
161+
`Failed to delete entitlements for ${customerId}: ${error.message}`,
162+
);
163+
return { updated: 0, deleted: 0, hasError: true };
164+
}
165+
166+
return { updated: 0, deleted: count ?? 0, hasError: false };
167+
});
168+
169+
const syncEntitlements = (
170+
customerId: string,
171+
entitlements: Stripe.Entitlements.ActiveEntitlement[],
172+
) =>
173+
Effect.gen(function* () {
174+
const activeLookupKeys = entitlements.map((e) => e.lookup_key);
175+
176+
const { error: deleteError, count: deleteCount } = yield* Effect.promise(
177+
() =>
178+
supabaseAdmin
179+
.schema("stripe")
180+
.from("active_entitlements")
181+
.delete({ count: "exact" })
182+
.eq("customer", customerId)
183+
.not("lookup_key", "in", `(${activeLookupKeys.join(",")})`),
184+
);
185+
186+
if (deleteError) {
187+
yield* Effect.logError(
188+
`Failed to delete stale entitlements for ${customerId}: ${deleteError.message}`,
189+
);
190+
return { updated: 0, deleted: 0, hasError: true };
191+
}
192+
193+
const records = entitlements.map((entitlement) => ({
194+
id: entitlement.id,
195+
object: entitlement.object,
196+
livemode: entitlement.livemode,
197+
feature: entitlement.feature,
198+
customer: customerId,
199+
lookup_key: entitlement.lookup_key,
200+
last_synced_at: new Date().toISOString(),
201+
}));
202+
203+
const { error: upsertError } = yield* Effect.promise(() =>
204+
supabaseAdmin
205+
.schema("stripe")
206+
.from("active_entitlements")
207+
.upsert(records, { onConflict: "customer,lookup_key" }),
208+
);
209+
210+
if (upsertError) {
211+
yield* Effect.logError(
212+
`Failed to upsert entitlements for ${customerId}: ${upsertError.message}`,
213+
);
214+
return { updated: 0, deleted: deleteCount ?? 0, hasError: true };
215+
}
216+
217+
return {
218+
updated: entitlements.length,
219+
deleted: deleteCount ?? 0,
220+
hasError: false,
221+
};
222+
});
223+
224+
const updateCustomerLastSyncedAt = (customerId: string) =>
225+
Effect.promise(() =>
226+
supabaseAdmin
227+
.schema("stripe")
228+
.from("customers")
229+
.update({ last_synced_at: new Date().toISOString() })
230+
.eq("id", customerId),
231+
);
232+
233+
const processCustomer = (customerId: string) =>
234+
Effect.gen(function* () {
235+
const entitlements = yield* fetchCustomerEntitlements(customerId);
236+
237+
const result =
238+
entitlements.length === 0
239+
? yield* deleteAllEntitlements(customerId)
240+
: yield* syncEntitlements(customerId, entitlements);
241+
242+
if (!result.hasError) {
243+
yield* updateCustomerLastSyncedAt(customerId);
244+
}
245+
246+
return result;
247+
}).pipe(
248+
Effect.catchAll((error) =>
249+
Effect.gen(function* () {
250+
yield* Effect.logError(
251+
`Failed to process customer ${customerId}: ${error}`,
252+
);
253+
return { updated: 0, deleted: 0, hasError: true };
254+
}),
255+
),
256+
);
257+
258+
const program = Effect.gen(function* () {
259+
yield* Effect.log("Starting Stripe entitlements sync...");
260+
yield* Effect.log(
261+
"Fetching customers with active subscriptions or existing entitlements...",
262+
);
263+
264+
const customerIds = yield* fetchCustomersToSync;
265+
266+
yield* Effect.log(`Found ${customerIds.length} customers to process`);
267+
268+
let processed = 0;
269+
let totalUpdated = 0;
270+
let totalDeleted = 0;
271+
let totalErrors = 0;
272+
273+
for (const customerId of customerIds) {
274+
const result = yield* processCustomer(customerId);
275+
processed++;
276+
totalUpdated += result.updated ?? 0;
277+
totalDeleted += result.deleted;
278+
if (result.hasError) totalErrors++;
279+
280+
if (processed % 100 === 0) {
281+
yield* Effect.log(`Progress: ${processed}/${customerIds.length}`);
282+
}
283+
}
284+
285+
yield* Effect.log(
286+
`Sync complete: processed=${processed}, updated=${totalUpdated}, deleted=${totalDeleted}, errors=${totalErrors}`,
287+
);
288+
});
289+
290+
Effect.runPromise(program).catch((error) => {
291+
console.error("Fatal error:", error);
292+
process.exit(1);
293+
});

0 commit comments

Comments
 (0)