diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index e0e46a8777..ba9c791758 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -5,8 +5,10 @@ import ( "errors" "fmt" "log/slog" + "maps" "net" "os" + "slices" "strconv" "sync/atomic" "time" @@ -125,6 +127,18 @@ func (a *FlowableActivity) EnsurePullability( } defer connectors.CloseConnector(ctx, srcConn) + // We can fetch from the DB, as we are in the activity + cfg, err := internal.FetchConfigFromDB(ctx, a.CatalogPool, config.FlowJobName) + if err != nil { + return nil, err + } + tableMappings, err := internal.FetchTableMappingsFromDB(ctx, config.FlowJobName, cfg.TableMappingVersion) + if err != nil { + return nil, err + } + + config.SourceTableIdentifiers = slices.Sorted(maps.Keys(internal.TableNameMapping(tableMappings, cfg.Resync))) + output, err := srcConn.EnsurePullability(ctx, config) if err != nil { return nil, a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("failed to ensure pullability: %w", err)) @@ -174,11 +188,19 @@ func (a *FlowableActivity) SetupTableSchema( } defer connectors.CloseConnector(ctx, srcConn) - tableNameSchemaMapping, err := srcConn.GetTableSchema(ctx, config.Env, config.Version, config.System, config.TableMappings) + cfg, err := internal.FetchConfigFromDB(ctx, a.CatalogPool, config.FlowName) + if err != nil { + return err + } + tableMappings, err := internal.FetchTableMappingsFromDB(ctx, cfg.FlowJobName, cfg.TableMappingVersion) + if err != nil { + return err + } + tableNameSchemaMapping, err := srcConn.GetTableSchema(ctx, config.Env, config.Version, config.System, tableMappings) if err != nil { return a.Alerter.LogFlowError(ctx, config.FlowName, fmt.Errorf("failed to get GetTableSchemaConnector: %w", err)) } - processed := internal.BuildProcessedSchemaMapping(config.TableMappings, tableNameSchemaMapping, logger) + processed := internal.BuildProcessedSchemaMapping(tableMappings, tableNameSchemaMapping, logger) tx, err := a.CatalogPool.BeginTx(ctx, pgx.TxOptions{}) if err != nil { @@ -243,9 +265,17 @@ func (a *FlowableActivity) CreateNormalizedTable( return nil, err } + cfg, err := internal.FetchConfigFromDB(ctx, a.CatalogPool, config.FlowName) + if err != nil { + return nil, err + } + tableMappings, err := internal.FetchTableMappingsFromDB(ctx, cfg.FlowJobName, cfg.TableMappingVersion) + if err != nil { + return nil, err + } numTablesToSetup.Store(int32(len(tableNameSchemaMapping))) tableExistsMapping := make(map[string]bool, len(tableNameSchemaMapping)) - for _, tableMapping := range config.TableMappings { + for _, tableMapping := range tableMappings { tableIdentifier := tableMapping.DestinationTableIdentifier tableSchema := tableNameSchemaMapping[tableIdentifier] existing, err := conn.SetupNormalizedTable( @@ -292,6 +322,21 @@ func (a *FlowableActivity) SyncFlow( var normalizeWaiting atomic.Bool var syncingBatchID atomic.Int64 var syncState atomic.Pointer[string] + + cfg, err := internal.FetchConfigFromDB(ctx, a.CatalogPool, config.FlowJobName) + if err != nil { + return err + } + + tableMappings, err := internal.FetchTableMappingsFromDB(ctx, cfg.FlowJobName, cfg.TableMappingVersion) + if err != nil { + return err + } + + // Override config with DB values to deal with the large fields. + config = cfg + options.TableMappings = tableMappings + syncState.Store(shared.Ptr("setup")) shutdown := heartbeatRoutine(ctx, func() string { // Must load Waiting after BatchID to avoid race saying we're waiting on currently processing batch @@ -1040,7 +1085,8 @@ func (a *FlowableActivity) RecordMetricsCritical(ctx context.Context) error { if isActive { activeFlows = append(activeFlows, info) } - a.OtelManager.Metrics.SyncedTablesGauge.Record(ctx, int64(len(info.config.TableMappings))) + //TODO: this will need a special query as we can extract this straight from the DB. + //a.OtelManager.Metrics.SyncedTablesGauge.Record(ctx, int64(len(info.config.TableMappings))) a.OtelManager.Metrics.FlowStatusGauge.Record(ctx, 1, metric.WithAttributeSet(attribute.NewSet( attribute.String(otel_metrics.FlowStatusKey, info.status.String()), attribute.Bool(otel_metrics.IsFlowActiveKey, isActive), @@ -1789,3 +1835,30 @@ func (a *FlowableActivity) ReportStatusMetric(ctx context.Context, status protos ))) return nil } + +func (a *FlowableActivity) MigrateTableMappingsToCatalog( + ctx context.Context, + flowJobName string, tableMappings []*protos.TableMapping, version uint32, +) error { + logger := internal.LoggerFromCtx(ctx) + tx, err := a.CatalogPool.Begin(ctx) + if err != nil { + return fmt.Errorf("failed to begin transaction to migrate table mappings to catalog: %w", err) + } + defer shared.RollbackTx(tx, logger) + + tableMappingsBytes, err := internal.TableMappingsToBytes(tableMappings) + if err != nil { + return fmt.Errorf("unable to marshal table mappings: %w", err) + } + + stmt := `INSERT INTO table_mappings (flow_name, version, table_mapping) VALUES ($1, $2, $3) + ON CONFLICT (flow_name, version) DO UPDATE SET table_mapping = EXCLUDED.table_mapping` + _, err = tx.Exec(ctx, stmt, flowJobName, version, tableMappingsBytes) + + if err := tx.Commit(ctx); err != nil { + return fmt.Errorf("failed to commit transaction to migrate table mappings to catalog: %w", err) + } + + return nil +} diff --git a/flow/activities/flowable_core.go b/flow/activities/flowable_core.go index 15179a1603..a87c9efc58 100644 --- a/flow/activities/flowable_core.go +++ b/flow/activities/flowable_core.go @@ -55,6 +55,33 @@ func heartbeatRoutine( ) } +//func (a *FlowableActivity) getTableMappings(ctx context.Context, flowName string, version uint) ([]*protos.TableMapping, error) { +//rows, err := a.CatalogPool.Query(ctx, "select table_mapping from table_mappings where flow_name = $1 AND version = $2", flowName, version) +//if err != nil { +//return nil, err +//} + +//var tableMappingsBytes [][]byte +//var tableMappings []*protos.TableMapping = []*protos.TableMapping{} + +//err = rows.Scan([]any{&tableMappingsBytes}) +//if err != nil { +//return nil, fmt.Errorf("failed to deserialize table mapping schema proto: %w", err) +//} + +//for _, tableMappingBytes := range tableMappingsBytes { + +//var tableMapping *protos.TableMapping +//if err := proto.Unmarshal(tableMappingBytes, tableMapping); err != nil { +//return nil, err +//} + +//tableMappings = append(tableMappings, tableMapping) +//} + +//return tableMappings, nil +//} + func (a *FlowableActivity) getTableNameSchemaMapping(ctx context.Context, flowName string) (map[string]*protos.TableSchema, error) { rows, err := a.CatalogPool.Query(ctx, "select table_name, table_schema from table_schema_mapping where flow_name = $1", flowName) if err != nil { @@ -648,13 +675,18 @@ func (a *FlowableActivity) startNormalize( return fmt.Errorf("failed to get table name schema mapping: %w", err) } + tableMappings, err := internal.FetchTableMappingsFromDB(ctx, config.FlowJobName, config.TableMappingVersion) + if err != nil { + return err + } + for { logger.Info("normalizing batch", slog.Int64("syncBatchID", batchID)) res, err := dstConn.NormalizeRecords(ctx, &model.NormalizeRecordsRequest{ FlowJobName: config.FlowJobName, Env: config.Env, TableNameSchemaMapping: tableNameSchemaMapping, - TableMappings: config.TableMappings, + TableMappings: tableMappings, SoftDeleteColName: config.SoftDeleteColName, SyncedAtColName: config.SyncedAtColName, SyncBatchID: batchID, diff --git a/flow/activities/snapshot_activity.go b/flow/activities/snapshot_activity.go index a8c7cb5837..7ab8cf588a 100644 --- a/flow/activities/snapshot_activity.go +++ b/flow/activities/snapshot_activity.go @@ -65,6 +65,18 @@ func (a *SnapshotActivity) SetupReplication( return nil, a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("failed to get connector: %w", err)) } + configCtx := context.Background() + defer configCtx.Done() + cfg, err := internal.FetchConfigFromDB(configCtx, a.CatalogPool, config.FlowJobName) + if err != nil { + return nil, err + } + tableMappings, err := internal.FetchTableMappingsFromDB(ctx, cfg.FlowJobName, cfg.TableMappingVersion) + if err != nil { + return nil, err + } + config.TableNameMapping = internal.TableNameMapping(tableMappings, cfg.Resync) + logger.Info("waiting for slot to be created...") slotInfo, err := conn.SetupReplication(ctx, config) @@ -180,8 +192,16 @@ func (a *SnapshotActivity) GetDefaultPartitionKeyForTables( } defer connectors.CloseConnector(ctx, connector) + cfg, err := internal.FetchConfigFromDB(ctx, a.CatalogPool, input.FlowJobName) + if err != nil { + return nil, err + } + tableMappings, err := internal.FetchTableMappingsFromDB(ctx, cfg.FlowJobName, cfg.TableMappingVersion) + if err != nil { + return nil, err + } output, err := connector.GetDefaultPartitionKeyForTables(ctx, &protos.GetDefaultPartitionKeyForTablesInput{ - TableMappings: input.TableMappings, + TableMappings: tableMappings, }) if err != nil { return nil, a.Alerter.LogFlowError(ctx, input.FlowJobName, fmt.Errorf("failed to check if tables can parallel load: %w", err)) diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index e98613a958..6df9cfda55 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -95,6 +95,17 @@ func (h *FlowRequestHandler) createCdcJobEntry(ctx context.Context, connectionConfigs.FlowJobName, err) } + // Insert the table mappings into the DB + tableMappingsBytes, err := internal.TableMappingsToBytes(connectionConfigs.TableMappings) + if err != nil { + return fmt.Errorf("unable to marshal table mappings: %w", err) + } + + stmt := `INSERT INTO table_mappings (flow_name, version, table_mapping) VALUES ($1, $2, $3) + ON CONFLICT (flow_name, version) DO UPDATE SET table_mapping = EXCLUDED.table_mapping` + version := 1 + _, err = h.pool.Exec(ctx, stmt, connectionConfigs.FlowJobName, version, tableMappingsBytes) + return nil } @@ -458,12 +469,17 @@ func (h *FlowRequestHandler) FlowStateChange( if err != nil { return nil, NewInternalApiError(fmt.Errorf("unable to get flow config: %w", err)) } + tableMappings, err := internal.FetchTableMappingsFromDB(ctx, config.FlowJobName, config.TableMappingVersion) + if err != nil { + return nil, NewInternalApiError(fmt.Errorf("unable to get table mappings: %w", err)) + } config.Resync = true config.DoInitialSnapshot = true // validate mirror first because once the mirror is dropped, there's no going back if _, err := h.ValidateCDCMirror(ctx, &protos.CreateCDCFlowRequest{ ConnectionConfigs: config, + TableMappings: tableMappings, }); err != nil { return nil, NewFailedPreconditionApiError(fmt.Errorf("invalid mirror: %w", err)) } diff --git a/flow/cmd/mirror_status.go b/flow/cmd/mirror_status.go index 58069a98c0..368eb10718 100644 --- a/flow/cmd/mirror_status.go +++ b/flow/cmd/mirror_status.go @@ -163,7 +163,8 @@ func (h *FlowRequestHandler) cdcFlowStatus( if state.SyncFlowOptions != nil { config.IdleTimeoutSeconds = state.SyncFlowOptions.IdleTimeoutSeconds config.MaxBatchSize = state.SyncFlowOptions.BatchSize - config.TableMappings = state.SyncFlowOptions.TableMappings + //TODO: this will need to fetch from the DB? + //config.TableMappings = state.SyncFlowOptions.TableMappings } srcType, err := connectors.LoadPeerType(ctx, h.pool, config.SourceName) diff --git a/flow/connectors/clickhouse/validate.go b/flow/connectors/clickhouse/validate.go index e527a23bdd..8f7e01c3b7 100644 --- a/flow/connectors/clickhouse/validate.go +++ b/flow/connectors/clickhouse/validate.go @@ -17,6 +17,7 @@ func (c *ClickHouseConnector) ValidateMirrorDestination( cfg *protos.FlowConnectionConfigsCore, tableNameSchemaMapping map[string]*protos.TableSchema, ) error { + tableMappings := cfg.TableMappings if internal.PeerDBOnlyClickHouseAllowed() { err := chvalidate.CheckIfClickHouseCloudHasSharedMergeTreeEnabled(ctx, c.logger, c.database) if err != nil { @@ -34,7 +35,7 @@ func (c *ClickHouseConnector) ValidateMirrorDestination( } // this is for handling column exclusion, processed schema does that in a step - processedMapping := internal.BuildProcessedSchemaMapping(cfg.TableMappings, tableNameSchemaMapping, c.logger) + processedMapping := internal.BuildProcessedSchemaMapping(tableMappings, tableNameSchemaMapping, c.logger) dstTableNames := slices.Collect(maps.Keys(processedMapping)) // In the case of resync, we don't need to check the content or structure of the original tables; @@ -64,7 +65,7 @@ func (c *ClickHouseConnector) ValidateMirrorDestination( return err } - for _, tableMapping := range cfg.TableMappings { + for _, tableMapping := range tableMappings { dstTableName := tableMapping.DestinationTableIdentifier if _, ok := processedMapping[dstTableName]; !ok { // if destination table is not a key, that means source table was not a key in the original schema mapping(?) diff --git a/flow/e2e/congen.go b/flow/e2e/congen.go index cc579fbc7e..049842aec2 100644 --- a/flow/e2e/congen.go +++ b/flow/e2e/congen.go @@ -72,7 +72,6 @@ func (c *FlowConnectionGenerationConfig) GenerateFlowConnectionConfigs(s Suite) ret := &protos.FlowConnectionConfigs{ FlowJobName: c.FlowJobName, - TableMappings: tblMappings, SourceName: s.Source().GeneratePeer(t).Name, DestinationName: c.Destination, SyncedAtColName: "_PEERDB_SYNCED_AT", diff --git a/flow/internal/flow_configuration_helpers.go b/flow/internal/flow_configuration_helpers.go index ee1b331da0..282a55358b 100644 --- a/flow/internal/flow_configuration_helpers.go +++ b/flow/internal/flow_configuration_helpers.go @@ -11,6 +11,22 @@ import ( "github.com/PeerDB-io/peerdb/flow/shared" ) +func TableNameMapping(tableMappings []*protos.TableMapping, resync bool) map[string]string { + tblNameMapping := make(map[string]string, len(tableMappings)) + if resync { + for _, mapping := range tableMappings { + if mapping.Engine != protos.TableEngine_CH_ENGINE_NULL { + mapping.DestinationTableIdentifier += "_resync" + } + } + } + for _, v := range tableMappings { + tblNameMapping[v.SourceTableIdentifier] = v.DestinationTableIdentifier + } + + return tblNameMapping +} + func FetchConfigFromDB(ctx context.Context, catalogPool shared.CatalogPool, flowName string) (*protos.FlowConnectionConfigsCore, error) { var configBytes sql.RawBytes if err := catalogPool.QueryRow(ctx, @@ -26,3 +42,49 @@ func FetchConfigFromDB(ctx context.Context, catalogPool shared.CatalogPool, flow return &cfgFromDB, nil } + +func TableMappingsToBytes(tableMappings []*protos.TableMapping) ([][]byte, error) { + tableMappingsBytes := [][]byte{} + for _, tableMapping := range tableMappings { + tableMappingBytes, err := proto.Marshal(tableMapping) + if err != nil { + return nil, fmt.Errorf("failed to marshal table mapping to migrate to catalog: %w", err) + } + tableMappingsBytes = append(tableMappingsBytes, tableMappingBytes) + } + return tableMappingsBytes, nil +} + +func FetchTableMappingsFromDB(ctx context.Context, flowJobName string, version uint32) ([]*protos.TableMapping, error) { + pool, err := GetCatalogConnectionPoolFromEnv(ctx) + if err != nil { + return nil, err + } + rows, err := pool.Query(ctx, + "select table_mappings from table_mappings where flow_name = $1 AND version = $2", + flowJobName, version, + ) + if err != nil { + return nil, err + } + + var tableMappingsBytes [][]byte + var tableMappings []*protos.TableMapping = []*protos.TableMapping{} + + err = rows.Scan([]any{&tableMappingsBytes}) + if err != nil { + return nil, fmt.Errorf("failed to deserialize table mapping schema proto: %w", err) + } + + for _, tableMappingBytes := range tableMappingsBytes { + + var tableMapping *protos.TableMapping + if err := proto.Unmarshal(tableMappingBytes, tableMapping); err != nil { + return nil, err + } + + tableMappings = append(tableMappings, tableMapping) + } + + return tableMappings, nil +} diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 9303a3cc8c..de1b7132e0 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -123,7 +123,7 @@ func updateFlowConfigWithLatestSettings( cloneCfg := proto.CloneOf(cfg) cloneCfg.MaxBatchSize = state.SyncFlowOptions.BatchSize cloneCfg.IdleTimeoutSeconds = state.SyncFlowOptions.IdleTimeoutSeconds - cloneCfg.TableMappings = state.SyncFlowOptions.TableMappings + cloneCfg.TableMappingVersion = state.SyncFlowOptions.TableMappingVersion if state.SnapshotNumRowsPerPartition > 0 { cloneCfg.SnapshotNumRowsPerPartition = state.SnapshotNumRowsPerPartition } @@ -221,6 +221,24 @@ func processCDCFlowConfigUpdate( } } + // MIGRATION: Move `tableMapping` to the DB + if len(state.SyncFlowOptions.TableMappings) > 0 { + migrateCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 5 * time.Minute, + }) + if err := workflow.ExecuteActivity( + migrateCtx, + flowable.MigrateTableMappingsToCatalog, + cfg.FlowJobName, + state.SyncFlowOptions.TableMappings, + 1, //hardcoding version as 1 for migration purposes. + ).Get(migrateCtx, nil); err != nil { + return fmt.Errorf("failed to migrate TableMappings: %w", err) + } + state.SyncFlowOptions.TableMappings = nil + state.SyncFlowOptions.TableMappingVersion = 1 + } + syncStateToConfigProtoInCatalog(ctx, cfg, state) return nil } @@ -309,7 +327,11 @@ func processTableAdditions( additionalTablesCfg := proto.CloneOf(cfg) additionalTablesCfg.DoInitialSnapshot = !flowConfigUpdate.SkipInitialSnapshotForTableAdditions additionalTablesCfg.InitialSnapshotOnly = true - additionalTablesCfg.TableMappings = flowConfigUpdate.AdditionalTables + // TODO: thought - maybe we need to pass additionalTables and then in the + // CDCWorkflow we persist them to the DB and send an incremented `tableMappingVersion` + // to the new workflow? + //additionalTablesCfg.TableMappings = flowConfigUpdate.AdditionalTables + additionalTablesCfg.Resync = false if state.SnapshotNumRowsPerPartition > 0 { additionalTablesCfg.SnapshotNumRowsPerPartition = state.SnapshotNumRowsPerPartition @@ -627,22 +649,6 @@ func CDCFlowWorkflow( // for safety, rely on the idempotency of SetupFlow instead // also, no signals are being handled until the loop starts, so no PAUSE/DROP will take here. if state.CurrentFlowStatus != protos.FlowStatus_STATUS_RUNNING { - originalTableMappings := make([]*protos.TableMapping, 0, len(cfg.TableMappings)) - for _, tableMapping := range cfg.TableMappings { - originalTableMappings = append(originalTableMappings, proto.CloneOf(tableMapping)) - } - // if resync is true, alter the table name schema mapping to temporarily add - // a suffix to the table names. - if cfg.Resync { - for _, mapping := range state.SyncFlowOptions.TableMappings { - if mapping.Engine != protos.TableEngine_CH_ENGINE_NULL { - mapping.DestinationTableIdentifier += "_resync" - } - } - // because we have renamed the tables. - cfg.TableMappings = state.SyncFlowOptions.TableMappings - } - // start the SetupFlow workflow as a child workflow, and wait for it to complete // it should return the table schema for the source peer setupFlowID := GetChildWorkflowID("setup-flow", cfg.FlowJobName, originalRunID) @@ -666,7 +672,6 @@ func CDCFlowWorkflow( state.ActiveSignal = model.ResyncSignal cfg.Resync = true cfg.DoInitialSnapshot = true - cfg.TableMappings = originalTableMappings // this is the only place where we can have a resync during a resync // so we need to NOT sync the tableMappings to catalog to preserve original names uploadConfigToCatalog(ctx, cfg) @@ -690,6 +695,9 @@ func CDCFlowWorkflow( WaitForCancellation: true, } setupFlowCtx := workflow.WithChildOptions(ctx, childSetupFlowOpts) + // Resync will rely rely on the `cfg.Resync` flag to rename the tables + // during the snapshot process. This is how we're able to also remove the need + // to sync the config back into the DB / not rely on the `state.TableMappings`. setupFlowFuture := workflow.ExecuteChildWorkflow(setupFlowCtx, SetupFlowWorkflow, cfg) var setupFlowOutput *protos.SetupFlowOutput @@ -837,6 +845,7 @@ func CDCFlowWorkflow( WaitForCancellation: true, RetryPolicy: &temporal.RetryPolicy{MaximumAttempts: 1}, })) + state.SyncFlowOptions.TableMappings = []*protos.TableMapping{} syncFlowFuture := workflow.ExecuteActivity(syncCtx, flowable.SyncFlow, cfg, state.SyncFlowOptions) mainLoopSelector := workflow.NewNamedSelector(ctx, "MainLoop") diff --git a/flow/workflows/setup_flow.go b/flow/workflows/setup_flow.go index f4c79eb67c..96876f93dd 100644 --- a/flow/workflows/setup_flow.go +++ b/flow/workflows/setup_flow.go @@ -36,15 +36,17 @@ type SetupFlowExecution struct { tableNameMapping map[string]string cdcFlowName string executionID string + resync bool } // NewSetupFlowExecution creates a new instance of SetupFlowExecution. -func NewSetupFlowExecution(ctx workflow.Context, tableNameMapping map[string]string, cdcFlowName string) *SetupFlowExecution { +func NewSetupFlowExecution(ctx workflow.Context, tableNameMapping map[string]string, cdcFlowName string, resync bool) *SetupFlowExecution { return &SetupFlowExecution{ Logger: log.With(workflow.GetLogger(ctx), slog.String(string(shared.FlowNameKey), cdcFlowName)), tableNameMapping: tableNameMapping, cdcFlowName: cdcFlowName, executionID: workflow.GetInfo(ctx).WorkflowExecution.ID, + resync: resync, } } @@ -119,10 +121,9 @@ func (s *SetupFlowExecution) ensurePullability( // create EnsurePullabilityInput for the srcTableName ensurePullabilityInput := &protos.EnsurePullabilityBatchInput{ - PeerName: config.SourceName, - FlowJobName: s.cdcFlowName, - SourceTableIdentifiers: slices.Sorted(maps.Keys(s.tableNameMapping)), - CheckConstraints: checkConstraints, + PeerName: config.SourceName, + FlowJobName: s.cdcFlowName, + CheckConstraints: checkConstraints, } future := workflow.ExecuteActivity(ctx, flowable.EnsurePullability, ensurePullabilityInput) @@ -162,9 +163,8 @@ func (s *SetupFlowExecution) createRawTable( // attempt to create the tables. createRawTblInput := &protos.CreateRawTableInput{ - PeerName: config.DestinationName, - FlowJobName: s.cdcFlowName, - TableNameMapping: s.tableNameMapping, + PeerName: config.DestinationName, + FlowJobName: s.cdcFlowName, } rawTblFuture := workflow.ExecuteActivity(ctx, flowable.CreateRawTable, createRawTblInput) @@ -191,12 +191,11 @@ func (s *SetupFlowExecution) setupNormalizedTables( }) tableSchemaInput := &protos.SetupTableSchemaBatchInput{ - PeerName: flowConnectionConfigs.SourceName, - TableMappings: flowConnectionConfigs.TableMappings, - FlowName: s.cdcFlowName, - System: flowConnectionConfigs.System, - Env: flowConnectionConfigs.Env, - Version: flowConnectionConfigs.Version, + PeerName: flowConnectionConfigs.SourceName, + FlowName: s.cdcFlowName, + System: flowConnectionConfigs.System, + Env: flowConnectionConfigs.Env, + Version: flowConnectionConfigs.Version, } if err := workflow.ExecuteActivity(ctx, flowable.SetupTableSchema, tableSchemaInput).Get(ctx, nil); err != nil { @@ -207,7 +206,6 @@ func (s *SetupFlowExecution) setupNormalizedTables( s.Info("setting up normalized tables on destination peer", slog.String("destination", flowConnectionConfigs.DestinationName)) setupConfig := &protos.SetupNormalizedTableBatchInput{ PeerName: flowConnectionConfigs.DestinationName, - TableMappings: flowConnectionConfigs.TableMappings, SoftDeleteColName: flowConnectionConfigs.SoftDeleteColName, SyncedAtColName: flowConnectionConfigs.SyncedAtColName, FlowName: flowConnectionConfigs.FlowJobName, @@ -267,7 +265,7 @@ func SetupFlowWorkflow(ctx workflow.Context, config *protos.FlowConnectionConfig } // create the setup flow execution - setupFlowExecution := NewSetupFlowExecution(ctx, tblNameMapping, config.FlowJobName) + setupFlowExecution := NewSetupFlowExecution(ctx, tblNameMapping, config.FlowJobName, config.Resync) // execute the setup flow setupFlowOutput, err := setupFlowExecution.executeSetupFlow(ctx, config) diff --git a/flow/workflows/snapshot_flow.go b/flow/workflows/snapshot_flow.go index 80431bb536..2ffb248c8e 100644 --- a/flow/workflows/snapshot_flow.go +++ b/flow/workflows/snapshot_flow.go @@ -1,6 +1,7 @@ package peerflow import ( + "context" "fmt" "log/slog" "slices" @@ -55,15 +56,10 @@ func (s *SnapshotFlowExecution) setupReplication( }, }) - tblNameMapping := make(map[string]string, len(s.config.TableMappings)) - for _, v := range s.config.TableMappings { - tblNameMapping[v.SourceTableIdentifier] = v.DestinationTableIdentifier - } - setupReplicationInput := &protos.SetupReplicationInput{ - PeerName: s.config.SourceName, - FlowJobName: flowName, - TableNameMapping: tblNameMapping, + PeerName: s.config.SourceName, + FlowJobName: flowName, + // TableNameMapping: tblNameMapping, DoInitialSnapshot: s.config.DoInitialSnapshot, ExistingPublicationName: s.config.PublicationName, ExistingReplicationSlotName: s.config.ReplicationSlotName, @@ -284,7 +280,27 @@ func (s *SnapshotFlowExecution) cloneTables( return err } - for _, v := range s.config.TableMappings { + configCtx := context.Background() + defer configCtx.Done() + pool, err := internal.GetCatalogConnectionPoolFromEnv(configCtx) + if err != nil { + return err + } + defer pool.Pool.Close() + + cfg, err := internal.FetchConfigFromDB(configCtx, pool, s.config.FlowJobName) + if err != nil { + return err + } + + tableMappingsCtx := context.Background() + defer tableMappingsCtx.Done() + tableMappings, err := internal.FetchTableMappingsFromDB(tableMappingsCtx, cfg.FlowJobName, cfg.TableMappingVersion) + if err != nil { + return err + } + + for _, v := range tableMappings { source := v.SourceTableIdentifier destination := v.DestinationTableIdentifier s.logger.Info( diff --git a/nexus/catalog/migrations/V49__table_mappings_table.sql b/nexus/catalog/migrations/V49__table_mappings_table.sql new file mode 100644 index 0000000000..5a4e78cc64 --- /dev/null +++ b/nexus/catalog/migrations/V49__table_mappings_table.sql @@ -0,0 +1,6 @@ +CREATE TABLE IF NOT EXISTS table_mappings ( + flow_name varchar(255) not null, + version bigint not null default 1, + table_mappings bytea[] not null, + primary key (flow_name, version) +); diff --git a/protos/flow.proto b/protos/flow.proto index fe0cdb853d..a7f6a1d035 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -41,6 +41,7 @@ message TableMapping { string sharding_key = 7; string policy_name = 8; string partition_by_expr = 9; + uint32 version = 10; } message SetupInput { @@ -52,12 +53,12 @@ message SetupInput { // FlowConnectionConfigs is for external use by the API, maintaining backwards compatibility // When adding fields here, add them to FlowConnectionConfigsCore too message FlowConnectionConfigs { - reserved 2,3,17; + reserved 2,3,17,4; string flow_job_name = 1; // config for the CDC flow itself // currently, TableMappings, MaxBatchSize and IdleTimeoutSeconds are dynamic via Temporal signals - repeated TableMapping table_mappings = 4; + //repeatable TableMapping table_mappings = 4; uint32 max_batch_size = 5; uint64 idle_timeout_seconds = 6; string cdc_staging_path = 7; @@ -91,6 +92,7 @@ message FlowConnectionConfigs { map env = 24; uint32 version = 25; + uint32 table_mapping_version = 27; } // FlowConnectionConfigsCore is used internally in the codebase, it is safe to remove (mark reserved) fields from it @@ -137,6 +139,7 @@ message FlowConnectionConfigsCore { map env = 24; uint32 version = 25; + uint32 table_mapping_version = 27; } message RenameTableOption { @@ -181,6 +184,7 @@ message SyncFlowOptions { map src_table_id_name_mapping = 4; repeated TableMapping table_mappings = 6; int32 number_of_syncs = 7; + uint32 table_mapping_version = 8; } message EnsurePullabilityBatchInput { diff --git a/protos/route.proto b/protos/route.proto index e051e0c94d..8f8f05d296 100644 --- a/protos/route.proto +++ b/protos/route.proto @@ -12,6 +12,7 @@ package peerdb_route; message CreateCDCFlowRequest { peerdb_flow.FlowConnectionConfigs connection_configs = 1; bool attach_to_existing = 2; + repeated peerdb_flow.TableMapping table_mappings = 3; } message CreateCDCFlowResponse { string workflow_id = 1; }