Skip to content

Commit 6cf957d

Browse files
committed
use pg instead of supabase for stripe schema ops
1 parent 8e10549 commit 6cf957d

File tree

3 files changed

+188
-147
lines changed

3 files changed

+188
-147
lines changed

apps/api/package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,12 @@
2020
"@supabase/stripe-sync-engine": "^0.45.0",
2121
"@supabase/supabase-js": "^2.89.0",
2222
"@t3-oss/env-core": "^0.13.10",
23+
"@types/pg": "^8.16.0",
2324
"effect": "^3.19.13",
2425
"hono": "^4.11.1",
2526
"hono-openapi": "^0.4.8",
2627
"openai": "^6.15.0",
28+
"pg": "^8.16.3",
2729
"posthog-node": "^5.17.4",
2830
"stripe": "^19.3.1",
2931
"zod": "^4.2.1",

apps/api/src/scripts/stripe-sync-entitlements.ts

Lines changed: 141 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
//
88
// This handles both backfill (pre-webhook customers) and daily verification.
99
import { Effect, Schedule } from "effect";
10+
import pg from "pg";
1011
import Stripe from "stripe";
1112
import { parseArgs } from "util";
1213

@@ -26,15 +27,13 @@ const { values } = parseArgs({
2627

2728
const skipRecentHours = parseInt(values["skip-recent-hours"] ?? "6", 10);
2829

29-
const { STRIPE_SECRET_KEY, SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY } = Bun.env;
30+
const { STRIPE_SECRET_KEY, DATABASE_URL } = Bun.env;
3031

31-
if (!STRIPE_SECRET_KEY || !SUPABASE_URL || !SUPABASE_SERVICE_ROLE_KEY) {
32+
if (!STRIPE_SECRET_KEY || !DATABASE_URL) {
3233
throw new Error("Missing required environment variables");
3334
}
3435

35-
const { createClient } = await import("@supabase/supabase-js");
36-
37-
const supabaseAdmin = createClient(SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY);
36+
const pool = new pg.Pool({ connectionString: DATABASE_URL });
3837
const stripe = new Stripe(STRIPE_SECRET_KEY, {
3938
apiVersion: STRIPE_API_VERSION,
4039
});
@@ -48,72 +47,67 @@ const retryPolicy = Schedule.exponential("500 millis").pipe(
4847
Schedule.intersect(Schedule.recurs(5)),
4948
);
5049

50+
class DbError {
51+
readonly _tag = "DbError";
52+
constructor(readonly message: string) {}
53+
}
54+
5155
const fetchRecentlySyncedCustomers = (hours: number) =>
5256
Effect.gen(function* () {
5357
if (hours <= 0) return new Set<string>();
5458

5559
const cutoff = new Date(Date.now() - hours * 60 * 60 * 1000).toISOString();
5660

57-
const { data, error } = yield* Effect.promise(() =>
58-
supabaseAdmin
59-
.schema("stripe")
60-
.from("customers")
61-
.select("id")
62-
.gte("last_synced_at", cutoff),
61+
const result = yield* Effect.tryPromise({
62+
try: () =>
63+
pool.query<{ id: string }>(
64+
`SELECT id FROM stripe.customers WHERE last_synced_at >= $1`,
65+
[cutoff],
66+
),
67+
catch: (e) => new DbError(e instanceof Error ? e.message : String(e)),
68+
}).pipe(
69+
Effect.catchAll((e) =>
70+
Effect.gen(function* () {
71+
yield* Effect.logWarning(
72+
`Failed to fetch recently synced customers: ${e.message}`,
73+
);
74+
return { rows: [] as { id: string }[] };
75+
}),
76+
),
6377
);
6478

65-
if (error) {
66-
yield* Effect.logWarning(
67-
`Failed to fetch recently synced customers: ${error.message}`,
68-
);
69-
return new Set<string>();
70-
}
71-
72-
return new Set((data ?? []).map((c) => c.id as string).filter(Boolean));
79+
return new Set(result.rows.map((c) => c.id));
7380
});
7481

7582
const fetchCustomersToSync = Effect.gen(function* () {
7683
const [subscriptionsResult, entitlementsResult, recentlySynced] =
7784
yield* Effect.all([
78-
Effect.promise(() =>
79-
supabaseAdmin
80-
.schema("stripe")
81-
.from("subscriptions")
82-
.select("customer")
83-
.in("status", ["active", "trialing", "past_due"]),
84-
),
85-
Effect.promise(() =>
86-
supabaseAdmin
87-
.schema("stripe")
88-
.from("active_entitlements")
89-
.select("customer"),
90-
),
85+
Effect.tryPromise({
86+
try: () =>
87+
pool.query<{ customer: string }>(
88+
`SELECT customer FROM stripe.subscriptions WHERE status IN ('active', 'trialing', 'past_due')`,
89+
),
90+
catch: (e) =>
91+
new DbError(
92+
`Failed to fetch subscriptions: ${e instanceof Error ? e.message : String(e)}`,
93+
),
94+
}),
95+
Effect.tryPromise({
96+
try: () =>
97+
pool.query<{ customer: string }>(
98+
`SELECT customer FROM stripe.active_entitlements`,
99+
),
100+
catch: (e) =>
101+
new DbError(
102+
`Failed to fetch existing entitlements: ${e instanceof Error ? e.message : String(e)}`,
103+
),
104+
}),
91105
fetchRecentlySyncedCustomers(skipRecentHours),
92106
]);
93107

94-
if (subscriptionsResult.error) {
95-
return yield* Effect.fail(
96-
new Error(
97-
`Failed to fetch subscriptions: ${subscriptionsResult.error.message}`,
98-
),
99-
);
100-
}
101-
102-
if (entitlementsResult.error) {
103-
return yield* Effect.fail(
104-
new Error(
105-
`Failed to fetch existing entitlements: ${entitlementsResult.error.message}`,
106-
),
107-
);
108-
}
109-
110108
const uniqueIds = new Set([
111-
...(subscriptionsResult.data ?? [])
112-
.map((s) => s.customer as string)
113-
.filter(Boolean),
114-
...(entitlementsResult.data ?? [])
115-
.map((e) => e.customer as string)
116-
.filter(Boolean),
109+
...subscriptionsResult.rows.map((s) => s.customer).filter(Boolean),
110+
...entitlementsResult.rows.map((e) => e.customer).filter(Boolean),
117111
]);
118112

119113
const filtered = Array.from(uniqueIds).filter(
@@ -147,24 +141,27 @@ const fetchCustomerEntitlements = (customerId: string) =>
147141
}).pipe(Effect.retry(retryPolicy));
148142

149143
const deleteAllEntitlements = (customerId: string) =>
150-
Effect.gen(function* () {
151-
const { error, count } = yield* Effect.promise(() =>
152-
supabaseAdmin
153-
.schema("stripe")
154-
.from("active_entitlements")
155-
.delete({ count: "exact" })
156-
.eq("customer", customerId),
157-
);
158-
159-
if (error) {
160-
yield* Effect.logError(
161-
`Failed to delete entitlements for ${customerId}: ${error.message}`,
162-
);
163-
return { updated: 0, deleted: 0, hasError: true };
164-
}
165-
166-
return { updated: 0, deleted: count ?? 0, hasError: false };
167-
});
144+
Effect.tryPromise({
145+
try: () =>
146+
pool.query(`DELETE FROM stripe.active_entitlements WHERE customer = $1`, [
147+
customerId,
148+
]),
149+
catch: (e) => new DbError(e instanceof Error ? e.message : String(e)),
150+
}).pipe(
151+
Effect.map((result) => ({
152+
updated: 0,
153+
deleted: result.rowCount ?? 0,
154+
hasError: false,
155+
})),
156+
Effect.catchAll((e) =>
157+
Effect.gen(function* () {
158+
yield* Effect.logError(
159+
`Failed to delete entitlements for ${customerId}: ${e.message}`,
160+
);
161+
return { updated: 0, deleted: 0, hasError: true };
162+
}),
163+
),
164+
);
168165

169166
const syncEntitlements = (
170167
customerId: string,
@@ -173,62 +170,85 @@ const syncEntitlements = (
173170
Effect.gen(function* () {
174171
const activeLookupKeys = entitlements.map((e) => e.lookup_key);
175172

176-
const { error: deleteError, count: deleteCount } = yield* Effect.promise(
177-
() =>
178-
supabaseAdmin
179-
.schema("stripe")
180-
.from("active_entitlements")
181-
.delete({ count: "exact" })
182-
.eq("customer", customerId)
183-
.not("lookup_key", "in", `(${activeLookupKeys.join(",")})`),
173+
const deleteResult = yield* Effect.tryPromise({
174+
try: () =>
175+
pool.query(
176+
`DELETE FROM stripe.active_entitlements WHERE customer = $1 AND lookup_key != ALL($2)`,
177+
[customerId, activeLookupKeys],
178+
),
179+
catch: (e) => new DbError(e instanceof Error ? e.message : String(e)),
180+
}).pipe(
181+
Effect.catchAll((e) =>
182+
Effect.gen(function* () {
183+
yield* Effect.logError(
184+
`Failed to delete stale entitlements for ${customerId}: ${e.message}`,
185+
);
186+
return null;
187+
}),
188+
),
184189
);
185190

186-
if (deleteError) {
187-
yield* Effect.logError(
188-
`Failed to delete stale entitlements for ${customerId}: ${deleteError.message}`,
189-
);
191+
if (deleteResult === null) {
190192
return { updated: 0, deleted: 0, hasError: true };
191193
}
192194

193-
const records = entitlements.map((entitlement) => ({
194-
id: entitlement.id,
195-
object: entitlement.object,
196-
livemode: entitlement.livemode,
197-
feature: entitlement.feature,
198-
customer: customerId,
199-
lookup_key: entitlement.lookup_key,
200-
last_synced_at: new Date().toISOString(),
201-
}));
202-
203-
const { error: upsertError } = yield* Effect.promise(() =>
204-
supabaseAdmin
205-
.schema("stripe")
206-
.from("active_entitlements")
207-
.upsert(records, { onConflict: "customer,lookup_key" }),
208-
);
209-
210-
if (upsertError) {
211-
yield* Effect.logError(
212-
`Failed to upsert entitlements for ${customerId}: ${upsertError.message}`,
195+
const deleteCount = deleteResult.rowCount ?? 0;
196+
197+
for (const entitlement of entitlements) {
198+
const upsertResult = yield* Effect.tryPromise({
199+
try: () =>
200+
pool.query(
201+
`INSERT INTO stripe.active_entitlements (id, object, livemode, feature, customer, lookup_key, last_synced_at)
202+
VALUES ($1, $2, $3, $4, $5, $6, $7)
203+
ON CONFLICT (customer, lookup_key) DO UPDATE SET
204+
id = EXCLUDED.id,
205+
object = EXCLUDED.object,
206+
livemode = EXCLUDED.livemode,
207+
feature = EXCLUDED.feature,
208+
last_synced_at = EXCLUDED.last_synced_at`,
209+
[
210+
entitlement.id,
211+
entitlement.object,
212+
entitlement.livemode,
213+
entitlement.feature,
214+
customerId,
215+
entitlement.lookup_key,
216+
new Date().toISOString(),
217+
],
218+
),
219+
catch: (e) => new DbError(e instanceof Error ? e.message : String(e)),
220+
}).pipe(
221+
Effect.catchAll((e) =>
222+
Effect.gen(function* () {
223+
yield* Effect.logError(
224+
`Failed to upsert entitlement for ${customerId}: ${e.message}`,
225+
);
226+
return null;
227+
}),
228+
),
213229
);
214-
return { updated: 0, deleted: deleteCount ?? 0, hasError: true };
230+
231+
if (upsertResult === null) {
232+
return { updated: 0, deleted: deleteCount, hasError: true };
233+
}
215234
}
216235

217236
return {
218237
updated: entitlements.length,
219-
deleted: deleteCount ?? 0,
238+
deleted: deleteCount,
220239
hasError: false,
221240
};
222241
});
223242

224243
const updateCustomerLastSyncedAt = (customerId: string) =>
225-
Effect.promise(() =>
226-
supabaseAdmin
227-
.schema("stripe")
228-
.from("customers")
229-
.update({ last_synced_at: new Date().toISOString() })
230-
.eq("id", customerId),
231-
);
244+
Effect.tryPromise({
245+
try: () =>
246+
pool.query(
247+
`UPDATE stripe.customers SET last_synced_at = $1 WHERE id = $2`,
248+
[new Date().toISOString(), customerId],
249+
),
250+
catch: (e) => new DbError(e instanceof Error ? e.message : String(e)),
251+
}).pipe(Effect.catchAll(() => Effect.void));
232252

233253
const processCustomer = (customerId: string) =>
234254
Effect.gen(function* () {
@@ -287,7 +307,11 @@ const program = Effect.gen(function* () {
287307
);
288308
});
289309

290-
Effect.runPromise(program).catch((error) => {
291-
console.error("Fatal error:", error);
292-
process.exit(1);
293-
});
310+
Effect.runPromise(program)
311+
.catch((error) => {
312+
console.error("Fatal error:", error);
313+
process.exit(1);
314+
})
315+
.finally(() => {
316+
pool.end();
317+
});

0 commit comments

Comments
 (0)