feat(dashboard-api): supabase auth users sync background runner#2247
feat(dashboard-api): supabase auth users sync background runner#2247ben-fornefeld wants to merge 25 commits intomainfrom
Conversation
PR SummaryHigh Risk Overview Written by Cursor Bugbot for commit 0e7f147. This will update automatically on new commits. Configure here. |
packages/db/pkg/auth/sql_queries/supabase_auth_user_sync/get_auth_user.sql
Outdated
Show resolved
Hide resolved
| ); | ||
|
|
||
| CREATE INDEX auth_user_sync_queue_pending_idx | ||
| ON auth.user_sync_queue (id) |
There was a problem hiding this comment.
The partial index only covers dead_lettered_at IS NULL AND locked_at IS NULL, so every unlocked non-dead-lettered row satisfies it — including rows with a future next_attempt_at. The claim query then discards those rows in a post-index filter. Under sustained retry load (many rows waiting with next_attempt_at > now()), the index returns a large candidate set that must be filtered in-memory before the LIMIT is applied. Adding next_attempt_at as the leading column — ON auth.user_sync_queue (next_attempt_at, id) WHERE dead_lettered_at IS NULL AND locked_at IS NULL — lets PostgreSQL range-scan only rows that are actually ready to be claimed.
|
|
||
| r.l.Debug(ctx, "claimed queue batch", zap.Int("count", len(items))) | ||
|
|
||
| for _, item := range items { |
There was a problem hiding this comment.
Items in a batch are processed sequentially. Each item does 2–3 synchronous DB round-trips (GetAuthUser + UpsertPublicUser/DeletePublicUser + Ack/Retry), so batch throughput is proportional to batchSize × DB latency. With the default BatchSize=50 and typical Postgres latency, a full batch can take several seconds — longer than PollInterval=2s. If sustained write volume is expected, consider processing items in a small goroutine pool.
… while enqueuing for processing
…om/e2b-dev/infra into feature/supabase-users-sync-worker
…d recovery mechanisms - Updated the sync runner to use `RunWithRestart` for improved error recovery. - Introduced a new `UserSyncQueue` model to manage user synchronization tasks. - Added SQL migration for creating the `user_sync_queue` table with necessary triggers. - Implemented tests for the processor and supervisor to ensure robust handling of retries and panics. - Refactored existing queries to target the new `public.user_sync_queue` table.
packages/dashboard-api/internal/supabaseauthusersync/runner_test.go
Outdated
Show resolved
Hide resolved
…agement - Introduced `supabase_auth_user_sync_enabled` variable to control user synchronization. - Updated Nomad job configuration to include the new sync setting. - Added Google Secret Manager resources for managing the sync configuration securely. - Enhanced the dashboard API to utilize the new sync configuration in processing logic. - Refactored related components to improve error handling and logging for the sync process.
packages/dashboard-api/internal/supabaseauthusersync/runner_test.go
Outdated
Show resolved
Hide resolved
- Replaced the previous `Store` implementation with a new structure that integrates both authentication and main database queries. - Updated the `Runner` and `NewRunner` functions to accommodate the new database client structure. - Removed obsolete SQL queries and migration files related to the `user_sync_queue` table. - Enhanced the test suite to reflect changes in the runner's initialization and database interactions.
- Updated the `TestSupabaseAuthUserSyncRunner_EndToEnd` to apply necessary database migrations before running tests. - Refactored the `SetupDatabase` function to include a new method `ApplyMigrations` for better migration management.
Made-with: Cursor
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 3f56979748
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| -- +goose Up | ||
| -- +goose StatementBegin | ||
| CREATE TABLE public.user_sync_queue ( |
There was a problem hiding this comment.
Ensure auth queue migration is executed by db-migrator
The queue/table trigger migration was added under packages/db/pkg/auth/migrations, but the deployed migrator workflow only runs ./migrations from packages/db/scripts/migrator.go (and the db Dockerfile only copies db/migrations). That means the standard prestart migration job will never create public.user_sync_queue or the new triggers, so enabling the sync worker will fail at runtime when it tries to claim from a non-existent table.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
we want to execute this manually + it should not be tied to main db migrations since authDB will be separate going forward
- Introduced a new process outcome `ready_to_ack` to streamline acknowledgment handling. - Refactored the `process` method to prepare for batch acknowledgment of processed items. - Added a new `AckBatch` method in the store to handle multiple acknowledgments efficiently. - Updated the `Runner` to process items in batches and finalize acknowledgments accordingly. - Removed obsolete SQL query for single item acknowledgment as part of the refactor. - Enhanced tests to cover new deletion logic and acknowledgment scenarios.
| -- name: GetAuthUserByID :one | ||
| SELECT id, email | ||
| FROM auth.users | ||
| WHERE id = sqlc.arg(user_id)::uuid; |
There was a problem hiding this comment.
🔴 The SQL query selects email from auth.users without null handling, and the generated AuthUser.Email field is a non-nullable string; Supabase supports phone-only and anonymous auth where email is NULL, so pgx returns a scan error (not pgx.ErrNoRows) for those users. In reconcile(), that error triggers the generic retry path, causing indefinite retries until the item is dead-lettered and the user is never synced to public.users. Fix by changing the query to COALESCE(email, '') or by mapping Email to *string and skipping the upsert when absent.
Extended reasoning...
What the bug is and how it manifests
The file packages/db/pkg/auth/sql_queries/supabase_auth_user_sync/get_auth_user.sql (lines 1-4) contains:
-- name: GetAuthUserByID :one
SELECT id, email
FROM auth.users
WHERE id = sqlc.arg(user_id)::uuid;The generated code in get_auth_user.sql.go scans the result into AuthUser, whose Email field is declared as a non-nullable string in models.go. In pgx v5, scanning a SQL NULL into a plain Go string returns a scan error — it does not return pgx.ErrNoRows.
The specific code path that triggers it
In processor.go, reconcile() calls p.store.GetAuthUser(ctx, item.UserID). It checks errors.Is(err, pgx.ErrNoRows) to decide whether to delete the public user. A NULL-email scan error is not pgx.ErrNoRows, so it falls into the generic if err != nil { return "", fmt.Errorf(...) } branch, which propagates back to process(). Since the item is not yet at maxAttempts, process() calls store.Retry(...), scheduling the item for another attempt with exponential backoff.
Why existing code does not prevent it
The reconcile() function only has two exit paths for errors from GetAuthUser: the pgx.ErrNoRows special case (leading to a delete) and a generic retry path. There is no handling for scan errors caused by NULL columns, and no defensive COALESCE or nullable type in the generated SQL/struct.
Impact
Any Supabase deployment using phone-only or anonymous authentication will have users with auth.users.email = NULL. Every such user that enters the sync queue will be retried up to MaxAttempts (default 20) times with growing backoffs and then dead-lettered. Those users are permanently excluded from public.users, causing any feature that relies on public.users to silently omit them.
How to fix it
The simplest fix is to add COALESCE in the SQL query:
SELECT id, COALESCE(email, '') AS email
FROM auth.users
WHERE id = sqlc.arg(user_id)::uuid;Alternatively, change the sqlc config so email maps to *string, update AuthUser.Email to *string, and skip the upsert (or pass an empty string) when Email is nil.
Step-by-step proof
- A Supabase phone-only user signs up. The
auth.usersrow hasid = <uuid>,email = NULL. - The
sync_insert_auth_users_to_public_users_triggerfires, inserting(user_id, 'upsert')intopublic.user_sync_queue. - The worker calls
ClaimBatch, picks up the item, incrementsattempt_countto 1. processor.process()->processOnce()->reconcile()->store.GetAuthUser(ctx, userID).GetAuthUserByIDexecutesSELECT id, email FROM auth.users WHERE id = $1. The row exists so pgx does not returnpgx.ErrNoRows. Insteadrow.Scan(&i.ID, &i.Email)attempts to scan NULL intostringand returns a pgx scan error.- Back in
reconcile(),errors.Is(err, pgx.ErrNoRows)isfalse. The error is returned asfmt.Errorf("get auth.users %s: %w", ...). process()seesitem.AttemptCount (1) < maxAttempts (20), so it callsstore.Retry(...)with a 5s backoff.- Steps 3-7 repeat until
attempt_count >= 20, at which pointDeadLetteris called. - The user is now dead-lettered and will never be synced to
public.users.
There was a problem hiding this comment.
fine, we require emails for auth
| %{ for key, val in env } | ||
| ${ key } = "${ val }" | ||
| %{ endfor } |
There was a problem hiding this comment.
🟡 The %{ for key, val in env } loop in dashboard-api.hcl renders env var values directly inside double-quoted HCL strings without escaping. A value containing a double-quote produces syntactically invalid HCL (Nomad job submission fails), and a value containing ${ causes Terraform templatefile() to attempt expression evaluation at render time (plan-time error). DASHBOARD_API_ENV_VARS is documented as accepting arbitrary operator-supplied values, making these characters plausible; fix by escaping double-quotes in values or passing extra env vars via a Nomad template stanza instead.
Extended reasoning...
What the bug is and how it manifests
In iac/modules/job-dashboard-api/jobs/dashboard-api.hcl lines 86-88, a Terraform templatefile() directive renders arbitrary env var values directly inside double-quoted HCL strings:
%{ for key, val in env }
${ key } = "${ val }"
%{ endfor }
Two distinct injection vectors exist. First, if val contains a double-quote character, the rendered output becomes syntactically invalid HCL (e.g. MY_VAR = "conn"string" breaks the HCL parser), causing Nomad to reject the job spec at submit time. Second, if val contains ${, Terraform's templatefile() function interprets it as an expression interpolation during template rendering and produces a plan-time error before the job is ever submitted.
The specific code path that triggers it
The value flows through: DASHBOARD_API_ENV_VARS env var -> parsed as map(string) in iac/provider-gcp/variables.tf -> passed to module.nomad as dashboard_api_env_vars -> passed to module.dashboard_api as env -> filtered in local.env in main.tf (removes null/empty only) -> passed to templatefile() -> rendered at line 87 with no escaping.
Why existing code does not prevent it
The locals block in main.tf only filters out null/empty values (if value != null && value != ""); it does not escape or sanitize values. Terraform's templatefile() does not automatically escape template variable contents when they are interpolated. The HCL string delimiters are hard-coded double quotes with no escaping mechanism.
What the impact would be
For a double-quote injection (e.g. a PostgreSQL DSN like postgresql://user:p"ass@host/db), the Nomad job submission fails at terraform apply time with an HCL parse error -- the deployment is blocked. For a ${ injection (e.g. a JWT secret or template string containing ${), Terraform fails during plan or apply with an evaluation error. Neither failure is silent, but both are surprising and difficult to debug without knowing the root cause.
How to fix it
The safest fix is to use replace(val, "\"", "\\\"") to escape double-quotes within the current templatefile() approach, or pass extra env vars through a Nomad template stanza which has proper escaping semantics. The ${ vector can be addressed by replacing ${ with $${ (Terraform's escape sequence for a literal ${).
Step-by-step proof
- Operator sets
DASHBOARD_API_ENV_VARS='{"DB_URL":"postgresql://user:p\"ass@host/db"}'in their env file. - Terraform receives
var.dashboard_api_env_vars = {DB_URL = "postgresql://user:p\"ass@host/db"}. local.envpasses it through unchanged (non-null, non-empty).templatefile()renders line 87 as:DB_URL = "postgresql://user:p"ass@host/db"-- the embedded"terminates the HCL string prematurely.- Nomad's job spec parser receives invalid HCL and rejects the job with a parse error.
- Alternatively, for a value containing
${SOME_VAR}:templatefile()evaluates${SOME_VAR}as a Terraform expression, returning an undefined variable error duringterraform apply.
Note: the identical ${ key } = "${ value }" pattern exists in job-api and job-orchestrator for their job_env_vars, but those maps contain hardcoded infrastructure constants (VOLUME_TOKEN_ISSUER, etc.) unlikely to contain special characters. The new DASHBOARD_API_ENV_VARS is explicitly documented as a freeform map for arbitrary operator-supplied values, removing that safety assumption.
…references - Deleted the `auth_db_connection_string` secret and its version from the GCP configuration. - Updated references in `main.tf` and `nomad/main.tf` to use the `postgres_connection_string` instead. - Removed the corresponding variable declaration from `variables.tf` to clean up unused configurations.
- Updated the dashboard API module to separate base and extra environment variables. - Introduced a precondition to prevent conflicts with reserved keys in extra environment variables. - Modified the HCL job configuration to iterate over environment variables dynamically. - Adjusted variable declarations to reflect the new structure for extra environment variables.
dobrac
left a comment
There was a problem hiding this comment.
In general, I would feel more confident using a library for all this logic. Here is for example a guide how to do it using River: https://riverqueue.com/docs/sql
| @@ -0,0 +1,156 @@ | |||
| -- +goose Up | |||
| -- +goose StatementBegin | |||
| CREATE TABLE public.user_sync_queue ( | |||
There was a problem hiding this comment.
lets put it in the auth schema if we can. If not, then lets create a custom schema for this
There was a problem hiding this comment.
custom schema then
…0 across multiple packages
…om/e2b-dev/infra into feature/supabase-users-sync-worker
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 2 potential issues.
Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
packages/dashboard-api/internal/backgroundworker/auth_user_sync.go
Outdated
Show resolved
Hide resolved
…om/e2b-dev/infra into feature/supabase-users-sync-worker
dobrac
left a comment
There was a problem hiding this comment.
Looks so much simpler with the library!
| metric.WithUnit("{job}"), | ||
| ) | ||
| if err != nil { | ||
| l.Warn(context.Background(), "failed to initialize auth user sync metric", zap.Error(err)) |
There was a problem hiding this comment.
lets pass the context as a parameter
| -- +goose Down | ||
| -- +goose StatementBegin | ||
|
|
||
| /* We don't want to drop the schema, as it is used by other services. */ |
There was a problem hiding this comment.
why not drop the schema here if the dashboard-api is the source of truth?
There was a problem hiding this comment.
what are these files generated from?
| make build | ||
| NODE_ID=$(HOSTNAME) ./bin/dashboard-api | ||
| @EXTRA_ENV=$(DASHBOARD_API_EXTRA_ENV); \ | ||
| eval "env NODE_ID=$(HOSTNAME) $$EXTRA_ENV ./bin/dashboard-api" |
There was a problem hiding this comment.
is the usage of eval safe (secure) here? Can we avoid it?
| HOSTNAME := $(shell hostname 2> /dev/null || hostnamectl hostname 2> /dev/null) | ||
| $(if $(HOSTNAME),,$(error Failed to determine hostname: both 'hostname' and 'hostnamectl' failed)) | ||
|
|
||
| define DASHBOARD_API_EXTRA_ENV |
There was a problem hiding this comment.
I think it might have been simpler to expose the env variable directly rather than having it nested under extra env
| if config.SupabaseAuthUserSyncEnabled { | ||
| workerLogger := l.With(zap.String("worker", "supabase_auth_user_sync")) | ||
|
|
||
| authPool := authDB.WritePool() |
There was a problem hiding this comment.
the authDB is not Supabase DB, but a preparation for a future split of public.users, public.teams, etc to its own database

No description provided.