Skip to content

Commit 0831ac5

Browse files
authored
Parse Time64 value without setting it during table creation (#3953)
- Adds ClickHouse Time64(6) type support for TIME/TIMETZ columns, gated by a read-only capability flag (clickhouse_time64_enabled) detected at flow creation time and persisted to the flow config protobuf. - Replaces the previous InternalVersion-based approach with a map<string, bool> flags field on FlowConnectionConfigs, FlowConnectionConfigsCore, QRepConfig, and SetupNormalizedTableBatchInput. This decouples connector-specific capabilities from the global linear version ordering. - Introduces a GetFlagsConnector interface. ClickHouse's implementation queries system.settings to check if enable_time_time64_type is already enabled on the server -- PeerDB never attempts to set this value. - Flags are immutable after flow creation: if the flag was false and the admin enables it later, only new flows will use Time64, existing flows retain their original behavior, including resync. If the flag was true and the admin disables the setting, queries will error out if it depends on Time64. Testing: e2e tests + test migration path in in CHC + test non-supported CHC environments
1 parent 91f14bf commit 0831ac5

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+561
-87
lines changed

.github/workflows/flow.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ jobs:
248248
if [ "${{ matrix.db-version.ch }}" = "lts" ]; then
249249
echo "ch_version=v25.8.11.66-lts" >> $GITHUB_OUTPUT
250250
elif [ "${{ matrix.db-version.ch }}" = "stable" ]; then
251-
echo "ch_version=v25.11.1.558-stable" >> $GITHUB_OUTPUT
251+
echo "ch_version=v25.12.4.35-stable" >> $GITHUB_OUTPUT
252252
elif [ "${{ matrix.db-version.ch }}" = "latest" ]; then
253253
# note: latest tag does not always reflect the latest version (could be an update on an lts),
254254
# but that is okay as we are only using it to invalidate the cache.

flow/activities/flowable_core.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,9 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
242242
defer dstClose(ctx)
243243

244244
syncState.Store(shared.Ptr("updating schema"))
245-
if err := dstConn.ReplayTableSchemaDeltas(ctx, config.Env, flowName, options.TableMappings, recordBatchSync.SchemaDeltas); err != nil {
245+
if err := dstConn.ReplayTableSchemaDeltas(
246+
ctx, config.Env, flowName, options.TableMappings, recordBatchSync.SchemaDeltas, config.Flags,
247+
); err != nil {
246248
return nil, fmt.Errorf("failed to sync schema: %w", err)
247249
}
248250

@@ -285,6 +287,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
285287
TableNameSchemaMapping: tableNameSchemaMapping,
286288
Env: config.Env,
287289
Version: config.Version,
290+
Flags: config.Flags,
288291
})
289292
if err != nil {
290293
return a.Alerter.LogFlowError(ctx, flowName, fmt.Errorf("failed to push records: %w", err))
@@ -681,6 +684,7 @@ func (a *FlowableActivity) startNormalize(
681684
SyncedAtColName: config.SyncedAtColName,
682685
SyncBatchID: batchID,
683686
Version: config.Version,
687+
Flags: config.Flags,
684688
})
685689
if err != nil {
686690
return a.Alerter.LogFlowError(ctx, config.FlowJobName,

flow/cmd/handler.go

Lines changed: 35 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,23 @@ func (h *FlowRequestHandler) getPeerID(ctx context.Context, peerName string) (in
6161
return id.Int32, nil
6262
}
6363

64+
func (h *FlowRequestHandler) determineFlags(
65+
ctx context.Context,
66+
env map[string]string,
67+
destPeerName string,
68+
) ([]string, error) {
69+
conn, connClose, err := connectors.GetByNameAs[connectors.GetFlagsConnector](ctx, env, h.pool, destPeerName)
70+
if err != nil {
71+
if errors.Is(err, errors.ErrUnsupported) {
72+
return nil, nil
73+
}
74+
return nil, fmt.Errorf("failed to get destination connector: %w", err)
75+
}
76+
defer connClose(ctx)
77+
78+
return conn.GetFlags(ctx)
79+
}
80+
6481
func (h *FlowRequestHandler) cdcJobEntryExists(ctx context.Context, flowJobName string) (bool, error) {
6582
var exists bool
6683
err := h.pool.QueryRow(ctx, `SELECT EXISTS(SELECT 1 FROM flows WHERE name = $1)`, flowJobName).Scan(&exists)
@@ -145,11 +162,16 @@ func (h *FlowRequestHandler) CreateCDCFlow(
145162
if cfg == nil {
146163
return nil, NewInvalidArgumentApiError(errors.New("connection configs cannot be nil"))
147164
}
148-
internalVersion, err := internal.PeerDBForceInternalVersion(ctx, req.ConnectionConfigs.Env)
149-
if err != nil {
150-
return nil, NewInternalApiError(fmt.Errorf("failed to get internal version: %w", err))
165+
if internalVersion, err := internal.PeerDBForceInternalVersion(ctx, cfg.Env); err != nil {
166+
return nil, NewInternalApiError(err)
167+
} else {
168+
cfg.Version = internalVersion
169+
}
170+
if flags, err := h.determineFlags(ctx, cfg.Env, cfg.DestinationName); err != nil {
171+
return nil, NewInternalApiError(err)
172+
} else {
173+
cfg.Flags = flags
151174
}
152-
cfg.Version = internalVersion
153175

154176
if !req.AttachToExisting {
155177
if exists, err := h.cdcJobEntryExists(ctx, cfg.FlowJobName); err != nil {
@@ -227,11 +249,16 @@ func (h *FlowRequestHandler) CreateQRepFlow(
227249
ctx context.Context, req *protos.CreateQRepFlowRequest,
228250
) (*protos.CreateQRepFlowResponse, APIError) {
229251
cfg := req.QrepConfig
230-
internalVersion, err := internal.PeerDBForceInternalVersion(ctx, req.QrepConfig.Env)
231-
if err != nil {
232-
return nil, NewInternalApiError(fmt.Errorf("failed to get internal version: %w", err))
252+
if internalVersion, err := internal.PeerDBForceInternalVersion(ctx, cfg.Env); err != nil {
253+
return nil, NewInternalApiError(err)
254+
} else {
255+
cfg.Version = internalVersion
256+
}
257+
if flags, err := h.determineFlags(ctx, cfg.Env, cfg.DestinationName); err != nil {
258+
return nil, NewInternalApiError(err)
259+
} else {
260+
cfg.Flags = flags
233261
}
234-
cfg.Version = internalVersion
235262

236263
workflowID := fmt.Sprintf("%s-qrepflow-%s", cfg.FlowJobName, uuid.New())
237264
workflowOptions := client.StartWorkflowOptions{

flow/cmd/validate_mirror.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,31 @@ import (
66
"fmt"
77
"log/slog"
88
"regexp"
9+
"slices"
910

1011
"github.com/PeerDB-io/peerdb/flow/connectors"
1112
"github.com/PeerDB-io/peerdb/flow/generated/proto_conversions"
1213
"github.com/PeerDB-io/peerdb/flow/generated/protos"
1314
"github.com/PeerDB-io/peerdb/flow/internal"
1415
"github.com/PeerDB-io/peerdb/flow/shared"
16+
"github.com/PeerDB-io/peerdb/flow/shared/types"
1517
)
1618

1719
var CustomColumnTypeRegex = regexp.MustCompile(`^$|^[a-zA-Z][a-zA-Z0-9(),]*$`)
1820

21+
type flagConstraint struct {
22+
ErrorMessage string
23+
AffectedTypes []types.QValueKind
24+
}
25+
26+
var FlagConstraints = map[string]flagConstraint{
27+
shared.Flag_ClickHouseTime64Enabled: {
28+
AffectedTypes: []types.QValueKind{types.QValueKindTime, types.QValueKindTimeTZ},
29+
ErrorMessage: "mirror uses time/timetz columns that require ClickHouse setting 'enable_time_time64_type';" +
30+
" re-enable it or recreate the mirror",
31+
},
32+
}
33+
1934
func (h *FlowRequestHandler) ValidateCDCMirror(
2035
ctx context.Context, req *protos.CreateCDCFlowRequest,
2136
) (*protos.ValidateCDCMirrorResponse, APIError) {
@@ -55,6 +70,12 @@ func (h *FlowRequestHandler) validateCDCMirrorImpl(
5570
}
5671
}
5772

73+
if connectionConfigs.Resync {
74+
if apiErr := h.checkFlagsCompatibility(ctx, connectionConfigs); apiErr != nil {
75+
return nil, apiErr
76+
}
77+
}
78+
5879
if connectionConfigs == nil {
5980
slog.ErrorContext(ctx, "/validatecdc connection configs is nil")
6081
return nil, NewInvalidArgumentApiError(errors.New("connection configs is nil"))
@@ -125,3 +146,53 @@ func (h *FlowRequestHandler) checkIfMirrorNameExists(ctx context.Context, mirror
125146

126147
return nameExists, nil
127148
}
149+
150+
// checkFlagsCompatibility blocks resync when a destination feature flag that was
151+
// enabled at mirror creation is now disabled and the stored schema contains
152+
// column types whose type mapping depends on that flag.
153+
func (h *FlowRequestHandler) checkFlagsCompatibility(
154+
ctx context.Context,
155+
cfg *protos.FlowConnectionConfigsCore,
156+
) APIError {
157+
newFlags, err := h.determineFlags(ctx, cfg.Env, cfg.DestinationName)
158+
if err != nil {
159+
return NewInternalApiError(fmt.Errorf("failed to determine destination flags: %w", err))
160+
}
161+
162+
schemaHasColumnTypes := func(colTypes []types.QValueKind) (bool, error) {
163+
tableNames := make([]string, 0, len(cfg.TableMappings))
164+
for _, tm := range cfg.TableMappings {
165+
tableNames = append(tableNames, tm.DestinationTableIdentifier)
166+
}
167+
schemas, err := internal.LoadTableSchemasFromCatalog(ctx, h.pool, cfg.FlowJobName, tableNames)
168+
if err != nil {
169+
return false, err
170+
}
171+
for _, schema := range schemas {
172+
for _, col := range schema.Columns {
173+
if slices.Contains(colTypes, types.QValueKind(col.Type)) {
174+
return true, nil
175+
}
176+
}
177+
}
178+
return false, nil
179+
}
180+
181+
for _, flag := range cfg.Flags {
182+
if slices.Contains(newFlags, flag) {
183+
continue
184+
}
185+
constraint, ok := FlagConstraints[flag]
186+
if !ok {
187+
continue
188+
}
189+
affected, err := schemaHasColumnTypes(constraint.AffectedTypes)
190+
if err != nil {
191+
return NewInternalApiError(fmt.Errorf("failed to check schema for flag %q: %w", flag, err))
192+
}
193+
if affected {
194+
return NewFailedPreconditionApiError(errors.New(constraint.ErrorMessage))
195+
}
196+
}
197+
return nil
198+
}

flow/connectors/bigquery/bigquery.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,7 @@ func (c *BigQueryConnector) ReplayTableSchemaDeltas(
239239
flowJobName string,
240240
_ []*protos.TableMapping,
241241
schemaDeltas []*protos.TableSchemaDelta,
242+
_ []string,
242243
) error {
243244
for _, schemaDelta := range schemaDeltas {
244245
if schemaDelta == nil || len(schemaDelta.AddedColumns) == 0 {

flow/connectors/bigquery/qrep.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ func (c *BigQueryConnector) replayTableSchemaDeltasQRep(
8787
}
8888

8989
if err := c.ReplayTableSchemaDeltas(
90-
ctx, config.Env, config.FlowJobName, nil, []*protos.TableSchemaDelta{tableSchemaDelta},
90+
ctx, config.Env, config.FlowJobName, nil, []*protos.TableSchemaDelta{tableSchemaDelta}, nil,
9191
); err != nil {
9292
return nil, fmt.Errorf("failed to add columns to destination table: %w", err)
9393
}

flow/connectors/bigquery/qrep_avro_sync.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,9 @@ func (s *QRepAvroSyncMethod) SyncRecords(
9898
slog.String(string(shared.FlowNameKey), req.FlowJobName),
9999
slog.String("dstTableName", rawTableName))
100100

101-
if err := s.connector.ReplayTableSchemaDeltas(ctx, req.Env, req.FlowJobName, req.TableMappings, req.Records.SchemaDeltas); err != nil {
101+
if err := s.connector.ReplayTableSchemaDeltas(
102+
ctx, req.Env, req.FlowJobName, req.TableMappings, req.Records.SchemaDeltas, nil,
103+
); err != nil {
102104
return nil, fmt.Errorf("failed to sync schema changes: %w", err)
103105
}
104106

flow/connectors/clickhouse/avro_sync.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/PeerDB-io/peerdb/flow/connectors/utils"
1515
"github.com/PeerDB-io/peerdb/flow/generated/protos"
1616
"github.com/PeerDB-io/peerdb/flow/internal"
17+
"github.com/PeerDB-io/peerdb/flow/internal/clickhouse"
1718
"github.com/PeerDB-io/peerdb/flow/model"
1819
peerdb_clickhouse "github.com/PeerDB-io/peerdb/flow/pkg/clickhouse"
1920
"github.com/PeerDB-io/peerdb/flow/shared"
@@ -293,11 +294,11 @@ func (s *ClickHouseAvroSyncMethod) pushS3DataToClickHouseForSnapshot(
293294
}
294295
numParts = max(numParts, 1)
295296

296-
chSettings := NewCHSettings(s.chVersion)
297-
chSettings.Add(SettingThrowOnMaxPartitionsPerInsertBlock, "0")
298-
chSettings.Add(SettingTypeJsonSkipDuplicatedPaths, "1")
297+
chSettings := clickhouse.NewCHSettings(s.chVersion)
298+
chSettings.Add(clickhouse.SettingThrowOnMaxPartitionsPerInsertBlock, "0")
299+
chSettings.Add(clickhouse.SettingTypeJsonSkipDuplicatedPaths, "1")
299300
if config.Version >= shared.InternalVersion_JsonEscapeDotsInKeys {
300-
chSettings.Add(SettingJsonTypeEscapeDotsInKeys, "1")
301+
chSettings.Add(clickhouse.SettingJsonTypeEscapeDotsInKeys, "1")
301302
}
302303

303304
// Process each chunk file individually

flow/connectors/clickhouse/cdc.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/PeerDB-io/peerdb/flow/connectors/utils"
1313
"github.com/PeerDB-io/peerdb/flow/generated/protos"
1414
"github.com/PeerDB-io/peerdb/flow/internal"
15+
chinternal "github.com/PeerDB-io/peerdb/flow/internal/clickhouse"
1516
"github.com/PeerDB-io/peerdb/flow/model"
1617
"github.com/PeerDB-io/peerdb/flow/model/qvalue"
1718
peerdb_clickhouse "github.com/PeerDB-io/peerdb/flow/pkg/clickhouse"
@@ -131,7 +132,7 @@ func (c *ClickHouseConnector) syncRecordsViaAvro(
131132
}
132133
warnings := numericTruncator.Warnings()
133134

134-
if err := c.ReplayTableSchemaDeltas(ctx, req.Env, req.FlowJobName, req.TableMappings, req.Records.SchemaDeltas); err != nil {
135+
if err := c.ReplayTableSchemaDeltas(ctx, req.Env, req.FlowJobName, req.TableMappings, req.Records.SchemaDeltas, req.Flags); err != nil {
135136
return nil, fmt.Errorf("failed to sync schema changes: %w", err)
136137
}
137138

@@ -165,6 +166,7 @@ func (c *ClickHouseConnector) ReplayTableSchemaDeltas(
165166
flowJobName string,
166167
tableMappings []*protos.TableMapping,
167168
schemaDeltas []*protos.TableSchemaDelta,
169+
flags []string,
168170
) error {
169171
if len(schemaDeltas) == 0 {
170172
return nil
@@ -188,7 +190,7 @@ func (c *ClickHouseConnector) ReplayTableSchemaDeltas(
188190
for _, addedColumn := range schemaDelta.AddedColumns {
189191
qvKind := types.QValueKind(addedColumn.Type)
190192
clickHouseColType, err := qvalue.ToDWHColumnType(
191-
ctx, qvKind, env, protos.DBType_CLICKHOUSE, c.chVersion, addedColumn, schemaDelta.NullableEnabled,
193+
ctx, qvKind, env, protos.DBType_CLICKHOUSE, c.chVersion, addedColumn, schemaDelta.NullableEnabled, flags,
192194
)
193195
if err != nil {
194196
return fmt.Errorf("failed to convert column type %s to ClickHouse type: %w", addedColumn.Type, err)
@@ -228,7 +230,8 @@ func (c *ClickHouseConnector) RenameTables(
228230
req *protos.RenameTablesInput,
229231
) (*protos.RenameTablesOutput, error) {
230232
onCluster := c.onCluster()
231-
dropTableSQLWithCHSetting := dropTableIfExistsSQL + NewCHSettingsString(c.chVersion, SettingMaxTableSizeToDrop, "0")
233+
dropTableSQLWithCHSetting := dropTableIfExistsSQL +
234+
chinternal.NewCHSettingsString(c.chVersion, chinternal.SettingMaxTableSizeToDrop, "0")
232235
for _, renameRequest := range req.RenameTableOptions {
233236
if renameRequest.CurrentName == renameRequest.NewName {
234237
c.logger.Info("table rename is nop, probably Null table engine, skipping rename for it",
@@ -302,7 +305,8 @@ func (c *ClickHouseConnector) SyncFlowCleanup(ctx context.Context, jobName strin
302305
// delete raw table if exists
303306
rawTableIdentifier := c.GetRawTableName(jobName)
304307
onCluster := c.onCluster()
305-
dropTableSQLWithCHSetting := dropTableIfExistsSQL + NewCHSettingsString(c.chVersion, SettingMaxTableSizeToDrop, "0")
308+
dropTableSQLWithCHSetting := dropTableIfExistsSQL +
309+
chinternal.NewCHSettingsString(c.chVersion, chinternal.SettingMaxTableSizeToDrop, "0")
306310
if err := c.execWithLogging(ctx,
307311
fmt.Sprintf(dropTableSQLWithCHSetting, peerdb_clickhouse.QuoteIdentifier(rawTableIdentifier), onCluster),
308312
); err != nil {

flow/connectors/clickhouse/clickhouse.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"crypto/tls"
66
"crypto/x509"
7+
"database/sql"
78
"errors"
89
"fmt"
910
"log/slog"
@@ -406,6 +407,25 @@ func (c *ClickHouseConnector) GetVersion(ctx context.Context) (string, error) {
406407
return clickhouseVersion.Version.String(), nil
407408
}
408409

410+
func (c *ClickHouseConnector) GetFlags(ctx context.Context) ([]string, error) {
411+
var flags []string
412+
413+
var time64Setting string
414+
err := c.queryRow(ctx,
415+
"SELECT value FROM system.settings WHERE name = 'enable_time_time64_type'",
416+
).Scan(&time64Setting)
417+
if err != nil {
418+
if !errors.Is(err, sql.ErrNoRows) {
419+
return nil, fmt.Errorf("failed to query enable_time_time64_type setting: %w", err)
420+
}
421+
} else if time64Setting == "1" {
422+
c.logger.Info("[clickhouse] enable_time_time64_type is enabled")
423+
flags = append(flags, shared.Flag_ClickHouseTime64Enabled)
424+
}
425+
426+
return flags, nil
427+
}
428+
409429
func GetTableSchemaForTable(tm *protos.TableMapping, columns []driver.ColumnType) (*protos.TableSchema, error) {
410430
colFields := make([]*protos.FieldDescription, 0, len(columns))
411431
for _, column := range columns {
@@ -443,6 +463,8 @@ func GetTableSchemaForTable(tm *protos.TableMapping, columns []driver.ColumnType
443463
qkind = types.QValueKindUUID
444464
case "DateTime64(6)", "Nullable(DateTime64(6))", "DateTime64(9)", "Nullable(DateTime64(9))":
445465
qkind = types.QValueKindTimestamp
466+
case "Time64(6)", "Nullable(Time64(6))":
467+
qkind = types.QValueKindTime
446468
case "Date32", "Nullable(Date32)":
447469
qkind = types.QValueKindDate
448470
case "Float32", "Nullable(Float32)":

0 commit comments

Comments
 (0)