diff --git a/README.md b/README.md index 98cdd48..4dede72 100644 --- a/README.md +++ b/README.md @@ -141,3 +141,4 @@ To deploy the sync-engine to a Supabase edge function, follow this [guide](./edg - [x] `subscription_schedule.expiring` 🟢 - [x] `subscription_schedule.released` 🟢 - [x] `subscription_schedule.updated` 🟢 +- [x] `entitlements.active_entitlement_summary.updated` 🟢 diff --git a/packages/fastify-app/src/test/stripe/active_entitlement_summary_updated.json b/packages/fastify-app/src/test/stripe/active_entitlement_summary_updated.json new file mode 100644 index 0000000..9d4b6a3 --- /dev/null +++ b/packages/fastify-app/src/test/stripe/active_entitlement_summary_updated.json @@ -0,0 +1,39 @@ +{ + "id": "evt_1RqDanBRSrwpzKxHOnFoInVh", + "object": "event", + "api_version": "2020-03-02", + "created": 1753795761, + "data": { + "object": { + "object": "entitlements.active_entitlement_summary", + "customer": "cus_Sll319ReSxCmEi", + "entitlements": { + "object": "list", + "data": [ + { + "id": "ent_test_61SzbY9Pui5KGP3yf41BRSrwpzKxHTFY", + "object": "entitlements.active_entitlement", + "feature": "feat_test_61SzbVaDzF2GIc5ng41BRSrwpzKxHABs", + "livemode": false, + "lookup_key": "journeys" + } + ], + "has_more": false, + "url": "/v1/customer/cus_Sll319ReSxCmEi/entitlements" + }, + "livemode": false + }, + "previous_attributes": { + "entitlements": { + "data": [] + } + } + }, + "livemode": false, + "pending_webhooks": 1, + "request": { + "id": null, + "idempotency_key": null + }, + "type": "entitlements.active_entitlement_summary.updated" +} diff --git a/packages/fastify-app/src/test/webhooks.test.ts b/packages/fastify-app/src/test/webhooks.test.ts index 13f62be..b665758 100644 --- a/packages/fastify-app/src/test/webhooks.test.ts +++ b/packages/fastify-app/src/test/webhooks.test.ts @@ -103,6 +103,7 @@ describe('POST /webhooks', () => { 'refund_created', 'refund_failed', 'refund_updated', + 'active_entitlement_summary_updated', ])('process event %s', async (jsonFile) => { const eventBody = await import(`./stripe/${jsonFile}`).then(({ default: myData }) => myData) const signature = createHmac('sha256', stripeWebhookSecret) diff --git a/packages/sync-engine/README.md b/packages/sync-engine/README.md index 3867fc8..116b15d 100644 --- a/packages/sync-engine/README.md +++ b/packages/sync-engine/README.md @@ -76,7 +76,7 @@ You can sync or update a single Stripe entity by its ID using the `syncSingleEnt await sync.syncSingleEntity('cus_12345') ``` -The entity type is detected automatically based on the Stripe ID prefix (e.g., `cus_` for customer, `prod_` for product). +The entity type is detected automatically based on the Stripe ID prefix (e.g., `cus_` for customer, `prod_` for product). `ent_` is not supported at the moment. ### Backfilling Data diff --git a/packages/sync-engine/src/database/migrations/0032_add_features.sql b/packages/sync-engine/src/database/migrations/0032_add_features.sql new file mode 100644 index 0000000..65c2d53 --- /dev/null +++ b/packages/sync-engine/src/database/migrations/0032_add_features.sql @@ -0,0 +1,17 @@ +create table + if not exists "stripe"."features" ( + "id" text primary key, + object text, + livemode boolean, + name text, + lookup_key text unique, + active boolean, + metadata jsonb, + updated_at timestamptz default timezone('utc'::text, now()) not null + ); + +create trigger handle_updated_at + before update + on stripe.features + for each row + execute procedure set_updated_at(); diff --git a/packages/sync-engine/src/database/migrations/0033_active_entitlement.sql b/packages/sync-engine/src/database/migrations/0033_active_entitlement.sql new file mode 100644 index 0000000..6abbd48 --- /dev/null +++ b/packages/sync-engine/src/database/migrations/0033_active_entitlement.sql @@ -0,0 +1,19 @@ +create table + if not exists "stripe"."active_entitlements" ( + "id" text primary key, + "object" text, + "livemode" boolean, + "feature" text, + "customer" text, + "lookup_key" text unique, + "updated_at" timestamptz default timezone('utc'::text, now()) not null + ); + +create index stripe_active_entitlements_customer_idx on "stripe"."active_entitlements" using btree (customer); +create index stripe_active_entitlements_feature_idx on "stripe"."active_entitlements" using btree (feature); + +create trigger handle_updated_at + before update + on stripe.active_entitlements + for each row + execute procedure set_updated_at(); diff --git a/packages/sync-engine/src/schemas/active_entitlement.ts b/packages/sync-engine/src/schemas/active_entitlement.ts new file mode 100644 index 0000000..f69c11e --- /dev/null +++ b/packages/sync-engine/src/schemas/active_entitlement.ts @@ -0,0 +1,5 @@ +import type { EntitySchema } from './types' + +export const activeEntitlementSchema: EntitySchema = { + properties: ['id', 'object', 'feature', 'lookup_key', 'livemode', 'customer'], +} as const diff --git a/packages/sync-engine/src/schemas/feature.ts b/packages/sync-engine/src/schemas/feature.ts new file mode 100644 index 0000000..32af8fb --- /dev/null +++ b/packages/sync-engine/src/schemas/feature.ts @@ -0,0 +1,5 @@ +import type { EntitySchema } from './types' + +export const featureSchema: EntitySchema = { + properties: ['id', 'object', 'livemode', 'name', 'lookup_key', 'active', 'metadata'], +} as const diff --git a/packages/sync-engine/src/stripeSync.ts b/packages/sync-engine/src/stripeSync.ts index 34ba97a..4699dd6 100644 --- a/packages/sync-engine/src/stripeSync.ts +++ b/packages/sync-engine/src/stripeSync.ts @@ -16,10 +16,19 @@ import { taxIdSchema } from './schemas/tax_id' import { subscriptionItemSchema } from './schemas/subscription_item' import { subscriptionScheduleSchema } from './schemas/subscription_schedules' import { subscriptionSchema } from './schemas/subscription' -import { StripeSyncConfig, Sync, SyncBackfill, SyncBackfillParams } from './types' +import { + StripeSyncConfig, + Sync, + SyncBackfill, + SyncBackfillParams, + SyncEntitlementsParams, + SyncFeaturesParams, +} from './types' import { earlyFraudWarningSchema } from './schemas/early_fraud_warning' import { reviewSchema } from './schemas/review' import { refundSchema } from './schemas/refund' +import { activeEntitlementSchema } from './schemas/active_entitlement' +import { featureSchema } from './schemas/feature' function getUniqueIds(entries: T[], key: string): string[] { const set = new Set( @@ -415,7 +424,30 @@ export class StripeSync { break } + case 'entitlements.active_entitlement_summary.updated': { + const activeEntitlementSummary = event.data + .object as Stripe.Entitlements.ActiveEntitlementSummary + let entitlements = activeEntitlementSummary.entitlements + + if (this.config.revalidateEntityViaStripeApi) { + // eslint-disable-next-line @typescript-eslint/no-unused-vars + const { lastResponse, ...rest } = await this.stripe.entitlements.activeEntitlements.list({ + customer: activeEntitlementSummary.customer, + }) + entitlements = rest + } + + this.config.logger?.info( + `Received webhook ${event.id}: ${event.type} for activeEntitlementSummary for customer ${activeEntitlementSummary.customer}` + ) + await this.deleteRemovedActiveEntitlements( + activeEntitlementSummary.customer, + entitlements.data.map((entitlement) => entitlement.id) + ) + await this.upsertActiveEntitlements(activeEntitlementSummary.customer, entitlements.data) + break + } default: throw new Error('Unhandled webhook event') } @@ -477,6 +509,10 @@ export class StripeSync { return this.stripe.reviews.retrieve(stripeId).then((it) => this.upsertReviews([it])) } else if (stripeId.startsWith('re_')) { return this.stripe.refunds.retrieve(stripeId).then((it) => this.upsertRefunds([it])) + } else if (stripeId.startsWith('feat_')) { + return this.stripe.entitlements.features + .retrieve(stripeId) + .then((it) => this.upsertFeatures([it])) } } @@ -806,6 +842,28 @@ export class StripeSync { ) } + async syncFeatures(syncParams?: SyncFeaturesParams): Promise { + this.config.logger?.info('Syncing features') + const params: Stripe.Entitlements.FeatureListParams = { limit: 100, ...syncParams?.pagination } + return this.fetchAndUpsert( + () => this.stripe.entitlements.features.list(params), + (features) => this.upsertFeatures(features) + ) + } + + async syncEntitlements(customerId: string, syncParams?: SyncEntitlementsParams): Promise { + this.config.logger?.info('Syncing entitlements') + const params: Stripe.Entitlements.ActiveEntitlementListParams = { + customer: customerId, + limit: 100, + ...syncParams?.pagination, + } + return this.fetchAndUpsert( + () => this.stripe.entitlements.activeEntitlements.list(params), + (entitlements) => this.upsertActiveEntitlements(customerId, entitlements) + ) + } + private async fetchAndUpsert( fetch: () => Stripe.ApiListPromise, upsert: (items: T[]) => Promise @@ -1198,6 +1256,77 @@ export class StripeSync { return rows } + async deleteRemovedActiveEntitlements( + customerId: string, + currentActiveEntitlementIds: string[] + ): Promise<{ rowCount: number }> { + let prepared = sql(` + select id from "${this.config.schema}"."active_entitlements" + where customer = :customerId; + `)({ customerId }) + const { rows } = await this.postgresClient.query(prepared.text, prepared.values) + const deletedIds = rows.filter( + ({ id }: { id: string }) => currentActiveEntitlementIds.includes(id) === false + ) + + if (deletedIds.length > 0) { + const ids = deletedIds.map(({ id }: { id: string }) => id) + prepared = sql(` + delete from "${this.config.schema}"."active_entitlements" + where id=any(:ids::text[]); + `)({ ids }) + const { rowCount } = await this.postgresClient.query(prepared.text, prepared.values) + return { rowCount: rowCount || 0 } + } else { + return { rowCount: 0 } + } + } + + async upsertFeatures(features: Stripe.Entitlements.Feature[]) { + return this.postgresClient.upsertMany(features, 'features', featureSchema) + } + + async backfillFeatures(featureIds: string[]) { + const missingFeatureIds = await this.postgresClient.findMissingEntries('features', featureIds) + await this.fetchMissingEntities(missingFeatureIds, (id) => + this.stripe.entitlements.features.retrieve(id) + ) + .then((features) => this.upsertFeatures(features)) + .catch((err) => { + this.config.logger?.error(err, 'Failed to backfill features') + throw err + }) + } + + async upsertActiveEntitlements( + customerId: string, + activeEntitlements: Stripe.Entitlements.ActiveEntitlement[], + backfillRelatedEntities?: boolean + ) { + if (backfillRelatedEntities ?? this.config.backfillRelatedEntities) { + await Promise.all([ + this.backfillCustomers(getUniqueIds(activeEntitlements, 'customer')), + this.backfillFeatures(getUniqueIds(activeEntitlements, 'feature')), + ]) + } + + const entitlements = activeEntitlements.map((entitlement) => ({ + id: entitlement.id, + object: entitlement.object, + feature: + typeof entitlement.feature === 'string' ? entitlement.feature : entitlement.feature.id, + customer: customerId, + livemode: entitlement.livemode, + lookup_key: entitlement.lookup_key, + })) + + return this.postgresClient.upsertMany( + entitlements, + 'active_entitlements', + activeEntitlementSchema + ) + } + async backfillSubscriptions(subscriptionIds: string[]) { const missingSubscriptionIds = await this.postgresClient.findMissingEntries( 'subscriptions', diff --git a/packages/sync-engine/src/types.ts b/packages/sync-engine/src/types.ts index 8b6c776..f38b290 100644 --- a/packages/sync-engine/src/types.ts +++ b/packages/sync-engine/src/types.ts @@ -1,4 +1,5 @@ import pino from 'pino' +import Stripe from 'stripe' export type StripeSyncConfig = { /** Postgres database URL including authentication */ @@ -43,6 +44,7 @@ export type StripeSyncConfig = { export type SyncObject = | 'all' | 'customer' + | 'customer_with_entitlements' | 'invoice' | 'price' | 'product' @@ -107,3 +109,13 @@ export interface SyncBackfillParams { object?: SyncObject backfillRelatedEntities?: boolean } + +export interface SyncEntitlementsParams { + object: 'entitlements' + customerId: string + pagination?: Pick +} +export interface SyncFeaturesParams { + object: 'features' + pagination?: Pick +}