Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -1862,13 +1862,29 @@ func (a *FlowableActivity) MigratePostgresTableOIDs(
}
destinationTableOidMap[destinationTableIdentifier] = oid
}
tableNames := make([]string, 0, len(destinationTableOidMap))
for tableName := range destinationTableOidMap {
tableNames = append(tableNames, tableName)
}
tableSchemas, err := internal.LoadTableSchemasFromCatalog(ctx, a.CatalogPool, flowName, tableNames)
if err != nil {
return fmt.Errorf("failed to load table schemas from catalog: %w", err)
}

for destinationTableName, oid := range destinationTableOidMap {
schema, ok := tableSchemas[destinationTableName]
if !ok {
return fmt.Errorf("table schema not found for table %s", destinationTableName)
}
schema.TableOid = oid
}

err := internal.UpdateTableOIDsInTableSchemaInCatalog(
err = internal.UpdateTableSchemasInCatalog(
Copy link
Contributor

@jgao54 jgao54 Dec 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For MigratePostgresTableOIDs:

	// MIGRATION: Migrate Postgres table OIDs to catalog before starting/resuming the flow
	migrateCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
		StartToCloseTimeout: 1 * time.Hour,
		HeartbeatTimeout:    2 * time.Minute,
	})

iiuc, on pause-restart, we also sync what's in the source db to our catalog. so there may be a chance here too that if there are schema changes between pause/resume, we would end up syncing catalog to the latest schema, and ApplySchemaDelta could miss column additions.

Im wondering what is the motivation here for always syncing schema to latest on pause/restart, vs. just letting CDC do the catch-up and ApplySchemaDelta.

Note this doesn't block your PR, just an observation and maybe a follow-up item.

Copy link
Contributor Author

@Amogh-Bharadwaj Amogh-Bharadwaj Dec 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we also sync what's in the source db to our catalog

Actually this activity doesn't touch Postgres, it moves the OIDs from the state object to catalog and the OIDs in state are populated during initial setup flow and can be extended by setup flows of table additions, but is independent of schema changes so we should be good?

P.S: This migration is what enables table cancellation addition (added recently), it can be removed after a certain period of time

ctx,
a.CatalogPool,
logger,
flowName,
destinationTableOidMap,
tableSchemas,
)
if err != nil {
return fmt.Errorf("failed to update table OIDs in catalog: %w", err)
Expand Down
75 changes: 50 additions & 25 deletions flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
"errors"
"fmt"
"log/slog"
"slices"
"sync/atomic"
"time"

Expand Down Expand Up @@ -64,31 +63,57 @@

func (a *FlowableActivity) applySchemaDeltas(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did a quick code search and i see a few other places that are using options.tableMapping (i.e. latest schema). wondering if we have any other logic that is relying on latest schema when it should be relying on catalog schema and if they introduce any subtle edge cases like this one

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call out, will take a look

ctx context.Context,
config *protos.FlowConnectionConfigsCore,
options *protos.SyncFlowOptions,
flowJobName string,
schemaDeltas []*protos.TableSchemaDelta,
) error {
filteredTableMappings := make([]*protos.TableMapping, 0, len(schemaDeltas))
for _, tableMapping := range options.TableMappings {
if slices.ContainsFunc(schemaDeltas, func(schemaDelta *protos.TableSchemaDelta) bool {
return schemaDelta.SrcTableName == tableMapping.SourceTableIdentifier &&
schemaDelta.DstTableName == tableMapping.DestinationTableIdentifier
}) {
filteredTableMappings = append(filteredTableMappings, tableMapping)
}
}

if len(schemaDeltas) > 0 {
if err := a.SetupTableSchema(ctx, &protos.SetupTableSchemaBatchInput{
PeerName: config.SourceName,
TableMappings: filteredTableMappings,
FlowName: config.FlowJobName,
System: config.System,
Env: config.Env,
Version: config.Version,
}); err != nil {
return a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("failed to execute schema update at source: %w", err))
logger := internal.LoggerFromCtx(ctx)
destinationNameListInDeltas := make([]string, 0, len(schemaDeltas))
for _, tableSchemaDelta := range schemaDeltas {
destinationNameListInDeltas = append(destinationNameListInDeltas, tableSchemaDelta.DstTableName)
}
logger.Info("loading table schemas from catalog for applying schema deltas",
slog.Int("numDeltas", len(schemaDeltas)),
slog.String("flowJobName", flowJobName),
)
schemasInCatalog, err := internal.LoadTableSchemasFromCatalog(ctx, a.CatalogPool, flowJobName, destinationNameListInDeltas)
if err != nil {
return fmt.Errorf("failed to load table schemas from catalog for applying schema deltas: %w", err)
}
for _, tableSchemaDelta := range schemaDeltas {
columnsInCatalog := schemasInCatalog[tableSchemaDelta.DstTableName].GetColumns()
addedColumns := tableSchemaDelta.GetAddedColumns()

// Create a map to track existing column names to avoid duplicates
existingColumnNames := make(map[string]bool)
for _, col := range columnsInCatalog {
existingColumnNames[col.Name] = true
}

// Only add columns that don't already exist
var newColumnsToAdd []*protos.FieldDescription
for _, addedCol := range addedColumns {
if !existingColumnNames[addedCol.Name] {
newColumnsToAdd = append(newColumnsToAdd, addedCol)
existingColumnNames[addedCol.Name] = true
}
}
updatedColumnsInCatalog := append(columnsInCatalog, newColumnsToAdd...)

Check failure on line 100 in flow/activities/flowable_core.go

View workflow job for this annotation

GitHub Actions / lint

appendAssign: append result not assigned to the same slice (gocritic)
schemasInCatalog[tableSchemaDelta.DstTableName].Columns = updatedColumnsInCatalog
}
logger.Info("applying schema deltas to catalog",
slog.Int("numTables", len(schemasInCatalog)),
slog.Int("numDeltas", len(schemaDeltas)),
slog.String("flowJobName", flowJobName),
)
err = internal.UpdateTableSchemasInCatalog(
ctx,
a.CatalogPool,
logger,
flowJobName,
schemasInCatalog,
)
if err != nil {
return fmt.Errorf("failed to update table schemas in catalog: %w", err)
}
return nil
}
Expand Down Expand Up @@ -223,7 +248,7 @@
return nil, fmt.Errorf("failed to sync schema: %w", err)
}

return nil, a.applySchemaDeltas(ctx, config, options, recordBatchSync.SchemaDeltas)
return nil, a.applySchemaDeltas(ctx, config.FlowJobName, recordBatchSync.SchemaDeltas)
}

var res *model.SyncResponse
Expand Down Expand Up @@ -319,7 +344,7 @@
a.OtelManager.Metrics.CurrentBatchIdGauge.Record(ctx, res.CurrentSyncBatchID)

syncState.Store(shared.Ptr("updating schema"))
if err := a.applySchemaDeltas(ctx, config, options, res.TableSchemaDeltas); err != nil {
if err := a.applySchemaDeltas(ctx, config.FlowJobName, res.TableSchemaDeltas); err != nil {
return nil, err
}

Expand Down
45 changes: 13 additions & 32 deletions flow/internal/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,31 +74,22 @@ func LoadTableSchemaFromCatalog(
return tableSchema, proto.Unmarshal(tableSchemaBytes, tableSchema)
}

func UpdateTableOIDsInTableSchemaInCatalog(
func UpdateTableSchemasInCatalog(
ctx context.Context,
pool shared.CatalogPool,
logger log.Logger,
flowName string,
tableOIDs map[string]uint32, // map[destinationTableName]tableOID
destinationTableSourceSchemaMap map[string]*protos.TableSchema, // map[destinationTableName]tableSchema
) error {
if len(tableOIDs) == 0 {
logger.Info("no table OIDs to update, skipping migration",
if len(destinationTableSourceSchemaMap) == 0 {
logger.Info("no schema deltas to update, skipping migration",
slog.String("flowName", flowName))
return nil
}

logger.Info("updating table OIDs in catalog",
logger.Info("updating schema deltas in catalog",
slog.String("flowName", flowName),
slog.Int("numTables", len(tableOIDs)))

tableNames := make([]string, 0, len(tableOIDs))
for tableName := range tableOIDs {
tableNames = append(tableNames, tableName)
}
tableSchemas, err := LoadTableSchemasFromCatalog(ctx, pool, flowName, tableNames)
if err != nil {
return fmt.Errorf("failed to load table schemas from catalog: %w", err)
}
slog.Int("numTables", len(destinationTableSourceSchemaMap)))

tx, err := pool.Pool.Begin(ctx)
if err != nil {
Expand All @@ -107,36 +98,26 @@ func UpdateTableOIDsInTableSchemaInCatalog(
defer shared.RollbackTx(tx, logger)

batch := &pgx.Batch{}
for tableName, tableOID := range tableOIDs {
tableSchema, exists := tableSchemas[tableName]
if !exists {
logger.Error("table schema not found in catalog",
slog.String("flowName", flowName),
slog.String("tableName", tableName))
return fmt.Errorf("table schema not found for table: %s", tableName)
}

tableSchema.TableOid = tableOID
for tableName, tableSchema := range destinationTableSourceSchemaMap {
tableSchemaBytes, err := proto.Marshal(tableSchema)
if err != nil {
return fmt.Errorf("unable to marshal updated table schema for %s: %w", tableName, err)
return fmt.Errorf("unable to marshal table schema for %s: %w", tableName, err)
}

batch.Queue(
"UPDATE table_schema_mapping SET table_schema=$1 WHERE flow_name=$2 AND table_name=$3",
tableSchemaBytes, flowName, tableName,
)

logger.Info("queued table OID update",
logger.Info("queued schema delta update",
slog.String("flowName", flowName),
slog.String("tableName", tableName),
slog.Uint64("tableOID", uint64(tableOID)))
slog.String("tableName", tableName))
}

results := tx.SendBatch(ctx, batch)
defer results.Close() // Ensure resources are freed in case of early return

for i := range len(tableOIDs) {
for i := range len(destinationTableSourceSchemaMap) {
if _, err := results.Exec(); err != nil {
logger.Error("failed to update table schema in catalog",
slog.Any("error", err),
Expand All @@ -157,9 +138,9 @@ func UpdateTableOIDsInTableSchemaInCatalog(
return fmt.Errorf("failed to commit transaction: %w", err)
}

logger.Info("successfully updated all table OIDs in catalog",
logger.Info("successfully updated all schema deltas in catalog",
slog.String("flowName", flowName),
slog.Int("numTables", len(tableOIDs)))
slog.Int("numTables", len(destinationTableSourceSchemaMap)))

return nil
}
Expand Down
Loading