Skip to content

Commit 26b9a03

Browse files
authored
PEERDB_CLICKHOUSE_INITIAL_LOAD_ALLOW_NON_EMPTY_TABLES (#3157)
closes #3156
1 parent 1d464df commit 26b9a03

File tree

3 files changed

+21
-3
lines changed

3 files changed

+21
-3
lines changed

flow/connectors/clickhouse/validate.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,15 @@ func (c *ClickHouseConnector) ValidateMirrorDestination(
4141
return err
4242
}
4343

44+
initialLoadAllowNonEmptyTables, err := internal.PeerDBClickHouseInitialLoadAllowNonEmptyTables(ctx, cfg.Env)
45+
if err != nil {
46+
return err
47+
}
48+
4449
if !(cfg.Resync || sourceSchemaAsDestinationColumn) {
4550
if err := chvalidate.CheckIfTablesEmptyAndEngine(ctx, c.logger, c.database,
46-
dstTableNames, cfg.DoInitialSnapshot, internal.PeerDBOnlyClickHouseAllowed()); err != nil {
51+
dstTableNames, cfg.DoInitialSnapshot, internal.PeerDBOnlyClickHouseAllowed(), initialLoadAllowNonEmptyTables,
52+
); err != nil {
4753
return err
4854
}
4955
}

flow/internal/dynamicconf.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,14 @@ var DynamicSettings = [...]*protos.DynamicSetting{
306306
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_AFTER_RESUME,
307307
TargetForSetting: protos.DynconfTarget_CLICKHOUSE,
308308
},
309+
{
310+
Name: "PEERDB_CLICKHOUSE_INITIAL_LOAD_ALLOW_NON_EMPTY_TABLES",
311+
Description: "Disables validation raising error if destination table of initial load is not empty",
312+
DefaultValue: "false",
313+
ValueType: protos.DynconfValueType_BOOL,
314+
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_NEW_MIRROR,
315+
TargetForSetting: protos.DynconfTarget_CLICKHOUSE,
316+
},
309317
{
310318
Name: "PEERDB_SKIP_SNAPSHOT_EXPORT",
311319
Description: "This avoids initial load failing due to connectivity drops, but risks data consistency unless precautions are taken",
@@ -625,6 +633,10 @@ func PeerDBClickHouseInitialLoadPartsPerPartition(ctx context.Context, env map[s
625633
return dynamicConfUnsigned[uint64](ctx, env, "PEERDB_CLICKHOUSE_INITIAL_LOAD_PARTS_PER_PARTITION")
626634
}
627635

636+
func PeerDBClickHouseInitialLoadAllowNonEmptyTables(ctx context.Context, env map[string]string) (bool, error) {
637+
return dynamicConfBool(ctx, env, "PEERDB_CLICKHOUSE_INITIAL_LOAD_ALLOW_NON_EMPTY_TABLES")
638+
}
639+
628640
func PeerDBSkipSnapshotExport(ctx context.Context, env map[string]string) (bool, error) {
629641
return dynamicConfBool(ctx, env, "PEERDB_SKIP_SNAPSHOT_EXPORT")
630642
}

flow/shared/clickhouse/validation.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ func CheckIfClickHouseCloudHasSharedMergeTreeEnabled(ctx context.Context, logger
3131
}
3232

3333
func CheckIfTablesEmptyAndEngine(ctx context.Context, logger log.Logger, conn clickhouse.Conn,
34-
tables []string, initialSnapshotEnabled bool, checkForCloudSMT bool,
34+
tables []string, initialSnapshotEnabled bool, checkForCloudSMT bool, allowNonEmpty bool,
3535
) error {
3636
queryTables := make([]string, 0, len(tables))
3737
for _, table := range tables {
@@ -51,7 +51,7 @@ func CheckIfTablesEmptyAndEngine(ctx context.Context, logger log.Logger, conn cl
5151
if err := rows.Scan(&tableName, &engine, &totalRows); err != nil {
5252
return fmt.Errorf("failed to scan information for tables: %w", err)
5353
}
54-
if totalRows != 0 && initialSnapshotEnabled {
54+
if !allowNonEmpty && totalRows != 0 && initialSnapshotEnabled {
5555
return fmt.Errorf("table %s exists and is not empty", tableName)
5656
}
5757
if !slices.Contains(acceptableTableEngines, strings.TrimPrefix(engine, "Shared")) {

0 commit comments

Comments
 (0)