diff --git a/apps/cli-go/internal/db/diff/managed_schemas_test.go b/apps/cli-go/internal/db/diff/managed_schemas_test.go new file mode 100644 index 0000000000..022a6181a6 --- /dev/null +++ b/apps/cli-go/internal/db/diff/managed_schemas_test.go @@ -0,0 +1,39 @@ +package diff + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/supabase/cli/internal/utils" +) + +func TestManagedDiffSchemas(t *testing.T) { + t.Run("excludes base managed schemas when stripe sync disabled", func(t *testing.T) { + utils.Config.StripeSync.Enabled = false + schemas := managedDiffSchemas() + assert.Equal(t, managedSchemas, schemas) + assert.NotContains(t, schemas, "stripe") + }) + + t.Run("excludes stripe schema when stripe sync enabled", func(t *testing.T) { + utils.Config.StripeSync.Enabled = true + utils.Config.StripeSync.Schema = "stripe" + t.Cleanup(func() { utils.Config.StripeSync.Enabled = false }) + schemas := managedDiffSchemas() + assert.Contains(t, schemas, "stripe") + // Base managed schemas are preserved and not mutated. + assert.Subset(t, schemas, managedSchemas) + assert.NotContains(t, managedSchemas, "stripe") + }) + + t.Run("respects a custom stripe schema name", func(t *testing.T) { + utils.Config.StripeSync.Enabled = true + utils.Config.StripeSync.Schema = "billing" + t.Cleanup(func() { + utils.Config.StripeSync.Enabled = false + utils.Config.StripeSync.Schema = "stripe" + }) + schemas := managedDiffSchemas() + assert.Contains(t, schemas, "billing") + }) +} diff --git a/apps/cli-go/internal/db/diff/migra.go b/apps/cli-go/internal/db/diff/migra.go index b5c001825b..66bf3107a8 100644 --- a/apps/cli-go/internal/db/diff/migra.go +++ b/apps/cli-go/internal/db/diff/migra.go @@ -4,6 +4,7 @@ import ( "bytes" "context" _ "embed" + "slices" "strings" "github.com/docker/docker/api/types/container" @@ -56,6 +57,17 @@ var ( } ) +// managedDiffSchemas returns the schemas excluded from schema diffing. When the +// Stripe Sync Engine is enabled, its schema is owned by the engine (which +// recreates it via its own migrations), so it must be excluded too — otherwise +// db diff / db pull would try to manage tables the engine owns. +func managedDiffSchemas() []string { + if utils.Config.StripeSync.Enabled && len(utils.Config.StripeSync.Schema) > 0 { + return append(slices.Clone(managedSchemas), utils.Config.StripeSync.Schema) + } + return managedSchemas +} + // Diffs local database schema against shadow, dumps output to stdout. func DiffSchemaMigraBash(ctx context.Context, source, target pgconn.Config, schema []string, options ...func(*pgx.ConnConfig)) (string, error) { // Load all user defined schemas @@ -134,7 +146,7 @@ func DiffSchemaMigra(ctx context.Context, source, target pgconn.Config, schema [ if len(schema) > 0 { env = append(env, "INCLUDED_SCHEMAS="+strings.Join(schema, ",")) } else { - env = append(env, "EXCLUDED_SCHEMAS="+strings.Join(managedSchemas, ",")) + env = append(env, "EXCLUDED_SCHEMAS="+strings.Join(managedDiffSchemas(), ",")) } // Migra also executes via Edge Runtime because the TypeScript implementation // shares the same containerized execution environment as other diff engines. diff --git a/apps/cli-go/internal/db/diff/pgschema.go b/apps/cli-go/internal/db/diff/pgschema.go index 5126ca96ed..6343bddb10 100644 --- a/apps/cli-go/internal/db/diff/pgschema.go +++ b/apps/cli-go/internal/db/diff/pgschema.go @@ -30,7 +30,7 @@ func DiffPgSchema(ctx context.Context, source, target pgconn.Config, schema []st opts = append(opts, pgschema.WithIncludeSchemas(schema...)) } else { opts = append(opts, - pgschema.WithExcludeSchemas(managedSchemas...), + pgschema.WithExcludeSchemas(managedDiffSchemas()...), pgschema.WithExcludeSchemas( "topology", // unsupported due to views "realtime", // unsupported due to partitioned table diff --git a/apps/cli-go/internal/db/reset/reset.go b/apps/cli-go/internal/db/reset/reset.go index 7d841f4ba3..34277c2a34 100644 --- a/apps/cli-go/internal/db/reset/reset.go +++ b/apps/cli-go/internal/db/reset/reset.go @@ -102,6 +102,10 @@ func resetDatabase14(ctx context.Context, version string, fsys afero.Fs, options if err := RestartDatabase(ctx, os.Stderr); err != nil { return err } + // Recreate the Stripe schema before applying migrations that may reference it. + if err := start.StartStripeSyncEngine(ctx, os.Stderr); err != nil { + return err + } conn, err := utils.ConnectLocalPostgres(ctx, pgconn.Config{}, options...) if err != nil { return err diff --git a/apps/cli-go/internal/db/start/start.go b/apps/cli-go/internal/db/start/start.go index 6cd411791c..9b7de8fd49 100644 --- a/apps/cli-go/internal/db/start/start.go +++ b/apps/cli-go/internal/db/start/start.go @@ -365,6 +365,12 @@ func SetupLocalDatabase(ctx context.Context, version string, fsys afero.Fs, w io if err := SetupDatabase(ctx, conn, utils.DbId, w, fsys); err != nil { return err } + // The Stripe Sync Engine owns the `stripe` schema and recreates it via its own + // migrations on startup. Bring it up before applying user migrations so that + // migrations referencing Stripe tables (e.g. foreign keys) can resolve them. + if err := StartStripeSyncEngine(ctx, w); err != nil { + return err + } if err := apply.MigrateAndSeed(ctx, version, conn, fsys); err != nil { return err } @@ -380,6 +386,83 @@ func SetupLocalDatabase(ctx context.Context, version string, fsys afero.Fs, w io return nil } +// stripeSyncEnginePort is the port the Stripe Sync Engine webhook server listens +// on inside the container. The host port is configurable via config.toml. +const stripeSyncEnginePort = 8080 + +// StartStripeSyncEngine (re)creates the Stripe Sync Engine container and waits for +// it to become healthy. The engine runs its own migrations on startup to +// (re)create the configured schema, so it must be started before applying user +// migrations that reference Stripe tables. Any existing container is removed +// first so that `db reset` re-runs those migrations against the freshly +// recreated database. It is a no-op when the service is disabled. +func StartStripeSyncEngine(ctx context.Context, w io.Writer) error { + if !utils.Config.StripeSync.Enabled { + return nil + } + if err := utils.Docker.ContainerRemove(ctx, utils.StripeSyncEngineId, container.RemoveOptions{Force: true}); err != nil && !errdefs.IsNotFound(err) { + return errors.Errorf("failed to remove stripe sync engine container: %w", err) + } + fmt.Fprintln(w, "Starting Stripe Sync Engine...") + cfg := utils.Config.StripeSync + env := []string{ + fmt.Sprintf("DATABASE_URL=postgresql://postgres:%s@%s:%d/postgres", utils.Config.Db.Password, utils.DbId, 5432), + "SCHEMA=" + cfg.Schema, + fmt.Sprintf("PORT=%d", stripeSyncEnginePort), + fmt.Sprintf("AUTO_EXPAND_LISTS=%t", cfg.AutoExpandLists), + } + // Only forward credentials that have been resolved to a concrete value; + // unset env() references are left out so the engine falls back to its own + // defaults instead of receiving the literal "env(...)" string. + for _, s := range []struct { + name string + value string + }{ + {"API_KEY", cfg.ApiKey.Value}, + {"STRIPE_SECRET_KEY", cfg.StripeSecretKey.Value}, + {"STRIPE_WEBHOOK_SECRET", cfg.StripeWebhookSecret.Value}, + } { + if len(s.value) > 0 && !strings.HasPrefix(s.value, "env(") { + env = append(env, s.name+"="+s.value) + } + } + hostPort := nat.Port(strconv.Itoa(stripeSyncEnginePort) + "/tcp") + if _, err := utils.DockerStart( + ctx, + container.Config{ + Image: cfg.Image, + Env: env, + Healthcheck: &container.HealthConfig{ + Test: []string{"CMD", "node", "-e", + fmt.Sprintf(`fetch("http://127.0.0.1:%d/health").then(r=>process.exit(r.ok?0:1)).catch(()=>process.exit(1))`, stripeSyncEnginePort), + }, + Interval: 10 * time.Second, + Timeout: 2 * time.Second, + Retries: 3, + StartPeriod: 10 * time.Second, + }, + ExposedPorts: nat.PortSet{hostPort: {}}, + }, + container.HostConfig{ + PortBindings: nat.PortMap{hostPort: []nat.PortBinding{{ + HostPort: strconv.FormatUint(uint64(cfg.Port), 10), + }}}, + RestartPolicy: container.RestartPolicy{Name: container.RestartPolicyUnlessStopped}, + }, + network.NetworkingConfig{ + EndpointsConfig: map[string]*network.EndpointSettings{ + utils.NetId: { + Aliases: utils.StripeSyncEngineAliases, + }, + }, + }, + utils.StripeSyncEngineId, + ); err != nil { + return err + } + return WaitForHealthyService(ctx, utils.Config.Db.HealthTimeout, utils.StripeSyncEngineId) +} + func SetupDatabase(ctx context.Context, conn *pgx.Conn, host string, w io.Writer, fsys afero.Fs) error { if err := initSchema(ctx, conn, host, w); err != nil { return err diff --git a/apps/cli-go/internal/db/start/stripe_sync_engine_test.go b/apps/cli-go/internal/db/start/stripe_sync_engine_test.go new file mode 100644 index 0000000000..7034ca5e70 --- /dev/null +++ b/apps/cli-go/internal/db/start/stripe_sync_engine_test.go @@ -0,0 +1,64 @@ +package start + +import ( + "context" + "io" + "net/http" + "testing" + + "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/container" + "github.com/h2non/gock" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/supabase/cli/internal/testing/apitest" + "github.com/supabase/cli/internal/utils" +) + +func TestStartStripeSyncEngine(t *testing.T) { + t.Run("noop when disabled", func(t *testing.T) { + utils.Config.StripeSync.Enabled = false + require.NoError(t, apitest.MockDocker(utils.Docker)) + defer gock.OffAll() + // Run test + err := StartStripeSyncEngine(context.Background(), io.Discard) + // Check error + assert.NoError(t, err) + // No docker interaction expected when disabled + assert.False(t, gock.HasUnmatchedRequest()) + }) + + t.Run("recreates container and waits for healthy", func(t *testing.T) { + utils.StripeSyncEngineId = "test-stripe" + utils.NetId = "test-network" + utils.DbId = "test-db" + utils.Config.StripeSync.Enabled = true + utils.Config.StripeSync.Image = "supabase/stripe-sync-engine:test" + utils.Config.StripeSync.Port = 54328 + utils.Config.StripeSync.Schema = "stripe" + t.Cleanup(func() { utils.Config.StripeSync.Enabled = false }) + require.NoError(t, apitest.MockDocker(utils.Docker)) + defer gock.OffAll() + // Removes any container left over from a previous run so a reset re-runs + // the engine's migrations against the freshly recreated database. + gock.New(utils.Docker.DaemonHost()). + Delete("/v" + utils.Docker.ClientVersion() + "/containers/" + utils.StripeSyncEngineId). + Reply(http.StatusOK) + apitest.MockDockerStart(utils.Docker, utils.GetRegistryImageUrl(utils.Config.StripeSync.Image), utils.StripeSyncEngineId) + // Reports healthy on the first probe + gock.New(utils.Docker.DaemonHost()). + Get("/v" + utils.Docker.ClientVersion() + "/containers/" + utils.StripeSyncEngineId + "/json"). + Reply(http.StatusOK). + JSON(container.InspectResponse{ContainerJSONBase: &container.ContainerJSONBase{ + State: &container.State{ + Running: true, + Health: &container.Health{Status: types.Healthy}, + }, + }}) + // Run test + err := StartStripeSyncEngine(context.Background(), io.Discard) + // Check error + assert.NoError(t, err) + assert.Empty(t, apitest.ListUnmatchedRequests()) + }) +} diff --git a/apps/cli-go/internal/start/start.go b/apps/cli-go/internal/start/start.go index 6ce6a4434d..74dc0a04b4 100644 --- a/apps/cli-go/internal/start/start.go +++ b/apps/cli-go/internal/start/start.go @@ -244,6 +244,14 @@ func run(ctx context.Context, fsys afero.Fs, excludedContainers []string, dbConf if err := start.StartDatabase(ctx, "", fsys, os.Stderr, options...); err != nil { return err } + // On a fresh database, SetupLocalDatabase already starts the Stripe Sync + // Engine before applying migrations. When the volume already exists, setup + // is skipped, so start it here to bring the service back up. + if !utils.NoBackupVolume && utils.Config.StripeSync.Enabled && !isContainerExcluded(utils.Config.StripeSync.Image, excluded) { + if err := start.StartStripeSyncEngine(ctx, os.Stderr); err != nil { + return err + } + } } var started []string diff --git a/apps/cli-go/internal/status/status.go b/apps/cli-go/internal/status/status.go index ae0639903f..bf3fc0447e 100644 --- a/apps/cli-go/internal/status/status.go +++ b/apps/cli-go/internal/status/status.go @@ -45,6 +45,7 @@ type CustomName struct { StorageS3AccessKeyId string `env:"storage.s3_access_key_id,default=S3_PROTOCOL_ACCESS_KEY_ID"` StorageS3SecretAccessKey string `env:"storage.s3_secret_access_key,default=S3_PROTOCOL_ACCESS_KEY_SECRET"` StorageS3Region string `env:"storage.s3_region,default=S3_PROTOCOL_REGION"` + StripeSyncEngineURL string `env:"stripe_sync_engine.url,default=STRIPE_SYNC_ENGINE_URL"` } func (c *CustomName) toValues(exclude ...string) map[string]string { @@ -93,6 +94,10 @@ func (c *CustomName) toValues(exclude ...string) map[string]string { values[c.StorageS3SecretAccessKey] = utils.Config.Storage.S3Credentials.SecretAccessKey values[c.StorageS3Region] = utils.Config.Storage.S3Credentials.Region } + stripeSyncEngineEnabled := utils.Config.StripeSync.Enabled && !slices.Contains(exclude, utils.StripeSyncEngineId) && !slices.Contains(exclude, utils.ShortContainerImageName(utils.Config.StripeSync.Image)) + if stripeSyncEngineEnabled { + values[c.StripeSyncEngineURL] = fmt.Sprintf("http://%s:%d", utils.Config.Hostname, utils.Config.StripeSync.Port) + } return values } @@ -245,6 +250,7 @@ func PrettyPrint(w io.Writer, exclude ...string) { Items: []OutputItem{ {Label: "Studio", Value: values[names.StudioURL], Type: Link}, {Label: "Mailpit", Value: values[names.MailpitURL], Type: Link}, + {Label: "Stripe Sync", Value: values[names.StripeSyncEngineURL], Type: Link}, {Label: "MCP", Value: values[names.McpURL], Type: Link}, }, }, diff --git a/apps/cli-go/internal/utils/config.go b/apps/cli-go/internal/utils/config.go index d171d06753..ac8baef270 100644 --- a/apps/cli-go/internal/utils/config.go +++ b/apps/cli-go/internal/utils/config.go @@ -16,37 +16,39 @@ import ( ) var ( - NetId string - DbId string - KongId string - GotrueId string - InbucketId string - RealtimeId string - RestId string - StorageId string - ImgProxyId string - DifferId string - PgmetaId string - StudioId string - EdgeRuntimeId string - LogflareId string - VectorId string - PoolerId string + NetId string + DbId string + KongId string + GotrueId string + InbucketId string + RealtimeId string + RestId string + StorageId string + ImgProxyId string + DifferId string + PgmetaId string + StudioId string + EdgeRuntimeId string + LogflareId string + VectorId string + PoolerId string + StripeSyncEngineId string - DbAliases = []string{"db", "db.supabase.internal"} - KongAliases = []string{"kong", "api.supabase.internal"} - GotrueAliases = []string{"auth"} - InbucketAliases = []string{"inbucket"} - RealtimeAliases = []string{"realtime", Config.Realtime.TenantId} - RestAliases = []string{"rest"} - StorageAliases = []string{"storage"} - ImgProxyAliases = []string{"imgproxy"} - PgmetaAliases = []string{"pg_meta"} - StudioAliases = []string{"studio"} - EdgeRuntimeAliases = []string{"edge_runtime"} - LogflareAliases = []string{"analytics"} - VectorAliases = []string{"vector"} - PoolerAliases = []string{"pooler"} + DbAliases = []string{"db", "db.supabase.internal"} + KongAliases = []string{"kong", "api.supabase.internal"} + GotrueAliases = []string{"auth"} + InbucketAliases = []string{"inbucket"} + RealtimeAliases = []string{"realtime", Config.Realtime.TenantId} + RestAliases = []string{"rest"} + StorageAliases = []string{"storage"} + ImgProxyAliases = []string{"imgproxy"} + PgmetaAliases = []string{"pg_meta"} + StudioAliases = []string{"studio"} + EdgeRuntimeAliases = []string{"edge_runtime"} + LogflareAliases = []string{"analytics"} + VectorAliases = []string{"vector"} + PoolerAliases = []string{"pooler"} + StripeSyncEngineAliases = []string{"stripe_sync_engine"} //go:embed templates/initial_schemas/13.sql InitialSchemaPg13Sql string @@ -77,6 +79,7 @@ func UpdateDockerIds() { LogflareId = GetId(LogflareAliases[0]) VectorId = GetId(VectorAliases[0]) PoolerId = GetId(PoolerAliases[0]) + StripeSyncEngineId = GetId(StripeSyncEngineAliases[0]) } func GetDockerIds() []string { @@ -94,6 +97,7 @@ func GetDockerIds() []string { LogflareId, VectorId, PoolerId, + StripeSyncEngineId, } } @@ -190,6 +194,13 @@ func GetServices() types.Services { PullPolicy: types.PullPolicyMissing, } } + if Config.StripeSync.Enabled { + services["stripeSyncEngine"] = types.ServiceConfig{ + Name: ShortContainerImageName(Config.StripeSync.Image), + Image: GetRegistryImageUrl(Config.StripeSync.Image), + PullPolicy: types.PullPolicyMissing, + } + } return services } diff --git a/apps/cli-go/pkg/config/config.go b/apps/cli-go/pkg/config/config.go index dfd63999cd..784e1a0e9a 100644 --- a/apps/cli-go/pkg/config/config.go +++ b/apps/cli-go/pkg/config/config.go @@ -150,6 +150,7 @@ type ( EdgeRuntime edgeRuntime `toml:"edge_runtime" json:"edge_runtime"` Functions FunctionConfig `toml:"functions" json:"functions"` Analytics analytics `toml:"analytics" json:"analytics"` + StripeSync stripeSync `toml:"stripe_sync_engine" json:"stripe_sync_engine"` Experimental experimental `toml:"experimental" json:"experimental"` } @@ -187,6 +188,22 @@ type ( SenderName string `toml:"sender_name" json:"sender_name"` } + // stripeSync configures the local Stripe Sync Engine container, which keeps a + // `stripe` schema in sync with a Stripe account. The container runs its own + // migrations on startup to (re)create the schema, so the CLI brings it up and + // waits for it to become healthy before applying user migrations during + // `db reset` and the initial `start`. + stripeSync struct { + Enabled bool `toml:"enabled" json:"enabled"` + Image string `toml:"-" json:"-"` + Port uint16 `toml:"port" json:"port"` + Schema string `toml:"schema" json:"schema"` + ApiKey Secret `toml:"api_key" json:"api_key"` + StripeSecretKey Secret `toml:"stripe_secret_key" json:"stripe_secret_key"` + StripeWebhookSecret Secret `toml:"stripe_webhook_secret" json:"stripe_webhook_secret"` + AutoExpandLists bool `toml:"auto_expand_lists" json:"auto_expand_lists"` + } + edgeRuntime struct { Enabled bool `toml:"enabled" json:"enabled"` Image string `toml:"-" json:"-"` @@ -445,6 +462,11 @@ func NewConfig(editors ...ConfigEditor) config { EdgeRuntime: edgeRuntime{ Image: Images.EdgeRuntime, }, + StripeSync: stripeSync{ + Image: Images.StripeSync, + Port: 54328, + Schema: "stripe", + }, }} for _, apply := range editors { apply(&initial) diff --git a/apps/cli-go/pkg/config/constants.go b/apps/cli-go/pkg/config/constants.go index 08d572d2da..26d9a48cbe 100644 --- a/apps/cli-go/pkg/config/constants.go +++ b/apps/cli-go/pkg/config/constants.go @@ -31,6 +31,7 @@ type images struct { Realtime string `mapstructure:"realtime"` Storage string `mapstructure:"storage"` Logflare string `mapstructure:"logflare"` + StripeSync string `mapstructure:"stripesyncengine"` // Append to Jobs when adding new dependencies below Differ string `mapstructure:"differ"` Migra string `mapstructure:"migra"` @@ -72,5 +73,6 @@ func (s images) Services() []string { s.Logflare, s.Vector, s.Supavisor, + s.StripeSync, } } diff --git a/apps/cli-go/pkg/config/templates/Dockerfile b/apps/cli-go/pkg/config/templates/Dockerfile index 4afcc48669..9be92f2e65 100644 --- a/apps/cli-go/pkg/config/templates/Dockerfile +++ b/apps/cli-go/pkg/config/templates/Dockerfile @@ -14,6 +14,7 @@ FROM supabase/gotrue:v2.189.0 AS gotrue FROM supabase/realtime:v2.107.2 AS realtime FROM supabase/storage-api:v1.60.17 AS storage FROM supabase/logflare:1.44.3 AS logflare +FROM supabase/stripe-sync-engine:v0.50.1 AS stripesyncengine # Append to JobImages when adding new dependencies below FROM supabase/pgadmin-schema-diff:cli-0.0.5 AS differ FROM supabase/migra:3.0.1663481299 AS migra diff --git a/apps/cli-go/pkg/config/templates/config.toml b/apps/cli-go/pkg/config/templates/config.toml index 505811216a..21ece41177 100644 --- a/apps/cli-go/pkg/config/templates/config.toml +++ b/apps/cli-go/pkg/config/templates/config.toml @@ -391,6 +391,21 @@ port = 54327 # Configure one of the supported backends: `postgres`, `bigquery`. backend = "postgres" +# Sync data from a Stripe account into a local `stripe` schema. The container runs its own +# migrations on startup, so the CLI starts it and waits for it to become healthy before applying +# your migrations during `supabase start` and `supabase db reset`. This keeps migrations that +# reference Stripe tables (e.g. via foreign keys) working after a reset. +[stripe_sync_engine] +enabled = false +# Port to expose the Stripe Sync Engine webhook server on. +port = 54328 +# Postgres schema the synced Stripe tables are created in. +schema = "stripe" +# DO NOT commit your Stripe credentials to git. Use environment variable substitution instead: +api_key = "env(STRIPE_SYNC_ENGINE_API_KEY)" +stripe_secret_key = "env(STRIPE_SECRET_KEY)" +stripe_webhook_secret = "env(STRIPE_WEBHOOK_SECRET)" + # Experimental features may be deprecated any time [experimental] # Configures Postgres storage engine to use OrioleDB (S3)