Skip to content

Conversation

@Amogh-Bharadwaj
Copy link
Contributor

@Amogh-Bharadwaj Amogh-Bharadwaj commented Dec 5, 2025

Why

Let's say you have a Postgres mirror running and it is in CDC.
The pull batch size is 1.
On source for a table t1, the following operations are performed at the same time:

ALTER TABLE t1 ADD COLUMN good_column TIMESTAMP DEFAULT CURRENT_TIMESTAMP;
INSERT INTO t1 DEFAULT VALUES;
ALTER TABLE t1 ADD COLUMN lost_column TIMESTAMP DEFAULT CURRENT_TIMESTAMP;

The way PeerDB syncs the above operations is:

  1. Receives the relation message for the addition of good_column and the INSERT.
    1.1) Checks the schema of this table as stored in catalog, sees that good_column is not present and marks good_column as a delta to be synced.
  2. Does not receive the column addition for lost_column because there was no subsequent DML (Postgres behaviour).
  3. Syncs the INSERT to destination and adds good_column to destination.
  4. Performs applySchemaDelta which gets the schema of the table t1 from Postgres and stores this schema in catalog. Note that lost_column will be stored here as it is part of the source table at this time.

Now, when another insert is performed:

INSERT INTO t1 DEFAULT VALUES;

PeerDB now:

  1. Receives the relation message for the addition of lost_column and the INSERT.
    1.1) Checks the schema of this table as stored in catalog, sees that lost_column is present and moves on.
  2. Errors out when inserting to target tables with some message resembling column "lost_column" not found in destination.

Because we never identified lost_column as a delta to be added to destination.

What

This PR changes the implementation of applySchemaDeltas where we do not touch Postgres, and instead simply do:

updated columns for t1 in catalog = current columns of t1 in catalog + added columns in delta of this batch

This way, when the second INSERT comes along in the example above, PeerDB will see that lost_column is not present in catalog and will add it to the list of schema deltas to be synced.

Note: Refactors the table OID migration code so that we can share a common function to update table schemas in catalog.

  • E2E tests pending
  • Functional reproduction pending

@jgao54
Copy link
Contributor

jgao54 commented Dec 5, 2025

@Amogh-Bharadwaj love this change! -- i.e. rely on our states (of schema) as base instead of fetching from the source db to apply schema delta, since many things can result in schema change between SyncRecords runs.

return tableNameSchemaMapping, nil
}

func (a *FlowableActivity) applySchemaDeltas(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did a quick code search and i see a few other places that are using options.tableMapping (i.e. latest schema). wondering if we have any other logic that is relying on latest schema when it should be relying on catalog schema and if they introduce any subtle edge cases like this one

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call out, will take a look

}

err := internal.UpdateTableOIDsInTableSchemaInCatalog(
err = internal.UpdateTableSchemasInCatalog(
Copy link
Contributor

@jgao54 jgao54 Dec 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For MigratePostgresTableOIDs:

	// MIGRATION: Migrate Postgres table OIDs to catalog before starting/resuming the flow
	migrateCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
		StartToCloseTimeout: 1 * time.Hour,
		HeartbeatTimeout:    2 * time.Minute,
	})

iiuc, on pause-restart, we also sync what's in the source db to our catalog. so there may be a chance here too that if there are schema changes between pause/resume, we would end up syncing catalog to the latest schema, and ApplySchemaDelta could miss column additions.

Im wondering what is the motivation here for always syncing schema to latest on pause/restart, vs. just letting CDC do the catch-up and ApplySchemaDelta.

Note this doesn't block your PR, just an observation and maybe a follow-up item.

Copy link
Contributor Author

@Amogh-Bharadwaj Amogh-Bharadwaj Dec 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we also sync what's in the source db to our catalog

Actually this activity doesn't touch Postgres, it moves the OIDs from the state object to catalog and the OIDs in state are populated during initial setup flow and can be extended by setup flows of table additions, but is independent of schema changes so we should be good?

P.S: This migration is what enables table cancellation addition (added recently), it can be removed after a certain period of time

jgao54 added a commit that referenced this pull request Dec 12, 2025
This pull request introduces a more robust approach for applying schema
deltas to the catalog. It's a follow-up to
#3768 (and addresses the race
condition described in the PR description), with e2e test coverage, and
with the new approach shadow-applied for validation before it's fully
enabled, given this PR introduces some non-trivial changes db state that
is hard to debug/rollback.

Changes:
- Moved previous `applySchemaDeltas` logic to `applySchemaDeltasV1`, and
introduce a separate `applySchemaDeltasV2`. Note that
`applySchemaDeltasV2` applies change in-memory, and works in conjunction
with a `ReadModifyWriteTableSchemasToCatalog` method to support
transaction with ReadModifyWrite patterns.
- Added a feature flag (`PEERDB_APPLY_SCHEMA_DELTA_TO_CATALOG`) and
logic to choose between the legacy (v1) and new (v2) approaches for
applying schema deltas to the catalog. For now the FF is **disabled** by
default, which means the old approach continue to be used as source of
truth.
- The new v2 method is shadow-applied. This is done by adding temporary
validation utilities to compare the results of v1 and v2 schema delta
approaches, logging discrepancies and reporting metrics for monitoring.
- Improved end-to-end tests to cover a race condition where columns
added without subsequent DML operations could be lost in the destination
schema, verifying catalog correctness after each sync.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants