Skip to content

Commit 069d870

Browse files
committed
Tenanative first go at removing tableMappings & storing in DB
1 parent 1525dfe commit 069d870

File tree

6 files changed

+96
-21
lines changed

6 files changed

+96
-21
lines changed

flow/activities/flowable.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1688,3 +1688,34 @@ func (a *FlowableActivity) ReportStatusMetric(ctx context.Context, status protos
16881688
)))
16891689
return nil
16901690
}
1691+
1692+
func (a *FlowableActivity) MigrateTableMappingsToCatalog(
1693+
ctx context.Context,
1694+
flowJobName string, tableMappings []*protos.TableMapping, version uint32,
1695+
) error {
1696+
logger := internal.LoggerFromCtx(ctx)
1697+
tx, err := a.CatalogPool.Begin(ctx)
1698+
if err != nil {
1699+
return fmt.Errorf("failed to begin transaction to migrate table mappings to catalog: %w", err)
1700+
}
1701+
defer shared.RollbackTx(tx, logger)
1702+
1703+
tableMappingsBytes := [][]byte{}
1704+
for _, tableMapping := range tableMappings {
1705+
tableMappingBytes, err := proto.Marshal(tableMapping)
1706+
if err != nil {
1707+
return fmt.Errorf("failed to marshal table mapping to migrate to catalog: %w", err)
1708+
}
1709+
tableMappingsBytes = append(tableMappingsBytes, tableMappingBytes)
1710+
}
1711+
1712+
stmt := `INSERT INTO table_mappings (flow_name, version, table_mapping) VALUES ($1, $2, $3)
1713+
ON CONFLICT (flow_name, version) DO UPDATE SET table_mapping = EXCLUDED.table_mapping`
1714+
_, err = tx.Exec(ctx, stmt, flowJobName, version, tableMappingsBytes)
1715+
1716+
if err := tx.Commit(ctx); err != nil {
1717+
return fmt.Errorf("failed to commit transaction to migrate table mappings to catalog: %w", err)
1718+
}
1719+
1720+
return nil
1721+
}

flow/activities/flowable_core.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,33 @@ func heartbeatRoutine(
5555
)
5656
}
5757

58+
func (a *FlowableActivity) getTableMappings(ctx context.Context, flowName string, version uint) ([]*protos.TableMapping, error) {
59+
rows, err := a.CatalogPool.Query(ctx, "select table_mapping from table_mappings where flow_name = $1 AND version = $2", flowName, version)
60+
if err != nil {
61+
return nil, err
62+
}
63+
64+
var tableMappingsBytes [][]byte
65+
var tableMappings []*protos.TableMapping = []*protos.TableMapping{}
66+
67+
err = rows.Scan([]any{&tableMappingsBytes})
68+
if err != nil {
69+
return nil, fmt.Errorf("failed to deserialize table mapping schema proto: %w", err)
70+
}
71+
72+
for _, tableMappingBytes := range tableMappingsBytes {
73+
74+
var tableMapping *protos.TableMapping
75+
if err := proto.Unmarshal(tableMappingBytes, tableMapping); err != nil {
76+
return nil, err
77+
}
78+
79+
tableMappings = append(tableMappings, tableMapping)
80+
}
81+
82+
return tableMappings, nil
83+
}
84+
5885
func (a *FlowableActivity) getTableNameSchemaMapping(ctx context.Context, flowName string) (map[string]*protos.TableSchema, error) {
5986
rows, err := a.CatalogPool.Query(ctx, "select table_name, table_schema from table_schema_mapping where flow_name = $1", flowName)
6087
if err != nil {

flow/internal/flow_configuration_helpers.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,6 @@ func TableNameMapping(tableMappings []*protos.TableMapping, resync bool) map[str
2525
return tblNameMapping
2626
}
2727

28-
func MinimizeFlowConfiguration(cfg *protos.FlowConnectionConfigs) *protos.FlowConnectionConfigs {
29-
if cfg == nil {
30-
return nil
31-
}
32-
cfg.TableMappings = nil
33-
return cfg
34-
}
35-
3628
func FetchConfigFromDB(flowName string, ctx context.Context) (*protos.FlowConnectionConfigs, error) {
3729
var configBytes sql.RawBytes
3830
pool, _ := GetCatalogConnectionPoolFromEnv(ctx)

flow/workflows/cdc_flow.go

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ func updateFlowConfigWithLatestSettings(
101101
cloneCfg := proto.CloneOf(cfg)
102102
cloneCfg.MaxBatchSize = state.SyncFlowOptions.BatchSize
103103
cloneCfg.IdleTimeoutSeconds = state.SyncFlowOptions.IdleTimeoutSeconds
104-
cloneCfg.TableMappings = state.SyncFlowOptions.TableMappings
104+
cloneCfg.TableMappingVersion = state.SyncFlowOptions.TableMappingVersion
105105
if state.SnapshotNumRowsPerPartition > 0 {
106106
cloneCfg.SnapshotNumRowsPerPartition = state.SnapshotNumRowsPerPartition
107107
}
@@ -125,7 +125,6 @@ func syncStateToConfigProtoInCatalog(
125125
cfg *protos.FlowConnectionConfigs,
126126
state *CDCFlowWorkflowState,
127127
) *protos.FlowConnectionConfigs {
128-
// TODO: make sure that `cfg` includes table mappings.
129128
cloneCfg := updateFlowConfigWithLatestSettings(cfg, state)
130129
uploadConfigToCatalog(ctx, cloneCfg)
131130
return cloneCfg
@@ -200,6 +199,24 @@ func processCDCFlowConfigUpdate(
200199
}
201200
}
202201

202+
// MIGRATION: Move `tableMapping` to the DB
203+
if len(state.SyncFlowOptions.TableMappings) > 0 {
204+
migrateCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
205+
StartToCloseTimeout: 5 * time.Minute,
206+
})
207+
if err := workflow.ExecuteActivity(
208+
migrateCtx,
209+
flowable.MigrateTableMappingsToCatalog,
210+
cfg.FlowJobName,
211+
state.SyncFlowOptions.TableMappings,
212+
1, //hardcoding version as 1 for migration purposes.
213+
).Get(migrateCtx, nil); err != nil {
214+
return fmt.Errorf("failed to migrate TableMappings: %w", err)
215+
}
216+
state.SyncFlowOptions.TableMappings = nil
217+
state.SyncFlowOptions.TableMappingVersion = 1
218+
}
219+
203220
syncStateToConfigProtoInCatalog(ctx, cfg, state)
204221
return nil
205222
}
@@ -275,7 +292,7 @@ func processTableAdditions(
275292
alterPublicationAddAdditionalTablesFuture := workflow.ExecuteActivity(
276293
alterPublicationAddAdditionalTablesCtx,
277294
flowable.AddTablesToPublication,
278-
internal.MinimizeFlowConfiguration(cfg), flowConfigUpdate.AdditionalTables)
295+
cfg, flowConfigUpdate.AdditionalTables)
279296

280297
var res *CDCFlowWorkflowResult
281298
var addTablesFlowErr error
@@ -317,7 +334,6 @@ func processTableAdditions(
317334
childAddTablesCDCFlowFuture := workflow.ExecuteChildWorkflow(
318335
childAddTablesCDCFlowCtx,
319336
CDCFlowWorkflow,
320-
// TODO: `additonalTableCfg` this cannot be minimized in the main branch; but the limitation is minimal.
321337
additionalTablesCfg,
322338
nil,
323339
)
@@ -392,7 +408,7 @@ func processTableRemovals(
392408
rawTableCleanupFuture := workflow.ExecuteActivity(
393409
removeTablesCtx,
394410
flowable.RemoveTablesFromRawTable,
395-
internal.MinimizeFlowConfiguration(cfg), state.FlowConfigUpdate.RemovedTables)
411+
cfg, state.FlowConfigUpdate.RemovedTables)
396412
removeTablesSelector.AddFuture(rawTableCleanupFuture, func(f workflow.Future) {
397413
if err := f.Get(ctx, nil); err != nil {
398414
logger.Error("failed to clean up raw table for removed tables", slog.Any("error", err))
@@ -404,7 +420,7 @@ func processTableRemovals(
404420
removeTablesFromCatalogFuture := workflow.ExecuteActivity(
405421
removeTablesCtx,
406422
flowable.RemoveTablesFromCatalog,
407-
internal.MinimizeFlowConfiguration(cfg), state.FlowConfigUpdate.RemovedTables)
423+
cfg, state.FlowConfigUpdate.RemovedTables)
408424
removeTablesSelector.AddFuture(removeTablesFromCatalogFuture, func(f workflow.Future) {
409425
if err := f.Get(ctx, nil); err != nil {
410426
logger.Error("failed to clean up raw table for removed tables", slog.Any("error", err))
@@ -574,7 +590,7 @@ func CDCFlowWorkflow(
574590

575591
logger.Info("mirror resumed", slog.Duration("after", time.Since(startTime)))
576592
state.updateStatus(ctx, logger, protos.FlowStatus_STATUS_RUNNING)
577-
return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, internal.MinimizeFlowConfiguration(cfg), state)
593+
return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, cfg, state)
578594
}
579595

580596
originalRunID := workflow.GetInfo(ctx).OriginalRunID
@@ -656,7 +672,7 @@ func CDCFlowWorkflow(
656672
// Resync will rely rely on the `cfg.Resync` flag to rename the tables
657673
// during the snapshot process. This is how we're able to also remove the need
658674
// to sync the config back into the DB / not rely on the `state.TableMappings`.
659-
setupFlowFuture := workflow.ExecuteChildWorkflow(setupFlowCtx, SetupFlowWorkflow, internal.MinimizeFlowConfiguration(cfg))
675+
setupFlowFuture := workflow.ExecuteChildWorkflow(setupFlowCtx, SetupFlowWorkflow, cfg)
660676

661677
var setupFlowOutput *protos.SetupFlowOutput
662678
var setupFlowError error
@@ -792,7 +808,7 @@ func CDCFlowWorkflow(
792808
logger.Info("executed setup flow and snapshot flow, start running")
793809
state.updateStatus(ctx, logger, protos.FlowStatus_STATUS_RUNNING)
794810
}
795-
return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, internal.MinimizeFlowConfiguration(cfg), state)
811+
return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, cfg, state)
796812
}
797813

798814
var finished bool
@@ -804,7 +820,7 @@ func CDCFlowWorkflow(
804820
RetryPolicy: &temporal.RetryPolicy{MaximumAttempts: 1},
805821
}))
806822
state.SyncFlowOptions.TableMappings = []*protos.TableMapping{}
807-
syncFlowFuture := workflow.ExecuteActivity(syncCtx, flowable.SyncFlow, internal.MinimizeFlowConfiguration(cfg), state.SyncFlowOptions)
823+
syncFlowFuture := workflow.ExecuteActivity(syncCtx, flowable.SyncFlow, cfg, state.SyncFlowOptions)
808824

809825
mainLoopSelector := workflow.NewNamedSelector(ctx, "MainLoop")
810826
mainLoopSelector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {
@@ -925,7 +941,7 @@ func CDCFlowWorkflow(
925941
if state.ActiveSignal == model.TerminateSignal || state.ActiveSignal == model.ResyncSignal {
926942
return state, workflow.NewContinueAsNewError(ctx, DropFlowWorkflow, state.DropFlowInput)
927943
}
928-
return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, internal.MinimizeFlowConfiguration(cfg), state)
944+
return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, cfg, state)
929945
}
930946
}
931947
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
CREATE TABLE IF NOT EXISTS table_mappings (
2+
flow_name varchar(255) not null,Expand commentComment on line R2ResolvedCode has comments. Press enter to view.
3+
version int32 not null default 1,
4+
table_mappings []bytea not null,
5+
primary key (flow_name, version)
6+
);

protos/flow.proto

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ message TableMapping {
4141
string sharding_key = 7;
4242
string policy_name = 8;
4343
string partition_by_expr = 9;
44+
uint32 version = 10;
4445
}
4546

4647
message SetupInput {
@@ -50,12 +51,12 @@ message SetupInput {
5051
}
5152

5253
message FlowConnectionConfigs {
53-
reserved 2,3,17;
54+
reserved 2,3,17,4;
5455
string flow_job_name = 1;
5556

5657
// config for the CDC flow itself
5758
// currently, TableMappings, MaxBatchSize and IdleTimeoutSeconds are dynamic via Temporal signals
58-
repeated TableMapping table_mappings = 4;
59+
//repeatable TableMapping table_mappings = 4;
5960
uint32 max_batch_size = 5;
6061
uint64 idle_timeout_seconds = 6;
6162
string cdc_staging_path = 7;
@@ -89,6 +90,7 @@ message FlowConnectionConfigs {
8990

9091
map<string, string> env = 24;
9192
uint32 version = 25;
93+
uint32 table_mapping_version = 27;
9294
}
9395

9496
message RenameTableOption {
@@ -133,6 +135,7 @@ message SyncFlowOptions {
133135
map<uint32, string> src_table_id_name_mapping = 4;
134136
repeated TableMapping table_mappings = 6;
135137
int32 number_of_syncs = 7;
138+
uint32 table_mapping_version = 8;
136139
}
137140

138141
message EnsurePullabilityBatchInput {

0 commit comments

Comments
 (0)