Skip to content

Commit 8c1234d

Browse files
ClickHouse: Segregate rename with and without softdelete (#3540)
ClickHouse connectors do not actually need the table schemas of source tables. This PR introduces a RenameWithSoftDelete connector which takes in the schema map and remove the schema map from the existing RenameTables connector. RenameWithSoftDelete connector members are snowflake, BQ and PG
1 parent 88f222e commit 8c1234d

File tree

3 files changed

+64
-21
lines changed

3 files changed

+64
-21
lines changed

flow/activities/flowable.go

Lines changed: 54 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1262,13 +1262,39 @@ func (a *FlowableActivity) RenameTables(ctx context.Context, config *protos.Rena
12621262
})
12631263
defer shutdown()
12641264

1265+
var renameOutput *protos.RenameTablesOutput
12651266
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
1266-
conn, err := connectors.GetByNameAs[connectors.RenameTablesConnector](ctx, nil, a.CatalogPool, config.PeerName)
1267+
renameWithSoftDeleteConn, err := connectors.GetByNameAs[connectors.RenameTablesWithSoftDeleteConnector](
1268+
ctx, nil, a.CatalogPool, config.PeerName)
12671269
if err != nil {
1268-
return nil, a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("failed to get connector: %w", err))
1270+
if err == errors.ErrUnsupported {
1271+
// Rename without soft-delete
1272+
renameConn, renameErr := connectors.GetByNameAs[connectors.RenameTablesConnector](ctx, nil, a.CatalogPool, config.PeerName)
1273+
if renameErr != nil {
1274+
return nil, a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("failed to get rename connector: %w", renameErr))
1275+
}
1276+
defer connectors.CloseConnector(ctx, renameConn)
1277+
1278+
a.Alerter.LogFlowInfo(ctx, config.FlowJobName, "Renaming tables for resync")
1279+
renameOutput, err = renameConn.RenameTables(ctx, config)
1280+
if err != nil {
1281+
return nil, a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("failed to rename tables: %w", err))
1282+
}
1283+
1284+
err = a.updateTableSchemaMappingForResync(ctx, config.RenameTableOptions, config.FlowJobName)
1285+
if err != nil {
1286+
return nil, a.Alerter.LogFlowError(ctx, config.FlowJobName,
1287+
fmt.Errorf("failed to update table_schema_mapping after resync: %w", err))
1288+
}
1289+
1290+
a.Alerter.LogFlowInfo(ctx, config.FlowJobName, "Resync completed for all tables")
1291+
return renameOutput, nil
1292+
}
1293+
return nil, a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("failed to get rename with soft-delete connector: %w", err))
12691294
}
1270-
defer connectors.CloseConnector(ctx, conn)
1295+
defer connectors.CloseConnector(ctx, renameWithSoftDeleteConn)
12711296

1297+
// Rename with soft-delete
12721298
tableNameSchemaMapping := make(map[string]*protos.TableSchema, len(config.RenameTableOptions))
12731299
for _, option := range config.RenameTableOptions {
12741300
schema, err := internal.LoadTableSchemaFromCatalog(
@@ -1282,44 +1308,56 @@ func (a *FlowableActivity) RenameTables(ctx context.Context, config *protos.Rena
12821308
}
12831309
tableNameSchemaMapping[option.CurrentName] = schema
12841310
}
1285-
1286-
renameOutput, err := conn.RenameTables(ctx, config, tableNameSchemaMapping)
1311+
a.Alerter.LogFlowInfo(ctx, config.FlowJobName, "Renaming tables for resync with soft-delete")
1312+
renameOutput, err = renameWithSoftDeleteConn.RenameTables(ctx, config, tableNameSchemaMapping)
12871313
if err != nil {
12881314
return nil, a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("failed to rename tables: %w", err))
12891315
}
12901316

1317+
err = a.updateTableSchemaMappingForResync(ctx, config.RenameTableOptions, config.FlowJobName)
1318+
if err != nil {
1319+
return nil, a.Alerter.LogFlowError(ctx, config.FlowJobName,
1320+
fmt.Errorf("failed to update table_schema_mapping after resync with soft-delete: %w", err))
1321+
}
1322+
1323+
a.Alerter.LogFlowInfo(ctx, config.FlowJobName, "Resync with soft-delete completed for all tables")
1324+
return renameOutput, nil
1325+
}
1326+
1327+
func (a *FlowableActivity) updateTableSchemaMappingForResync(
1328+
ctx context.Context,
1329+
renameOptions []*protos.RenameTableOption,
1330+
flowJobName string,
1331+
) error {
12911332
tx, err := a.CatalogPool.Begin(ctx)
12921333
if err != nil {
1293-
return nil, fmt.Errorf("failed to begin updating table_schema_mapping: %w", err)
1334+
return fmt.Errorf("failed to begin updating table_schema_mapping: %w", err)
12941335
}
1295-
logger := log.With(internal.LoggerFromCtx(ctx), slog.String(string(shared.FlowNameKey), config.FlowJobName))
1336+
logger := log.With(internal.LoggerFromCtx(ctx), slog.String(string(shared.FlowNameKey), flowJobName))
12961337
defer shared.RollbackTx(tx, logger)
12971338

1298-
for _, option := range config.RenameTableOptions {
1339+
for _, option := range renameOptions {
12991340
if option.NewName != option.CurrentName {
13001341
if _, err := tx.Exec(
13011342
ctx,
13021343
"delete from table_schema_mapping where flow_name = $1 and table_name = $2",
1303-
config.FlowJobName,
1344+
flowJobName,
13041345
option.NewName,
13051346
); err != nil {
1306-
return nil, fmt.Errorf("failed to update table_schema_mapping: %w", err)
1347+
return fmt.Errorf("failed to delete _resync entries in table_schema_mapping: %w", err)
13071348
}
13081349
}
13091350
if _, err := tx.Exec(
13101351
ctx,
13111352
"update table_schema_mapping set table_name = $3 where flow_name = $1 and table_name = $2",
1312-
config.FlowJobName,
1353+
flowJobName,
13131354
option.CurrentName,
13141355
option.NewName,
13151356
); err != nil {
1316-
return nil, fmt.Errorf("failed to update table_schema_mapping: %w", err)
1357+
return fmt.Errorf("failed to update table_schema_mapping: %w", err)
13171358
}
13181359
}
1319-
1320-
a.Alerter.LogFlowInfo(ctx, config.FlowJobName, "Resync completed for all tables")
1321-
1322-
return renameOutput, tx.Commit(ctx)
1360+
return tx.Commit(ctx)
13231361
}
13241362

13251363
func (a *FlowableActivity) DeleteMirrorStats(ctx context.Context, flowName string) error {

flow/connectors/clickhouse/cdc.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,6 @@ func (c *ClickHouseConnector) ReplayTableSchemaDeltas(
226226
func (c *ClickHouseConnector) RenameTables(
227227
ctx context.Context,
228228
req *protos.RenameTablesInput,
229-
tableNameSchemaMapping map[string]*protos.TableSchema,
230229
) (*protos.RenameTablesOutput, error) {
231230
onCluster := c.onCluster()
232231
for _, renameRequest := range req.RenameTableOptions {

flow/connectors/core.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,12 @@ type RawTableConnector interface {
290290
type RenameTablesConnector interface {
291291
Connector
292292

293+
RenameTables(context.Context, *protos.RenameTablesInput) (*protos.RenameTablesOutput, error)
294+
}
295+
296+
type RenameTablesWithSoftDeleteConnector interface {
297+
Connector
298+
293299
RenameTables(context.Context, *protos.RenameTablesInput, map[string]*protos.TableSchema) (*protos.RenameTablesOutput, error)
294300
}
295301

@@ -593,10 +599,10 @@ var (
593599
_ QRepConsolidateConnector = &connsnowflake.SnowflakeConnector{}
594600
_ QRepConsolidateConnector = &connclickhouse.ClickHouseConnector{}
595601

596-
_ RenameTablesConnector = &connsnowflake.SnowflakeConnector{}
597-
_ RenameTablesConnector = &connbigquery.BigQueryConnector{}
598-
_ RenameTablesConnector = &connpostgres.PostgresConnector{}
599-
_ RenameTablesConnector = &connclickhouse.ClickHouseConnector{}
602+
_ RenameTablesWithSoftDeleteConnector = &connsnowflake.SnowflakeConnector{}
603+
_ RenameTablesWithSoftDeleteConnector = &connbigquery.BigQueryConnector{}
604+
_ RenameTablesWithSoftDeleteConnector = &connpostgres.PostgresConnector{}
605+
_ RenameTablesConnector = &connclickhouse.ClickHouseConnector{}
600606

601607
_ RawTableConnector = &connclickhouse.ClickHouseConnector{}
602608
_ RawTableConnector = &connbigquery.BigQueryConnector{}

0 commit comments

Comments
 (0)