Skip to content

Commit b8f06ba

Browse files
committed
Resolve issues from deprecating TableMappings pt1
1 parent c901a49 commit b8f06ba

File tree

6 files changed

+90
-29
lines changed

6 files changed

+90
-29
lines changed

flow/activities/flowable.go

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1834,13 +1834,9 @@ func (a *FlowableActivity) MigrateTableMappingsToCatalog(
18341834
}
18351835
defer shared.RollbackTx(tx, logger)
18361836

1837-
tableMappingsBytes := [][]byte{}
1838-
for _, tableMapping := range tableMappings {
1839-
tableMappingBytes, err := proto.Marshal(tableMapping)
1840-
if err != nil {
1841-
return fmt.Errorf("failed to marshal table mapping to migrate to catalog: %w", err)
1842-
}
1843-
tableMappingsBytes = append(tableMappingsBytes, tableMappingBytes)
1837+
tableMappingsBytes, err := internal.TableMappingsToBytes(tableMappings)
1838+
if err != nil {
1839+
return fmt.Errorf("unable to marshal table mappings: %w", err)
18441840
}
18451841

18461842
stmt := `INSERT INTO table_mappings (flow_name, version, table_mapping) VALUES ($1, $2, $3)

flow/activities/flowable_core.go

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -55,32 +55,32 @@ func heartbeatRoutine(
5555
)
5656
}
5757

58-
func (a *FlowableActivity) getTableMappings(ctx context.Context, flowName string, version uint) ([]*protos.TableMapping, error) {
59-
rows, err := a.CatalogPool.Query(ctx, "select table_mapping from table_mappings where flow_name = $1 AND version = $2", flowName, version)
60-
if err != nil {
61-
return nil, err
62-
}
58+
//func (a *FlowableActivity) getTableMappings(ctx context.Context, flowName string, version uint) ([]*protos.TableMapping, error) {
59+
//rows, err := a.CatalogPool.Query(ctx, "select table_mapping from table_mappings where flow_name = $1 AND version = $2", flowName, version)
60+
//if err != nil {
61+
//return nil, err
62+
//}
6363

64-
var tableMappingsBytes [][]byte
65-
var tableMappings []*protos.TableMapping = []*protos.TableMapping{}
64+
//var tableMappingsBytes [][]byte
65+
//var tableMappings []*protos.TableMapping = []*protos.TableMapping{}
6666

67-
err = rows.Scan([]any{&tableMappingsBytes})
68-
if err != nil {
69-
return nil, fmt.Errorf("failed to deserialize table mapping schema proto: %w", err)
70-
}
67+
//err = rows.Scan([]any{&tableMappingsBytes})
68+
//if err != nil {
69+
//return nil, fmt.Errorf("failed to deserialize table mapping schema proto: %w", err)
70+
//}
7171

72-
for _, tableMappingBytes := range tableMappingsBytes {
72+
//for _, tableMappingBytes := range tableMappingsBytes {
7373

74-
var tableMapping *protos.TableMapping
75-
if err := proto.Unmarshal(tableMappingBytes, tableMapping); err != nil {
76-
return nil, err
77-
}
74+
//var tableMapping *protos.TableMapping
75+
//if err := proto.Unmarshal(tableMappingBytes, tableMapping); err != nil {
76+
//return nil, err
77+
//}
7878

79-
tableMappings = append(tableMappings, tableMapping)
80-
}
79+
//tableMappings = append(tableMappings, tableMapping)
80+
//}
8181

82-
return tableMappings, nil
83-
}
82+
//return tableMappings, nil
83+
//}
8484

8585
func (a *FlowableActivity) getTableNameSchemaMapping(ctx context.Context, flowName string) (map[string]*protos.TableSchema, error) {
8686
rows, err := a.CatalogPool.Query(ctx, "select table_name, table_schema from table_schema_mapping where flow_name = $1", flowName)

flow/cmd/handler.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,17 @@ func (h *FlowRequestHandler) createCdcJobEntry(ctx context.Context,
9595
connectionConfigs.FlowJobName, err)
9696
}
9797

98+
// Insert the table mappings into the DB
99+
tableMappingsBytes, err := internal.TableMappingsToBytes(req.TableMappings)
100+
if err != nil {
101+
return fmt.Errorf("unable to marshal table mappings: %w", err)
102+
}
103+
104+
stmt := `INSERT INTO table_mappings (flow_name, version, table_mapping) VALUES ($1, $2, $3)
105+
ON CONFLICT (flow_name, version) DO UPDATE SET table_mapping = EXCLUDED.table_mapping`
106+
version := 1
107+
_, err = h.pool.Exec(ctx, stmt, req.ConnectionConfigs.FlowJobName, version, tableMappingsBytes)
108+
98109
return nil
99110
}
100111

@@ -458,12 +469,18 @@ func (h *FlowRequestHandler) FlowStateChange(
458469
if err != nil {
459470
return nil, NewInternalApiError(fmt.Errorf("unable to get flow config: %w", err))
460471
}
472+
tableMappings, err := internal.FetchTableMappingsFromDB(ctx, h.pool, req.FlowJobName)
473+
474+
if err != nil {
475+
return nil, NewInternalApiError(fmt.Errorf("unable to get table mappings: %w", err))
476+
}
461477

462478
config.Resync = true
463479
config.DoInitialSnapshot = true
464480
// validate mirror first because once the mirror is dropped, there's no going back
465481
if _, err := h.ValidateCDCMirror(ctx, &protos.CreateCDCFlowRequest{
466482
ConnectionConfigs: config,
483+
TableMappings: tableMappings,
467484
}); err != nil {
468485
return nil, NewFailedPreconditionApiError(fmt.Errorf("invalid mirror: %w", err))
469486
}

flow/connectors/clickhouse/validate.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ func (c *ClickHouseConnector) ValidateMirrorDestination(
1616
ctx context.Context,
1717
cfg *protos.FlowConnectionConfigsCore,
1818
tableNameSchemaMapping map[string]*protos.TableSchema,
19+
tableMappings []*protos.TableMapping,
1920
) error {
2021
if internal.PeerDBOnlyClickHouseAllowed() {
2122
err := chvalidate.CheckIfClickHouseCloudHasSharedMergeTreeEnabled(ctx, c.logger, c.database)
@@ -34,7 +35,7 @@ func (c *ClickHouseConnector) ValidateMirrorDestination(
3435
}
3536

3637
// this is for handling column exclusion, processed schema does that in a step
37-
processedMapping := internal.BuildProcessedSchemaMapping(cfg.TableMappings, tableNameSchemaMapping, c.logger)
38+
processedMapping := internal.BuildProcessedSchemaMapping(tableMappings, tableNameSchemaMapping, c.logger)
3839
dstTableNames := slices.Collect(maps.Keys(processedMapping))
3940

4041
// In the case of resync, we don't need to check the content or structure of the original tables;
@@ -64,7 +65,7 @@ func (c *ClickHouseConnector) ValidateMirrorDestination(
6465
return err
6566
}
6667

67-
for _, tableMapping := range cfg.TableMappings {
68+
for _, tableMapping := range tableMappings {
6869
dstTableName := tableMapping.DestinationTableIdentifier
6970
if _, ok := processedMapping[dstTableName]; !ok {
7071
// if destination table is not a key, that means source table was not a key in the original schema mapping(?)

flow/internal/flow_configuration_helpers.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,3 +42,49 @@ func FetchConfigFromDB(ctx context.Context, catalogPool shared.CatalogPool, flow
4242

4343
return &cfgFromDB, nil
4444
}
45+
46+
func TableMappingsToBytes(tableMappings []*protos.TableMapping) ([][]byte, error) {
47+
tableMappingsBytes := [][]byte{}
48+
for _, tableMapping := range tableMappings {
49+
tableMappingBytes, err := proto.Marshal(tableMapping)
50+
if err != nil {
51+
return nil, fmt.Errorf("failed to marshal table mapping to migrate to catalog: %w", err)
52+
}
53+
tableMappingsBytes = append(tableMappingsBytes, tableMappingBytes)
54+
}
55+
return tableMappingsBytes, nil
56+
}
57+
58+
func FetchTableMappingsFromDB(ctx context.Context, flowJobName string, version uint32) ([]*protos.TableMapping, error) {
59+
pool, err := GetCatalogConnectionPoolFromEnv(ctx)
60+
if err != nil {
61+
return nil, err
62+
}
63+
rows, err := pool.Query(ctx,
64+
"select table_mapping from table_mappings where flow_name = $1 AND version = $2",
65+
flowJobName, version,
66+
)
67+
if err != nil {
68+
return nil, err
69+
}
70+
71+
var tableMappingsBytes [][]byte
72+
var tableMappings []*protos.TableMapping = []*protos.TableMapping{}
73+
74+
err = rows.Scan([]any{&tableMappingsBytes})
75+
if err != nil {
76+
return nil, fmt.Errorf("failed to deserialize table mapping schema proto: %w", err)
77+
}
78+
79+
for _, tableMappingBytes := range tableMappingsBytes {
80+
81+
var tableMapping *protos.TableMapping
82+
if err := proto.Unmarshal(tableMappingBytes, tableMapping); err != nil {
83+
return nil, err
84+
}
85+
86+
tableMappings = append(tableMappings, tableMapping)
87+
}
88+
89+
return tableMappings, nil
90+
}

protos/route.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ package peerdb_route;
1212
message CreateCDCFlowRequest {
1313
peerdb_flow.FlowConnectionConfigs connection_configs = 1;
1414
bool attach_to_existing = 2;
15+
repeated peerdb_flow.TableMapping table_mappings = 3;
1516
}
1617

1718
message CreateCDCFlowResponse { string workflow_id = 1; }

0 commit comments

Comments
 (0)