Skip to content

Commit 928570e

Browse files
Validate mirror: no need to get source schemas for resync (#3325)
We get source schemas for validating destination tables' schemas, which in the case of resync we do not do. Hence no need to fetch schemas from source in the case of resync
1 parent 4480e95 commit 928570e

File tree

2 files changed

+26
-18
lines changed

2 files changed

+26
-18
lines changed

flow/cmd/validate_mirror.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/PeerDB-io/peerdb/flow/connectors"
1313
"github.com/PeerDB-io/peerdb/flow/generated/protos"
1414
"github.com/PeerDB-io/peerdb/flow/internal"
15+
"github.com/PeerDB-io/peerdb/flow/shared"
1516
"github.com/PeerDB-io/peerdb/flow/shared/exceptions"
1617
)
1718

@@ -20,6 +21,7 @@ var CustomColumnTypeRegex = regexp.MustCompile(`^$|^[a-zA-Z][a-zA-Z0-9(),]*$`)
2021
func (h *FlowRequestHandler) ValidateCDCMirror(
2122
ctx context.Context, req *protos.CreateCDCFlowRequest,
2223
) (*protos.ValidateCDCMirrorResponse, error) {
24+
ctx = context.WithValue(ctx, shared.FlowNameKey, req.ConnectionConfigs.FlowJobName)
2325
underMaintenance, err := internal.PeerDBMaintenanceModeEnabled(ctx, nil)
2426
if err != nil {
2527
slog.Error("unable to check maintenance mode", slog.Any("error", err))
@@ -82,13 +84,16 @@ func (h *FlowRequestHandler) ValidateCDCMirror(
8284
}
8385
defer connectors.CloseConnector(ctx, dstConn)
8486

85-
res, err := srcConn.GetTableSchema(ctx, req.ConnectionConfigs.Env, req.ConnectionConfigs.Version,
86-
req.ConnectionConfigs.System, req.ConnectionConfigs.TableMappings)
87-
if err != nil {
88-
return nil, fmt.Errorf("failed to get source table schema: %w", err)
87+
var tableSchemaMap map[string]*protos.TableSchema
88+
if !req.ConnectionConfigs.Resync {
89+
var getTableSchemaError error
90+
tableSchemaMap, getTableSchemaError = srcConn.GetTableSchema(ctx, req.ConnectionConfigs.Env, req.ConnectionConfigs.Version,
91+
req.ConnectionConfigs.System, req.ConnectionConfigs.TableMappings)
92+
if getTableSchemaError != nil {
93+
return nil, fmt.Errorf("failed to get source table schema: %w", getTableSchemaError)
94+
}
8995
}
90-
91-
if err := dstConn.ValidateMirrorDestination(ctx, req.ConnectionConfigs, res); err != nil {
96+
if err := dstConn.ValidateMirrorDestination(ctx, req.ConnectionConfigs, tableSchemaMap); err != nil {
9297
return nil, err
9398
}
9499

flow/connectors/clickhouse/validate.go

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,10 @@ func (c *ClickHouseConnector) ValidateMirrorDestination(
2424
}
2525
}
2626

27-
peerDBColumns := []string{signColName, versionColName}
28-
if cfg.SyncedAtColName != "" {
29-
peerDBColumns = append(peerDBColumns, strings.ToLower(cfg.SyncedAtColName))
27+
if cfg.Resync {
28+
return nil // no need to validate schema for resync, as we will create or replace the tables
3029
}
30+
3131
// this is for handling column exclusion, processed schema does that in a step
3232
processedMapping := internal.BuildProcessedSchemaMapping(cfg.TableMappings, tableNameSchemaMapping, c.logger)
3333
dstTableNames := slices.Collect(maps.Keys(processedMapping))
@@ -46,7 +46,7 @@ func (c *ClickHouseConnector) ValidateMirrorDestination(
4646
return err
4747
}
4848

49-
if !(cfg.Resync || sourceSchemaAsDestinationColumn) {
49+
if !sourceSchemaAsDestinationColumn {
5050
if err := chvalidate.CheckIfTablesEmptyAndEngine(ctx, c.logger, c.database,
5151
dstTableNames, cfg.DoInitialSnapshot, internal.PeerDBOnlyClickHouseAllowed(), initialLoadAllowNonEmptyTables,
5252
); err != nil {
@@ -59,6 +59,11 @@ func (c *ClickHouseConnector) ValidateMirrorDestination(
5959
return err
6060
}
6161

62+
peerDBColumns := []string{signColName, versionColName}
63+
if cfg.SyncedAtColName != "" {
64+
peerDBColumns = append(peerDBColumns, strings.ToLower(cfg.SyncedAtColName))
65+
}
66+
6267
for _, tableMapping := range cfg.TableMappings {
6368
dstTableName := tableMapping.DestinationTableIdentifier
6469
if _, ok := processedMapping[dstTableName]; !ok {
@@ -70,14 +75,12 @@ func (c *ClickHouseConnector) ValidateMirrorDestination(
7075
continue
7176
}
7277

73-
if !cfg.Resync {
74-
// for resync, we don't need to check the content or structure of the original tables;
75-
// they'll anyways get swapped out with the _resync tables which we CREATE OR REPLACE
76-
if err := c.processTableComparison(dstTableName, processedMapping[dstTableName],
77-
chTableColumnsMapping[dstTableName], peerDBColumns, tableMapping,
78-
); err != nil {
79-
return err
80-
}
78+
// for resync, we don't need to check the content or structure of the original tables;
79+
// they'll anyways get swapped out with the _resync tables which we CREATE OR REPLACE
80+
if err := c.processTableComparison(dstTableName, processedMapping[dstTableName],
81+
chTableColumnsMapping[dstTableName], peerDBColumns, tableMapping,
82+
); err != nil {
83+
return err
8184
}
8285
}
8386
return nil

0 commit comments

Comments
 (0)