Skip to content

Commit e6add9b

Browse files
committed
Resolve issues from deprecating TableMappings pt1
1 parent 069d870 commit e6add9b

File tree

10 files changed

+102
-41
lines changed

10 files changed

+102
-41
lines changed

flow/activities/flowable.go

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1700,13 +1700,9 @@ func (a *FlowableActivity) MigrateTableMappingsToCatalog(
17001700
}
17011701
defer shared.RollbackTx(tx, logger)
17021702

1703-
tableMappingsBytes := [][]byte{}
1704-
for _, tableMapping := range tableMappings {
1705-
tableMappingBytes, err := proto.Marshal(tableMapping)
1706-
if err != nil {
1707-
return fmt.Errorf("failed to marshal table mapping to migrate to catalog: %w", err)
1708-
}
1709-
tableMappingsBytes = append(tableMappingsBytes, tableMappingBytes)
1703+
tableMappingsBytes, err := internal.TableMappingsToBytes(tableMappings)
1704+
if err != nil {
1705+
return fmt.Errorf("unable to marshal table mappings: %w", err)
17101706
}
17111707

17121708
stmt := `INSERT INTO table_mappings (flow_name, version, table_mapping) VALUES ($1, $2, $3)

flow/activities/flowable_core.go

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -55,32 +55,32 @@ func heartbeatRoutine(
5555
)
5656
}
5757

58-
func (a *FlowableActivity) getTableMappings(ctx context.Context, flowName string, version uint) ([]*protos.TableMapping, error) {
59-
rows, err := a.CatalogPool.Query(ctx, "select table_mapping from table_mappings where flow_name = $1 AND version = $2", flowName, version)
60-
if err != nil {
61-
return nil, err
62-
}
58+
//func (a *FlowableActivity) getTableMappings(ctx context.Context, flowName string, version uint) ([]*protos.TableMapping, error) {
59+
//rows, err := a.CatalogPool.Query(ctx, "select table_mapping from table_mappings where flow_name = $1 AND version = $2", flowName, version)
60+
//if err != nil {
61+
//return nil, err
62+
//}
6363

64-
var tableMappingsBytes [][]byte
65-
var tableMappings []*protos.TableMapping = []*protos.TableMapping{}
64+
//var tableMappingsBytes [][]byte
65+
//var tableMappings []*protos.TableMapping = []*protos.TableMapping{}
6666

67-
err = rows.Scan([]any{&tableMappingsBytes})
68-
if err != nil {
69-
return nil, fmt.Errorf("failed to deserialize table mapping schema proto: %w", err)
70-
}
67+
//err = rows.Scan([]any{&tableMappingsBytes})
68+
//if err != nil {
69+
//return nil, fmt.Errorf("failed to deserialize table mapping schema proto: %w", err)
70+
//}
7171

72-
for _, tableMappingBytes := range tableMappingsBytes {
72+
//for _, tableMappingBytes := range tableMappingsBytes {
7373

74-
var tableMapping *protos.TableMapping
75-
if err := proto.Unmarshal(tableMappingBytes, tableMapping); err != nil {
76-
return nil, err
77-
}
74+
//var tableMapping *protos.TableMapping
75+
//if err := proto.Unmarshal(tableMappingBytes, tableMapping); err != nil {
76+
//return nil, err
77+
//}
7878

79-
tableMappings = append(tableMappings, tableMapping)
80-
}
79+
//tableMappings = append(tableMappings, tableMapping)
80+
//}
8181

82-
return tableMappings, nil
83-
}
82+
//return tableMappings, nil
83+
//}
8484

8585
func (a *FlowableActivity) getTableNameSchemaMapping(ctx context.Context, flowName string) (map[string]*protos.TableSchema, error) {
8686
rows, err := a.CatalogPool.Query(ctx, "select table_name, table_schema from table_schema_mapping where flow_name = $1", flowName)

flow/cmd/handler.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,17 @@ func (h *FlowRequestHandler) createCdcJobEntry(ctx context.Context,
9494
req.ConnectionConfigs.FlowJobName, err)
9595
}
9696

97+
// Insert the table mappings into the DB
98+
tableMappingsBytes, err := internal.TableMappingsToBytes(req.TableMappings)
99+
if err != nil {
100+
return fmt.Errorf("unable to marshal table mappings: %w", err)
101+
}
102+
103+
stmt := `INSERT INTO table_mappings (flow_name, version, table_mapping) VALUES ($1, $2, $3)
104+
ON CONFLICT (flow_name, version) DO UPDATE SET table_mapping = EXCLUDED.table_mapping`
105+
version := 1
106+
_, err = h.pool.Exec(ctx, stmt, req.ConnectionConfigs.FlowJobName, version, tableMappingsBytes)
107+
97108
return nil
98109
}
99110

@@ -457,12 +468,18 @@ func (h *FlowRequestHandler) FlowStateChange(
457468
if err != nil {
458469
return nil, NewInternalApiError(fmt.Errorf("unable to get flow config: %w", err))
459470
}
471+
tableMappings, err := internal.FetchTableMappingsFromDB(ctx, h.pool, req.FlowJobName)
472+
473+
if err != nil {
474+
return nil, NewInternalApiError(fmt.Errorf("unable to get table mappings: %w", err))
475+
}
460476

461477
config.Resync = true
462478
config.DoInitialSnapshot = true
463479
// validate mirror first because once the mirror is dropped, there's no going back
464480
if _, err := h.ValidateCDCMirror(ctx, &protos.CreateCDCFlowRequest{
465481
ConnectionConfigs: config,
482+
TableMappings: tableMappings,
466483
}); err != nil {
467484
return nil, NewFailedPreconditionApiError(fmt.Errorf("invalid mirror: %w", err))
468485
}

flow/cmd/validate_mirror.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func (h *FlowRequestHandler) validateCDCMirrorImpl(
6363
errors.New("invalid config: initial_snapshot_only is true but do_initial_snapshot is false"))
6464
}
6565

66-
for _, tm := range req.ConnectionConfigs.TableMappings {
66+
for _, tm := range req.TableMappings {
6767
for _, col := range tm.Columns {
6868
if !CustomColumnTypeRegex.MatchString(col.DestinationType) {
6969
return nil, NewInvalidArgumentApiError(errors.New("invalid custom column type " + col.DestinationType))
@@ -102,7 +102,7 @@ func (h *FlowRequestHandler) validateCDCMirrorImpl(
102102
if !req.ConnectionConfigs.Resync {
103103
var getTableSchemaError error
104104
tableSchemaMap, getTableSchemaError = srcConn.GetTableSchema(ctx, req.ConnectionConfigs.Env, req.ConnectionConfigs.Version,
105-
req.ConnectionConfigs.System, req.ConnectionConfigs.TableMappings)
105+
req.ConnectionConfigs.System, req.TableMappings)
106106
if getTableSchemaError != nil {
107107
return nil, NewFailedPreconditionApiError(fmt.Errorf("failed to get source table schema: %w", getTableSchemaError))
108108
}

flow/connectors/clickhouse/validate.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ func (c *ClickHouseConnector) ValidateMirrorDestination(
1616
ctx context.Context,
1717
cfg *protos.FlowConnectionConfigs,
1818
tableNameSchemaMapping map[string]*protos.TableSchema,
19+
tableMappings []*protos.TableMapping,
1920
) error {
2021
if internal.PeerDBOnlyClickHouseAllowed() {
2122
err := chvalidate.CheckIfClickHouseCloudHasSharedMergeTreeEnabled(ctx, c.logger, c.database)
@@ -34,7 +35,7 @@ func (c *ClickHouseConnector) ValidateMirrorDestination(
3435
}
3536

3637
// this is for handling column exclusion, processed schema does that in a step
37-
processedMapping := internal.BuildProcessedSchemaMapping(cfg.TableMappings, tableNameSchemaMapping, c.logger)
38+
processedMapping := internal.BuildProcessedSchemaMapping(tableMappings, tableNameSchemaMapping, c.logger)
3839
dstTableNames := slices.Collect(maps.Keys(processedMapping))
3940

4041
// In the case of resync, we don't need to check the content or structure of the original tables;
@@ -64,7 +65,7 @@ func (c *ClickHouseConnector) ValidateMirrorDestination(
6465
return err
6566
}
6667

67-
for _, tableMapping := range cfg.TableMappings {
68+
for _, tableMapping := range tableMappings {
6869
dstTableName := tableMapping.DestinationTableIdentifier
6970
if _, ok := processedMapping[dstTableName]; !ok {
7071
// if destination table is not a key, that means source table was not a key in the original schema mapping(?)

flow/connectors/core.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,13 @@ type ValidationConnector interface {
4444
type MirrorSourceValidationConnector interface {
4545
GetTableSchemaConnector
4646

47-
ValidateMirrorSource(context.Context, *protos.FlowConnectionConfigs) error
47+
ValidateMirrorSource(context.Context, *protos.FlowConnectionConfigs, []*protos.TableMapping) error
4848
}
4949

5050
type MirrorDestinationValidationConnector interface {
5151
Connector
5252

53-
ValidateMirrorDestination(context.Context, *protos.FlowConnectionConfigs, map[string]*protos.TableSchema) error
53+
ValidateMirrorDestination(context.Context, *protos.FlowConnectionConfigs, map[string]*protos.TableSchema, []*protos.TableMapping) error
5454
}
5555

5656
type StatActivityConnector interface {

flow/connectors/mysql/validate.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,9 @@ func (c *MySqlConnector) CheckBinlogSettings(ctx context.Context, requireRowMeta
7575
return errors.New("failed to connect to MySQL server")
7676
}
7777

78-
func (c *MySqlConnector) ValidateMirrorSource(ctx context.Context, cfg *protos.FlowConnectionConfigs) error {
79-
sourceTables := make([]*utils.SchemaTable, 0, len(cfg.TableMappings))
80-
for _, tableMapping := range cfg.TableMappings {
78+
func (c *MySqlConnector) ValidateMirrorSource(ctx context.Context, cfg *protos.FlowConnectionConfigs, tableMappings []*protos.TableMapping) error {
79+
sourceTables := make([]*utils.SchemaTable, 0, len(tableMappings))
80+
for _, tableMapping := range tableMappings {
8181
parsedTable, parseErr := utils.ParseSchemaTable(tableMapping.SourceTableIdentifier)
8282
if parseErr != nil {
8383
return fmt.Errorf("invalid source table identifier: %w", parseErr)
@@ -110,7 +110,7 @@ func (c *MySqlConnector) ValidateMirrorSource(ctx context.Context, cfg *protos.F
110110
}
111111

112112
requireRowMetadata := false
113-
for _, tm := range cfg.TableMappings {
113+
for _, tm := range tableMappings {
114114
if len(tm.Exclude) > 0 {
115115
requireRowMetadata = true
116116
break

flow/connectors/postgres/validate.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ func (c *PostgresConnector) CheckPublicationCreationPermissions(ctx context.Cont
213213
return nil
214214
}
215215

216-
func (c *PostgresConnector) ValidateMirrorSource(ctx context.Context, cfg *protos.FlowConnectionConfigs) error {
216+
func (c *PostgresConnector) ValidateMirrorSource(ctx context.Context, cfg *protos.FlowConnectionConfigs, tableMappings []*protos.TableMapping) error {
217217
noCDC := cfg.DoInitialSnapshot && cfg.InitialSnapshotOnly
218218
if !noCDC {
219219
// Check replication connectivity
@@ -227,8 +227,8 @@ func (c *PostgresConnector) ValidateMirrorSource(ctx context.Context, cfg *proto
227227
}
228228
}
229229

230-
sourceTables := make([]*utils.SchemaTable, 0, len(cfg.TableMappings))
231-
for _, tableMapping := range cfg.TableMappings {
230+
sourceTables := make([]*utils.SchemaTable, 0, len(tableMappings))
231+
for _, tableMapping := range tableMappings {
232232
parsedTable, parseErr := utils.ParseSchemaTable(tableMapping.SourceTableIdentifier)
233233
if parseErr != nil {
234234
return fmt.Errorf("invalid source table identifier: %w", parseErr)
@@ -249,7 +249,7 @@ func (c *PostgresConnector) ValidateMirrorSource(ctx context.Context, cfg *proto
249249
}
250250
}
251251

252-
if err := c.CheckSourceTables(ctx, sourceTables, cfg.TableMappings, pubName, noCDC); err != nil {
252+
if err := c.CheckSourceTables(ctx, sourceTables, tableMappings, pubName, noCDC); err != nil {
253253
return fmt.Errorf("provided source tables invalidated: %w", err)
254254
}
255255

flow/internal/flow_configuration_helpers.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,49 @@ func FetchConfigFromDB(flowName string, ctx context.Context) (*protos.FlowConnec
4141

4242
return &cfgFromDB, nil
4343
}
44+
45+
func TableMappingsToBytes(tableMappings []*protos.TableMapping) ([][]byte, error) {
46+
tableMappingsBytes := [][]byte{}
47+
for _, tableMapping := range tableMappings {
48+
tableMappingBytes, err := proto.Marshal(tableMapping)
49+
if err != nil {
50+
return nil, fmt.Errorf("failed to marshal table mapping to migrate to catalog: %w", err)
51+
}
52+
tableMappingsBytes = append(tableMappingsBytes, tableMappingBytes)
53+
}
54+
return tableMappingsBytes, nil
55+
}
56+
57+
func FetchTableMappingsFromDB(ctx context.Context, flowJobName string, version uint32) ([]*protos.TableMapping, error) {
58+
pool, err := GetCatalogConnectionPoolFromEnv(ctx)
59+
if err != nil {
60+
return nil, err
61+
}
62+
rows, err := pool.Query(ctx,
63+
"select table_mapping from table_mappings where flow_name = $1 AND version = $2",
64+
flowJobName, version,
65+
)
66+
if err != nil {
67+
return nil, err
68+
}
69+
70+
var tableMappingsBytes [][]byte
71+
var tableMappings []*protos.TableMapping = []*protos.TableMapping{}
72+
73+
err = rows.Scan([]any{&tableMappingsBytes})
74+
if err != nil {
75+
return nil, fmt.Errorf("failed to deserialize table mapping schema proto: %w", err)
76+
}
77+
78+
for _, tableMappingBytes := range tableMappingsBytes {
79+
80+
var tableMapping *protos.TableMapping
81+
if err := proto.Unmarshal(tableMappingBytes, tableMapping); err != nil {
82+
return nil, err
83+
}
84+
85+
tableMappings = append(tableMappings, tableMapping)
86+
}
87+
88+
return tableMappings, nil
89+
}

protos/route.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ package peerdb_route;
1212
message CreateCDCFlowRequest {
1313
peerdb_flow.FlowConnectionConfigs connection_configs = 1;
1414
bool attach_to_existing = 2;
15+
repeated peerdb_flow.TableMapping table_mappings = 3;
1516
}
1617

1718
message CreateCDCFlowResponse { string workflow_id = 1; }

0 commit comments

Comments
 (0)