Skip to content

Commit f9e2eb2

Browse files
NehharShahjgao54
authored andcommitted
fix time64 support
1 parent 54a053c commit f9e2eb2

File tree

20 files changed

+224
-181
lines changed

20 files changed

+224
-181
lines changed

flow/activities/flowable_core.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,9 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
320320
defer dstClose(ctx)
321321

322322
syncState.Store(shared.Ptr("updating schema"))
323-
if err := dstConn.ReplayTableSchemaDeltas(ctx, config.Env, flowName, options.TableMappings, recordBatchSync.SchemaDeltas, config.Version); err != nil {
323+
if err := dstConn.ReplayTableSchemaDeltas(
324+
ctx, config.Env, flowName, options.TableMappings, recordBatchSync.SchemaDeltas, config.Version,
325+
); err != nil {
324326
return nil, fmt.Errorf("failed to sync schema: %w", err)
325327
}
326328

flow/connectors/bigquery/qrep_avro_sync.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,9 @@ func (s *QRepAvroSyncMethod) SyncRecords(
9898
slog.String(string(shared.FlowNameKey), req.FlowJobName),
9999
slog.String("dstTableName", rawTableName))
100100

101-
if err := s.connector.ReplayTableSchemaDeltas(ctx, req.Env, req.FlowJobName, req.TableMappings, req.Records.SchemaDeltas, req.Version); err != nil {
101+
if err := s.connector.ReplayTableSchemaDeltas(
102+
ctx, req.Env, req.FlowJobName, req.TableMappings, req.Records.SchemaDeltas, req.Version,
103+
); err != nil {
102104
return nil, fmt.Errorf("failed to sync schema changes: %w", err)
103105
}
104106

flow/connectors/clickhouse/avro_sync.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -293,11 +293,14 @@ func (s *ClickHouseAvroSyncMethod) pushS3DataToClickHouseForSnapshot(
293293
}
294294
numParts = max(numParts, 1)
295295

296-
chSettings := NewCHSettings(s.chVersion)
297-
chSettings.Add(SettingThrowOnMaxPartitionsPerInsertBlock, "0")
298-
chSettings.Add(SettingTypeJsonSkipDuplicatedPaths, "1")
296+
chSettings := peerdb_clickhouse.NewCHSettings(s.chVersion)
297+
chSettings.Add(peerdb_clickhouse.SettingThrowOnMaxPartitionsPerInsertBlock, "0")
298+
chSettings.Add(peerdb_clickhouse.SettingTypeJsonSkipDuplicatedPaths, "1")
299299
if config.Version >= shared.InternalVersion_JsonEscapeDotsInKeys {
300-
chSettings.Add(SettingJsonTypeEscapeDotsInKeys, "1")
300+
chSettings.Add(peerdb_clickhouse.SettingJsonTypeEscapeDotsInKeys, "1")
301+
}
302+
if config.Version >= shared.InternalVersion_ClickHouseTime64 {
303+
chSettings.Add(peerdb_clickhouse.SettingEnableTimeTime64Type, "1")
301304
}
302305

303306
// Process each chunk file individually

flow/connectors/clickhouse/cdc.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,8 @@ func (c *ClickHouseConnector) RenameTables(
229229
req *protos.RenameTablesInput,
230230
) (*protos.RenameTablesOutput, error) {
231231
onCluster := c.onCluster()
232-
dropTableSQLWithCHSetting := dropTableIfExistsSQL + NewCHSettingsString(c.chVersion, SettingMaxTableSizeToDrop, "0")
232+
dropTableSQLWithCHSetting := dropTableIfExistsSQL +
233+
peerdb_clickhouse.NewCHSettingsString(c.chVersion, peerdb_clickhouse.SettingMaxTableSizeToDrop, "0")
233234
for _, renameRequest := range req.RenameTableOptions {
234235
if renameRequest.CurrentName == renameRequest.NewName {
235236
c.logger.Info("table rename is nop, probably Null table engine, skipping rename for it",
@@ -303,7 +304,8 @@ func (c *ClickHouseConnector) SyncFlowCleanup(ctx context.Context, jobName strin
303304
// delete raw table if exists
304305
rawTableIdentifier := c.GetRawTableName(jobName)
305306
onCluster := c.onCluster()
306-
dropTableSQLWithCHSetting := dropTableIfExistsSQL + NewCHSettingsString(c.chVersion, SettingMaxTableSizeToDrop, "0")
307+
dropTableSQLWithCHSetting := dropTableIfExistsSQL +
308+
peerdb_clickhouse.NewCHSettingsString(c.chVersion, peerdb_clickhouse.SettingMaxTableSizeToDrop, "0")
307309
if err := c.execWithLogging(ctx,
308310
fmt.Sprintf(dropTableSQLWithCHSetting, peerdb_clickhouse.QuoteIdentifier(rawTableIdentifier), onCluster),
309311
); err != nil {

flow/connectors/clickhouse/clickhouse.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -443,7 +443,7 @@ func GetTableSchemaForTable(tm *protos.TableMapping, columns []driver.ColumnType
443443
qkind = types.QValueKindUUID
444444
case "DateTime64(6)", "Nullable(DateTime64(6))", "DateTime64(9)", "Nullable(DateTime64(9))":
445445
qkind = types.QValueKindTimestamp
446-
case "Time64(3)", "Nullable(Time64(3))":
446+
case "Time64(6)", "Nullable(Time64(6))":
447447
qkind = types.QValueKindTime
448448
case "Date32", "Nullable(Date32)":
449449
qkind = types.QValueKindDate

flow/connectors/clickhouse/normalize.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/PeerDB-io/peerdb/flow/model/qvalue"
2222
peerdb_clickhouse "github.com/PeerDB-io/peerdb/flow/pkg/clickhouse"
2323
"github.com/PeerDB-io/peerdb/flow/pkg/common"
24+
"github.com/PeerDB-io/peerdb/flow/shared"
2425
"github.com/PeerDB-io/peerdb/flow/shared/types"
2526
)
2627

@@ -205,7 +206,8 @@ func (c *ClickHouseConnector) generateCreateTableSQLForNormalizedTable(
205206
if clickHouseType == "" {
206207
var err error
207208
clickHouseType, err = qvalue.ToDWHColumnType(
208-
ctx, colType, config.Env, protos.DBType_CLICKHOUSE, chVersion, column, tableSchema.NullableEnabled || columnNullableEnabled, internalVersion,
209+
ctx, colType, config.Env, protos.DBType_CLICKHOUSE, chVersion, column,
210+
tableSchema.NullableEnabled || columnNullableEnabled, internalVersion,
209211
)
210212
if err != nil {
211213
return nil, fmt.Errorf("error while converting column type to ClickHouse type: %w", err)
@@ -265,9 +267,14 @@ func (c *ClickHouseConnector) generateCreateTableSQLForNormalizedTable(
265267
}
266268
}
267269

270+
settings := peerdb_clickhouse.NewCHSettings(chVersion)
268271
if allowNullableKey {
269-
stmtBuilder.WriteString(NewCHSettingsString(chVersion, SettingAllowNullableKey, "1"))
272+
settings.Add(peerdb_clickhouse.SettingAllowNullableKey, "1")
270273
}
274+
if internalVersion >= shared.InternalVersion_ClickHouseTime64 {
275+
settings.Add(peerdb_clickhouse.SettingEnableTimeTime64Type, "1")
276+
}
277+
stmtBuilder.WriteString(settings.String())
271278

272279
if c.Config.Cluster != "" {
273280
fmt.Fprintf(&stmtBuilderDistributed, " ENGINE = Distributed(%s,%s,%s",

flow/connectors/clickhouse/normalize_query.go

Lines changed: 19 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -131,17 +131,15 @@ func (t *NormalizeQueryGenerator) BuildQuery(ctx context.Context) (string, error
131131
}
132132

133133
switch clickHouseType {
134-
case "Time64(3)", "Nullable(Time64(3))":
135-
// Time64 is a time-of-day type, parse from JSON string
136-
// toTime64 converts string to Time64(3), returns NULL if string is NULL or invalid
134+
case "Time64(6)", "Nullable(Time64(6))":
137135
fmt.Fprintf(&projection,
138-
"toTime64(JSONExtractString(_peerdb_data, %s),3) AS %s,",
136+
"toTime64(JSONExtractString(_peerdb_data, %s), 6) AS %s,",
139137
peerdb_clickhouse.QuoteLiteral(colName),
140138
peerdb_clickhouse.QuoteIdentifier(dstColName),
141139
)
142140
if t.enablePrimaryUpdate {
143141
fmt.Fprintf(&projectionUpdate,
144-
"toTime64(JSONExtractString(_peerdb_match_data, %s),3) AS %s,",
142+
"toTime64(JSONExtractString(_peerdb_match_data, %s), 6) AS %s,",
145143
peerdb_clickhouse.QuoteLiteral(colName),
146144
peerdb_clickhouse.QuoteIdentifier(dstColName),
147145
)
@@ -160,34 +158,17 @@ func (t *NormalizeQueryGenerator) BuildQuery(ctx context.Context) (string, error
160158
)
161159
}
162160
case "DateTime64(6)", "Nullable(DateTime64(6))":
163-
if colType == types.QValueKindTime || colType == types.QValueKindTimeTZ {
164-
// parseDateTime64BestEffortOrNull for hh:mm:ss puts the year as current year
165-
// (or previous year if result would be in future) so explicitly anchor to unix epoch
166-
fmt.Fprintf(&projection,
167-
"parseDateTime64BestEffortOrNull('1970-01-01 ' || JSONExtractString(_peerdb_data, %s),6,'UTC') AS %s,",
168-
peerdb_clickhouse.QuoteLiteral(colName),
169-
peerdb_clickhouse.QuoteIdentifier(dstColName),
170-
)
171-
if t.enablePrimaryUpdate {
172-
fmt.Fprintf(&projectionUpdate,
173-
"parseDateTime64BestEffortOrNull('1970-01-01 ' || JSONExtractString(_peerdb_match_data, %s),6,'UTC') AS %s,",
174-
peerdb_clickhouse.QuoteLiteral(colName),
175-
peerdb_clickhouse.QuoteIdentifier(dstColName),
176-
)
177-
}
178-
} else {
179-
fmt.Fprintf(&projection,
180-
"parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_data, %s),6,'UTC') AS %s,",
161+
fmt.Fprintf(&projection,
162+
"parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_data, %s),6,'UTC') AS %s,",
163+
peerdb_clickhouse.QuoteLiteral(colName),
164+
peerdb_clickhouse.QuoteIdentifier(dstColName),
165+
)
166+
if t.enablePrimaryUpdate {
167+
fmt.Fprintf(&projectionUpdate,
168+
"parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_match_data, %s),6,'UTC') AS %s,",
181169
peerdb_clickhouse.QuoteLiteral(colName),
182170
peerdb_clickhouse.QuoteIdentifier(dstColName),
183171
)
184-
if t.enablePrimaryUpdate {
185-
fmt.Fprintf(&projectionUpdate,
186-
"parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_match_data, %s),6,'UTC') AS %s,",
187-
peerdb_clickhouse.QuoteLiteral(colName),
188-
peerdb_clickhouse.QuoteIdentifier(dstColName),
189-
)
190-
}
191172
}
192173
case "Array(DateTime64(6))", "Nullable(Array(DateTime64(6)))":
193174
fmt.Fprintf(&projection,
@@ -312,14 +293,17 @@ func (t *NormalizeQueryGenerator) BuildQuery(ctx context.Context) (string, error
312293
t.lastNormBatchID, t.endBatchID, peerdb_clickhouse.QuoteLiteral(t.TableName))
313294
}
314295

315-
chSettings := NewCHSettings(t.chVersion)
316-
chSettings.Add(SettingThrowOnMaxPartitionsPerInsertBlock, "0")
317-
chSettings.Add(SettingTypeJsonSkipDuplicatedPaths, "1")
296+
chSettings := peerdb_clickhouse.NewCHSettings(t.chVersion)
297+
chSettings.Add(peerdb_clickhouse.SettingThrowOnMaxPartitionsPerInsertBlock, "0")
298+
chSettings.Add(peerdb_clickhouse.SettingTypeJsonSkipDuplicatedPaths, "1")
318299
if t.cluster {
319-
chSettings.Add(SettingParallelDistributedInsertSelect, "0")
300+
chSettings.Add(peerdb_clickhouse.SettingParallelDistributedInsertSelect, "0")
320301
}
321302
if t.version >= shared.InternalVersion_JsonEscapeDotsInKeys {
322-
chSettings.Add(SettingJsonTypeEscapeDotsInKeys, "1")
303+
chSettings.Add(peerdb_clickhouse.SettingJsonTypeEscapeDotsInKeys, "1")
304+
}
305+
if t.version >= shared.InternalVersion_ClickHouseTime64 {
306+
chSettings.Add(peerdb_clickhouse.SettingEnableTimeTime64Type, "1")
323307
}
324308

325309
insertIntoSelectQuery := fmt.Sprintf("INSERT INTO %s %s %s%s",

flow/connectors/clickhouse/table_function.go

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,27 @@ func jsonFieldExpressionConverter(
5252
return fmt.Sprintf("CAST(%s, 'JSON')", sourceFieldIdentifier), nil
5353
}
5454

55+
func timeFieldExpressionConverter(
56+
_ context.Context,
57+
config *insertFromTableFunctionConfig,
58+
sourceFieldIdentifier string,
59+
field types.QField,
60+
) (string, error) {
61+
if field.Type != types.QValueKindTime {
62+
return sourceFieldIdentifier, nil
63+
}
64+
65+
if !qvalue.ShouldUseTime64Type(config.connector.chVersion, config.config.Version) {
66+
return fmt.Sprintf("parseDateTime64BestEffortOrNull(%s, 6, 'UTC')", sourceFieldIdentifier), nil
67+
}
68+
69+
// time is stored as string "HH:MM:SS.ffffff"
70+
return fmt.Sprintf("toTime64(%s, 6)", sourceFieldIdentifier), nil
71+
}
72+
5573
var defaultFieldExpressionConverters = []fieldExpressionConverter{
5674
jsonFieldExpressionConverter,
75+
timeFieldExpressionConverter,
5776
}
5877

5978
// buildInsertFromTableFunctionQuery builds a complete INSERT query from a table function expression
@@ -62,7 +81,7 @@ func buildInsertFromTableFunctionQuery(
6281
ctx context.Context,
6382
config *insertFromTableFunctionConfig,
6483
tableFunctionExpr string,
65-
chSettings *CHSettings,
84+
chSettings *peerdb_clickhouse.CHSettings,
6685
) (string, error) {
6786
fieldExpressionConverters := defaultFieldExpressionConverters
6887
fieldExpressionConverters = append(fieldExpressionConverters, config.fieldExpressionConverters...)
@@ -142,7 +161,7 @@ func buildInsertFromTableFunctionQueryWithPartitioning(
142161
tableFunctionExpr string,
143162
partitionIndex uint64,
144163
totalPartitions uint64,
145-
chSettings *CHSettings,
164+
chSettings *peerdb_clickhouse.CHSettings,
146165
) (string, error) {
147166
var query strings.Builder
148167

flow/connectors/clickhouse/table_function_test.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/stretchr/testify/require"
1010

1111
"github.com/PeerDB-io/peerdb/flow/generated/protos"
12+
peerdb_clickhouse "github.com/PeerDB-io/peerdb/flow/pkg/clickhouse"
1213
"github.com/PeerDB-io/peerdb/flow/shared/types"
1314
)
1415

@@ -33,14 +34,16 @@ func TestBuildInsertFromTableFunctionQuery(t *testing.T) {
3334
}
3435

3536
tableFunctionExpr := "s3('s3://bucket/key', 'format')"
36-
chSettings := NewCHSettings(&chproto.Version{Major: 25, Minor: 8})
37-
chSettings.Add(SettingTypeJsonSkipDuplicatedPaths, "1")
37+
chSettings := peerdb_clickhouse.NewCHSettings(&chproto.Version{Major: 25, Minor: 8})
38+
chSettings.Add(peerdb_clickhouse.SettingTypeJsonSkipDuplicatedPaths, "1")
3839

3940
// without partitioning
4041
query, err := buildInsertFromTableFunctionQuery(ctx, config, tableFunctionExpr, chSettings)
4142
require.NoError(t, err)
42-
require.Equal(t, fmt.Sprintf("INSERT INTO `t1`(`id`,`name`) SELECT `id`,`name` FROM s3('s3://bucket/key', 'format') SETTINGS %s=%s",
43-
string(SettingTypeJsonSkipDuplicatedPaths), "1"), query)
43+
require.Equal(t,
44+
fmt.Sprintf("INSERT INTO `t1`(`id`,`name`) SELECT `id`,`name` FROM s3('s3://bucket/key', 'format') SETTINGS %s=%s",
45+
string(peerdb_clickhouse.SettingTypeJsonSkipDuplicatedPaths), "1"),
46+
query)
4447

4548
// with partitioning
4649
totalPartitions := uint64(8)
@@ -49,6 +52,7 @@ func TestBuildInsertFromTableFunctionQuery(t *testing.T) {
4952
require.NoError(t, err)
5053
require.Equal(t, query,
5154
"INSERT INTO `t1`(`id`,`name`) SELECT `id`,`name` FROM s3('s3://bucket/key', 'format')"+
52-
fmt.Sprintf(" WHERE cityHash64(`id`) %% 8 = %d SETTINGS %s=%s", idx, string(SettingTypeJsonSkipDuplicatedPaths), "1"))
55+
fmt.Sprintf(" WHERE cityHash64(`id`) %% 8 = %d SETTINGS %s=%s",
56+
idx, string(peerdb_clickhouse.SettingTypeJsonSkipDuplicatedPaths), "1"))
5357
}
5458
}

flow/connectors/mysql/qvalue_convert.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,11 @@ func qkindFromMysqlType(mytype byte, unsigned bool, charset uint16) (types.QValu
5858
return types.QValueKindInvalid, nil
5959
case mysql.MYSQL_TYPE_DATE, mysql.MYSQL_TYPE_NEWDATE:
6060
return types.QValueKindDate, nil
61-
case mysql.MYSQL_TYPE_TIMESTAMP, mysql.MYSQL_TYPE_TIME, mysql.MYSQL_TYPE_DATETIME,
62-
mysql.MYSQL_TYPE_TIMESTAMP2, mysql.MYSQL_TYPE_DATETIME2, mysql.MYSQL_TYPE_TIME2:
61+
case mysql.MYSQL_TYPE_TIMESTAMP, mysql.MYSQL_TYPE_DATETIME,
62+
mysql.MYSQL_TYPE_TIMESTAMP2, mysql.MYSQL_TYPE_DATETIME2:
6363
return types.QValueKindTimestamp, nil
64+
case mysql.MYSQL_TYPE_TIME, mysql.MYSQL_TYPE_TIME2:
65+
return types.QValueKindTime, nil
6466
case mysql.MYSQL_TYPE_YEAR:
6567
return types.QValueKindInt16, nil
6668
case mysql.MYSQL_TYPE_BIT:
@@ -309,6 +311,8 @@ func QValueFromMysqlFieldValue(qkind types.QValueKind, mytype byte, fv mysql.Fie
309311
}
310312
return types.QValueNumeric{Val: val}, nil
311313
case types.QValueKindTimestamp:
314+
// Deprecated: we mapped MySQL time to QValueKindTimestamp before ClickHouse supported Time64
315+
// Keep code path for backwards compatibility
312316
if mytype == mysql.MYSQL_TYPE_TIME || mytype == mysql.MYSQL_TYPE_TIME2 {
313317
tm, err := processTime(unsafeString)
314318
if err != nil {
@@ -325,9 +329,8 @@ func QValueFromMysqlFieldValue(qkind types.QValueKind, mytype byte, fv mysql.Fie
325329
}
326330
return types.QValueTimestamp{Val: val}, nil
327331
case types.QValueKindTime:
328-
// deprecated: most databases expect time to be time part of datetime
329-
// mysql it's a +/- 800 hour range to represent duration
330-
// keep codepath for backwards compat when mysql time was mapped to QValueKindTime
332+
// MySQL TIME supports range: [-838:59:59.000000, 838:59:59.999999]
333+
// ClickHouse Time64 supports range: [-999:59:59.000000, 999:59:59.999999]
331334
tm, err := processTime(unsafeString)
332335
if err != nil {
333336
return nil, err

0 commit comments

Comments
 (0)