Skip to content

Commit 1c6cd16

Browse files
authored
Revert TIME64 PRs again (#3923)
Revert #3876 and #3911 In CHC 25.12 enable_time_time64_type is readonly and can't be set In CHC 26.2 it seems that it's able to be set Unclear if we should just check if it's enabled and don't try to do it ourselves, or there are cases when we'll need to enable ourselves and not trip on readonly versions.
1 parent 1aa232e commit 1c6cd16

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+82
-465
lines changed

.github/workflows/flow.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ jobs:
248248
if [ "${{ matrix.db-version.ch }}" = "lts" ]; then
249249
echo "ch_version=v25.8.11.66-lts" >> $GITHUB_OUTPUT
250250
elif [ "${{ matrix.db-version.ch }}" = "stable" ]; then
251-
echo "ch_version=v25.12.4.35-stable" >> $GITHUB_OUTPUT
251+
echo "ch_version=v25.11.1.558-stable" >> $GITHUB_OUTPUT
252252
elif [ "${{ matrix.db-version.ch }}" = "latest" ]; then
253253
# note: latest tag does not always reflect the latest version (could be an update on an lts),
254254
# but that is okay as we are only using it to invalidate the cache.

flow/activities/flowable_core.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -242,9 +242,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
242242
defer dstClose(ctx)
243243

244244
syncState.Store(shared.Ptr("updating schema"))
245-
if err := dstConn.ReplayTableSchemaDeltas(
246-
ctx, config.Env, flowName, options.TableMappings, recordBatchSync.SchemaDeltas, config.Version,
247-
); err != nil {
245+
if err := dstConn.ReplayTableSchemaDeltas(ctx, config.Env, flowName, options.TableMappings, recordBatchSync.SchemaDeltas); err != nil {
248246
return nil, fmt.Errorf("failed to sync schema: %w", err)
249247
}
250248

flow/cmd/handler.go

Lines changed: 4 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -61,34 +61,6 @@ func (h *FlowRequestHandler) getPeerID(ctx context.Context, peerName string) (in
6161
return id.Int32, nil
6262
}
6363

64-
func (h *FlowRequestHandler) determineInternalVersion(
65-
ctx context.Context,
66-
env map[string]string,
67-
destPeerName string,
68-
) (uint32, error) {
69-
internalVersion, err := internal.PeerDBForceInternalVersion(ctx, env)
70-
if err != nil {
71-
return 0, fmt.Errorf("failed to get internal version: %w", err)
72-
}
73-
74-
conn, connClose, err := connectors.GetByNameAs[connectors.InternalVersionAwareConnector](ctx, env, h.pool, destPeerName)
75-
if err != nil {
76-
// Connector isn't aware of internal version, so just use the default version
77-
if errors.Is(err, errors.ErrUnsupported) {
78-
return internalVersion, nil
79-
}
80-
return 0, fmt.Errorf("failed to get destination connector: %w", err)
81-
}
82-
defer connClose(ctx)
83-
84-
maxSupportedInternalVersion := conn.GetMaxSupportedInternalVersion()
85-
if maxSupportedInternalVersion < internalVersion {
86-
return maxSupportedInternalVersion, nil
87-
}
88-
89-
return internalVersion, nil
90-
}
91-
9264
func (h *FlowRequestHandler) cdcJobEntryExists(ctx context.Context, flowJobName string) (bool, error) {
9365
var exists bool
9466
err := h.pool.QueryRow(ctx, `SELECT EXISTS(SELECT 1 FROM flows WHERE name = $1)`, flowJobName).Scan(&exists)
@@ -173,9 +145,9 @@ func (h *FlowRequestHandler) CreateCDCFlow(
173145
if cfg == nil {
174146
return nil, NewInvalidArgumentApiError(errors.New("connection configs cannot be nil"))
175147
}
176-
internalVersion, err := h.determineInternalVersion(ctx, cfg.Env, cfg.DestinationName)
148+
internalVersion, err := internal.PeerDBForceInternalVersion(ctx, req.ConnectionConfigs.Env)
177149
if err != nil {
178-
return nil, NewInternalApiError(err)
150+
return nil, NewInternalApiError(fmt.Errorf("failed to get internal version: %w", err))
179151
}
180152
cfg.Version = internalVersion
181153

@@ -255,9 +227,9 @@ func (h *FlowRequestHandler) CreateQRepFlow(
255227
ctx context.Context, req *protos.CreateQRepFlowRequest,
256228
) (*protos.CreateQRepFlowResponse, APIError) {
257229
cfg := req.QrepConfig
258-
internalVersion, err := h.determineInternalVersion(ctx, cfg.Env, cfg.DestinationName)
230+
internalVersion, err := internal.PeerDBForceInternalVersion(ctx, req.QrepConfig.Env)
259231
if err != nil {
260-
return nil, NewInternalApiError(err)
232+
return nil, NewInternalApiError(fmt.Errorf("failed to get internal version: %w", err))
261233
}
262234
cfg.Version = internalVersion
263235

flow/connectors/bigquery/bigquery.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,6 @@ func (c *BigQueryConnector) ReplayTableSchemaDeltas(
239239
flowJobName string,
240240
_ []*protos.TableMapping,
241241
schemaDeltas []*protos.TableSchemaDelta,
242-
_ uint32,
243242
) error {
244243
for _, schemaDelta := range schemaDeltas {
245244
if schemaDelta == nil || len(schemaDelta.AddedColumns) == 0 {

flow/connectors/bigquery/qrep.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ func (c *BigQueryConnector) replayTableSchemaDeltasQRep(
8787
}
8888

8989
if err := c.ReplayTableSchemaDeltas(
90-
ctx, config.Env, config.FlowJobName, nil, []*protos.TableSchemaDelta{tableSchemaDelta}, config.Version,
90+
ctx, config.Env, config.FlowJobName, nil, []*protos.TableSchemaDelta{tableSchemaDelta},
9191
); err != nil {
9292
return nil, fmt.Errorf("failed to add columns to destination table: %w", err)
9393
}

flow/connectors/bigquery/qrep_avro_sync.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,7 @@ func (s *QRepAvroSyncMethod) SyncRecords(
9898
slog.String(string(shared.FlowNameKey), req.FlowJobName),
9999
slog.String("dstTableName", rawTableName))
100100

101-
if err := s.connector.ReplayTableSchemaDeltas(
102-
ctx, req.Env, req.FlowJobName, req.TableMappings, req.Records.SchemaDeltas, req.Version,
103-
); err != nil {
101+
if err := s.connector.ReplayTableSchemaDeltas(ctx, req.Env, req.FlowJobName, req.TableMappings, req.Records.SchemaDeltas); err != nil {
104102
return nil, fmt.Errorf("failed to sync schema changes: %w", err)
105103
}
106104

flow/connectors/clickhouse/avro_sync.go

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import (
1414
"github.com/PeerDB-io/peerdb/flow/connectors/utils"
1515
"github.com/PeerDB-io/peerdb/flow/generated/protos"
1616
"github.com/PeerDB-io/peerdb/flow/internal"
17-
"github.com/PeerDB-io/peerdb/flow/internal/clickhouse"
1817
"github.com/PeerDB-io/peerdb/flow/model"
1918
peerdb_clickhouse "github.com/PeerDB-io/peerdb/flow/pkg/clickhouse"
2019
"github.com/PeerDB-io/peerdb/flow/shared"
@@ -294,14 +293,11 @@ func (s *ClickHouseAvroSyncMethod) pushS3DataToClickHouseForSnapshot(
294293
}
295294
numParts = max(numParts, 1)
296295

297-
chSettings := clickhouse.NewCHSettings(s.chVersion)
298-
chSettings.Add(clickhouse.SettingThrowOnMaxPartitionsPerInsertBlock, "0")
299-
chSettings.Add(clickhouse.SettingTypeJsonSkipDuplicatedPaths, "1")
296+
chSettings := NewCHSettings(s.chVersion)
297+
chSettings.Add(SettingThrowOnMaxPartitionsPerInsertBlock, "0")
298+
chSettings.Add(SettingTypeJsonSkipDuplicatedPaths, "1")
300299
if config.Version >= shared.InternalVersion_JsonEscapeDotsInKeys {
301-
chSettings.Add(clickhouse.SettingJsonTypeEscapeDotsInKeys, "1")
302-
}
303-
if config.Version >= shared.InternalVersion_ClickHouseTime64 {
304-
chSettings.Add(clickhouse.SettingEnableTimeTime64Type, "1")
300+
chSettings.Add(SettingJsonTypeEscapeDotsInKeys, "1")
305301
}
306302

307303
// Process each chunk file individually

flow/connectors/clickhouse/cdc.go

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212
"github.com/PeerDB-io/peerdb/flow/connectors/utils"
1313
"github.com/PeerDB-io/peerdb/flow/generated/protos"
1414
"github.com/PeerDB-io/peerdb/flow/internal"
15-
chinternal "github.com/PeerDB-io/peerdb/flow/internal/clickhouse"
1615
"github.com/PeerDB-io/peerdb/flow/model"
1716
"github.com/PeerDB-io/peerdb/flow/model/qvalue"
1817
peerdb_clickhouse "github.com/PeerDB-io/peerdb/flow/pkg/clickhouse"
@@ -132,7 +131,7 @@ func (c *ClickHouseConnector) syncRecordsViaAvro(
132131
}
133132
warnings := numericTruncator.Warnings()
134133

135-
if err := c.ReplayTableSchemaDeltas(ctx, req.Env, req.FlowJobName, req.TableMappings, req.Records.SchemaDeltas, req.Version); err != nil {
134+
if err := c.ReplayTableSchemaDeltas(ctx, req.Env, req.FlowJobName, req.TableMappings, req.Records.SchemaDeltas); err != nil {
136135
return nil, fmt.Errorf("failed to sync schema changes: %w", err)
137136
}
138137

@@ -166,7 +165,6 @@ func (c *ClickHouseConnector) ReplayTableSchemaDeltas(
166165
flowJobName string,
167166
tableMappings []*protos.TableMapping,
168167
schemaDeltas []*protos.TableSchemaDelta,
169-
internalVersion uint32,
170168
) error {
171169
if len(schemaDeltas) == 0 {
172170
return nil
@@ -190,7 +188,7 @@ func (c *ClickHouseConnector) ReplayTableSchemaDeltas(
190188
for _, addedColumn := range schemaDelta.AddedColumns {
191189
qvKind := types.QValueKind(addedColumn.Type)
192190
clickHouseColType, err := qvalue.ToDWHColumnType(
193-
ctx, qvKind, env, protos.DBType_CLICKHOUSE, c.chVersion, addedColumn, schemaDelta.NullableEnabled, internalVersion,
191+
ctx, qvKind, env, protos.DBType_CLICKHOUSE, c.chVersion, addedColumn, schemaDelta.NullableEnabled,
194192
)
195193
if err != nil {
196194
return fmt.Errorf("failed to convert column type %s to ClickHouse type: %w", addedColumn.Type, err)
@@ -230,8 +228,7 @@ func (c *ClickHouseConnector) RenameTables(
230228
req *protos.RenameTablesInput,
231229
) (*protos.RenameTablesOutput, error) {
232230
onCluster := c.onCluster()
233-
dropTableSQLWithCHSetting := dropTableIfExistsSQL +
234-
chinternal.NewCHSettingsString(c.chVersion, chinternal.SettingMaxTableSizeToDrop, "0")
231+
dropTableSQLWithCHSetting := dropTableIfExistsSQL + NewCHSettingsString(c.chVersion, SettingMaxTableSizeToDrop, "0")
235232
for _, renameRequest := range req.RenameTableOptions {
236233
if renameRequest.CurrentName == renameRequest.NewName {
237234
c.logger.Info("table rename is nop, probably Null table engine, skipping rename for it",
@@ -305,8 +302,7 @@ func (c *ClickHouseConnector) SyncFlowCleanup(ctx context.Context, jobName strin
305302
// delete raw table if exists
306303
rawTableIdentifier := c.GetRawTableName(jobName)
307304
onCluster := c.onCluster()
308-
dropTableSQLWithCHSetting := dropTableIfExistsSQL +
309-
chinternal.NewCHSettingsString(c.chVersion, chinternal.SettingMaxTableSizeToDrop, "0")
305+
dropTableSQLWithCHSetting := dropTableIfExistsSQL + NewCHSettingsString(c.chVersion, SettingMaxTableSizeToDrop, "0")
310306
if err := c.execWithLogging(ctx,
311307
fmt.Sprintf(dropTableSQLWithCHSetting, peerdb_clickhouse.QuoteIdentifier(rawTableIdentifier), onCluster),
312308
); err != nil {

flow/connectors/clickhouse/clickhouse.go

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import (
2323
"github.com/PeerDB-io/peerdb/flow/connectors/utils"
2424
"github.com/PeerDB-io/peerdb/flow/generated/protos"
2525
"github.com/PeerDB-io/peerdb/flow/internal"
26-
chinternal "github.com/PeerDB-io/peerdb/flow/internal/clickhouse"
2726
peerdb_clickhouse "github.com/PeerDB-io/peerdb/flow/pkg/clickhouse"
2827
"github.com/PeerDB-io/peerdb/flow/shared"
2928
"github.com/PeerDB-io/peerdb/flow/shared/types"
@@ -407,22 +406,6 @@ func (c *ClickHouseConnector) GetVersion(ctx context.Context) (string, error) {
407406
return clickhouseVersion.Version.String(), nil
408407
}
409408

410-
func (c *ClickHouseConnector) GetMaxSupportedInternalVersion() uint32 {
411-
if c.chVersion == nil {
412-
// should never get here
413-
c.logger.Warn("[clickhouse] version is not set, use the latest internal version")
414-
return shared.InternalVersion_Latest
415-
}
416-
417-
minVersionWithTime64Support, exists := chinternal.GetMinVersion(chinternal.SettingEnableTimeTime64Type)
418-
supportsTime64 := exists && clickhouseproto.CheckMinVersion(minVersionWithTime64Support, *c.chVersion)
419-
if !supportsTime64 {
420-
return shared.InternalVersion_ClickHouseTime64 - 1
421-
}
422-
423-
return shared.InternalVersion_Latest
424-
}
425-
426409
func GetTableSchemaForTable(tm *protos.TableMapping, columns []driver.ColumnType) (*protos.TableSchema, error) {
427410
colFields := make([]*protos.FieldDescription, 0, len(columns))
428411
for _, column := range columns {
@@ -460,8 +443,6 @@ func GetTableSchemaForTable(tm *protos.TableMapping, columns []driver.ColumnType
460443
qkind = types.QValueKindUUID
461444
case "DateTime64(6)", "Nullable(DateTime64(6))", "DateTime64(9)", "Nullable(DateTime64(9))":
462445
qkind = types.QValueKindTimestamp
463-
case "Time64(6)", "Nullable(Time64(6))":
464-
qkind = types.QValueKindTime
465446
case "Date32", "Nullable(Date32)":
466447
qkind = types.QValueKindDate
467448
case "Float32", "Nullable(Float32)":

flow/connectors/clickhouse/normalize.go

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,10 @@ import (
1717

1818
"github.com/PeerDB-io/peerdb/flow/generated/protos"
1919
"github.com/PeerDB-io/peerdb/flow/internal"
20-
chinternal "github.com/PeerDB-io/peerdb/flow/internal/clickhouse"
2120
"github.com/PeerDB-io/peerdb/flow/model"
2221
"github.com/PeerDB-io/peerdb/flow/model/qvalue"
2322
peerdb_clickhouse "github.com/PeerDB-io/peerdb/flow/pkg/clickhouse"
2423
"github.com/PeerDB-io/peerdb/flow/pkg/common"
25-
"github.com/PeerDB-io/peerdb/flow/shared"
2624
"github.com/PeerDB-io/peerdb/flow/shared/types"
2725
)
2826

@@ -68,7 +66,6 @@ func (c *ClickHouseConnector) SetupNormalizedTable(
6866
destinationTableIdentifier,
6967
sourceTableSchema,
7068
c.chVersion,
71-
config.Version,
7269
)
7370
if err != nil {
7471
return false, fmt.Errorf("error while generating create table sql for destination ClickHouse table: %w", err)
@@ -88,7 +85,6 @@ func (c *ClickHouseConnector) generateCreateTableSQLForNormalizedTable(
8885
tableIdentifier string,
8986
tableSchema *protos.TableSchema,
9087
chVersion *chproto.Version,
91-
internalVersion uint32,
9288
) ([]string, error) {
9389
var engine string
9490
tmEngine := protos.TableEngine_CH_ENGINE_REPLACING_MERGE_TREE
@@ -207,8 +203,7 @@ func (c *ClickHouseConnector) generateCreateTableSQLForNormalizedTable(
207203
if clickHouseType == "" {
208204
var err error
209205
clickHouseType, err = qvalue.ToDWHColumnType(
210-
ctx, colType, config.Env, protos.DBType_CLICKHOUSE, chVersion, column,
211-
tableSchema.NullableEnabled || columnNullableEnabled, internalVersion,
206+
ctx, colType, config.Env, protos.DBType_CLICKHOUSE, chVersion, column, tableSchema.NullableEnabled || columnNullableEnabled,
212207
)
213208
if err != nil {
214209
return nil, fmt.Errorf("error while converting column type to ClickHouse type: %w", err)
@@ -268,14 +263,9 @@ func (c *ClickHouseConnector) generateCreateTableSQLForNormalizedTable(
268263
}
269264
}
270265

271-
settings := chinternal.NewCHSettings(chVersion)
272266
if allowNullableKey {
273-
settings.Add(chinternal.SettingAllowNullableKey, "1")
267+
stmtBuilder.WriteString(NewCHSettingsString(chVersion, SettingAllowNullableKey, "1"))
274268
}
275-
if internalVersion >= shared.InternalVersion_ClickHouseTime64 {
276-
settings.Add(chinternal.SettingEnableTimeTime64Type, "1")
277-
}
278-
stmtBuilder.WriteString(settings.String())
279269

280270
if c.Config.Cluster != "" {
281271
fmt.Fprintf(&stmtBuilderDistributed, " ENGINE = Distributed(%s,%s,%s",
@@ -292,10 +282,6 @@ func (c *ClickHouseConnector) generateCreateTableSQLForNormalizedTable(
292282
}
293283
}
294284
stmtBuilderDistributed.WriteByte(')')
295-
if internalVersion >= shared.InternalVersion_ClickHouseTime64 {
296-
stmtBuilderDistributed.WriteString(chinternal.NewCHSettingsString(
297-
chVersion, chinternal.SettingEnableTimeTime64Type, "1"))
298-
}
299285
}
300286
}
301287

0 commit comments

Comments
 (0)