From e2cce3aa40169837e6318c6e4a8c3fc9f3b69d29 Mon Sep 17 00:00:00 2001 From: Stoica Alexandru Date: Tue, 7 Oct 2025 13:09:15 +0100 Subject: [PATCH 1/6] Proposal: Activity to fetch config from the DB This uses a new helper in `internal.FetchConfigFromDB` to fetch a fully hydrated `protos.FlowConnectionConfigs`. When passing this config to Temporal we strip the `tableMappings` array element which can cause it to go over the 2MB limit. This contains a subset of the changes which were originally proposed in: https://github.com/PeerDB-io/peerdb/pull/3407 --- flow/activities/flowable.go | 39 +++++++++++++++++++++++++--- flow/activities/snapshot_activity.go | 14 +++++++++- flow/workflows/cdc_flow.go | 38 ++++++++++----------------- flow/workflows/setup_flow.go | 30 ++++++++++----------- flow/workflows/snapshot_flow.go | 21 ++++++++------- 5 files changed, 88 insertions(+), 54 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index e0e46a8777..2241f5bb7c 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -5,8 +5,10 @@ import ( "errors" "fmt" "log/slog" + "maps" "net" "os" + "slices" "strconv" "sync/atomic" "time" @@ -125,6 +127,13 @@ func (a *FlowableActivity) EnsurePullability( } defer connectors.CloseConnector(ctx, srcConn) + // We can fetch from the DB, as we are in the activity + cfg, err := internal.FetchConfigFromDB(config.FlowJobName, ctx) + if err != nil { + return nil, err + } + config.SourceTableIdentifiers = slices.Sorted(maps.Keys(internal.TableNameMapping(cfg.TableMappings, cfg.Resync))) + output, err := srcConn.EnsurePullability(ctx, config) if err != nil { return nil, a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("failed to ensure pullability: %w", err)) @@ -145,6 +154,12 @@ func (a *FlowableActivity) CreateRawTable( } defer connectors.CloseConnector(ctx, dstConn) + cfg, err := internal.FetchConfigFromDB(config.FlowJobName, ctx) + if err != nil { + return nil, err + } + config.TableNameMapping = internal.TableNameMapping(cfg.TableMappings, cfg.Resync) + res, err := dstConn.CreateRawTable(ctx, config) if err != nil { return nil, a.Alerter.LogFlowError(ctx, config.FlowJobName, err) @@ -174,11 +189,15 @@ func (a *FlowableActivity) SetupTableSchema( } defer connectors.CloseConnector(ctx, srcConn) - tableNameSchemaMapping, err := srcConn.GetTableSchema(ctx, config.Env, config.Version, config.System, config.TableMappings) + cfg, err := internal.FetchConfigFromDB(config.FlowName, ctx) + if err != nil { + return err + } + tableNameSchemaMapping, err := srcConn.GetTableSchema(ctx, config.Env, config.Version, config.System, cfg.TableMappings) if err != nil { return a.Alerter.LogFlowError(ctx, config.FlowName, fmt.Errorf("failed to get GetTableSchemaConnector: %w", err)) } - processed := internal.BuildProcessedSchemaMapping(config.TableMappings, tableNameSchemaMapping, logger) + processed := internal.BuildProcessedSchemaMapping(cfg.TableMappings, tableNameSchemaMapping, logger) tx, err := a.CatalogPool.BeginTx(ctx, pgx.TxOptions{}) if err != nil { @@ -243,9 +262,13 @@ func (a *FlowableActivity) CreateNormalizedTable( return nil, err } + cfg, err := internal.FetchConfigFromDB(config.FlowName, ctx) + if err != nil { + return nil, err + } numTablesToSetup.Store(int32(len(tableNameSchemaMapping))) tableExistsMapping := make(map[string]bool, len(tableNameSchemaMapping)) - for _, tableMapping := range config.TableMappings { + for _, tableMapping := range cfg.TableMappings { tableIdentifier := tableMapping.DestinationTableIdentifier tableSchema := tableNameSchemaMapping[tableIdentifier] existing, err := conn.SetupNormalizedTable( @@ -292,6 +315,16 @@ func (a *FlowableActivity) SyncFlow( var normalizeWaiting atomic.Bool var syncingBatchID atomic.Int64 var syncState atomic.Pointer[string] + + cfg, err := internal.FetchConfigFromDB(config.FlowJobName, ctx) + if err != nil { + return err + } + + // Override config with DB values to deal with the large fields. + config = cfg + options.TableMappings = cfg.TableMappings + syncState.Store(shared.Ptr("setup")) shutdown := heartbeatRoutine(ctx, func() string { // Must load Waiting after BatchID to avoid race saying we're waiting on currently processing batch diff --git a/flow/activities/snapshot_activity.go b/flow/activities/snapshot_activity.go index a8c7cb5837..074e4479f0 100644 --- a/flow/activities/snapshot_activity.go +++ b/flow/activities/snapshot_activity.go @@ -65,6 +65,14 @@ func (a *SnapshotActivity) SetupReplication( return nil, a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("failed to get connector: %w", err)) } + configCtx := context.Background() + defer configCtx.Done() + cfg, err := internal.FetchConfigFromDB(config.FlowJobName, configCtx) + if err != nil { + return nil, err + } + config.TableNameMapping = internal.TableNameMapping(cfg.TableMappings, cfg.Resync) + logger.Info("waiting for slot to be created...") slotInfo, err := conn.SetupReplication(ctx, config) @@ -180,8 +188,12 @@ func (a *SnapshotActivity) GetDefaultPartitionKeyForTables( } defer connectors.CloseConnector(ctx, connector) + cfg, err := internal.FetchConfigFromDB(input.FlowJobName, ctx) + if err != nil { + return nil, err + } output, err := connector.GetDefaultPartitionKeyForTables(ctx, &protos.GetDefaultPartitionKeyForTablesInput{ - TableMappings: input.TableMappings, + TableMappings: cfg.TableMappings, }) if err != nil { return nil, a.Alerter.LogFlowError(ctx, input.FlowJobName, fmt.Errorf("failed to check if tables can parallel load: %w", err)) diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 9303a3cc8c..96fe98a984 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -296,7 +296,7 @@ func processTableAdditions( alterPublicationAddAdditionalTablesFuture := workflow.ExecuteActivity( alterPublicationAddAdditionalTablesCtx, flowable.AddTablesToPublication, - cfg, flowConfigUpdate.AdditionalTables) + internal.MinimizeFlowConfiguration(cfg), flowConfigUpdate.AdditionalTables) var res *CDCFlowWorkflowResult var addTablesFlowErr error @@ -338,6 +338,7 @@ func processTableAdditions( childAddTablesCDCFlowFuture := workflow.ExecuteChildWorkflow( childAddTablesCDCFlowCtx, CDCFlowWorkflow, + // TODO: `additonalTableCfg` this cannot be minimized in the main branch; but the limitation is minimal. additionalTablesCfg, nil, ) @@ -412,7 +413,7 @@ func processTableRemovals( rawTableCleanupFuture := workflow.ExecuteActivity( removeTablesCtx, flowable.RemoveTablesFromRawTable, - cfg, state.FlowConfigUpdate.RemovedTables) + internal.MinimizeFlowConfiguration(cfg), state.FlowConfigUpdate.RemovedTables) removeTablesSelector.AddFuture(rawTableCleanupFuture, func(f workflow.Future) { if err := f.Get(ctx, nil); err != nil { logger.Error("failed to clean up raw table for removed tables", slog.Any("error", err)) @@ -424,7 +425,7 @@ func processTableRemovals( removeTablesFromCatalogFuture := workflow.ExecuteActivity( removeTablesCtx, flowable.RemoveTablesFromCatalog, - cfg, state.FlowConfigUpdate.RemovedTables) + internal.MinimizeFlowConfiguration(cfg), state.FlowConfigUpdate.RemovedTables) removeTablesSelector.AddFuture(removeTablesFromCatalogFuture, func(f workflow.Future) { if err := f.Get(ctx, nil); err != nil { logger.Error("failed to clean up raw table for removed tables", slog.Any("error", err)) @@ -594,7 +595,7 @@ func CDCFlowWorkflow( logger.Info("mirror resumed", slog.Duration("after", time.Since(startTime))) state.updateStatus(ctx, logger, protos.FlowStatus_STATUS_RUNNING) - return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, cfg, state) + return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, internal.MinimizeFlowConfiguration(cfg), state) } originalRunID := workflow.GetInfo(ctx).OriginalRunID @@ -627,22 +628,6 @@ func CDCFlowWorkflow( // for safety, rely on the idempotency of SetupFlow instead // also, no signals are being handled until the loop starts, so no PAUSE/DROP will take here. if state.CurrentFlowStatus != protos.FlowStatus_STATUS_RUNNING { - originalTableMappings := make([]*protos.TableMapping, 0, len(cfg.TableMappings)) - for _, tableMapping := range cfg.TableMappings { - originalTableMappings = append(originalTableMappings, proto.CloneOf(tableMapping)) - } - // if resync is true, alter the table name schema mapping to temporarily add - // a suffix to the table names. - if cfg.Resync { - for _, mapping := range state.SyncFlowOptions.TableMappings { - if mapping.Engine != protos.TableEngine_CH_ENGINE_NULL { - mapping.DestinationTableIdentifier += "_resync" - } - } - // because we have renamed the tables. - cfg.TableMappings = state.SyncFlowOptions.TableMappings - } - // start the SetupFlow workflow as a child workflow, and wait for it to complete // it should return the table schema for the source peer setupFlowID := GetChildWorkflowID("setup-flow", cfg.FlowJobName, originalRunID) @@ -666,7 +651,6 @@ func CDCFlowWorkflow( state.ActiveSignal = model.ResyncSignal cfg.Resync = true cfg.DoInitialSnapshot = true - cfg.TableMappings = originalTableMappings // this is the only place where we can have a resync during a resync // so we need to NOT sync the tableMappings to catalog to preserve original names uploadConfigToCatalog(ctx, cfg) @@ -690,7 +674,10 @@ func CDCFlowWorkflow( WaitForCancellation: true, } setupFlowCtx := workflow.WithChildOptions(ctx, childSetupFlowOpts) - setupFlowFuture := workflow.ExecuteChildWorkflow(setupFlowCtx, SetupFlowWorkflow, cfg) + // Resync will rely rely on the `cfg.Resync` flag to rename the tables + // during the snapshot process. This is how we're able to also remove the need + // to sync the config back into the DB / not rely on the `state.TableMappings`. + setupFlowFuture := workflow.ExecuteChildWorkflow(setupFlowCtx, SetupFlowWorkflow, internal.MinimizeFlowConfiguration(cfg)) var setupFlowOutput *protos.SetupFlowOutput var setupFlowError error @@ -826,7 +813,7 @@ func CDCFlowWorkflow( logger.Info("executed setup flow and snapshot flow, start running") state.updateStatus(ctx, logger, protos.FlowStatus_STATUS_RUNNING) } - return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, cfg, state) + return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, internal.MinimizeFlowConfiguration(cfg), state) } var finished bool @@ -837,7 +824,8 @@ func CDCFlowWorkflow( WaitForCancellation: true, RetryPolicy: &temporal.RetryPolicy{MaximumAttempts: 1}, })) - syncFlowFuture := workflow.ExecuteActivity(syncCtx, flowable.SyncFlow, cfg, state.SyncFlowOptions) + state.SyncFlowOptions.TableMappings = []*protos.TableMapping{} + syncFlowFuture := workflow.ExecuteActivity(syncCtx, flowable.SyncFlow, internal.MinimizeFlowConfiguration(cfg), state.SyncFlowOptions) mainLoopSelector := workflow.NewNamedSelector(ctx, "MainLoop") mainLoopSelector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) { @@ -958,7 +946,7 @@ func CDCFlowWorkflow( if state.ActiveSignal == model.TerminateSignal || state.ActiveSignal == model.ResyncSignal { return state, workflow.NewContinueAsNewError(ctx, DropFlowWorkflow, state.DropFlowInput) } - return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, cfg, state) + return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, internal.MinimizeFlowConfiguration(cfg), state) } } } diff --git a/flow/workflows/setup_flow.go b/flow/workflows/setup_flow.go index f4c79eb67c..96876f93dd 100644 --- a/flow/workflows/setup_flow.go +++ b/flow/workflows/setup_flow.go @@ -36,15 +36,17 @@ type SetupFlowExecution struct { tableNameMapping map[string]string cdcFlowName string executionID string + resync bool } // NewSetupFlowExecution creates a new instance of SetupFlowExecution. -func NewSetupFlowExecution(ctx workflow.Context, tableNameMapping map[string]string, cdcFlowName string) *SetupFlowExecution { +func NewSetupFlowExecution(ctx workflow.Context, tableNameMapping map[string]string, cdcFlowName string, resync bool) *SetupFlowExecution { return &SetupFlowExecution{ Logger: log.With(workflow.GetLogger(ctx), slog.String(string(shared.FlowNameKey), cdcFlowName)), tableNameMapping: tableNameMapping, cdcFlowName: cdcFlowName, executionID: workflow.GetInfo(ctx).WorkflowExecution.ID, + resync: resync, } } @@ -119,10 +121,9 @@ func (s *SetupFlowExecution) ensurePullability( // create EnsurePullabilityInput for the srcTableName ensurePullabilityInput := &protos.EnsurePullabilityBatchInput{ - PeerName: config.SourceName, - FlowJobName: s.cdcFlowName, - SourceTableIdentifiers: slices.Sorted(maps.Keys(s.tableNameMapping)), - CheckConstraints: checkConstraints, + PeerName: config.SourceName, + FlowJobName: s.cdcFlowName, + CheckConstraints: checkConstraints, } future := workflow.ExecuteActivity(ctx, flowable.EnsurePullability, ensurePullabilityInput) @@ -162,9 +163,8 @@ func (s *SetupFlowExecution) createRawTable( // attempt to create the tables. createRawTblInput := &protos.CreateRawTableInput{ - PeerName: config.DestinationName, - FlowJobName: s.cdcFlowName, - TableNameMapping: s.tableNameMapping, + PeerName: config.DestinationName, + FlowJobName: s.cdcFlowName, } rawTblFuture := workflow.ExecuteActivity(ctx, flowable.CreateRawTable, createRawTblInput) @@ -191,12 +191,11 @@ func (s *SetupFlowExecution) setupNormalizedTables( }) tableSchemaInput := &protos.SetupTableSchemaBatchInput{ - PeerName: flowConnectionConfigs.SourceName, - TableMappings: flowConnectionConfigs.TableMappings, - FlowName: s.cdcFlowName, - System: flowConnectionConfigs.System, - Env: flowConnectionConfigs.Env, - Version: flowConnectionConfigs.Version, + PeerName: flowConnectionConfigs.SourceName, + FlowName: s.cdcFlowName, + System: flowConnectionConfigs.System, + Env: flowConnectionConfigs.Env, + Version: flowConnectionConfigs.Version, } if err := workflow.ExecuteActivity(ctx, flowable.SetupTableSchema, tableSchemaInput).Get(ctx, nil); err != nil { @@ -207,7 +206,6 @@ func (s *SetupFlowExecution) setupNormalizedTables( s.Info("setting up normalized tables on destination peer", slog.String("destination", flowConnectionConfigs.DestinationName)) setupConfig := &protos.SetupNormalizedTableBatchInput{ PeerName: flowConnectionConfigs.DestinationName, - TableMappings: flowConnectionConfigs.TableMappings, SoftDeleteColName: flowConnectionConfigs.SoftDeleteColName, SyncedAtColName: flowConnectionConfigs.SyncedAtColName, FlowName: flowConnectionConfigs.FlowJobName, @@ -267,7 +265,7 @@ func SetupFlowWorkflow(ctx workflow.Context, config *protos.FlowConnectionConfig } // create the setup flow execution - setupFlowExecution := NewSetupFlowExecution(ctx, tblNameMapping, config.FlowJobName) + setupFlowExecution := NewSetupFlowExecution(ctx, tblNameMapping, config.FlowJobName, config.Resync) // execute the setup flow setupFlowOutput, err := setupFlowExecution.executeSetupFlow(ctx, config) diff --git a/flow/workflows/snapshot_flow.go b/flow/workflows/snapshot_flow.go index 80431bb536..40d96efff3 100644 --- a/flow/workflows/snapshot_flow.go +++ b/flow/workflows/snapshot_flow.go @@ -1,6 +1,7 @@ package peerflow import ( + "context" "fmt" "log/slog" "slices" @@ -55,15 +56,10 @@ func (s *SnapshotFlowExecution) setupReplication( }, }) - tblNameMapping := make(map[string]string, len(s.config.TableMappings)) - for _, v := range s.config.TableMappings { - tblNameMapping[v.SourceTableIdentifier] = v.DestinationTableIdentifier - } - setupReplicationInput := &protos.SetupReplicationInput{ - PeerName: s.config.SourceName, - FlowJobName: flowName, - TableNameMapping: tblNameMapping, + PeerName: s.config.SourceName, + FlowJobName: flowName, + // TableNameMapping: tblNameMapping, DoInitialSnapshot: s.config.DoInitialSnapshot, ExistingPublicationName: s.config.PublicationName, ExistingReplicationSlotName: s.config.ReplicationSlotName, @@ -284,7 +280,14 @@ func (s *SnapshotFlowExecution) cloneTables( return err } - for _, v := range s.config.TableMappings { + configCtx := context.Background() + defer configCtx.Done() + cfg, err := internal.FetchConfigFromDB(s.config.FlowJobName, configCtx) + if err != nil { + return err + } + + for _, v := range cfg.TableMappings { source := v.SourceTableIdentifier destination := v.DestinationTableIdentifier s.logger.Info( From c901a49341e9a07dd0ed0366129e5c983fccfebf Mon Sep 17 00:00:00 2001 From: Stoica Alexandru Date: Mon, 13 Oct 2025 14:49:12 +0100 Subject: [PATCH 2/6] Tenanative first go at removing `tableMappings` & storing in DB --- flow/activities/flowable.go | 31 ++++++++++++++++ flow/activities/flowable_core.go | 27 ++++++++++++++ flow/internal/flow_configuration_helpers.go | 16 ++++++++ flow/workflows/cdc_flow.go | 37 ++++++++++++++----- .../migrations/V49__table_mappings_table.sql | 6 +++ protos/flow.proto | 7 +++- 6 files changed, 112 insertions(+), 12 deletions(-) create mode 100644 nexus/catalog/migrations/V49__table_mappings_table.sql diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 2241f5bb7c..fe1edfb10a 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -1822,3 +1822,34 @@ func (a *FlowableActivity) ReportStatusMetric(ctx context.Context, status protos ))) return nil } + +func (a *FlowableActivity) MigrateTableMappingsToCatalog( + ctx context.Context, + flowJobName string, tableMappings []*protos.TableMapping, version uint32, +) error { + logger := internal.LoggerFromCtx(ctx) + tx, err := a.CatalogPool.Begin(ctx) + if err != nil { + return fmt.Errorf("failed to begin transaction to migrate table mappings to catalog: %w", err) + } + defer shared.RollbackTx(tx, logger) + + tableMappingsBytes := [][]byte{} + for _, tableMapping := range tableMappings { + tableMappingBytes, err := proto.Marshal(tableMapping) + if err != nil { + return fmt.Errorf("failed to marshal table mapping to migrate to catalog: %w", err) + } + tableMappingsBytes = append(tableMappingsBytes, tableMappingBytes) + } + + stmt := `INSERT INTO table_mappings (flow_name, version, table_mapping) VALUES ($1, $2, $3) + ON CONFLICT (flow_name, version) DO UPDATE SET table_mapping = EXCLUDED.table_mapping` + _, err = tx.Exec(ctx, stmt, flowJobName, version, tableMappingsBytes) + + if err := tx.Commit(ctx); err != nil { + return fmt.Errorf("failed to commit transaction to migrate table mappings to catalog: %w", err) + } + + return nil +} diff --git a/flow/activities/flowable_core.go b/flow/activities/flowable_core.go index 15179a1603..cd00ae4b48 100644 --- a/flow/activities/flowable_core.go +++ b/flow/activities/flowable_core.go @@ -55,6 +55,33 @@ func heartbeatRoutine( ) } +func (a *FlowableActivity) getTableMappings(ctx context.Context, flowName string, version uint) ([]*protos.TableMapping, error) { + rows, err := a.CatalogPool.Query(ctx, "select table_mapping from table_mappings where flow_name = $1 AND version = $2", flowName, version) + if err != nil { + return nil, err + } + + var tableMappingsBytes [][]byte + var tableMappings []*protos.TableMapping = []*protos.TableMapping{} + + err = rows.Scan([]any{&tableMappingsBytes}) + if err != nil { + return nil, fmt.Errorf("failed to deserialize table mapping schema proto: %w", err) + } + + for _, tableMappingBytes := range tableMappingsBytes { + + var tableMapping *protos.TableMapping + if err := proto.Unmarshal(tableMappingBytes, tableMapping); err != nil { + return nil, err + } + + tableMappings = append(tableMappings, tableMapping) + } + + return tableMappings, nil +} + func (a *FlowableActivity) getTableNameSchemaMapping(ctx context.Context, flowName string) (map[string]*protos.TableSchema, error) { rows, err := a.CatalogPool.Query(ctx, "select table_name, table_schema from table_schema_mapping where flow_name = $1", flowName) if err != nil { diff --git a/flow/internal/flow_configuration_helpers.go b/flow/internal/flow_configuration_helpers.go index ee1b331da0..250bbc13a2 100644 --- a/flow/internal/flow_configuration_helpers.go +++ b/flow/internal/flow_configuration_helpers.go @@ -11,6 +11,22 @@ import ( "github.com/PeerDB-io/peerdb/flow/shared" ) +func TableNameMapping(tableMappings []*protos.TableMapping, resync bool) map[string]string { + tblNameMapping := make(map[string]string, len(tableMappings)) + if resync { + for _, mapping := range tableMappings { + if mapping.Engine != protos.TableEngine_CH_ENGINE_NULL { + mapping.DestinationTableIdentifier += "_resync" + } + } + } + for _, v := range tableMappings { + tblNameMapping[v.SourceTableIdentifier] = v.DestinationTableIdentifier + } + + return tblNameMapping +} + func FetchConfigFromDB(ctx context.Context, catalogPool shared.CatalogPool, flowName string) (*protos.FlowConnectionConfigsCore, error) { var configBytes sql.RawBytes if err := catalogPool.QueryRow(ctx, diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 96fe98a984..8a99745f9f 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -123,7 +123,7 @@ func updateFlowConfigWithLatestSettings( cloneCfg := proto.CloneOf(cfg) cloneCfg.MaxBatchSize = state.SyncFlowOptions.BatchSize cloneCfg.IdleTimeoutSeconds = state.SyncFlowOptions.IdleTimeoutSeconds - cloneCfg.TableMappings = state.SyncFlowOptions.TableMappings + cloneCfg.TableMappingVersion = state.SyncFlowOptions.TableMappingVersion if state.SnapshotNumRowsPerPartition > 0 { cloneCfg.SnapshotNumRowsPerPartition = state.SnapshotNumRowsPerPartition } @@ -221,6 +221,24 @@ func processCDCFlowConfigUpdate( } } + // MIGRATION: Move `tableMapping` to the DB + if len(state.SyncFlowOptions.TableMappings) > 0 { + migrateCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 5 * time.Minute, + }) + if err := workflow.ExecuteActivity( + migrateCtx, + flowable.MigrateTableMappingsToCatalog, + cfg.FlowJobName, + state.SyncFlowOptions.TableMappings, + 1, //hardcoding version as 1 for migration purposes. + ).Get(migrateCtx, nil); err != nil { + return fmt.Errorf("failed to migrate TableMappings: %w", err) + } + state.SyncFlowOptions.TableMappings = nil + state.SyncFlowOptions.TableMappingVersion = 1 + } + syncStateToConfigProtoInCatalog(ctx, cfg, state) return nil } @@ -296,7 +314,7 @@ func processTableAdditions( alterPublicationAddAdditionalTablesFuture := workflow.ExecuteActivity( alterPublicationAddAdditionalTablesCtx, flowable.AddTablesToPublication, - internal.MinimizeFlowConfiguration(cfg), flowConfigUpdate.AdditionalTables) + cfg, flowConfigUpdate.AdditionalTables) var res *CDCFlowWorkflowResult var addTablesFlowErr error @@ -338,7 +356,6 @@ func processTableAdditions( childAddTablesCDCFlowFuture := workflow.ExecuteChildWorkflow( childAddTablesCDCFlowCtx, CDCFlowWorkflow, - // TODO: `additonalTableCfg` this cannot be minimized in the main branch; but the limitation is minimal. additionalTablesCfg, nil, ) @@ -413,7 +430,7 @@ func processTableRemovals( rawTableCleanupFuture := workflow.ExecuteActivity( removeTablesCtx, flowable.RemoveTablesFromRawTable, - internal.MinimizeFlowConfiguration(cfg), state.FlowConfigUpdate.RemovedTables) + cfg, state.FlowConfigUpdate.RemovedTables) removeTablesSelector.AddFuture(rawTableCleanupFuture, func(f workflow.Future) { if err := f.Get(ctx, nil); err != nil { logger.Error("failed to clean up raw table for removed tables", slog.Any("error", err)) @@ -425,7 +442,7 @@ func processTableRemovals( removeTablesFromCatalogFuture := workflow.ExecuteActivity( removeTablesCtx, flowable.RemoveTablesFromCatalog, - internal.MinimizeFlowConfiguration(cfg), state.FlowConfigUpdate.RemovedTables) + cfg, state.FlowConfigUpdate.RemovedTables) removeTablesSelector.AddFuture(removeTablesFromCatalogFuture, func(f workflow.Future) { if err := f.Get(ctx, nil); err != nil { logger.Error("failed to clean up raw table for removed tables", slog.Any("error", err)) @@ -595,7 +612,7 @@ func CDCFlowWorkflow( logger.Info("mirror resumed", slog.Duration("after", time.Since(startTime))) state.updateStatus(ctx, logger, protos.FlowStatus_STATUS_RUNNING) - return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, internal.MinimizeFlowConfiguration(cfg), state) + return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, cfg, state) } originalRunID := workflow.GetInfo(ctx).OriginalRunID @@ -677,7 +694,7 @@ func CDCFlowWorkflow( // Resync will rely rely on the `cfg.Resync` flag to rename the tables // during the snapshot process. This is how we're able to also remove the need // to sync the config back into the DB / not rely on the `state.TableMappings`. - setupFlowFuture := workflow.ExecuteChildWorkflow(setupFlowCtx, SetupFlowWorkflow, internal.MinimizeFlowConfiguration(cfg)) + setupFlowFuture := workflow.ExecuteChildWorkflow(setupFlowCtx, SetupFlowWorkflow, cfg) var setupFlowOutput *protos.SetupFlowOutput var setupFlowError error @@ -813,7 +830,7 @@ func CDCFlowWorkflow( logger.Info("executed setup flow and snapshot flow, start running") state.updateStatus(ctx, logger, protos.FlowStatus_STATUS_RUNNING) } - return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, internal.MinimizeFlowConfiguration(cfg), state) + return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, cfg, state) } var finished bool @@ -825,7 +842,7 @@ func CDCFlowWorkflow( RetryPolicy: &temporal.RetryPolicy{MaximumAttempts: 1}, })) state.SyncFlowOptions.TableMappings = []*protos.TableMapping{} - syncFlowFuture := workflow.ExecuteActivity(syncCtx, flowable.SyncFlow, internal.MinimizeFlowConfiguration(cfg), state.SyncFlowOptions) + syncFlowFuture := workflow.ExecuteActivity(syncCtx, flowable.SyncFlow, cfg, state.SyncFlowOptions) mainLoopSelector := workflow.NewNamedSelector(ctx, "MainLoop") mainLoopSelector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) { @@ -946,7 +963,7 @@ func CDCFlowWorkflow( if state.ActiveSignal == model.TerminateSignal || state.ActiveSignal == model.ResyncSignal { return state, workflow.NewContinueAsNewError(ctx, DropFlowWorkflow, state.DropFlowInput) } - return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, internal.MinimizeFlowConfiguration(cfg), state) + return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, cfg, state) } } } diff --git a/nexus/catalog/migrations/V49__table_mappings_table.sql b/nexus/catalog/migrations/V49__table_mappings_table.sql new file mode 100644 index 0000000000..a0f3649d7a --- /dev/null +++ b/nexus/catalog/migrations/V49__table_mappings_table.sql @@ -0,0 +1,6 @@ +CREATE TABLE IF NOT EXISTS table_mappings ( + flow_name varchar(255) not null,Expand commentComment on line R2ResolvedCode has comments. Press enter to view. + version int32 not null default 1, + table_mappings []bytea not null, + primary key (flow_name, version) +); diff --git a/protos/flow.proto b/protos/flow.proto index fe0cdb853d..5329a70058 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -41,6 +41,7 @@ message TableMapping { string sharding_key = 7; string policy_name = 8; string partition_by_expr = 9; + uint32 version = 10; } message SetupInput { @@ -52,12 +53,12 @@ message SetupInput { // FlowConnectionConfigs is for external use by the API, maintaining backwards compatibility // When adding fields here, add them to FlowConnectionConfigsCore too message FlowConnectionConfigs { - reserved 2,3,17; + reserved 2,3,17,4; string flow_job_name = 1; // config for the CDC flow itself // currently, TableMappings, MaxBatchSize and IdleTimeoutSeconds are dynamic via Temporal signals - repeated TableMapping table_mappings = 4; + //repeatable TableMapping table_mappings = 4; uint32 max_batch_size = 5; uint64 idle_timeout_seconds = 6; string cdc_staging_path = 7; @@ -91,6 +92,7 @@ message FlowConnectionConfigs { map env = 24; uint32 version = 25; + uint32 table_mapping_version = 27; } // FlowConnectionConfigsCore is used internally in the codebase, it is safe to remove (mark reserved) fields from it @@ -181,6 +183,7 @@ message SyncFlowOptions { map src_table_id_name_mapping = 4; repeated TableMapping table_mappings = 6; int32 number_of_syncs = 7; + uint32 table_mapping_version = 8; } message EnsurePullabilityBatchInput { From b8f06ba5d20d328dcc9278bade4cd3dcdf500003 Mon Sep 17 00:00:00 2001 From: Stoica Alexandru Date: Mon, 13 Oct 2025 16:53:28 +0100 Subject: [PATCH 3/6] Resolve issues from deprecating TableMappings pt1 --- flow/activities/flowable.go | 10 ++--- flow/activities/flowable_core.go | 40 +++++++++--------- flow/cmd/handler.go | 17 ++++++++ flow/connectors/clickhouse/validate.go | 5 ++- flow/internal/flow_configuration_helpers.go | 46 +++++++++++++++++++++ protos/route.proto | 1 + 6 files changed, 90 insertions(+), 29 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index fe1edfb10a..8870916075 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -1834,13 +1834,9 @@ func (a *FlowableActivity) MigrateTableMappingsToCatalog( } defer shared.RollbackTx(tx, logger) - tableMappingsBytes := [][]byte{} - for _, tableMapping := range tableMappings { - tableMappingBytes, err := proto.Marshal(tableMapping) - if err != nil { - return fmt.Errorf("failed to marshal table mapping to migrate to catalog: %w", err) - } - tableMappingsBytes = append(tableMappingsBytes, tableMappingBytes) + tableMappingsBytes, err := internal.TableMappingsToBytes(tableMappings) + if err != nil { + return fmt.Errorf("unable to marshal table mappings: %w", err) } stmt := `INSERT INTO table_mappings (flow_name, version, table_mapping) VALUES ($1, $2, $3) diff --git a/flow/activities/flowable_core.go b/flow/activities/flowable_core.go index cd00ae4b48..3efaa94a7f 100644 --- a/flow/activities/flowable_core.go +++ b/flow/activities/flowable_core.go @@ -55,32 +55,32 @@ func heartbeatRoutine( ) } -func (a *FlowableActivity) getTableMappings(ctx context.Context, flowName string, version uint) ([]*protos.TableMapping, error) { - rows, err := a.CatalogPool.Query(ctx, "select table_mapping from table_mappings where flow_name = $1 AND version = $2", flowName, version) - if err != nil { - return nil, err - } +//func (a *FlowableActivity) getTableMappings(ctx context.Context, flowName string, version uint) ([]*protos.TableMapping, error) { +//rows, err := a.CatalogPool.Query(ctx, "select table_mapping from table_mappings where flow_name = $1 AND version = $2", flowName, version) +//if err != nil { +//return nil, err +//} - var tableMappingsBytes [][]byte - var tableMappings []*protos.TableMapping = []*protos.TableMapping{} +//var tableMappingsBytes [][]byte +//var tableMappings []*protos.TableMapping = []*protos.TableMapping{} - err = rows.Scan([]any{&tableMappingsBytes}) - if err != nil { - return nil, fmt.Errorf("failed to deserialize table mapping schema proto: %w", err) - } +//err = rows.Scan([]any{&tableMappingsBytes}) +//if err != nil { +//return nil, fmt.Errorf("failed to deserialize table mapping schema proto: %w", err) +//} - for _, tableMappingBytes := range tableMappingsBytes { +//for _, tableMappingBytes := range tableMappingsBytes { - var tableMapping *protos.TableMapping - if err := proto.Unmarshal(tableMappingBytes, tableMapping); err != nil { - return nil, err - } +//var tableMapping *protos.TableMapping +//if err := proto.Unmarshal(tableMappingBytes, tableMapping); err != nil { +//return nil, err +//} - tableMappings = append(tableMappings, tableMapping) - } +//tableMappings = append(tableMappings, tableMapping) +//} - return tableMappings, nil -} +//return tableMappings, nil +//} func (a *FlowableActivity) getTableNameSchemaMapping(ctx context.Context, flowName string) (map[string]*protos.TableSchema, error) { rows, err := a.CatalogPool.Query(ctx, "select table_name, table_schema from table_schema_mapping where flow_name = $1", flowName) diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index e98613a958..e7d67f294b 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -95,6 +95,17 @@ func (h *FlowRequestHandler) createCdcJobEntry(ctx context.Context, connectionConfigs.FlowJobName, err) } + // Insert the table mappings into the DB + tableMappingsBytes, err := internal.TableMappingsToBytes(req.TableMappings) + if err != nil { + return fmt.Errorf("unable to marshal table mappings: %w", err) + } + + stmt := `INSERT INTO table_mappings (flow_name, version, table_mapping) VALUES ($1, $2, $3) + ON CONFLICT (flow_name, version) DO UPDATE SET table_mapping = EXCLUDED.table_mapping` + version := 1 + _, err = h.pool.Exec(ctx, stmt, req.ConnectionConfigs.FlowJobName, version, tableMappingsBytes) + return nil } @@ -458,12 +469,18 @@ func (h *FlowRequestHandler) FlowStateChange( if err != nil { return nil, NewInternalApiError(fmt.Errorf("unable to get flow config: %w", err)) } + tableMappings, err := internal.FetchTableMappingsFromDB(ctx, h.pool, req.FlowJobName) + + if err != nil { + return nil, NewInternalApiError(fmt.Errorf("unable to get table mappings: %w", err)) + } config.Resync = true config.DoInitialSnapshot = true // validate mirror first because once the mirror is dropped, there's no going back if _, err := h.ValidateCDCMirror(ctx, &protos.CreateCDCFlowRequest{ ConnectionConfigs: config, + TableMappings: tableMappings, }); err != nil { return nil, NewFailedPreconditionApiError(fmt.Errorf("invalid mirror: %w", err)) } diff --git a/flow/connectors/clickhouse/validate.go b/flow/connectors/clickhouse/validate.go index e527a23bdd..9ea8fe2dc8 100644 --- a/flow/connectors/clickhouse/validate.go +++ b/flow/connectors/clickhouse/validate.go @@ -16,6 +16,7 @@ func (c *ClickHouseConnector) ValidateMirrorDestination( ctx context.Context, cfg *protos.FlowConnectionConfigsCore, tableNameSchemaMapping map[string]*protos.TableSchema, + tableMappings []*protos.TableMapping, ) error { if internal.PeerDBOnlyClickHouseAllowed() { err := chvalidate.CheckIfClickHouseCloudHasSharedMergeTreeEnabled(ctx, c.logger, c.database) @@ -34,7 +35,7 @@ func (c *ClickHouseConnector) ValidateMirrorDestination( } // this is for handling column exclusion, processed schema does that in a step - processedMapping := internal.BuildProcessedSchemaMapping(cfg.TableMappings, tableNameSchemaMapping, c.logger) + processedMapping := internal.BuildProcessedSchemaMapping(tableMappings, tableNameSchemaMapping, c.logger) dstTableNames := slices.Collect(maps.Keys(processedMapping)) // In the case of resync, we don't need to check the content or structure of the original tables; @@ -64,7 +65,7 @@ func (c *ClickHouseConnector) ValidateMirrorDestination( return err } - for _, tableMapping := range cfg.TableMappings { + for _, tableMapping := range tableMappings { dstTableName := tableMapping.DestinationTableIdentifier if _, ok := processedMapping[dstTableName]; !ok { // if destination table is not a key, that means source table was not a key in the original schema mapping(?) diff --git a/flow/internal/flow_configuration_helpers.go b/flow/internal/flow_configuration_helpers.go index 250bbc13a2..8cdb0f954a 100644 --- a/flow/internal/flow_configuration_helpers.go +++ b/flow/internal/flow_configuration_helpers.go @@ -42,3 +42,49 @@ func FetchConfigFromDB(ctx context.Context, catalogPool shared.CatalogPool, flow return &cfgFromDB, nil } + +func TableMappingsToBytes(tableMappings []*protos.TableMapping) ([][]byte, error) { + tableMappingsBytes := [][]byte{} + for _, tableMapping := range tableMappings { + tableMappingBytes, err := proto.Marshal(tableMapping) + if err != nil { + return nil, fmt.Errorf("failed to marshal table mapping to migrate to catalog: %w", err) + } + tableMappingsBytes = append(tableMappingsBytes, tableMappingBytes) + } + return tableMappingsBytes, nil +} + +func FetchTableMappingsFromDB(ctx context.Context, flowJobName string, version uint32) ([]*protos.TableMapping, error) { + pool, err := GetCatalogConnectionPoolFromEnv(ctx) + if err != nil { + return nil, err + } + rows, err := pool.Query(ctx, + "select table_mapping from table_mappings where flow_name = $1 AND version = $2", + flowJobName, version, + ) + if err != nil { + return nil, err + } + + var tableMappingsBytes [][]byte + var tableMappings []*protos.TableMapping = []*protos.TableMapping{} + + err = rows.Scan([]any{&tableMappingsBytes}) + if err != nil { + return nil, fmt.Errorf("failed to deserialize table mapping schema proto: %w", err) + } + + for _, tableMappingBytes := range tableMappingsBytes { + + var tableMapping *protos.TableMapping + if err := proto.Unmarshal(tableMappingBytes, tableMapping); err != nil { + return nil, err + } + + tableMappings = append(tableMappings, tableMapping) + } + + return tableMappings, nil +} diff --git a/protos/route.proto b/protos/route.proto index e051e0c94d..8f8f05d296 100644 --- a/protos/route.proto +++ b/protos/route.proto @@ -12,6 +12,7 @@ package peerdb_route; message CreateCDCFlowRequest { peerdb_flow.FlowConnectionConfigs connection_configs = 1; bool attach_to_existing = 2; + repeated peerdb_flow.TableMapping table_mappings = 3; } message CreateCDCFlowResponse { string workflow_id = 1; } From fdf4dc8eaa912068f3d94c75ba044d77ea665c60 Mon Sep 17 00:00:00 2001 From: Stoica Alexandru Date: Mon, 13 Oct 2025 17:27:34 +0100 Subject: [PATCH 4/6] Dealing with the removed tableMappings - pt2; build is passing --- flow/activities/flowable.go | 37 +++++++++++++++++++--------- flow/activities/flowable_core.go | 7 +++++- flow/activities/snapshot_activity.go | 12 +++++++-- flow/cmd/handler.go | 3 +-- flow/cmd/mirror_status.go | 3 ++- flow/workflows/cdc_flow.go | 6 ++++- flow/workflows/snapshot_flow.go | 9 ++++++- 7 files changed, 57 insertions(+), 20 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 8870916075..c575fbe4cc 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -132,7 +132,12 @@ func (a *FlowableActivity) EnsurePullability( if err != nil { return nil, err } - config.SourceTableIdentifiers = slices.Sorted(maps.Keys(internal.TableNameMapping(cfg.TableMappings, cfg.Resync))) + tableMappings, err := internal.FetchTableMappingsFromDB(ctx, config.FlowJobName, cfg.TableMappingVersion) + if err != nil { + return nil, err + } + + config.SourceTableIdentifiers = slices.Sorted(maps.Keys(internal.TableNameMapping(tableMappings, cfg.Resync))) output, err := srcConn.EnsurePullability(ctx, config) if err != nil { @@ -154,12 +159,6 @@ func (a *FlowableActivity) CreateRawTable( } defer connectors.CloseConnector(ctx, dstConn) - cfg, err := internal.FetchConfigFromDB(config.FlowJobName, ctx) - if err != nil { - return nil, err - } - config.TableNameMapping = internal.TableNameMapping(cfg.TableMappings, cfg.Resync) - res, err := dstConn.CreateRawTable(ctx, config) if err != nil { return nil, a.Alerter.LogFlowError(ctx, config.FlowJobName, err) @@ -193,11 +192,15 @@ func (a *FlowableActivity) SetupTableSchema( if err != nil { return err } - tableNameSchemaMapping, err := srcConn.GetTableSchema(ctx, config.Env, config.Version, config.System, cfg.TableMappings) + tableMappings, err := internal.FetchTableMappingsFromDB(ctx, cfg.FlowJobName, cfg.TableMappingVersion) + if err != nil { + return err + } + tableNameSchemaMapping, err := srcConn.GetTableSchema(ctx, config.Env, config.Version, config.System, tableMappings) if err != nil { return a.Alerter.LogFlowError(ctx, config.FlowName, fmt.Errorf("failed to get GetTableSchemaConnector: %w", err)) } - processed := internal.BuildProcessedSchemaMapping(cfg.TableMappings, tableNameSchemaMapping, logger) + processed := internal.BuildProcessedSchemaMapping(tableMappings, tableNameSchemaMapping, logger) tx, err := a.CatalogPool.BeginTx(ctx, pgx.TxOptions{}) if err != nil { @@ -266,9 +269,13 @@ func (a *FlowableActivity) CreateNormalizedTable( if err != nil { return nil, err } + tableMappings, err := internal.FetchTableMappingsFromDB(ctx, cfg.FlowJobName, cfg.TableMappingVersion) + if err != nil { + return nil, err + } numTablesToSetup.Store(int32(len(tableNameSchemaMapping))) tableExistsMapping := make(map[string]bool, len(tableNameSchemaMapping)) - for _, tableMapping := range cfg.TableMappings { + for _, tableMapping := range tableMappings { tableIdentifier := tableMapping.DestinationTableIdentifier tableSchema := tableNameSchemaMapping[tableIdentifier] existing, err := conn.SetupNormalizedTable( @@ -321,9 +328,14 @@ func (a *FlowableActivity) SyncFlow( return err } + tableMappings, err := internal.FetchTableMappingsFromDB(ctx, cfg.FlowJobName, cfg.TableMappingVersion) + if err != nil { + return err + } + // Override config with DB values to deal with the large fields. config = cfg - options.TableMappings = cfg.TableMappings + options.TableMappings = tableMappings syncState.Store(shared.Ptr("setup")) shutdown := heartbeatRoutine(ctx, func() string { @@ -1073,7 +1085,8 @@ func (a *FlowableActivity) RecordMetricsCritical(ctx context.Context) error { if isActive { activeFlows = append(activeFlows, info) } - a.OtelManager.Metrics.SyncedTablesGauge.Record(ctx, int64(len(info.config.TableMappings))) + //TODO: this will need a special query as we can extract this straight from the DB. + //a.OtelManager.Metrics.SyncedTablesGauge.Record(ctx, int64(len(info.config.TableMappings))) a.OtelManager.Metrics.FlowStatusGauge.Record(ctx, 1, metric.WithAttributeSet(attribute.NewSet( attribute.String(otel_metrics.FlowStatusKey, info.status.String()), attribute.Bool(otel_metrics.IsFlowActiveKey, isActive), diff --git a/flow/activities/flowable_core.go b/flow/activities/flowable_core.go index 3efaa94a7f..a87c9efc58 100644 --- a/flow/activities/flowable_core.go +++ b/flow/activities/flowable_core.go @@ -675,13 +675,18 @@ func (a *FlowableActivity) startNormalize( return fmt.Errorf("failed to get table name schema mapping: %w", err) } + tableMappings, err := internal.FetchTableMappingsFromDB(ctx, config.FlowJobName, config.TableMappingVersion) + if err != nil { + return err + } + for { logger.Info("normalizing batch", slog.Int64("syncBatchID", batchID)) res, err := dstConn.NormalizeRecords(ctx, &model.NormalizeRecordsRequest{ FlowJobName: config.FlowJobName, Env: config.Env, TableNameSchemaMapping: tableNameSchemaMapping, - TableMappings: config.TableMappings, + TableMappings: tableMappings, SoftDeleteColName: config.SoftDeleteColName, SyncedAtColName: config.SyncedAtColName, SyncBatchID: batchID, diff --git a/flow/activities/snapshot_activity.go b/flow/activities/snapshot_activity.go index 074e4479f0..c8f0c024a9 100644 --- a/flow/activities/snapshot_activity.go +++ b/flow/activities/snapshot_activity.go @@ -71,7 +71,11 @@ func (a *SnapshotActivity) SetupReplication( if err != nil { return nil, err } - config.TableNameMapping = internal.TableNameMapping(cfg.TableMappings, cfg.Resync) + tableMappings, err := internal.FetchTableMappingsFromDB(ctx, cfg.FlowJobName, cfg.TableMappingVersion) + if err != nil { + return nil, err + } + config.TableNameMapping = internal.TableNameMapping(tableMappings, cfg.Resync) logger.Info("waiting for slot to be created...") slotInfo, err := conn.SetupReplication(ctx, config) @@ -192,8 +196,12 @@ func (a *SnapshotActivity) GetDefaultPartitionKeyForTables( if err != nil { return nil, err } + tableMappings, err := internal.FetchTableMappingsFromDB(ctx, cfg.FlowJobName, cfg.TableMappingVersion) + if err != nil { + return nil, err + } output, err := connector.GetDefaultPartitionKeyForTables(ctx, &protos.GetDefaultPartitionKeyForTablesInput{ - TableMappings: cfg.TableMappings, + TableMappings: tableMappings, }) if err != nil { return nil, a.Alerter.LogFlowError(ctx, input.FlowJobName, fmt.Errorf("failed to check if tables can parallel load: %w", err)) diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index e7d67f294b..311e33e924 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -469,8 +469,7 @@ func (h *FlowRequestHandler) FlowStateChange( if err != nil { return nil, NewInternalApiError(fmt.Errorf("unable to get flow config: %w", err)) } - tableMappings, err := internal.FetchTableMappingsFromDB(ctx, h.pool, req.FlowJobName) - + tableMappings, err := internal.FetchTableMappingsFromDB(ctx, config.FlowJobName, config.TableMappingVersion) if err != nil { return nil, NewInternalApiError(fmt.Errorf("unable to get table mappings: %w", err)) } diff --git a/flow/cmd/mirror_status.go b/flow/cmd/mirror_status.go index 58069a98c0..368eb10718 100644 --- a/flow/cmd/mirror_status.go +++ b/flow/cmd/mirror_status.go @@ -163,7 +163,8 @@ func (h *FlowRequestHandler) cdcFlowStatus( if state.SyncFlowOptions != nil { config.IdleTimeoutSeconds = state.SyncFlowOptions.IdleTimeoutSeconds config.MaxBatchSize = state.SyncFlowOptions.BatchSize - config.TableMappings = state.SyncFlowOptions.TableMappings + //TODO: this will need to fetch from the DB? + //config.TableMappings = state.SyncFlowOptions.TableMappings } srcType, err := connectors.LoadPeerType(ctx, h.pool, config.SourceName) diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 8a99745f9f..de1b7132e0 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -327,7 +327,11 @@ func processTableAdditions( additionalTablesCfg := proto.CloneOf(cfg) additionalTablesCfg.DoInitialSnapshot = !flowConfigUpdate.SkipInitialSnapshotForTableAdditions additionalTablesCfg.InitialSnapshotOnly = true - additionalTablesCfg.TableMappings = flowConfigUpdate.AdditionalTables + // TODO: thought - maybe we need to pass additionalTables and then in the + // CDCWorkflow we persist them to the DB and send an incremented `tableMappingVersion` + // to the new workflow? + //additionalTablesCfg.TableMappings = flowConfigUpdate.AdditionalTables + additionalTablesCfg.Resync = false if state.SnapshotNumRowsPerPartition > 0 { additionalTablesCfg.SnapshotNumRowsPerPartition = state.SnapshotNumRowsPerPartition diff --git a/flow/workflows/snapshot_flow.go b/flow/workflows/snapshot_flow.go index 40d96efff3..382b99edff 100644 --- a/flow/workflows/snapshot_flow.go +++ b/flow/workflows/snapshot_flow.go @@ -287,7 +287,14 @@ func (s *SnapshotFlowExecution) cloneTables( return err } - for _, v := range cfg.TableMappings { + tableMappingsCtx := context.Background() + defer tableMappingsCtx.Done() + tableMappings, err := internal.FetchTableMappingsFromDB(tableMappingsCtx, cfg.FlowJobName, cfg.TableMappingVersion) + if err != nil { + return err + } + + for _, v := range tableMappings { source := v.SourceTableIdentifier destination := v.DestinationTableIdentifier s.logger.Info( From ae6f8216057f4f216df324a6376dad2372a54a1d Mon Sep 17 00:00:00 2001 From: Stoica Alexandru Date: Tue, 14 Oct 2025 17:47:54 +0100 Subject: [PATCH 5/6] ff do not push --- flow/internal/flow_configuration_helpers.go | 2 +- nexus/catalog/migrations/V49__table_mappings_table.sql | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/flow/internal/flow_configuration_helpers.go b/flow/internal/flow_configuration_helpers.go index 8cdb0f954a..282a55358b 100644 --- a/flow/internal/flow_configuration_helpers.go +++ b/flow/internal/flow_configuration_helpers.go @@ -61,7 +61,7 @@ func FetchTableMappingsFromDB(ctx context.Context, flowJobName string, version u return nil, err } rows, err := pool.Query(ctx, - "select table_mapping from table_mappings where flow_name = $1 AND version = $2", + "select table_mappings from table_mappings where flow_name = $1 AND version = $2", flowJobName, version, ) if err != nil { diff --git a/nexus/catalog/migrations/V49__table_mappings_table.sql b/nexus/catalog/migrations/V49__table_mappings_table.sql index a0f3649d7a..9b0a9cbe89 100644 --- a/nexus/catalog/migrations/V49__table_mappings_table.sql +++ b/nexus/catalog/migrations/V49__table_mappings_table.sql @@ -1,6 +1,6 @@ CREATE TABLE IF NOT EXISTS table_mappings ( - flow_name varchar(255) not null,Expand commentComment on line R2ResolvedCode has comments. Press enter to view. - version int32 not null default 1, - table_mappings []bytea not null, + flow_name varchar(255) not null, + version int not null default 1, + table_mappings bytea[] not null, primary key (flow_name, version) ); From d4c3aebefbd99789302b6e172f25aa5f55bd970e Mon Sep 17 00:00:00 2001 From: Stoica Alexandru Date: Thu, 30 Oct 2025 10:03:38 +0000 Subject: [PATCH 6/6] ff rebase --- flow/activities/flowable.go | 8 ++++---- flow/activities/snapshot_activity.go | 4 ++-- flow/cmd/handler.go | 4 ++-- flow/connectors/clickhouse/validate.go | 2 +- flow/e2e/congen.go | 1 - flow/workflows/snapshot_flow.go | 8 +++++++- nexus/catalog/migrations/V49__table_mappings_table.sql | 2 +- protos/flow.proto | 1 + 8 files changed, 18 insertions(+), 12 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index c575fbe4cc..ba9c791758 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -128,7 +128,7 @@ func (a *FlowableActivity) EnsurePullability( defer connectors.CloseConnector(ctx, srcConn) // We can fetch from the DB, as we are in the activity - cfg, err := internal.FetchConfigFromDB(config.FlowJobName, ctx) + cfg, err := internal.FetchConfigFromDB(ctx, a.CatalogPool, config.FlowJobName) if err != nil { return nil, err } @@ -188,7 +188,7 @@ func (a *FlowableActivity) SetupTableSchema( } defer connectors.CloseConnector(ctx, srcConn) - cfg, err := internal.FetchConfigFromDB(config.FlowName, ctx) + cfg, err := internal.FetchConfigFromDB(ctx, a.CatalogPool, config.FlowName) if err != nil { return err } @@ -265,7 +265,7 @@ func (a *FlowableActivity) CreateNormalizedTable( return nil, err } - cfg, err := internal.FetchConfigFromDB(config.FlowName, ctx) + cfg, err := internal.FetchConfigFromDB(ctx, a.CatalogPool, config.FlowName) if err != nil { return nil, err } @@ -323,7 +323,7 @@ func (a *FlowableActivity) SyncFlow( var syncingBatchID atomic.Int64 var syncState atomic.Pointer[string] - cfg, err := internal.FetchConfigFromDB(config.FlowJobName, ctx) + cfg, err := internal.FetchConfigFromDB(ctx, a.CatalogPool, config.FlowJobName) if err != nil { return err } diff --git a/flow/activities/snapshot_activity.go b/flow/activities/snapshot_activity.go index c8f0c024a9..7ab8cf588a 100644 --- a/flow/activities/snapshot_activity.go +++ b/flow/activities/snapshot_activity.go @@ -67,7 +67,7 @@ func (a *SnapshotActivity) SetupReplication( configCtx := context.Background() defer configCtx.Done() - cfg, err := internal.FetchConfigFromDB(config.FlowJobName, configCtx) + cfg, err := internal.FetchConfigFromDB(configCtx, a.CatalogPool, config.FlowJobName) if err != nil { return nil, err } @@ -192,7 +192,7 @@ func (a *SnapshotActivity) GetDefaultPartitionKeyForTables( } defer connectors.CloseConnector(ctx, connector) - cfg, err := internal.FetchConfigFromDB(input.FlowJobName, ctx) + cfg, err := internal.FetchConfigFromDB(ctx, a.CatalogPool, input.FlowJobName) if err != nil { return nil, err } diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index 311e33e924..6df9cfda55 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -96,7 +96,7 @@ func (h *FlowRequestHandler) createCdcJobEntry(ctx context.Context, } // Insert the table mappings into the DB - tableMappingsBytes, err := internal.TableMappingsToBytes(req.TableMappings) + tableMappingsBytes, err := internal.TableMappingsToBytes(connectionConfigs.TableMappings) if err != nil { return fmt.Errorf("unable to marshal table mappings: %w", err) } @@ -104,7 +104,7 @@ func (h *FlowRequestHandler) createCdcJobEntry(ctx context.Context, stmt := `INSERT INTO table_mappings (flow_name, version, table_mapping) VALUES ($1, $2, $3) ON CONFLICT (flow_name, version) DO UPDATE SET table_mapping = EXCLUDED.table_mapping` version := 1 - _, err = h.pool.Exec(ctx, stmt, req.ConnectionConfigs.FlowJobName, version, tableMappingsBytes) + _, err = h.pool.Exec(ctx, stmt, connectionConfigs.FlowJobName, version, tableMappingsBytes) return nil } diff --git a/flow/connectors/clickhouse/validate.go b/flow/connectors/clickhouse/validate.go index 9ea8fe2dc8..8f7e01c3b7 100644 --- a/flow/connectors/clickhouse/validate.go +++ b/flow/connectors/clickhouse/validate.go @@ -16,8 +16,8 @@ func (c *ClickHouseConnector) ValidateMirrorDestination( ctx context.Context, cfg *protos.FlowConnectionConfigsCore, tableNameSchemaMapping map[string]*protos.TableSchema, - tableMappings []*protos.TableMapping, ) error { + tableMappings := cfg.TableMappings if internal.PeerDBOnlyClickHouseAllowed() { err := chvalidate.CheckIfClickHouseCloudHasSharedMergeTreeEnabled(ctx, c.logger, c.database) if err != nil { diff --git a/flow/e2e/congen.go b/flow/e2e/congen.go index cc579fbc7e..049842aec2 100644 --- a/flow/e2e/congen.go +++ b/flow/e2e/congen.go @@ -72,7 +72,6 @@ func (c *FlowConnectionGenerationConfig) GenerateFlowConnectionConfigs(s Suite) ret := &protos.FlowConnectionConfigs{ FlowJobName: c.FlowJobName, - TableMappings: tblMappings, SourceName: s.Source().GeneratePeer(t).Name, DestinationName: c.Destination, SyncedAtColName: "_PEERDB_SYNCED_AT", diff --git a/flow/workflows/snapshot_flow.go b/flow/workflows/snapshot_flow.go index 382b99edff..2ffb248c8e 100644 --- a/flow/workflows/snapshot_flow.go +++ b/flow/workflows/snapshot_flow.go @@ -282,7 +282,13 @@ func (s *SnapshotFlowExecution) cloneTables( configCtx := context.Background() defer configCtx.Done() - cfg, err := internal.FetchConfigFromDB(s.config.FlowJobName, configCtx) + pool, err := internal.GetCatalogConnectionPoolFromEnv(configCtx) + if err != nil { + return err + } + defer pool.Pool.Close() + + cfg, err := internal.FetchConfigFromDB(configCtx, pool, s.config.FlowJobName) if err != nil { return err } diff --git a/nexus/catalog/migrations/V49__table_mappings_table.sql b/nexus/catalog/migrations/V49__table_mappings_table.sql index 9b0a9cbe89..5a4e78cc64 100644 --- a/nexus/catalog/migrations/V49__table_mappings_table.sql +++ b/nexus/catalog/migrations/V49__table_mappings_table.sql @@ -1,6 +1,6 @@ CREATE TABLE IF NOT EXISTS table_mappings ( flow_name varchar(255) not null, - version int not null default 1, + version bigint not null default 1, table_mappings bytea[] not null, primary key (flow_name, version) ); diff --git a/protos/flow.proto b/protos/flow.proto index 5329a70058..a7f6a1d035 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -139,6 +139,7 @@ message FlowConnectionConfigsCore { map env = 24; uint32 version = 25; + uint32 table_mapping_version = 27; } message RenameTableOption {