diff --git a/packages/sync-engine/src/database/migrations/0032_checkout_sessions.sql b/packages/sync-engine/src/database/migrations/0032_checkout_sessions.sql new file mode 100644 index 0000000..a4b54df --- /dev/null +++ b/packages/sync-engine/src/database/migrations/0032_checkout_sessions.sql @@ -0,0 +1,76 @@ +create table + if not exists "stripe"."checkout_sessions" ( + "id" text primary key, + "object" text, + "adaptive_pricing" jsonb, + "after_expiration" jsonb, + "allow_promotion_codes" boolean, + "amount_subtotal" integer, + "amount_total" integer, + "automatic_tax" jsonb, + "billing_address_collection" text, + "cancel_url" text, + "client_reference_id" text, + "client_secret" text, + "collected_information" jsonb, + "consent" jsonb, + "consent_collection" jsonb, + "created" integer, + "currency" text, + "currency_conversion" jsonb, + "custom_fields" jsonb, + "custom_text" jsonb, + "customer" text, + "customer_creation" text, + "customer_details" jsonb, + "customer_email" text, + "discounts" jsonb, + "expires_at" integer, + "invoice" text, + "invoice_creation" jsonb, + "livemode" boolean, + "locale" text, + "metadata" jsonb, + "mode" text, + "optional_items" jsonb, + "payment_intent" text, + "payment_link" text, + "payment_method_collection" text, + "payment_method_configuration_details" jsonb, + "payment_method_options" jsonb, + "payment_method_types" jsonb, + "payment_status" text, + "permissions" jsonb, + "phone_number_collection" jsonb, + "presentment_details" jsonb, + "recovered_from" text, + "redirect_on_completion" text, + "return_url" text, + "saved_payment_method_options" jsonb, + "setup_intent" text, + "shipping_address_collection" jsonb, + "shipping_cost" jsonb, + "shipping_details" jsonb, + "shipping_options" jsonb, + "status" text, + "submit_type" text, + "subscription" text, + "success_url" text, + "tax_id_collection" jsonb, + "total_details" jsonb, + "ui_mode" text, + "url" text, + "wallet_options" jsonb, + "updated_at" timestamptz default timezone('utc'::text, now()) not null + ); + +create index stripe_checkout_sessions_customer_idx on "stripe"."checkout_sessions" using btree (customer); +create index stripe_checkout_sessions_subscription_idx on "stripe"."checkout_sessions" using btree (subscription); +create index stripe_checkout_sessions_payment_intent_idx on "stripe"."checkout_sessions" using btree (payment_intent); +create index stripe_checkout_sessions_invoice_idx on "stripe"."checkout_sessions" using btree (invoice); + +create trigger handle_updated_at + before update + on stripe.checkout_sessions + for each row + execute procedure set_updated_at(); diff --git a/packages/sync-engine/src/database/migrations/0033_checkout_session_line_items.sql b/packages/sync-engine/src/database/migrations/0033_checkout_session_line_items.sql new file mode 100644 index 0000000..f15b9fe --- /dev/null +++ b/packages/sync-engine/src/database/migrations/0033_checkout_session_line_items.sql @@ -0,0 +1,25 @@ +create table if not exists "stripe"."checkout_session_line_items" ( + "id" text, + "object" text, + "adjustable_quantity" jsonb, + "amount_subtotal" integer, + "amount_total" integer, + "currency" text, + "description" text, + "discounts" jsonb, + "price" text, + "quantity" integer, + "taxes" jsonb, + "checkout_session" text references "stripe"."checkout_sessions", + "updated_at" timestamptz default timezone('utc'::text, now()) not null, + primary key ("checkout_session", "id") +); + +create index stripe_checkout_session_line_items_session_idx on "stripe"."checkout_session_line_items" using btree (checkout_session); +create index stripe_checkout_session_line_items_price_idx on "stripe"."checkout_session_line_items" using btree (price); + +create trigger handle_updated_at + before update + on stripe.checkout_session_line_items + for each row + execute procedure set_updated_at(); \ No newline at end of file diff --git a/packages/sync-engine/src/schemas/checkout_session_line_items.ts b/packages/sync-engine/src/schemas/checkout_session_line_items.ts new file mode 100644 index 0000000..ed110f9 --- /dev/null +++ b/packages/sync-engine/src/schemas/checkout_session_line_items.ts @@ -0,0 +1,18 @@ +import type { EntitySchema } from './types' + +export const checkoutSessionLineItemSchema: EntitySchema = { + properties: [ + 'id', + 'object', + 'adjustable_quantity', + 'amount_subtotal', + 'amount_total', + 'currency', + 'description', + 'discounts', + 'price', + 'quantity', + 'taxes', + 'checkout_session', + ], +} as const diff --git a/packages/sync-engine/src/schemas/checkout_sessions.ts b/packages/sync-engine/src/schemas/checkout_sessions.ts new file mode 100644 index 0000000..742be79 --- /dev/null +++ b/packages/sync-engine/src/schemas/checkout_sessions.ts @@ -0,0 +1,71 @@ +import type { EntitySchema } from './types' + +export const checkoutSessionSchema: EntitySchema = { + properties: [ + 'id', + 'object', + 'adaptive_pricing', + 'after_expiration', + 'allow_promotion_codes', + 'amount_subtotal', + 'amount_total', + 'automatic_tax', + 'billing_address_collection', + 'cancel_url', + 'client_reference_id', + 'client_secret', + 'collected_information', + 'consent', + 'consent_collection', + 'created', + 'currency', + 'currency_conversion', + 'custom_fields', + 'custom_text', + 'customer', + 'customer_creation', + 'customer_details', + 'customer_email', + 'discounts', + 'expires_at', + 'invoice', + 'invoice_creation', + 'livemode', + 'locale', + 'metadata', + 'mode', + 'optional_items', + 'payment_intent', + 'payment_link', + 'payment_method_collection', + 'payment_method_configuration_details', + 'payment_method_options', + 'payment_method_types', + 'payment_status', + 'permissions', + 'phone_number_collection', + 'presentment_details', + 'recovered_from', + 'redirect_on_completion', + 'return_url', + 'saved_payment_method_options', + 'setup_intent', + 'shipping_address_collection', + 'shipping_cost', + 'shipping_details', + 'shipping_options', + 'status', + 'submit_type', + 'subscription', + 'success_url', + 'tax_id_collection', + 'total_details', + 'ui_mode', + 'url', + 'wallet_options', + ], +} as const + +export const checkoutSessionDeletedSchema: EntitySchema = { + properties: ['id', 'object', 'deleted'], +} as const diff --git a/packages/sync-engine/src/stripeSync.ts b/packages/sync-engine/src/stripeSync.ts index 34ba97a..4c1e687 100644 --- a/packages/sync-engine/src/stripeSync.ts +++ b/packages/sync-engine/src/stripeSync.ts @@ -2,6 +2,8 @@ import Stripe from 'stripe' import { pg as sql } from 'yesql' import { PostgresClient } from './database/postgres' import { chargeSchema } from './schemas/charge' +import { checkoutSessionSchema } from './schemas/checkout_sessions' +import { checkoutSessionLineItemSchema } from './schemas/checkout_session_line_items' import { creditNoteSchema } from './schemas/credit_note' import { customerDeletedSchema, customerSchema } from './schemas/customer' import { disputeSchema } from './schemas/dispute' @@ -85,6 +87,22 @@ export class StripeSync { await this.upsertCharges([charge]) break } + case 'checkout.session.async_payment_failed': + case 'checkout.session.async_payment_succeeded': + case 'checkout.session.completed': + case 'checkout.session.expired': { + const checkoutSession = await this.fetchOrUseWebhookData( + event.data.object as Stripe.Checkout.Session, + (id) => this.stripe.checkout.sessions.retrieve(id, { expand: ['line_items'] }) + ) + + this.config.logger?.info( + `Received webhook ${event.id}: ${event.type} for checkout session ${checkoutSession.id}` + ) + + await this.upsertCheckoutSessions([checkoutSession]) + break + } case 'customer.created': case 'customer.deleted': case 'customer.updated': { @@ -477,6 +495,10 @@ export class StripeSync { return this.stripe.reviews.retrieve(stripeId).then((it) => this.upsertReviews([it])) } else if (stripeId.startsWith('re_')) { return this.stripe.refunds.retrieve(stripeId).then((it) => this.upsertRefunds([it])) + } else if (stripeId.startsWith('cs_')) { + return this.stripe.checkout.sessions + .retrieve(stripeId, { expand: ['line_items'] }) + .then((it) => this.upsertCheckoutSessions([it])) } } @@ -497,7 +519,8 @@ export class StripeSync { taxIds, creditNotes, earlyFraudWarnings, - refunds + refunds, + checkoutSessions switch (object) { case 'all': @@ -517,6 +540,7 @@ export class StripeSync { disputes = await this.syncDisputes(params) earlyFraudWarnings = await this.syncEarlyFraudWarnings(params) refunds = await this.syncRefunds(params) + checkoutSessions = await this.syncCheckoutSessions(params) break case 'customer': customers = await this.syncCustomers(params) @@ -565,6 +589,9 @@ export class StripeSync { case 'refund': refunds = await this.syncRefunds(params) break + case 'checkout_sessions': + checkoutSessions = await this.syncCheckoutSessions(params) + break default: break } @@ -586,6 +613,7 @@ export class StripeSync { creditNotes, earlyFraudWarnings, refunds, + checkoutSessions, } } @@ -806,6 +834,21 @@ export class StripeSync { ) } + async syncCheckoutSessions(syncParams?: SyncBackfillParams): Promise { + this.config.logger?.info('Syncing checkout sessions') + + const params: Stripe.Checkout.SessionListParams = { + limit: 100, + expand: ['data.line_items'], + } + if (syncParams?.created) params.created = syncParams.created + + return this.fetchAndUpsert( + () => this.stripe.checkout.sessions.list(params), + (items) => this.upsertCheckoutSessions(items, syncParams?.backfillRelatedEntities) + ) + } + private async fetchAndUpsert( fetch: () => Stripe.ApiListPromise, upsert: (items: T[]) => Promise @@ -886,6 +929,38 @@ export class StripeSync { return this.postgresClient.upsertMany(creditNotes, 'credit_notes', creditNoteSchema) } + private async upsertCheckoutSessions( + checkoutSessions: Stripe.Checkout.Session[], + backfillRelatedEntities?: boolean + ): Promise { + if (backfillRelatedEntities ?? this.config.backfillRelatedEntities) { + await Promise.all([ + this.backfillCustomers(getUniqueIds(checkoutSessions, 'customer')), + this.backfillSubscriptions(getUniqueIds(checkoutSessions, 'subscription')), + this.backfillPaymentIntents(getUniqueIds(checkoutSessions, 'payment_intent')), + this.backfillInvoices(getUniqueIds(checkoutSessions, 'invoice')), + ]) + } + + // Upsert checkout sessions first + const rows = await this.postgresClient.upsertMany( + checkoutSessions, + 'checkout_sessions', + checkoutSessionSchema + ) + + // Upsert line items into separate table + const lineItemsPromises = checkoutSessions + .filter((session) => session.line_items?.data && session.line_items.data.length > 0) + .map((session) => this.upsertCheckoutSessionLineItems(session.line_items!.data, session.id)) + + if (lineItemsPromises.length > 0) { + await Promise.all(lineItemsPromises) + } + + return rows + } + async upsertEarlyFraudWarning( earlyFraudWarnings: Stripe.Radar.EarlyFraudWarning[], backfillRelatedEntities?: boolean @@ -1111,6 +1186,28 @@ export class StripeSync { ) } + async upsertCheckoutSessionLineItems(lineItems: Stripe.LineItem[], checkoutSessionId: string) { + const modifiedLineItems = lineItems.map((lineItem) => { + // Extract price ID if price is an object, otherwise use the string value + const priceId = + typeof lineItem.price === 'object' && lineItem.price?.id + ? lineItem.price.id.toString() + : lineItem.price?.toString() || null + + return { + ...lineItem, + price: priceId, + checkout_session: checkoutSessionId, + } + }) + + await this.postgresClient.upsertMany( + modifiedLineItems, + 'checkout_session_line_items', + checkoutSessionLineItemSchema + ) + } + async markDeletedSubscriptionItems( subscriptionId: string, currentSubItemIds: string[] diff --git a/packages/sync-engine/src/types.ts b/packages/sync-engine/src/types.ts index 8b6c776..e543f07 100644 --- a/packages/sync-engine/src/types.ts +++ b/packages/sync-engine/src/types.ts @@ -58,7 +58,7 @@ export type SyncObject = | 'credit_note' | 'early_fraud_warning' | 'refund' - + | 'checkout_sessions' export interface Sync { synced: number } @@ -80,6 +80,7 @@ export interface SyncBackfill { creditNotes?: Sync earlyFraudWarnings?: Sync refunds?: Sync + checkoutSessions?: Sync } export interface SyncBackfillParams {