Skip to content

Commit c901a49

Browse files
committed
Tenanative first go at removing tableMappings & storing in DB
1 parent e2cce3a commit c901a49

File tree

6 files changed

+112
-12
lines changed

6 files changed

+112
-12
lines changed

flow/activities/flowable.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1822,3 +1822,34 @@ func (a *FlowableActivity) ReportStatusMetric(ctx context.Context, status protos
18221822
)))
18231823
return nil
18241824
}
1825+
1826+
func (a *FlowableActivity) MigrateTableMappingsToCatalog(
1827+
ctx context.Context,
1828+
flowJobName string, tableMappings []*protos.TableMapping, version uint32,
1829+
) error {
1830+
logger := internal.LoggerFromCtx(ctx)
1831+
tx, err := a.CatalogPool.Begin(ctx)
1832+
if err != nil {
1833+
return fmt.Errorf("failed to begin transaction to migrate table mappings to catalog: %w", err)
1834+
}
1835+
defer shared.RollbackTx(tx, logger)
1836+
1837+
tableMappingsBytes := [][]byte{}
1838+
for _, tableMapping := range tableMappings {
1839+
tableMappingBytes, err := proto.Marshal(tableMapping)
1840+
if err != nil {
1841+
return fmt.Errorf("failed to marshal table mapping to migrate to catalog: %w", err)
1842+
}
1843+
tableMappingsBytes = append(tableMappingsBytes, tableMappingBytes)
1844+
}
1845+
1846+
stmt := `INSERT INTO table_mappings (flow_name, version, table_mapping) VALUES ($1, $2, $3)
1847+
ON CONFLICT (flow_name, version) DO UPDATE SET table_mapping = EXCLUDED.table_mapping`
1848+
_, err = tx.Exec(ctx, stmt, flowJobName, version, tableMappingsBytes)
1849+
1850+
if err := tx.Commit(ctx); err != nil {
1851+
return fmt.Errorf("failed to commit transaction to migrate table mappings to catalog: %w", err)
1852+
}
1853+
1854+
return nil
1855+
}

flow/activities/flowable_core.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,33 @@ func heartbeatRoutine(
5555
)
5656
}
5757

58+
func (a *FlowableActivity) getTableMappings(ctx context.Context, flowName string, version uint) ([]*protos.TableMapping, error) {
59+
rows, err := a.CatalogPool.Query(ctx, "select table_mapping from table_mappings where flow_name = $1 AND version = $2", flowName, version)
60+
if err != nil {
61+
return nil, err
62+
}
63+
64+
var tableMappingsBytes [][]byte
65+
var tableMappings []*protos.TableMapping = []*protos.TableMapping{}
66+
67+
err = rows.Scan([]any{&tableMappingsBytes})
68+
if err != nil {
69+
return nil, fmt.Errorf("failed to deserialize table mapping schema proto: %w", err)
70+
}
71+
72+
for _, tableMappingBytes := range tableMappingsBytes {
73+
74+
var tableMapping *protos.TableMapping
75+
if err := proto.Unmarshal(tableMappingBytes, tableMapping); err != nil {
76+
return nil, err
77+
}
78+
79+
tableMappings = append(tableMappings, tableMapping)
80+
}
81+
82+
return tableMappings, nil
83+
}
84+
5885
func (a *FlowableActivity) getTableNameSchemaMapping(ctx context.Context, flowName string) (map[string]*protos.TableSchema, error) {
5986
rows, err := a.CatalogPool.Query(ctx, "select table_name, table_schema from table_schema_mapping where flow_name = $1", flowName)
6087
if err != nil {

flow/internal/flow_configuration_helpers.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,22 @@ import (
1111
"github.com/PeerDB-io/peerdb/flow/shared"
1212
)
1313

14+
func TableNameMapping(tableMappings []*protos.TableMapping, resync bool) map[string]string {
15+
tblNameMapping := make(map[string]string, len(tableMappings))
16+
if resync {
17+
for _, mapping := range tableMappings {
18+
if mapping.Engine != protos.TableEngine_CH_ENGINE_NULL {
19+
mapping.DestinationTableIdentifier += "_resync"
20+
}
21+
}
22+
}
23+
for _, v := range tableMappings {
24+
tblNameMapping[v.SourceTableIdentifier] = v.DestinationTableIdentifier
25+
}
26+
27+
return tblNameMapping
28+
}
29+
1430
func FetchConfigFromDB(ctx context.Context, catalogPool shared.CatalogPool, flowName string) (*protos.FlowConnectionConfigsCore, error) {
1531
var configBytes sql.RawBytes
1632
if err := catalogPool.QueryRow(ctx,

flow/workflows/cdc_flow.go

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ func updateFlowConfigWithLatestSettings(
123123
cloneCfg := proto.CloneOf(cfg)
124124
cloneCfg.MaxBatchSize = state.SyncFlowOptions.BatchSize
125125
cloneCfg.IdleTimeoutSeconds = state.SyncFlowOptions.IdleTimeoutSeconds
126-
cloneCfg.TableMappings = state.SyncFlowOptions.TableMappings
126+
cloneCfg.TableMappingVersion = state.SyncFlowOptions.TableMappingVersion
127127
if state.SnapshotNumRowsPerPartition > 0 {
128128
cloneCfg.SnapshotNumRowsPerPartition = state.SnapshotNumRowsPerPartition
129129
}
@@ -221,6 +221,24 @@ func processCDCFlowConfigUpdate(
221221
}
222222
}
223223

224+
// MIGRATION: Move `tableMapping` to the DB
225+
if len(state.SyncFlowOptions.TableMappings) > 0 {
226+
migrateCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
227+
StartToCloseTimeout: 5 * time.Minute,
228+
})
229+
if err := workflow.ExecuteActivity(
230+
migrateCtx,
231+
flowable.MigrateTableMappingsToCatalog,
232+
cfg.FlowJobName,
233+
state.SyncFlowOptions.TableMappings,
234+
1, //hardcoding version as 1 for migration purposes.
235+
).Get(migrateCtx, nil); err != nil {
236+
return fmt.Errorf("failed to migrate TableMappings: %w", err)
237+
}
238+
state.SyncFlowOptions.TableMappings = nil
239+
state.SyncFlowOptions.TableMappingVersion = 1
240+
}
241+
224242
syncStateToConfigProtoInCatalog(ctx, cfg, state)
225243
return nil
226244
}
@@ -296,7 +314,7 @@ func processTableAdditions(
296314
alterPublicationAddAdditionalTablesFuture := workflow.ExecuteActivity(
297315
alterPublicationAddAdditionalTablesCtx,
298316
flowable.AddTablesToPublication,
299-
internal.MinimizeFlowConfiguration(cfg), flowConfigUpdate.AdditionalTables)
317+
cfg, flowConfigUpdate.AdditionalTables)
300318

301319
var res *CDCFlowWorkflowResult
302320
var addTablesFlowErr error
@@ -338,7 +356,6 @@ func processTableAdditions(
338356
childAddTablesCDCFlowFuture := workflow.ExecuteChildWorkflow(
339357
childAddTablesCDCFlowCtx,
340358
CDCFlowWorkflow,
341-
// TODO: `additonalTableCfg` this cannot be minimized in the main branch; but the limitation is minimal.
342359
additionalTablesCfg,
343360
nil,
344361
)
@@ -413,7 +430,7 @@ func processTableRemovals(
413430
rawTableCleanupFuture := workflow.ExecuteActivity(
414431
removeTablesCtx,
415432
flowable.RemoveTablesFromRawTable,
416-
internal.MinimizeFlowConfiguration(cfg), state.FlowConfigUpdate.RemovedTables)
433+
cfg, state.FlowConfigUpdate.RemovedTables)
417434
removeTablesSelector.AddFuture(rawTableCleanupFuture, func(f workflow.Future) {
418435
if err := f.Get(ctx, nil); err != nil {
419436
logger.Error("failed to clean up raw table for removed tables", slog.Any("error", err))
@@ -425,7 +442,7 @@ func processTableRemovals(
425442
removeTablesFromCatalogFuture := workflow.ExecuteActivity(
426443
removeTablesCtx,
427444
flowable.RemoveTablesFromCatalog,
428-
internal.MinimizeFlowConfiguration(cfg), state.FlowConfigUpdate.RemovedTables)
445+
cfg, state.FlowConfigUpdate.RemovedTables)
429446
removeTablesSelector.AddFuture(removeTablesFromCatalogFuture, func(f workflow.Future) {
430447
if err := f.Get(ctx, nil); err != nil {
431448
logger.Error("failed to clean up raw table for removed tables", slog.Any("error", err))
@@ -595,7 +612,7 @@ func CDCFlowWorkflow(
595612

596613
logger.Info("mirror resumed", slog.Duration("after", time.Since(startTime)))
597614
state.updateStatus(ctx, logger, protos.FlowStatus_STATUS_RUNNING)
598-
return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, internal.MinimizeFlowConfiguration(cfg), state)
615+
return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, cfg, state)
599616
}
600617

601618
originalRunID := workflow.GetInfo(ctx).OriginalRunID
@@ -677,7 +694,7 @@ func CDCFlowWorkflow(
677694
// Resync will rely rely on the `cfg.Resync` flag to rename the tables
678695
// during the snapshot process. This is how we're able to also remove the need
679696
// to sync the config back into the DB / not rely on the `state.TableMappings`.
680-
setupFlowFuture := workflow.ExecuteChildWorkflow(setupFlowCtx, SetupFlowWorkflow, internal.MinimizeFlowConfiguration(cfg))
697+
setupFlowFuture := workflow.ExecuteChildWorkflow(setupFlowCtx, SetupFlowWorkflow, cfg)
681698

682699
var setupFlowOutput *protos.SetupFlowOutput
683700
var setupFlowError error
@@ -813,7 +830,7 @@ func CDCFlowWorkflow(
813830
logger.Info("executed setup flow and snapshot flow, start running")
814831
state.updateStatus(ctx, logger, protos.FlowStatus_STATUS_RUNNING)
815832
}
816-
return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, internal.MinimizeFlowConfiguration(cfg), state)
833+
return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, cfg, state)
817834
}
818835

819836
var finished bool
@@ -825,7 +842,7 @@ func CDCFlowWorkflow(
825842
RetryPolicy: &temporal.RetryPolicy{MaximumAttempts: 1},
826843
}))
827844
state.SyncFlowOptions.TableMappings = []*protos.TableMapping{}
828-
syncFlowFuture := workflow.ExecuteActivity(syncCtx, flowable.SyncFlow, internal.MinimizeFlowConfiguration(cfg), state.SyncFlowOptions)
845+
syncFlowFuture := workflow.ExecuteActivity(syncCtx, flowable.SyncFlow, cfg, state.SyncFlowOptions)
829846

830847
mainLoopSelector := workflow.NewNamedSelector(ctx, "MainLoop")
831848
mainLoopSelector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {
@@ -946,7 +963,7 @@ func CDCFlowWorkflow(
946963
if state.ActiveSignal == model.TerminateSignal || state.ActiveSignal == model.ResyncSignal {
947964
return state, workflow.NewContinueAsNewError(ctx, DropFlowWorkflow, state.DropFlowInput)
948965
}
949-
return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, internal.MinimizeFlowConfiguration(cfg), state)
966+
return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, cfg, state)
950967
}
951968
}
952969
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
CREATE TABLE IF NOT EXISTS table_mappings (
2+
flow_name varchar(255) not null,Expand commentComment on line R2ResolvedCode has comments. Press enter to view.
3+
version int32 not null default 1,
4+
table_mappings []bytea not null,
5+
primary key (flow_name, version)
6+
);

protos/flow.proto

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ message TableMapping {
4141
string sharding_key = 7;
4242
string policy_name = 8;
4343
string partition_by_expr = 9;
44+
uint32 version = 10;
4445
}
4546

4647
message SetupInput {
@@ -52,12 +53,12 @@ message SetupInput {
5253
// FlowConnectionConfigs is for external use by the API, maintaining backwards compatibility
5354
// When adding fields here, add them to FlowConnectionConfigsCore too
5455
message FlowConnectionConfigs {
55-
reserved 2,3,17;
56+
reserved 2,3,17,4;
5657
string flow_job_name = 1;
5758

5859
// config for the CDC flow itself
5960
// currently, TableMappings, MaxBatchSize and IdleTimeoutSeconds are dynamic via Temporal signals
60-
repeated TableMapping table_mappings = 4;
61+
//repeatable TableMapping table_mappings = 4;
6162
uint32 max_batch_size = 5;
6263
uint64 idle_timeout_seconds = 6;
6364
string cdc_staging_path = 7;
@@ -91,6 +92,7 @@ message FlowConnectionConfigs {
9192

9293
map<string, string> env = 24;
9394
uint32 version = 25;
95+
uint32 table_mapping_version = 27;
9496
}
9597

9698
// FlowConnectionConfigsCore is used internally in the codebase, it is safe to remove (mark reserved) fields from it
@@ -181,6 +183,7 @@ message SyncFlowOptions {
181183
map<uint32, string> src_table_id_name_mapping = 4;
182184
repeated TableMapping table_mappings = 6;
183185
int32 number_of_syncs = 7;
186+
uint32 table_mapping_version = 8;
184187
}
185188

186189
message EnsurePullabilityBatchInput {

0 commit comments

Comments
 (0)