diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 17e87fa62d..4ac3252c86 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -5,8 +5,10 @@ import ( "errors" "fmt" "log/slog" + "maps" "net" "os" + "slices" "strconv" "sync/atomic" "time" @@ -125,6 +127,12 @@ func (a *FlowableActivity) EnsurePullability( } defer connectors.CloseConnector(ctx, srcConn) + cfg, err := internal.FetchConfigFromDB(config.FlowJobName) + if err != nil { + return nil, a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("failed to fetch config from DB: %w", err)) + } + + config.SourceTableIdentifiers = slices.Sorted(maps.Keys(internal.TableNameMapping(cfg.TableMappings))) output, err := srcConn.EnsurePullability(ctx, config) if err != nil { return nil, a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("failed to ensure pullability: %w", err)) @@ -166,6 +174,18 @@ func (a *FlowableActivity) SetupTableSchema( }) defer shutdown() + // have to fetch config from the DB + cfg, err := internal.FetchConfigFromDB(config.FlowName) + if err != nil { + return a.Alerter.LogFlowError(ctx, config.FlowName, fmt.Errorf("failed to fetch config from DB: %w", err)) + } + tableMappings := cfg.TableMappings + if len(config.FilteredTableMappings) > 0 { + // we use the filtered table mappings if provided. they are provided from + // the sync flow which includes changes to the schema. + tableMappings = config.FilteredTableMappings + } + logger := internal.LoggerFromCtx(ctx) ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowName) srcConn, err := connectors.GetByNameAs[connectors.GetTableSchemaConnector](ctx, config.Env, a.CatalogPool, config.PeerName) @@ -174,11 +194,11 @@ func (a *FlowableActivity) SetupTableSchema( } defer connectors.CloseConnector(ctx, srcConn) - tableNameSchemaMapping, err := srcConn.GetTableSchema(ctx, config.Env, config.Version, config.System, config.TableMappings) + tableNameSchemaMapping, err := srcConn.GetTableSchema(ctx, config.Env, config.Version, config.System, tableMappings) if err != nil { return a.Alerter.LogFlowError(ctx, config.FlowName, fmt.Errorf("failed to get GetTableSchemaConnector: %w", err)) } - processed := internal.BuildProcessedSchemaMapping(config.TableMappings, tableNameSchemaMapping, logger) + processed := internal.BuildProcessedSchemaMapping(tableMappings, tableNameSchemaMapping, logger) tx, err := a.CatalogPool.BeginTx(ctx, pgx.TxOptions{}) if err != nil { @@ -214,6 +234,13 @@ func (a *FlowableActivity) CreateNormalizedTable( numTablesSetup := atomic.Uint32{} numTablesToSetup := atomic.Int32{} + cfg, err := internal.FetchConfigFromDB(config.FlowName) + if err != nil { + return nil, a.Alerter.LogFlowError(ctx, config.FlowName, fmt.Errorf("failed to fetch config from DB: %w", err)) + } + + tableMappings := cfg.TableMappings + shutdown := heartbeatRoutine(ctx, func() string { return fmt.Sprintf("setting up normalized tables - %d of %d done", numTablesSetup.Load(), numTablesToSetup.Load()) }) @@ -245,7 +272,7 @@ func (a *FlowableActivity) CreateNormalizedTable( numTablesToSetup.Store(int32(len(tableNameSchemaMapping))) tableExistsMapping := make(map[string]bool, len(tableNameSchemaMapping)) - for _, tableMapping := range config.TableMappings { + for _, tableMapping := range tableMappings { tableIdentifier := tableMapping.DestinationTableIdentifier tableSchema := tableNameSchemaMapping[tableIdentifier] existing, err := conn.SetupNormalizedTable( @@ -293,6 +320,12 @@ func (a *FlowableActivity) SyncFlow( var syncingBatchID atomic.Int64 var syncState atomic.Pointer[string] syncState.Store(shared.Ptr("setup")) + + config, err := internal.FetchConfigFromDB(config.FlowJobName) + if err != nil { + return fmt.Errorf("unable to query flow config from catalog: %w", err) + } + shutdown := heartbeatRoutine(ctx, func() string { // Must load Waiting after BatchID to avoid race saying we're waiting on currently processing batch sBatchID := syncingBatchID.Load() @@ -364,10 +397,10 @@ func (a *FlowableActivity) SyncFlow( var syncResponse *model.SyncResponse var syncErr error if config.System == protos.TypeSystem_Q { - syncResponse, syncErr = a.syncRecords(groupCtx, config, options, srcConn.(connectors.CDCPullConnector), + syncResponse, syncErr = a.syncRecords(groupCtx, config, srcConn.(connectors.CDCPullConnector), normRequests, normResponses, normBufferSize, &syncingBatchID, &syncState) } else { - syncResponse, syncErr = a.syncPg(groupCtx, config, options, srcConn.(connectors.CDCPullPgConnector), + syncResponse, syncErr = a.syncPg(groupCtx, config, srcConn.(connectors.CDCPullPgConnector), normRequests, normResponses, normBufferSize, &syncingBatchID, &syncState) } @@ -415,7 +448,6 @@ func (a *FlowableActivity) SyncFlow( func (a *FlowableActivity) syncRecords( ctx context.Context, config *protos.FlowConnectionConfigs, - options *protos.SyncFlowOptions, srcConn connectors.CDCPullConnector, normRequests *concurrency.LastChan, normResponses *concurrency.LastChan, @@ -452,7 +484,7 @@ func (a *FlowableActivity) syncRecords( return stream, nil } } - return syncCore(ctx, a, config, options, srcConn, + return syncCore(ctx, a, config, srcConn, normRequests, normResponses, normBufferSize, syncingBatchID, syncWaiting, adaptStream, connectors.CDCPullConnector.PullRecords, @@ -462,7 +494,6 @@ func (a *FlowableActivity) syncRecords( func (a *FlowableActivity) syncPg( ctx context.Context, config *protos.FlowConnectionConfigs, - options *protos.SyncFlowOptions, srcConn connectors.CDCPullPgConnector, normRequests *concurrency.LastChan, normResponses *concurrency.LastChan, @@ -470,7 +501,7 @@ func (a *FlowableActivity) syncPg( syncingBatchID *atomic.Int64, syncWaiting *atomic.Pointer[string], ) (*model.SyncResponse, error) { - return syncCore(ctx, a, config, options, srcConn, + return syncCore(ctx, a, config, srcConn, normRequests, normResponses, normBufferSize, syncingBatchID, syncWaiting, nil, connectors.CDCPullPgConnector.PullPg, @@ -1209,13 +1240,12 @@ func (a *FlowableActivity) emitLogRetentionHours( } var activeFlowStatuses = map[protos.FlowStatus]struct{}{ - protos.FlowStatus_STATUS_RUNNING: {}, - protos.FlowStatus_STATUS_PAUSED: {}, - protos.FlowStatus_STATUS_PAUSING: {}, - protos.FlowStatus_STATUS_SETUP: {}, - protos.FlowStatus_STATUS_SNAPSHOT: {}, - protos.FlowStatus_STATUS_RESYNC: {}, - protos.FlowStatus_STATUS_MODIFYING: {}, + protos.FlowStatus_STATUS_RUNNING: {}, + protos.FlowStatus_STATUS_PAUSED: {}, + protos.FlowStatus_STATUS_PAUSING: {}, + protos.FlowStatus_STATUS_SETUP: {}, + protos.FlowStatus_STATUS_SNAPSHOT: {}, + protos.FlowStatus_STATUS_RESYNC: {}, } func (a *FlowableActivity) QRepHasNewRows(ctx context.Context, @@ -1431,9 +1461,13 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context, } } -func (a *FlowableActivity) AddTablesToPublication(ctx context.Context, cfg *protos.FlowConnectionConfigs, +func (a *FlowableActivity) AddTablesToPublication(ctx context.Context, flowJobName string, additionalTableMappings []*protos.TableMapping, ) error { + cfg, err := internal.FetchConfigFromDB(flowJobName) + if err != nil { + return fmt.Errorf("unable to query flow config from catalog: %w", err) + } ctx = context.WithValue(ctx, shared.FlowNameKey, cfg.FlowJobName) srcConn, err := connectors.GetByNameAs[*connpostgres.PostgresConnector](ctx, cfg.Env, a.CatalogPool, cfg.SourceName) if err != nil { diff --git a/flow/activities/flowable_core.go b/flow/activities/flowable_core.go index 532985292d..58b1a97138 100644 --- a/flow/activities/flowable_core.go +++ b/flow/activities/flowable_core.go @@ -80,11 +80,10 @@ func (a *FlowableActivity) getTableNameSchemaMapping(ctx context.Context, flowNa func (a *FlowableActivity) applySchemaDeltas( ctx context.Context, config *protos.FlowConnectionConfigs, - options *protos.SyncFlowOptions, schemaDeltas []*protos.TableSchemaDelta, ) error { filteredTableMappings := make([]*protos.TableMapping, 0, len(schemaDeltas)) - for _, tableMapping := range options.TableMappings { + for _, tableMapping := range config.TableMappings { if slices.ContainsFunc(schemaDeltas, func(schemaDelta *protos.TableSchemaDelta) bool { return schemaDelta.SrcTableName == tableMapping.SourceTableIdentifier && schemaDelta.DstTableName == tableMapping.DestinationTableIdentifier @@ -95,12 +94,12 @@ func (a *FlowableActivity) applySchemaDeltas( if len(schemaDeltas) > 0 { if err := a.SetupTableSchema(ctx, &protos.SetupTableSchemaBatchInput{ - PeerName: config.SourceName, - TableMappings: filteredTableMappings, - FlowName: config.FlowJobName, - System: config.System, - Env: config.Env, - Version: config.Version, + PeerName: config.SourceName, + FilteredTableMappings: filteredTableMappings, + FlowName: config.FlowJobName, + System: config.System, + Env: config.Env, + Version: config.Version, }); err != nil { return a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("failed to execute schema update at source: %w", err)) } @@ -112,7 +111,6 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon ctx context.Context, a *FlowableActivity, config *protos.FlowConnectionConfigs, - options *protos.SyncFlowOptions, srcConn TPull, normRequests *concurrency.LastChan, normResponses *concurrency.LastChan, @@ -127,8 +125,9 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon ctx = context.WithValue(ctx, shared.FlowNameKey, flowName) logger := internal.LoggerFromCtx(ctx) - tblNameMapping := make(map[string]model.NameAndExclude, len(options.TableMappings)) - for _, v := range options.TableMappings { + // we should be able to rely on `config` here. + tblNameMapping := make(map[string]model.NameAndExclude, len(config.TableMappings)) + for _, v := range config.TableMappings { tblNameMapping[v.SourceTableIdentifier] = model.NewNameAndExclude(v.DestinationTableIdentifier, v.Exclude) } @@ -136,7 +135,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon return nil, temporal.NewNonRetryableApplicationError("connection to source down", "disconnect", nil) } - batchSize := options.BatchSize + batchSize := config.MaxBatchSize if batchSize == 0 { batchSize = 250_000 } @@ -190,13 +189,13 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon errGroup.Go(func() error { return pull(srcConn, errCtx, a.CatalogPool, a.OtelManager, &model.PullRecordsRequest[Items]{ FlowJobName: flowName, - SrcTableIDNameMapping: options.SrcTableIdNameMapping, + SrcTableIDNameMapping: config.SrcTableIdNameMapping, TableNameMapping: tblNameMapping, LastOffset: lastOffset, ConsumedOffset: &consumedOffset, MaxBatchSize: batchSize, IdleTimeout: internal.PeerDBCDCIdleTimeoutSeconds( - int(options.IdleTimeoutSeconds), + int(config.IdleTimeoutSeconds), ), TableNameSchemaMapping: tableNameSchemaMapping, OverridePublicationName: config.PublicationName, @@ -234,11 +233,11 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon defer connectors.CloseConnector(ctx, dstConn) syncState.Store(shared.Ptr("updating schema")) - if err := dstConn.ReplayTableSchemaDeltas(ctx, config.Env, flowName, options.TableMappings, recordBatchSync.SchemaDeltas); err != nil { + if err := dstConn.ReplayTableSchemaDeltas(ctx, config.Env, flowName, config.TableMappings, recordBatchSync.SchemaDeltas); err != nil { return nil, fmt.Errorf("failed to sync schema: %w", err) } - return nil, a.applySchemaDeltas(ctx, config, options, recordBatchSync.SchemaDeltas) + return nil, a.applySchemaDeltas(ctx, config, recordBatchSync.SchemaDeltas) } var res *model.SyncResponse @@ -271,7 +270,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon Records: recordBatchSync, ConsumedOffset: &consumedOffset, FlowJobName: flowName, - TableMappings: options.TableMappings, + TableMappings: config.TableMappings, StagingPath: config.CdcStagingPath, Script: config.Script, TableNameSchemaMapping: tableNameSchemaMapping, @@ -334,7 +333,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon a.OtelManager.Metrics.CurrentBatchIdGauge.Record(ctx, res.CurrentSyncBatchID) syncState.Store(shared.Ptr("updating schema")) - if err := a.applySchemaDeltas(ctx, config, options, res.TableSchemaDeltas); err != nil { + if err := a.applySchemaDeltas(ctx, config, res.TableSchemaDeltas); err != nil { return nil, err } diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index 2213636f65..99e05b50c1 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -86,7 +86,7 @@ func (h *FlowRequestHandler) createCdcJobEntry(ctx context.Context, } if _, err = h.pool.Exec(ctx, - `INSERT INTO flows (workflow_id, name, source_peer, destination_peer, config_proto, status, description) + `INSERT INTO flows (workflow_id, name, source_peer, destination_peer, config_proto, status, description) VALUES ($1,$2,$3,$4,$5,$6,'gRPC')`, workflowID, req.ConnectionConfigs.FlowJobName, sourcePeerID, destinationPeerID, cfgBytes, protos.FlowStatus_STATUS_SETUP, ); err != nil && !(idempotent && shared.IsSQLStateError(err, pgerrcode.UniqueViolation)) { @@ -207,7 +207,7 @@ func (h *FlowRequestHandler) createCDCFlow( return nil, fmt.Errorf("unable to create flow job entry: %w", err) } - if _, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, peerflow.CDCFlowWorkflow, cfg, nil); err != nil { + if _, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, peerflow.CDCFlowWorkflow, cfg.FlowJobName, nil); err != nil { slog.ErrorContext(ctx, "unable to start PeerFlow workflow", slog.Any("error", err)) return nil, fmt.Errorf("unable to start PeerFlow workflow: %w", err) } diff --git a/flow/cmd/mirror_status.go b/flow/cmd/mirror_status.go index 58069a98c0..522ee8fbbf 100644 --- a/flow/cmd/mirror_status.go +++ b/flow/cmd/mirror_status.go @@ -148,23 +148,7 @@ func (h *FlowRequestHandler) cdcFlowStatus( slog.ErrorContext(ctx, "unable to query flow config from catalog", slog.Any("error", err)) return nil, err } - workflowID, err := h.getWorkflowID(ctx, req.FlowJobName) - if err != nil { - slog.ErrorContext(ctx, "unable to get the workflow ID of mirror", slog.Any("error", err)) - return nil, err - } - state, err := h.getCDCWorkflowState(ctx, workflowID) - if err != nil { - slog.ErrorContext(ctx, "unable to get the state of mirror", slog.Any("error", err)) - return nil, err - } - - // patching config to show latest values from state - if state.SyncFlowOptions != nil { - config.IdleTimeoutSeconds = state.SyncFlowOptions.IdleTimeoutSeconds - config.MaxBatchSize = state.SyncFlowOptions.BatchSize - config.TableMappings = state.SyncFlowOptions.TableMappings - } + // The config is now always sourced from DB, so no state patching needed srcType, err := connectors.LoadPeerType(ctx, h.pool, config.SourceName) if err != nil { @@ -473,7 +457,7 @@ func (h *FlowRequestHandler) getFlowConfigFromCatalog( ) (*protos.FlowConnectionConfigs, error) { var configBytes sql.RawBytes if err := h.pool.QueryRow(ctx, - "SELECT config_proto FROM flows WHERE name = $1", flowJobName, + "SELECT config_proto FROM flows WHERE name = $1 LIMIT 1", flowJobName, ).Scan(&configBytes); err != nil { slog.ErrorContext(ctx, "unable to query flow config from catalog", slog.Any("error", err)) return nil, fmt.Errorf("unable to query flow config from catalog: %w", err) diff --git a/flow/connectors/clickhouse/normalize.go b/flow/connectors/clickhouse/normalize.go index 19d8ac6402..3b5ca2052e 100644 --- a/flow/connectors/clickhouse/normalize.go +++ b/flow/connectors/clickhouse/normalize.go @@ -90,7 +90,12 @@ func (c *ClickHouseConnector) generateCreateTableSQLForNormalizedTable( tmEngine := protos.TableEngine_CH_ENGINE_REPLACING_MERGE_TREE var tableMapping *protos.TableMapping - for _, tm := range config.TableMappings { + + cfg, err := internal.FetchConfigFromDB(config.FlowName) + if err != nil { + return nil, fmt.Errorf("failed to fetch config from DB: %w", err) + } + for _, tm := range cfg.TableMappings { if tm.DestinationTableIdentifier == tableIdentifier { tmEngine = tm.Engine tableMapping = tm diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index e832d818a7..ac19bb9e66 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -457,10 +457,15 @@ func pullCore[Items model.Items]( return fmt.Errorf("failed to get get setting for originMetaAsDestinationColumn: %w", err) } + cfgFromDB, err := internal.FetchConfigFromDB(req.FlowJobName) + if err != nil { + return fmt.Errorf("unable to query flow config from catalog: %w", err) + } + cdc, err := c.NewPostgresCDCSource(ctx, &PostgresCDCConfig{ CatalogPool: catalogPool, OtelManager: otelManager, - SrcTableIDNameMapping: req.SrcTableIDNameMapping, + SrcTableIDNameMapping: cfgFromDB.SrcTableIdNameMapping, TableNameMapping: req.TableNameMapping, TableNameSchemaMapping: req.TableNameSchemaMapping, RelationMessageMapping: c.relationMessageMapping, diff --git a/flow/e2e/postgres_test.go b/flow/e2e/postgres_test.go index 8e13408126..da38594c88 100644 --- a/flow/e2e/postgres_test.go +++ b/flow/e2e/postgres_test.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/require" "github.com/PeerDB-io/peerdb/flow/generated/protos" + "github.com/PeerDB-io/peerdb/flow/internal" "github.com/PeerDB-io/peerdb/flow/model" "github.com/PeerDB-io/peerdb/flow/shared" peerflow "github.com/PeerDB-io/peerdb/flow/workflows" diff --git a/flow/internal/flow_configuration_helpers.go b/flow/internal/flow_configuration_helpers.go new file mode 100644 index 0000000000..08bd589742 --- /dev/null +++ b/flow/internal/flow_configuration_helpers.go @@ -0,0 +1,37 @@ +package internal + +import ( + "context" + "database/sql" + "fmt" + + "google.golang.org/protobuf/proto" + + "github.com/PeerDB-io/peerdb/flow/generated/protos" +) + +func TableNameMapping(tableMappings []*protos.TableMapping) map[string]string { + tblNameMapping := make(map[string]string, len(tableMappings)) + for _, v := range tableMappings { + tblNameMapping[v.SourceTableIdentifier] = v.DestinationTableIdentifier + } + return tblNameMapping +} + +func FetchConfigFromDB(flowName string) (*protos.FlowConnectionConfigs, error) { + var configBytes sql.RawBytes + ctx := context.Background() + pool, _ := GetCatalogConnectionPoolFromEnv(ctx) + if err := pool.QueryRow(ctx, + "SELECT config_proto FROM flows WHERE name = $1 LIMIT 1", flowName, + ).Scan(&configBytes); err != nil { + return nil, fmt.Errorf("unable to query flow config from catalog: %w", err) + } + + var cfgFromDB protos.FlowConnectionConfigs + if err := proto.Unmarshal(configBytes, &cfgFromDB); err != nil { + return nil, fmt.Errorf("unable to unmarshal flow config: %w", err) + } + + return &cfgFromDB, nil +} diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 2b9f66fe53..2bb5fec103 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -25,8 +25,6 @@ import ( type CDCFlowWorkflowState struct { // flow config update request, set to nil after processed FlowConfigUpdate *protos.CDCFlowConfigUpdate - // options passed to all SyncFlows - SyncFlowOptions *protos.SyncFlowOptions // for becoming DropFlow DropFlowInput *protos.DropFlowInput // used for computing backoff timeout @@ -35,33 +33,14 @@ type CDCFlowWorkflowState struct { // Current signalled state of the peer flow. ActiveSignal model.CDCFlowSignal CurrentFlowStatus protos.FlowStatus - - // Initial load settings - SnapshotNumRowsPerPartition uint32 - SnapshotNumPartitionsOverride uint32 - SnapshotMaxParallelWorkers uint32 - SnapshotNumTablesInParallel uint32 } // returns a new empty PeerFlowState func NewCDCFlowWorkflowState(ctx workflow.Context, logger log.Logger, cfg *protos.FlowConnectionConfigs) *CDCFlowWorkflowState { - tableMappings := make([]*protos.TableMapping, 0, len(cfg.TableMappings)) - for _, tableMapping := range cfg.TableMappings { - tableMappings = append(tableMappings, proto.CloneOf(tableMapping)) - } state := CDCFlowWorkflowState{ ActiveSignal: model.NoopSignal, CurrentFlowStatus: protos.FlowStatus_STATUS_SETUP, FlowConfigUpdate: nil, - SyncFlowOptions: &protos.SyncFlowOptions{ - BatchSize: cfg.MaxBatchSize, - IdleTimeoutSeconds: cfg.IdleTimeoutSeconds, - TableMappings: tableMappings, - }, - SnapshotNumRowsPerPartition: cfg.SnapshotNumRowsPerPartition, - SnapshotNumPartitionsOverride: cfg.SnapshotNumPartitionsOverride, - SnapshotMaxParallelWorkers: cfg.SnapshotMaxParallelWorkers, - SnapshotNumTablesInParallel: cfg.SnapshotNumTablesInParallel, } syncStatusToCatalog(ctx, logger, state.CurrentFlowStatus) return &state @@ -100,23 +79,16 @@ func GetChildWorkflowID( func updateFlowConfigWithLatestSettings( cfg *protos.FlowConnectionConfigs, - state *CDCFlowWorkflowState, + flowConfigUpdate *protos.CDCFlowConfigUpdate, ) *protos.FlowConnectionConfigs { cloneCfg := proto.CloneOf(cfg) - cloneCfg.MaxBatchSize = state.SyncFlowOptions.BatchSize - cloneCfg.IdleTimeoutSeconds = state.SyncFlowOptions.IdleTimeoutSeconds - cloneCfg.TableMappings = state.SyncFlowOptions.TableMappings - if state.SnapshotNumRowsPerPartition > 0 { - cloneCfg.SnapshotNumRowsPerPartition = state.SnapshotNumRowsPerPartition - } - if state.SnapshotNumPartitionsOverride > 0 { - cloneCfg.SnapshotNumPartitionsOverride = state.SnapshotNumPartitionsOverride - } - if state.SnapshotMaxParallelWorkers > 0 { - cloneCfg.SnapshotMaxParallelWorkers = state.SnapshotMaxParallelWorkers - } - if state.SnapshotNumTablesInParallel > 0 { - cloneCfg.SnapshotNumTablesInParallel = state.SnapshotNumTablesInParallel + if flowConfigUpdate != nil { + cloneCfg.MaxBatchSize = flowConfigUpdate.BatchSize + cloneCfg.IdleTimeoutSeconds = flowConfigUpdate.IdleTimeout + cloneCfg.SnapshotNumRowsPerPartition = flowConfigUpdate.SnapshotNumRowsPerPartition + cloneCfg.SnapshotNumPartitionsOverride = flowConfigUpdate.SnapshotNumPartitionsOverride + cloneCfg.SnapshotMaxParallelWorkers = flowConfigUpdate.SnapshotMaxParallelWorkers + cloneCfg.SnapshotNumTablesInParallel = flowConfigUpdate.SnapshotNumTablesInParallel } return cloneCfg } @@ -127,9 +99,9 @@ type CDCFlowWorkflowResult = CDCFlowWorkflowState func syncStateToConfigProtoInCatalog( ctx workflow.Context, cfg *protos.FlowConnectionConfigs, - state *CDCFlowWorkflowState, + flowConfigUpdate *protos.CDCFlowConfigUpdate, ) *protos.FlowConnectionConfigs { - cloneCfg := updateFlowConfigWithLatestSettings(cfg, state) + cloneCfg := updateFlowConfigWithLatestSettings(cfg, flowConfigUpdate) uploadConfigToCatalog(ctx, cloneCfg) return cloneCfg } @@ -157,34 +129,19 @@ func processCDCFlowConfigUpdate( ) error { flowConfigUpdate := state.FlowConfigUpdate - // only modify for options since SyncFlow uses it - if flowConfigUpdate.BatchSize > 0 { - state.SyncFlowOptions.BatchSize = flowConfigUpdate.BatchSize - } - if flowConfigUpdate.IdleTimeout > 0 { - state.SyncFlowOptions.IdleTimeoutSeconds = flowConfigUpdate.IdleTimeout - } if flowConfigUpdate.UpdatedEnv != nil { if cfg.Env == nil { cfg.Env = make(map[string]string, len(flowConfigUpdate.UpdatedEnv)) } maps.Copy(cfg.Env, flowConfigUpdate.UpdatedEnv) } - if flowConfigUpdate.SnapshotNumRowsPerPartition > 0 { - state.SnapshotNumRowsPerPartition = flowConfigUpdate.SnapshotNumRowsPerPartition - } - if flowConfigUpdate.SnapshotNumPartitionsOverride > 0 { - state.SnapshotNumPartitionsOverride = flowConfigUpdate.SnapshotNumPartitionsOverride - } - if flowConfigUpdate.SnapshotMaxParallelWorkers > 0 { - state.SnapshotMaxParallelWorkers = flowConfigUpdate.SnapshotMaxParallelWorkers - } - if flowConfigUpdate.SnapshotNumTablesInParallel > 0 { - state.SnapshotNumTablesInParallel = flowConfigUpdate.SnapshotNumTablesInParallel - } + cfg = syncStateToConfigProtoInCatalog(ctx, cfg, state.FlowConfigUpdate) tablesAreAdded := len(flowConfigUpdate.AdditionalTables) > 0 tablesAreRemoved := len(flowConfigUpdate.RemovedTables) > 0 + if !tablesAreAdded && !tablesAreRemoved { + return nil + } if tablesAreAdded || tablesAreRemoved { logger.Info("processing CDCFlowConfigUpdate", slog.Any("updatedState", flowConfigUpdate)) @@ -203,7 +160,6 @@ func processCDCFlowConfigUpdate( } } - syncStateToConfigProtoInCatalog(ctx, cfg, state) return nil } @@ -219,7 +175,7 @@ func handleFlowSignalStateChange( case protos.FlowStatus_STATUS_TERMINATING: logger.Info("terminating CDCFlow", slog.String("operation", op)) state.ActiveSignal = model.TerminateSignal - dropCfg := syncStateToConfigProtoInCatalog(ctx, cfg, state) + dropCfg := syncStateToConfigProtoInCatalog(ctx, cfg, state.FlowConfigUpdate) state.DropFlowInput = &protos.DropFlowInput{ FlowJobName: dropCfg.FlowJobName, FlowConnectionConfigs: dropCfg, @@ -256,12 +212,7 @@ func processTableAdditions( ) error { flowConfigUpdate := state.FlowConfigUpdate if len(flowConfigUpdate.AdditionalTables) == 0 { - syncStateToConfigProtoInCatalog(ctx, cfg, state) - return nil - } - if internal.AdditionalTablesHasOverlap(state.SyncFlowOptions.TableMappings, flowConfigUpdate.AdditionalTables) { - logger.Warn("duplicate source/destination tables found in additionalTables") - syncStateToConfigProtoInCatalog(ctx, cfg, state) + syncStateToConfigProtoInCatalog(ctx, cfg, state.FlowConfigUpdate) return nil } state.updateStatus(ctx, logger, protos.FlowStatus_STATUS_SNAPSHOT) @@ -278,7 +229,7 @@ func processTableAdditions( alterPublicationAddAdditionalTablesFuture := workflow.ExecuteActivity( alterPublicationAddAdditionalTablesCtx, flowable.AddTablesToPublication, - cfg, flowConfigUpdate.AdditionalTables) + cfg.FlowJobName, flowConfigUpdate.AdditionalTables) var res *CDCFlowWorkflowResult var addTablesFlowErr error @@ -289,23 +240,26 @@ func processTableAdditions( additionalTablesUUID := GetUUID(ctx) childAdditionalTablesCDCFlowID := GetChildWorkflowID("additional-cdc-flow", cfg.FlowJobName, additionalTablesUUID) additionalTablesCfg := proto.CloneOf(cfg) - additionalTablesCfg.DoInitialSnapshot = !flowConfigUpdate.SkipInitialSnapshotForTableAdditions + // Default to doing initial snapshot for additional tables + additionalTablesCfg.DoInitialSnapshot = true additionalTablesCfg.InitialSnapshotOnly = true - additionalTablesCfg.TableMappings = flowConfigUpdate.AdditionalTables + additionalTablesCfg.TableMappings = append(additionalTablesCfg.TableMappings, flowConfigUpdate.AdditionalTables...) additionalTablesCfg.Resync = false - if state.SnapshotNumRowsPerPartition > 0 { - additionalTablesCfg.SnapshotNumRowsPerPartition = state.SnapshotNumRowsPerPartition + if flowConfigUpdate.SnapshotNumRowsPerPartition > 0 { + additionalTablesCfg.SnapshotNumRowsPerPartition = flowConfigUpdate.SnapshotNumRowsPerPartition } - if state.SnapshotNumPartitionsOverride > 0 { - additionalTablesCfg.SnapshotNumPartitionsOverride = state.SnapshotNumPartitionsOverride + if flowConfigUpdate.SnapshotNumPartitionsOverride > 0 { + additionalTablesCfg.SnapshotNumPartitionsOverride = flowConfigUpdate.SnapshotNumPartitionsOverride } - if state.SnapshotMaxParallelWorkers > 0 { - additionalTablesCfg.SnapshotMaxParallelWorkers = state.SnapshotMaxParallelWorkers + if flowConfigUpdate.SnapshotMaxParallelWorkers > 0 { + additionalTablesCfg.SnapshotMaxParallelWorkers = flowConfigUpdate.SnapshotMaxParallelWorkers } - if state.SnapshotNumTablesInParallel > 0 { - additionalTablesCfg.SnapshotNumTablesInParallel = state.SnapshotNumTablesInParallel + if flowConfigUpdate.SnapshotNumTablesInParallel > 0 { + additionalTablesCfg.SnapshotNumTablesInParallel = flowConfigUpdate.SnapshotNumTablesInParallel } + uploadConfigToCatalog(ctx, additionalTablesCfg) + // execute the sync flow as a child workflow childAddTablesCDCFlowOpts := workflow.ChildWorkflowOptions{ WorkflowID: childAdditionalTablesCDCFlowID, @@ -317,11 +271,12 @@ func processTableAdditions( WaitForCancellation: true, } childAddTablesCDCFlowCtx := workflow.WithChildOptions(ctx, childAddTablesCDCFlowOpts) + childAddTablesCDCFlowFuture := workflow.ExecuteChildWorkflow( childAddTablesCDCFlowCtx, CDCFlowWorkflow, - additionalTablesCfg, - nil, + additionalTablesCfg.FlowJobName, + nil, // nil is passed to trigger `setup` flow. ) addTablesSelector.AddFuture(childAddTablesCDCFlowFuture, func(f workflow.Future) { addTablesFlowErr = f.Get(childAddTablesCDCFlowCtx, &res) @@ -329,14 +284,12 @@ func processTableAdditions( } }) - // additional tables should also be resynced, we don't know how much was done so far - state.SyncFlowOptions.TableMappings = append(state.SyncFlowOptions.TableMappings, flowConfigUpdate.AdditionalTables...) - for res == nil { addTablesSelector.Select(ctx) if state.ActiveSignal == model.TerminateSignal || state.ActiveSignal == model.ResyncSignal { if state.ActiveSignal == model.ResyncSignal { - resyncCfg := syncStateToConfigProtoInCatalog(ctx, cfg, state) + resyncCfg := syncStateToConfigProtoInCatalog(ctx, cfg, state.FlowConfigUpdate) + state.DropFlowInput.FlowJobName = resyncCfg.FlowJobName state.DropFlowInput.FlowConnectionConfigs = resyncCfg } return workflow.NewContinueAsNewError(ctx, DropFlowWorkflow, state.DropFlowInput) @@ -351,8 +304,6 @@ func processTableAdditions( } } - maps.Copy(state.SyncFlowOptions.SrcTableIdNameMapping, res.SyncFlowOptions.SrcTableIdNameMapping) - logger.Info("additional tables added to sync flow") return nil } @@ -425,11 +376,11 @@ func processTableRemovals( for _, removedTable := range state.FlowConfigUpdate.RemovedTables { removedTables[removedTable.SourceTableIdentifier] = struct{}{} } - maps.DeleteFunc(state.SyncFlowOptions.SrcTableIdNameMapping, func(k uint32, v string) bool { + maps.DeleteFunc(cfg.SrcTableIdNameMapping, func(k uint32, v string) bool { _, removed := removedTables[v] return removed }) - state.SyncFlowOptions.TableMappings = slices.DeleteFunc(state.SyncFlowOptions.TableMappings, func(tm *protos.TableMapping) bool { + cfg.TableMappings = slices.DeleteFunc(cfg.TableMappings, func(tm *protos.TableMapping) bool { _, removed := removedTables[tm.SourceTableIdentifier] return removed }) @@ -438,7 +389,7 @@ func processTableRemovals( removeTablesSelector.Select(ctx) if state.ActiveSignal == model.TerminateSignal || state.ActiveSignal == model.ResyncSignal { if state.ActiveSignal == model.ResyncSignal { - resyncCfg := syncStateToConfigProtoInCatalog(ctx, cfg, state) + resyncCfg := syncStateToConfigProtoInCatalog(ctx, cfg, state.FlowConfigUpdate) state.DropFlowInput.FlowConnectionConfigs = resyncCfg } return workflow.NewContinueAsNewError(ctx, DropFlowWorkflow, state.DropFlowInput) @@ -452,7 +403,6 @@ func processTableRemovals( return fmt.Errorf("failed to execute child CDCFlow for additional tables: %w", removeTablesFlowErr) } } - return nil } @@ -467,8 +417,6 @@ func addCdcPropertiesSignalListener( // do this irrespective of additional tables being present, for auto unpausing state.FlowConfigUpdate = cdcConfigUpdate logger.Info("CDC Signal received", - slog.Uint64("BatchSize", uint64(state.SyncFlowOptions.BatchSize)), - slog.Uint64("IdleTimeout", state.SyncFlowOptions.IdleTimeoutSeconds), slog.Any("AdditionalTables", cdcConfigUpdate.AdditionalTables), slog.Any("RemovedTables", cdcConfigUpdate.RemovedTables), slog.Any("UpdatedEnv", cdcConfigUpdate.UpdatedEnv), @@ -483,9 +431,14 @@ func addCdcPropertiesSignalListener( func CDCFlowWorkflow( ctx workflow.Context, - cfg *protos.FlowConnectionConfigs, + flowJobName string, state *CDCFlowWorkflowState, ) (*CDCFlowWorkflowResult, error) { + // Fetch full config from DB using the flowJobName + cfg, err := internal.FetchConfigFromDB(flowJobName) + if err != nil { + return nil, fmt.Errorf("unable to fetch flow config from DB: %w", err) + } if cfg == nil { return nil, errors.New("invalid connection configs") } @@ -523,7 +476,7 @@ func CDCFlowWorkflow( switch val.RequestedFlowState { case protos.FlowStatus_STATUS_TERMINATING: state.ActiveSignal = model.TerminateSignal - dropCfg := syncStateToConfigProtoInCatalog(ctx, cfg, state) + dropCfg := syncStateToConfigProtoInCatalog(ctx, cfg, state.FlowConfigUpdate) state.DropFlowInput = &protos.DropFlowInput{ FlowJobName: dropCfg.FlowJobName, FlowConnectionConfigs: dropCfg, @@ -534,7 +487,7 @@ func CDCFlowWorkflow( state.ActiveSignal = model.ResyncSignal cfg.Resync = true cfg.DoInitialSnapshot = true - resyncCfg := syncStateToConfigProtoInCatalog(ctx, cfg, state) + resyncCfg := syncStateToConfigProtoInCatalog(ctx, cfg, state.FlowConfigUpdate) state.DropFlowInput = &protos.DropFlowInput{ FlowJobName: resyncCfg.FlowJobName, FlowConnectionConfigs: resyncCfg, @@ -576,11 +529,10 @@ func CDCFlowWorkflow( logger.Info("mirror resumed", slog.Duration("after", time.Since(startTime))) state.updateStatus(ctx, logger, protos.FlowStatus_STATUS_RUNNING) - return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, cfg, state) + return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, cfg.FlowJobName, state) } originalRunID := workflow.GetInfo(ctx).OriginalRunID - state.SyncFlowOptions.NumberOfSyncs = 0 // removed feature for { if err := ctx.Err(); err != nil { @@ -609,6 +561,7 @@ func CDCFlowWorkflow( // for safety, rely on the idempotency of SetupFlow instead // also, no signals are being handled until the loop starts, so no PAUSE/DROP will take here. if state.CurrentFlowStatus != protos.FlowStatus_STATUS_RUNNING { + // have to get cfg from DB. originalTableMappings := make([]*protos.TableMapping, 0, len(cfg.TableMappings)) for _, tableMapping := range cfg.TableMappings { originalTableMappings = append(originalTableMappings, proto.CloneOf(tableMapping)) @@ -616,13 +569,9 @@ func CDCFlowWorkflow( // if resync is true, alter the table name schema mapping to temporarily add // a suffix to the table names. if cfg.Resync { - for _, mapping := range state.SyncFlowOptions.TableMappings { - if mapping.Engine != protos.TableEngine_CH_ENGINE_NULL { - mapping.DestinationTableIdentifier += "_resync" - } - } - // because we have renamed the tables. - cfg.TableMappings = state.SyncFlowOptions.TableMappings + return nil, errors.New("cannot start CDCFlow with Resync enabled, please drop the flow and start again") + // TODOAS: this will need to be resolved somehow, as we cannot pass all of + // table mappings. } // start the SetupFlow workflow as a child workflow, and wait for it to complete @@ -637,7 +586,7 @@ func CDCFlowWorkflow( logger.Warn("pause requested during setup, ignoring") case protos.FlowStatus_STATUS_TERMINATING: state.ActiveSignal = model.TerminateSignal - dropCfg := syncStateToConfigProtoInCatalog(ctx, cfg, state) + dropCfg := syncStateToConfigProtoInCatalog(ctx, cfg, state.FlowConfigUpdate) state.DropFlowInput = &protos.DropFlowInput{ FlowJobName: dropCfg.FlowJobName, FlowConnectionConfigs: dropCfg, @@ -672,7 +621,7 @@ func CDCFlowWorkflow( WaitForCancellation: true, } setupFlowCtx := workflow.WithChildOptions(ctx, childSetupFlowOpts) - setupFlowFuture := workflow.ExecuteChildWorkflow(setupFlowCtx, SetupFlowWorkflow, cfg) + setupFlowFuture := workflow.ExecuteChildWorkflow(setupFlowCtx, SetupFlowWorkflow, cfg.FlowJobName) var setupFlowOutput *protos.SetupFlowOutput var setupFlowError error @@ -695,9 +644,40 @@ func CDCFlowWorkflow( } } - state.SyncFlowOptions.SrcTableIdNameMapping = setupFlowOutput.SrcTableIdNameMapping state.updateStatus(ctx, logger, protos.FlowStatus_STATUS_SNAPSHOT) + if cfg.SrcTableIdNameMapping == nil { + cfg.SrcTableIdNameMapping = make(map[uint32]string, len(setupFlowOutput.SrcTableIdNameMapping)) + } + // list of table names which are in cfg but not in the setupFlowOutput; are + // the ones which have been added to the flow. + + var newTables []string + + if cfg.SrcTableIdNameMapping == nil { + cfg.SrcTableIdNameMapping = make(map[uint32]string) + } + + for k, v := range setupFlowOutput.SrcTableIdNameMapping { + if _, exists := cfg.SrcTableIdNameMapping[k]; !exists { + newTables = append(newTables, v) + cfg.SrcTableIdNameMapping[k] = v + } + } + + // compute additional tables by selecting + + var additionalTables []*protos.TableMapping + for _, tableMapping := range cfg.TableMappings { + if slices.Contains(newTables, tableMapping.SourceTableIdentifier) { + additionalTables = append(additionalTables, tableMapping) + } + } + + // here we will also store the table mappings in the state. + maps.Copy(cfg.SrcTableIdNameMapping, setupFlowOutput.SrcTableIdNameMapping) + uploadConfigToCatalog(ctx, cfg) + // next part of the setup is to snapshot-initial-copy and setup replication slots. snapshotFlowID := GetChildWorkflowID("snapshot-flow", cfg.FlowJobName, originalRunID) @@ -716,7 +696,17 @@ func CDCFlowWorkflow( // so we can use the same cfg for snapshot flow, and then rely on being state being saved to catalog // during any operation that triggers another snapshot (INCLUDING add tables). // this could fail for very weird Temporal resets - snapshotFlowFuture := workflow.ExecuteChildWorkflow(snapshotFlowCtx, SnapshotFlowWorkflow, cfg) + + // This will send the additionalTables to `temporal`, meaning + // that we cannot add too many tables at once, or we risk the blob is too + // large (2MB limit). + snapshotFlowFuture := workflow.ExecuteChildWorkflow( + snapshotFlowCtx, + SnapshotFlowWorkflow, + cfg.FlowJobName, + additionalTables, + ) + var snapshotDone bool var snapshotError error setupSnapshotSelector.AddFuture(snapshotFlowFuture, func(f workflow.Future) { @@ -748,7 +738,7 @@ func CDCFlowWorkflow( SoftDeleteColName: cfg.SoftDeleteColName, } - for _, mapping := range state.SyncFlowOptions.TableMappings { + for _, mapping := range cfg.TableMappings { if mapping.Engine != protos.TableEngine_CH_ENGINE_NULL { oldName := mapping.DestinationTableIdentifier newName := strings.TrimSuffix(oldName, "_resync") @@ -772,6 +762,7 @@ func CDCFlowWorkflow( InitialInterval: 1 * time.Minute, }, }) + // renameOpts will need to be computed again as it holds list of tables. renameTablesFuture := workflow.ExecuteActivity(renameTablesCtx, flowable.RenameTables, renameOpts) var renameTablesDone bool var renameTablesError error @@ -808,7 +799,7 @@ func CDCFlowWorkflow( logger.Info("executed setup flow and snapshot flow, start running") state.updateStatus(ctx, logger, protos.FlowStatus_STATUS_RUNNING) } - return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, cfg, state) + return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, cfg.FlowJobName, state) } var finished bool @@ -819,7 +810,7 @@ func CDCFlowWorkflow( WaitForCancellation: true, RetryPolicy: &temporal.RetryPolicy{MaximumAttempts: 1}, })) - syncFlowFuture := workflow.ExecuteActivity(syncCtx, flowable.SyncFlow, cfg, state.SyncFlowOptions) + syncFlowFuture := workflow.ExecuteActivity(syncCtx, flowable.SyncFlow, cfg, nil) mainLoopSelector := workflow.NewNamedSelector(ctx, "MainLoop") mainLoopSelector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) { @@ -874,7 +865,7 @@ func CDCFlowWorkflow( switch val.RequestedFlowState { case protos.FlowStatus_STATUS_TERMINATING: state.ActiveSignal = model.TerminateSignal - dropCfg := syncStateToConfigProtoInCatalog(ctx, cfg, state) + dropCfg := syncStateToConfigProtoInCatalog(ctx, cfg, state.FlowConfigUpdate) state.DropFlowInput = &protos.DropFlowInput{ FlowJobName: dropCfg.FlowJobName, FlowConnectionConfigs: dropCfg, @@ -885,7 +876,7 @@ func CDCFlowWorkflow( state.ActiveSignal = model.ResyncSignal cfg.Resync = true cfg.DoInitialSnapshot = true - resyncCfg := syncStateToConfigProtoInCatalog(ctx, cfg, state) + resyncCfg := syncStateToConfigProtoInCatalog(ctx, cfg, state.FlowConfigUpdate) state.DropFlowInput = &protos.DropFlowInput{ FlowJobName: resyncCfg.FlowJobName, FlowConnectionConfigs: resyncCfg, @@ -938,7 +929,7 @@ func CDCFlowWorkflow( if state.ActiveSignal == model.TerminateSignal || state.ActiveSignal == model.ResyncSignal { return state, workflow.NewContinueAsNewError(ctx, DropFlowWorkflow, state.DropFlowInput) } - return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, cfg, state) + return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, cfg.FlowJobName, state) } } } diff --git a/flow/workflows/drop_flow.go b/flow/workflows/drop_flow.go index 72a1e67872..5ca1035001 100644 --- a/flow/workflows/drop_flow.go +++ b/flow/workflows/drop_flow.go @@ -213,7 +213,7 @@ func DropFlowWorkflow(ctx workflow.Context, input *protos.DropFlowInput) error { } if input.Resync { - return workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, input.FlowConnectionConfigs, nil) + return workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, input.FlowConnectionConfigs.FlowJobName, nil) } return nil diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index 05eedf84b7..7a96b20384 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -108,12 +108,6 @@ func (q *QRepFlowExecution) setupTableSchema(ctx workflow.Context, tableName str tableSchemaInput := &protos.SetupTableSchemaBatchInput{ PeerName: q.config.SourceName, - TableMappings: []*protos.TableMapping{ - { - SourceTableIdentifier: tableName, - DestinationTableIdentifier: q.config.DestinationTableIdentifier, - }, - }, FlowName: q.config.FlowJobName, System: q.config.System, Env: q.config.Env, @@ -146,15 +140,7 @@ func (q *QRepFlowExecution) setupWatermarkTableOnDestination(ctx workflow.Contex // now setup the normalized tables on the destination peer setupConfig := &protos.SetupNormalizedTableBatchInput{ - PeerName: q.config.DestinationName, - TableMappings: []*protos.TableMapping{ - { - SourceTableIdentifier: q.config.WatermarkTable, - DestinationTableIdentifier: q.config.DestinationTableIdentifier, - Exclude: q.config.Exclude, - Columns: q.config.Columns, - }, - }, + PeerName: q.config.DestinationName, SyncedAtColName: q.config.SyncedAtColName, SoftDeleteColName: q.config.SoftDeleteColName, FlowName: q.config.FlowJobName, diff --git a/flow/workflows/setup_flow.go b/flow/workflows/setup_flow.go index f78371b346..fb85f64806 100644 --- a/flow/workflows/setup_flow.go +++ b/flow/workflows/setup_flow.go @@ -13,6 +13,7 @@ import ( "github.com/PeerDB-io/peerdb/flow/activities" "github.com/PeerDB-io/peerdb/flow/generated/protos" + "github.com/PeerDB-io/peerdb/flow/internal" "github.com/PeerDB-io/peerdb/flow/shared" ) @@ -33,18 +34,16 @@ import ( // - creating the normalized table on the destination peer type SetupFlowExecution struct { log.Logger - tableNameMapping map[string]string - cdcFlowName string - executionID string + cdcFlowName string + executionID string } // NewSetupFlowExecution creates a new instance of SetupFlowExecution. -func NewSetupFlowExecution(ctx workflow.Context, tableNameMapping map[string]string, cdcFlowName string) *SetupFlowExecution { +func NewSetupFlowExecution(ctx workflow.Context, cdcFlowName string) *SetupFlowExecution { return &SetupFlowExecution{ - Logger: log.With(workflow.GetLogger(ctx), slog.String(string(shared.FlowNameKey), cdcFlowName)), - tableNameMapping: tableNameMapping, - cdcFlowName: cdcFlowName, - executionID: workflow.GetInfo(ctx).WorkflowExecution.ID, + Logger: log.With(workflow.GetLogger(ctx), slog.String(string(shared.FlowNameKey), cdcFlowName)), + cdcFlowName: cdcFlowName, + executionID: workflow.GetInfo(ctx).WorkflowExecution.ID, } } @@ -121,7 +120,7 @@ func (s *SetupFlowExecution) ensurePullability( ensurePullabilityInput := &protos.EnsurePullabilityBatchInput{ PeerName: config.SourceName, FlowJobName: s.cdcFlowName, - SourceTableIdentifiers: slices.Sorted(maps.Keys(s.tableNameMapping)), + SourceTableIdentifiers: []string{}, CheckConstraints: checkConstraints, } @@ -164,7 +163,7 @@ func (s *SetupFlowExecution) createRawTable( createRawTblInput := &protos.CreateRawTableInput{ PeerName: config.DestinationName, FlowJobName: s.cdcFlowName, - TableNameMapping: s.tableNameMapping, + TableNameMapping: internal.TableNameMapping(config.TableMappings), } rawTblFuture := workflow.ExecuteActivity(ctx, flowable.CreateRawTable, createRawTblInput) @@ -191,12 +190,11 @@ func (s *SetupFlowExecution) setupNormalizedTables( }) tableSchemaInput := &protos.SetupTableSchemaBatchInput{ - PeerName: flowConnectionConfigs.SourceName, - TableMappings: flowConnectionConfigs.TableMappings, - FlowName: s.cdcFlowName, - System: flowConnectionConfigs.System, - Env: flowConnectionConfigs.Env, - Version: flowConnectionConfigs.Version, + PeerName: flowConnectionConfigs.SourceName, + FlowName: s.cdcFlowName, + System: flowConnectionConfigs.System, + Env: flowConnectionConfigs.Env, + Version: flowConnectionConfigs.Version, } if err := workflow.ExecuteActivity(ctx, flowable.SetupTableSchema, tableSchemaInput).Get(ctx, nil); err != nil { @@ -207,7 +205,6 @@ func (s *SetupFlowExecution) setupNormalizedTables( s.Info("setting up normalized tables on destination peer", slog.String("destination", flowConnectionConfigs.DestinationName)) setupConfig := &protos.SetupNormalizedTableBatchInput{ PeerName: flowConnectionConfigs.DestinationName, - TableMappings: flowConnectionConfigs.TableMappings, SoftDeleteColName: flowConnectionConfigs.SoftDeleteColName, SyncedAtColName: flowConnectionConfigs.SyncedAtColName, FlowName: flowConnectionConfigs.FlowJobName, @@ -227,9 +224,14 @@ func (s *SetupFlowExecution) setupNormalizedTables( // executeSetupFlow executes the setup flow. func (s *SetupFlowExecution) executeSetupFlow( ctx workflow.Context, - config *protos.FlowConnectionConfigs, + flowJobName string, ) (*protos.SetupFlowOutput, error) { s.Info("executing setup flow") + // gotta fetch the config from the catalog. + config, err := internal.FetchConfigFromDB(flowJobName) + if err != nil { + return nil, fmt.Errorf("unable to fetch config from DB: %w", err) + } // first check the connectionsAndSetupMetadataTables if err := s.checkConnectionsAndSetupMetadataTables(ctx, config); err != nil { @@ -260,17 +262,12 @@ func (s *SetupFlowExecution) executeSetupFlow( } // SetupFlowWorkflow is the workflow that sets up the flow. -func SetupFlowWorkflow(ctx workflow.Context, config *protos.FlowConnectionConfigs) (*protos.SetupFlowOutput, error) { - tblNameMapping := make(map[string]string, len(config.TableMappings)) - for _, v := range config.TableMappings { - tblNameMapping[v.SourceTableIdentifier] = v.DestinationTableIdentifier - } - +func SetupFlowWorkflow(ctx workflow.Context, flowJobName string) (*protos.SetupFlowOutput, error) { // create the setup flow execution - setupFlowExecution := NewSetupFlowExecution(ctx, tblNameMapping, config.FlowJobName) + setupFlowExecution := NewSetupFlowExecution(ctx, flowJobName) // execute the setup flow - setupFlowOutput, err := setupFlowExecution.executeSetupFlow(ctx, config) + setupFlowOutput, err := setupFlowExecution.executeSetupFlow(ctx, flowJobName) if err != nil { return nil, fmt.Errorf("failed to execute setup flow: %w", err) } diff --git a/flow/workflows/snapshot_flow.go b/flow/workflows/snapshot_flow.go index 3b5e7ff6ba..bec4da3c7b 100644 --- a/flow/workflows/snapshot_flow.go +++ b/flow/workflows/snapshot_flow.go @@ -27,8 +27,9 @@ const ( ) type SnapshotFlowExecution struct { - config *protos.FlowConnectionConfigs - logger log.Logger + FlowJobName string + logger log.Logger + AdditionalTables []*protos.TableMapping } func getPeerType(wCtx workflow.Context, name string) (protos.DBType, error) { @@ -44,7 +45,7 @@ func getPeerType(wCtx workflow.Context, name string) (protos.DBType, error) { func (s *SnapshotFlowExecution) setupReplication( ctx workflow.Context, ) (*protos.SetupReplicationOutput, error) { - flowName := s.config.FlowJobName + flowName := s.FlowJobName s.logger.Info("setting up replication on source for peer flow") ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ @@ -55,19 +56,24 @@ func (s *SnapshotFlowExecution) setupReplication( }, }) - tblNameMapping := make(map[string]string, len(s.config.TableMappings)) - for _, v := range s.config.TableMappings { + config, err := internal.FetchConfigFromDB(s.FlowJobName) + if err != nil { + return nil, fmt.Errorf("unable to fetch config from DB for flow-job-name %s; err : %w", s.FlowJobName, err) + } + + tblNameMapping := make(map[string]string, len(s.AdditionalTables)) + for _, v := range s.AdditionalTables { tblNameMapping[v.SourceTableIdentifier] = v.DestinationTableIdentifier } setupReplicationInput := &protos.SetupReplicationInput{ - PeerName: s.config.SourceName, + PeerName: config.SourceName, FlowJobName: flowName, TableNameMapping: tblNameMapping, - DoInitialSnapshot: s.config.DoInitialSnapshot, - ExistingPublicationName: s.config.PublicationName, - ExistingReplicationSlotName: s.config.ReplicationSlotName, - Env: s.config.Env, + DoInitialSnapshot: config.DoInitialSnapshot, + ExistingPublicationName: config.PublicationName, + ExistingReplicationSlotName: config.ReplicationSlotName, + Env: config.Env, } var res *protos.SetupReplicationOutput @@ -92,7 +98,7 @@ func (s *SnapshotFlowExecution) closeSlotKeepAlive( }, }) - if err := workflow.ExecuteActivity(ctx, snapshot.CloseSlotKeepAlive, s.config.FlowJobName).Get(ctx, nil); err != nil { + if err := workflow.ExecuteActivity(ctx, snapshot.CloseSlotKeepAlive, s.FlowJobName).Get(ctx, nil); err != nil { return fmt.Errorf("failed to close slot keep alive for peer flow: %w", err) } @@ -109,11 +115,16 @@ func (s *SnapshotFlowExecution) cloneTable( sourcePeerType protos.DBType, destinationPeerType protos.DBType, ) error { - flowName := s.config.FlowJobName + flowName := s.FlowJobName cloneLog := slog.Group("clone-log", slog.String(string(shared.FlowNameKey), flowName), slog.String("snapshotName", snapshotName)) + config, err := internal.FetchConfigFromDB(s.FlowJobName) + if err != nil { + return fmt.Errorf("unable to fetch config from DB for flow-job-name %s; err : %w", s.FlowJobName, err) + } + srcName := mapping.SourceTableIdentifier dstName := mapping.DestinationTableIdentifier originalRunID := workflow.GetInfo(ctx).OriginalRunID @@ -148,7 +159,7 @@ func (s *SnapshotFlowExecution) cloneTable( return workflow.ExecuteActivity( schemaCtx, snapshot.LoadTableSchema, - s.config.FlowJobName, + s.FlowJobName, dstName, ).Get(ctx, &tableSchema) } @@ -175,23 +186,26 @@ func (s *SnapshotFlowExecution) cloneTable( // usually MySQL supports double quotes with ANSI_QUOTES, but Vitess doesn't // Vitess currently only supports initial load so change here is enough srcTableEscaped := parsedSrcTable.String() - if sourcePeerType == protos.DBType_MYSQL { + + if dbtype, err := getPeerType(ctx, config.SourceName); err != nil { + return err + } else if dbtype == protos.DBType_MYSQL { srcTableEscaped = parsedSrcTable.MySQL() } numWorkers := uint32(8) - if s.config.SnapshotMaxParallelWorkers > 0 { - numWorkers = s.config.SnapshotMaxParallelWorkers + if config.SnapshotMaxParallelWorkers > 0 { + numWorkers = config.SnapshotMaxParallelWorkers } numRowsPerPartition := uint32(250000) - if s.config.SnapshotNumRowsPerPartition > 0 { - numRowsPerPartition = s.config.SnapshotNumRowsPerPartition + if config.SnapshotNumRowsPerPartition > 0 { + numRowsPerPartition = config.SnapshotNumRowsPerPartition } numPartitionsOverride := uint32(0) - if s.config.SnapshotNumPartitionsOverride > 0 { - numPartitionsOverride = s.config.SnapshotNumPartitionsOverride + if config.SnapshotNumPartitionsOverride > 0 { + numPartitionsOverride = config.SnapshotNumPartitionsOverride } snapshotWriteMode := &protos.QRepWriteMode{ @@ -208,7 +222,9 @@ func (s *SnapshotFlowExecution) cloneTable( // ensure document IDs are synchronized across initial load and CDC // for the same document - if destinationPeerType == protos.DBType_ELASTICSEARCH { + if dbtype, err := getPeerType(ctx, config.DestinationName); err != nil { + return err + } else if dbtype == protos.DBType_ELASTICSEARCH { if err := initTableSchema(); err != nil { return err } @@ -218,10 +234,10 @@ func (s *SnapshotFlowExecution) cloneTable( } } - config := &protos.QRepConfig{ + qRepConfig := &protos.QRepConfig{ FlowJobName: childWorkflowID, - SourceName: s.config.SourceName, - DestinationName: s.config.DestinationName, + SourceName: config.SourceName, + DestinationName: config.DestinationName, Query: query, WatermarkColumn: mapping.PartitionKey, WatermarkTable: srcName, @@ -231,20 +247,20 @@ func (s *SnapshotFlowExecution) cloneTable( NumRowsPerPartition: numRowsPerPartition, NumPartitionsOverride: numPartitionsOverride, MaxParallelWorkers: numWorkers, - StagingPath: s.config.SnapshotStagingPath, - SyncedAtColName: s.config.SyncedAtColName, - SoftDeleteColName: s.config.SoftDeleteColName, + StagingPath: config.SnapshotStagingPath, + SyncedAtColName: config.SyncedAtColName, + SoftDeleteColName: config.SoftDeleteColName, WriteMode: snapshotWriteMode, - System: s.config.System, - Script: s.config.Script, - Env: s.config.Env, + System: config.System, + Script: config.Script, + Env: config.Env, ParentMirrorName: flowName, Exclude: mapping.Exclude, Columns: mapping.Columns, - Version: s.config.Version, + Version: config.Version, } - return boundSelector.SpawnChild(childCtx, QRepFlowWorkflow, nil, config, nil) + return boundSelector.SpawnChild(childCtx, QRepFlowWorkflow, nil, qRepConfig, nil) } func (s *SnapshotFlowExecution) cloneTables( @@ -267,24 +283,28 @@ func (s *SnapshotFlowExecution) cloneTables( }, }) - var res *protos.GetDefaultPartitionKeyForTablesOutput - if err := workflow.ExecuteActivity(getParallelLoadKeyForTablesCtx, - snapshot.GetDefaultPartitionKeyForTables, s.config).Get(ctx, &res); err != nil { - return fmt.Errorf("failed to get default partition keys for tables: %w", err) + config, err := internal.FetchConfigFromDB(s.FlowJobName) + if err != nil { + return fmt.Errorf("unable to query flow config from catalog: %w", err) } - boundSelector := shared.NewBoundSelector(ctx, "CloneTablesSelector", maxParallelClones) - - sourcePeerType, err := getPeerType(ctx, s.config.SourceName) + sourcePeerType, err := getPeerType(ctx, config.SourceName) if err != nil { return err } - destinationPeerType, err := getPeerType(ctx, s.config.DestinationName) + destinationPeerType, err := getPeerType(ctx, config.DestinationName) if err != nil { return err } - for _, v := range s.config.TableMappings { + var res *protos.GetDefaultPartitionKeyForTablesOutput + if err := workflow.ExecuteActivity(getParallelLoadKeyForTablesCtx, + snapshot.GetDefaultPartitionKeyForTables, config).Get(ctx, &res); err != nil { + return fmt.Errorf("failed to get default partition keys for tables: %w", err) + } + + boundSelector := shared.NewBoundSelector(ctx, "CloneTablesSelector", maxParallelClones) + for _, v := range s.AdditionalTables { source := v.SourceTableIdentifier destination := v.DestinationTableIdentifier s.logger.Info( @@ -348,10 +368,17 @@ func (s *SnapshotFlowExecution) cloneTablesWithSlot( func SnapshotFlowWorkflow( ctx workflow.Context, - config *protos.FlowConnectionConfigs, + flowJobName string, + additionalTables []*protos.TableMapping, ) error { + config, err := internal.FetchConfigFromDB(flowJobName) + if err != nil { + return fmt.Errorf("unable to fetch config from DB: %w", err) + } + se := &SnapshotFlowExecution{ - config: config, + FlowJobName: flowJobName, + AdditionalTables: additionalTables, logger: log.With(workflow.GetLogger(ctx), slog.String(string(shared.FlowNameKey), config.FlowJobName), slog.String("sourcePeer", config.SourceName)), diff --git a/protos/flow.proto b/protos/flow.proto index 7dec8ed46b..044a8c3465 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -89,6 +89,7 @@ message FlowConnectionConfigs { map env = 24; uint32 version = 25; + map src_table_id_name_mapping = 27; } message RenameTableOption { @@ -127,11 +128,9 @@ message CreateTablesFromExistingOutput { } message SyncFlowOptions { - reserved 5; + reserved 4,5,6; uint32 batch_size = 1; uint64 idle_timeout_seconds = 3; - map src_table_id_name_mapping = 4; - repeated TableMapping table_mappings = 6; int32 number_of_syncs = 7; } @@ -195,13 +194,13 @@ message FieldDescription { } message SetupTableSchemaBatchInput { - reserved 2; + reserved 2,6; map env = 1; string flow_name = 3; TypeSystem system = 4; string peer_name = 5; - repeated TableMapping table_mappings = 6; uint32 version = 7; + repeated TableMapping filtered_table_mappings = 8; } message SetupNormalizedTableBatchInput {