Skip to content

Commit d7187f5

Browse files
authored
mongo: allow write to JSON in CH when FF is enabled (#3119)
Currently QValueJson gets converted to String in ClickHouse. For Mongo we want to be able to write `_full_document` as JSON type. This PR will default to JSON in CH if these are all true: 1) Schema is QValueJson 2) CH version supports JSON (25.3 is when JSON is first introduced) 3) JSON is enabled by FF (default to false) Test: - e2e test - tested locally, with FF on (as JSON) and off (as String)
1 parent c95a2a8 commit d7187f5

File tree

12 files changed

+159
-22
lines changed

12 files changed

+159
-22
lines changed

flow/connectors/clickhouse/cdc.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ func (c *ClickHouseConnector) ReplayTableSchemaDeltas(
147147
for _, addedColumn := range schemaDelta.AddedColumns {
148148
qvKind := types.QValueKind(addedColumn.Type)
149149
clickHouseColType, err := qvalue.ToDWHColumnType(
150-
ctx, qvKind, env, protos.DBType_CLICKHOUSE, addedColumn, schemaDelta.NullableEnabled,
150+
ctx, qvKind, env, protos.DBType_CLICKHOUSE, c.chVersion, addedColumn, schemaDelta.NullableEnabled,
151151
)
152152
if err != nil {
153153
return fmt.Errorf("failed to convert column type %s to ClickHouse type: %w", addedColumn.Type, err)

flow/connectors/clickhouse/clickhouse.go

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ type ClickHouseConnector struct {
3333
logger log.Logger
3434
config *protos.ClickhouseConfig
3535
credsProvider *utils.ClickHouseS3Credentials
36+
chVersion *chproto.Version
3637
}
3738

3839
func NewClickHouseConnector(
@@ -95,6 +96,10 @@ func NewClickHouseConnector(
9596
return nil, err
9697
}
9798

99+
clickHouseVersion, err := database.ServerVersion()
100+
if err != nil {
101+
return nil, fmt.Errorf("failed to get ClickHouse version: %w", err)
102+
}
98103
connector := &ClickHouseConnector{
99104
database: database,
100105
PostgresMetadata: pgMetadata,
@@ -104,15 +109,12 @@ func NewClickHouseConnector(
104109
Provider: credentialsProvider,
105110
BucketPath: awsBucketPath,
106111
},
112+
chVersion: &clickHouseVersion.Version,
107113
}
108114

109115
if credentials.AWS.SessionToken != "" {
110116
// 24.3.1 is minimum version of ClickHouse that actually supports session token
111117
// https://github.com/ClickHouse/ClickHouse/issues/61230
112-
clickHouseVersion, err := database.ServerVersion()
113-
if err != nil {
114-
return nil, fmt.Errorf("failed to get ClickHouse version: %w", err)
115-
}
116118
if !chproto.CheckMinVersion(
117119
chproto.Version{Major: 24, Minor: 3, Patch: 1},
118120
clickHouseVersion.Version,
@@ -361,11 +363,15 @@ func (c *ClickHouseConnector) processTableComparison(dstTableName string, srcSch
361363
}
362364

363365
func (c *ClickHouseConnector) GetVersion(ctx context.Context) (string, error) {
366+
if c.chVersion != nil {
367+
return c.chVersion.String(), nil
368+
}
369+
364370
clickhouseVersion, err := c.database.ServerVersion()
365371
if err != nil {
366372
return "", fmt.Errorf("failed to get ClickHouse version: %w", err)
367373
}
368-
c.logger.Info("[clickhouse] version", slog.Any("version", clickhouseVersion.DisplayName))
374+
c.logger.Info("[clickhouse] version", slog.String("version", clickhouseVersion.DisplayName))
369375
return clickhouseVersion.Version.String(), nil
370376
}
371377

@@ -424,6 +430,8 @@ func GetTableSchemaForTable(tm *protos.TableMapping, columns []driver.ColumnType
424430
qkind = types.QValueKindArrayUUID
425431
case "Array(DateTime64(6))":
426432
qkind = types.QValueKindArrayTimestamp
433+
case "JSON":
434+
qkind = types.QValueKindJSON
427435
default:
428436
if strings.Contains(column.DatabaseTypeName(), "Decimal") {
429437
if strings.HasPrefix(column.DatabaseTypeName(), "Array(") {

flow/connectors/clickhouse/normalize.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"time"
1111

1212
"github.com/ClickHouse/clickhouse-go/v2"
13+
chproto "github.com/ClickHouse/clickhouse-go/v2/lib/proto"
1314
"golang.org/x/sync/errgroup"
1415

1516
"github.com/PeerDB-io/peerdb/flow/generated/protos"
@@ -62,6 +63,7 @@ func (c *ClickHouseConnector) SetupNormalizedTable(
6263
config,
6364
destinationTableIdentifier,
6465
sourceTableSchema,
66+
c.chVersion,
6567
)
6668
if err != nil {
6769
return false, fmt.Errorf("error while generating create table sql for destination ClickHouse table: %w", err)
@@ -85,6 +87,7 @@ func generateCreateTableSQLForNormalizedTable(
8587
config *protos.SetupNormalizedTableBatchInput,
8688
tableIdentifier string,
8789
tableSchema *protos.TableSchema,
90+
chVersion *chproto.Version,
8891
) (string, error) {
8992
var tableMapping *protos.TableMapping
9093
for _, tm := range config.TableMappings {
@@ -131,7 +134,7 @@ func generateCreateTableSQLForNormalizedTable(
131134
if clickHouseType == "" {
132135
var err error
133136
clickHouseType, err = qvalue.ToDWHColumnType(
134-
ctx, colType, config.Env, protos.DBType_CLICKHOUSE, column, tableSchema.NullableEnabled || columnNullableEnabled,
137+
ctx, colType, config.Env, protos.DBType_CLICKHOUSE, chVersion, column, tableSchema.NullableEnabled || columnNullableEnabled,
135138
)
136139
if err != nil {
137140
return "", fmt.Errorf("error while converting column type to ClickHouse type: %w", err)
@@ -407,6 +410,7 @@ func (c *ClickHouseConnector) NormalizeRecords(
407410
sourceSchemaAsDestinationColumn,
408411
req.Env,
409412
rawTbl,
413+
c.chVersion,
410414
)
411415
insertIntoSelectQuery, err := queryGenerator.BuildQuery(ctx)
412416
if err != nil {

flow/connectors/clickhouse/normalize_query.go

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import (
55
"fmt"
66
"strings"
77

8+
chproto "github.com/ClickHouse/clickhouse-go/v2/lib/proto"
9+
810
"github.com/PeerDB-io/peerdb/flow/generated/protos"
911
"github.com/PeerDB-io/peerdb/flow/internal"
1012
"github.com/PeerDB-io/peerdb/flow/model/qvalue"
@@ -13,16 +15,17 @@ import (
1315
)
1416

1517
type NormalizeQueryGenerator struct {
16-
tableNameSchemaMapping map[string]*protos.TableSchema
1718
env map[string]string
19+
tableNameSchemaMapping map[string]*protos.TableSchema
20+
chVersion *chproto.Version
1821
Query string
1922
TableName string
2023
rawTableName string
2124
tableMappings []*protos.TableMapping
2225
Part uint64
23-
syncBatchID int64
2426
batchIDToLoadForTable int64
2527
numParts uint64
28+
syncBatchID int64
2629
enablePrimaryUpdate bool
2730
sourceSchemaAsDestinationColumn bool
2831
}
@@ -40,6 +43,7 @@ func NewNormalizeQueryGenerator(
4043
sourceSchemaAsDestinationColumn bool,
4144
env map[string]string,
4245
rawTableName string,
46+
chVersion *chproto.Version,
4347
) *NormalizeQueryGenerator {
4448
return &NormalizeQueryGenerator{
4549
TableName: tableName,
@@ -53,6 +57,7 @@ func NewNormalizeQueryGenerator(
5357
sourceSchemaAsDestinationColumn: sourceSchemaAsDestinationColumn,
5458
env: env,
5559
rawTableName: rawTableName,
60+
chVersion: chVersion,
5661
}
5762
}
5863

@@ -110,7 +115,7 @@ func (t *NormalizeQueryGenerator) BuildQuery(ctx context.Context) (string, error
110115
if clickHouseType == "" {
111116
var err error
112117
clickHouseType, err = qvalue.ToDWHColumnType(
113-
ctx, colType, t.env, protos.DBType_CLICKHOUSE, column, schema.NullableEnabled || columnNullableEnabled,
118+
ctx, colType, t.env, protos.DBType_CLICKHOUSE, t.chVersion, column, schema.NullableEnabled || columnNullableEnabled,
114119
)
115120
if err != nil {
116121
return "", fmt.Errorf("error while converting column type to clickhouse type: %w", err)
@@ -174,6 +179,20 @@ func (t *NormalizeQueryGenerator) BuildQuery(ctx context.Context) (string, error
174179
peerdb_clickhouse.QuoteIdentifier(dstColName),
175180
)
176181
}
182+
case "JSON", "Nullable(JSON)":
183+
fmt.Fprintf(&projection,
184+
"JSONExtractString(_peerdb_data, %s) AS %s,",
185+
peerdb_clickhouse.QuoteLiteral(colName),
186+
peerdb_clickhouse.QuoteIdentifier(dstColName),
187+
)
188+
if t.enablePrimaryUpdate {
189+
fmt.Fprintf(&projectionUpdate,
190+
"JSONExtractString(_peerdb_match_data, %s) AS %s,",
191+
peerdb_clickhouse.QuoteLiteral(colName),
192+
peerdb_clickhouse.QuoteIdentifier(dstColName),
193+
)
194+
}
195+
177196
default:
178197
projLen := projection.Len()
179198
if colType == types.QValueKindBytes {

flow/connectors/clickhouse/normalize_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ func TestBuildQuery_Basic(t *testing.T) {
189189
sourceSchemaAsDestinationColumn,
190190
env,
191191
rawTableName,
192+
nil,
192193
)
193194

194195
query, err := g.BuildQuery(ctx)
@@ -243,6 +244,7 @@ func TestBuildQuery_WithPrimaryUpdate(t *testing.T) {
243244
sourceSchemaAsDestinationColumn,
244245
env,
245246
rawTableName,
247+
nil,
246248
)
247249

248250
query, err := g.BuildQuery(ctx)
@@ -294,6 +296,7 @@ func TestBuildQuery_WithSourceSchemaAsDestinationColumn(t *testing.T) {
294296
sourceSchemaAsDestinationColumn,
295297
env,
296298
rawTableName,
299+
nil,
297300
)
298301

299302
query, err := g.BuildQuery(ctx)
@@ -342,6 +345,7 @@ func TestBuildQuery_WithNumParts(t *testing.T) {
342345
sourceSchemaAsDestinationColumn,
343346
env,
344347
rawTableName,
348+
nil,
345349
)
346350

347351
query, err := g.BuildQuery(ctx)

flow/connectors/clickhouse/qrep_avro_sync.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/PeerDB-io/peerdb/flow/generated/protos"
1515
"github.com/PeerDB-io/peerdb/flow/internal"
1616
"github.com/PeerDB-io/peerdb/flow/model"
17+
"github.com/PeerDB-io/peerdb/flow/model/qvalue"
1718
"github.com/PeerDB-io/peerdb/flow/shared"
1819
peerdb_clickhouse "github.com/PeerDB-io/peerdb/flow/shared/clickhouse"
1920
"github.com/PeerDB-io/peerdb/flow/shared/exceptions"
@@ -263,7 +264,8 @@ func (s *ClickHouseAvroSyncMethod) pushS3DataToClickHouse(
263264

264265
selectedColumnNames := make([]string, 0, len(schema.Fields))
265266
insertedColumnNames := make([]string, 0, len(schema.Fields))
266-
for _, colName := range schema.GetColumnNames() {
267+
for _, field := range schema.Fields {
268+
colName := field.Name
267269
for _, excludedColumn := range config.Exclude {
268270
if colName == excludedColumn {
269271
continue
@@ -276,7 +278,15 @@ func (s *ClickHouseAvroSyncMethod) pushS3DataToClickHouse(
276278
slog.String("avroFieldName", avroColName))
277279
return fmt.Errorf("destination column %s not found in avro schema", colName)
278280
}
279-
selectedColumnNames = append(selectedColumnNames, peerdb_clickhouse.QuoteIdentifier(avroColName))
281+
282+
avroColName = peerdb_clickhouse.QuoteIdentifier(avroColName)
283+
284+
if field.Type == types.QValueKindJSON &&
285+
qvalue.ShouldUseNativeJSONType(ctx, config.Env, s.ClickHouseConnector.chVersion) {
286+
avroColName = fmt.Sprintf("JSONExtractString(%s)", avroColName)
287+
}
288+
289+
selectedColumnNames = append(selectedColumnNames, avroColName)
280290
insertedColumnNames = append(insertedColumnNames, peerdb_clickhouse.QuoteIdentifier(colName))
281291
}
282292
if sourceSchemaAsDestinationColumn {

flow/connectors/snowflake/merge_stmt_generator.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ func (m *mergeStmtGenerator) generateMergeStmt(ctx context.Context, env map[stri
3636
for _, column := range columns {
3737
genericColumnType := column.Type
3838
qvKind := types.QValueKind(genericColumnType)
39-
sfType, err := qvalue.ToDWHColumnType(ctx, qvKind, env, protos.DBType_SNOWFLAKE, column, normalizedTableSchema.NullableEnabled)
39+
sfType, err := qvalue.ToDWHColumnType(ctx, qvKind, env, protos.DBType_SNOWFLAKE, nil, column, normalizedTableSchema.NullableEnabled)
4040
if err != nil {
4141
return "", fmt.Errorf("failed to convert column type %s to snowflake type: %w", genericColumnType, err)
4242
}

flow/connectors/snowflake/snowflake.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -362,7 +362,7 @@ func (c *SnowflakeConnector) ReplayTableSchemaDeltas(
362362
for _, addedColumn := range schemaDelta.AddedColumns {
363363
qvKind := types.QValueKind(addedColumn.Type)
364364
sfColtype, err := qvalue.ToDWHColumnType(
365-
ctx, qvKind, env, protos.DBType_SNOWFLAKE, addedColumn, schemaDelta.NullableEnabled,
365+
ctx, qvKind, env, protos.DBType_SNOWFLAKE, nil, addedColumn, schemaDelta.NullableEnabled,
366366
)
367367
if err != nil {
368368
return fmt.Errorf("failed to convert column type %s to snowflake type: %w",
@@ -661,7 +661,7 @@ func generateCreateTableSQLForNormalizedTable(
661661
normalizedColName := SnowflakeIdentifierNormalize(column.Name)
662662
qvKind := types.QValueKind(genericColumnType)
663663
sfColType, err := qvalue.ToDWHColumnType(
664-
ctx, qvKind, config.Env, protos.DBType_SNOWFLAKE, column, tableSchema.NullableEnabled,
664+
ctx, qvKind, config.Env, protos.DBType_SNOWFLAKE, nil, column, tableSchema.NullableEnabled,
665665
)
666666
if err != nil {
667667
slog.Warn(fmt.Sprintf("failed to convert column type %s to snowflake type", genericColumnType),

flow/e2e/clickhouse/clickhouse.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"time"
1111

1212
"github.com/ClickHouse/clickhouse-go/v2"
13+
"github.com/ClickHouse/clickhouse-go/v2/lib/chcol"
1314
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
1415
"github.com/google/uuid"
1516
"github.com/jackc/pgx/v5"
@@ -303,6 +304,22 @@ func (s ClickHouseSuite) GetRows(table string, cols string) (*model.QRecordBatch
303304
qrow = append(qrow, types.QValueUUID{Val: *v})
304305
case *[]uuid.UUID:
305306
qrow = append(qrow, types.QValueArrayUUID{Val: *v})
307+
case **chcol.JSON:
308+
if *v == nil {
309+
qrow = append(qrow, types.QValueNull(types.QValueKindJSON))
310+
} else {
311+
jsonb, err := (**v).MarshalJSON()
312+
if err != nil {
313+
return nil, err
314+
}
315+
qrow = append(qrow, types.QValueJSON{Val: string(jsonb)})
316+
}
317+
case *chcol.JSON:
318+
jsonb, err := (*v).MarshalJSON()
319+
if err != nil {
320+
return nil, err
321+
}
322+
qrow = append(qrow, types.QValueJSON{Val: string(jsonb)})
306323
default:
307324
return nil, fmt.Errorf("cannot convert %T to types", v)
308325
}

flow/e2e/mongo/mongo_test.go

Lines changed: 54 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ func (s MongoClickhouseSuite) Test_Inconsistent_Schema() {
165165
e2e.RequireEnvCanceled(t, env)
166166
}
167167

168-
func (s MongoClickhouseSuite) Test_Update_Replace_Delete_Events() {
168+
func (s MongoClickhouseSuite) Test_CDC() {
169169
t := s.T()
170170

171171
srcDatabase := GetTestDatabase(s.Suffix())
@@ -183,16 +183,15 @@ func (s MongoClickhouseSuite) Test_Update_Replace_Delete_Events() {
183183
adminClient := s.Source().(*MongoSource).AdminClient()
184184
collection := adminClient.Database(srcDatabase).Collection(srcTable)
185185

186-
insertRes, err := collection.InsertOne(t.Context(), bson.D{bson.E{Key: "key", Value: 1}}, options.InsertOne())
187-
require.NoError(t, err)
188-
require.True(t, insertRes.Acknowledged)
189-
190186
tc := e2e.NewTemporalClient(t)
191187
env := e2e.ExecutePeerflow(t.Context(), tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil)
192-
e2e.EnvWaitForEqualTablesWithNames(env, s, "initial load", srcTable, dstTable, "_id,_full_document")
193-
194188
e2e.SetupCDCFlowStatusQuery(t, env, flowConnConfig)
195189

190+
insertRes, err := collection.InsertOne(t.Context(), bson.D{bson.E{Key: "key", Value: 1}}, options.InsertOne())
191+
require.NoError(t, err)
192+
require.True(t, insertRes.Acknowledged)
193+
e2e.EnvWaitForEqualTablesWithNames(env, s, "insert event", srcTable, dstTable, "_id,_full_document")
194+
196195
updateRes, err := collection.UpdateOne(
197196
t.Context(),
198197
bson.D{bson.E{Key: "key", Value: 1}},
@@ -355,3 +354,51 @@ func (s MongoClickhouseSuite) Test_Transactions_Across_Collections() {
355354
env.Cancel(t.Context())
356355
e2e.RequireEnvCanceled(t, env)
357356
}
357+
358+
func (s MongoClickhouseSuite) Test_Enable_Json() {
359+
t := s.T()
360+
srcDatabase := GetTestDatabase(s.Suffix())
361+
srcTable := "test_full_document_json"
362+
dstTable := "test_full_document_json_dst"
363+
364+
connectionGen := e2e.FlowConnectionGenerationConfig{
365+
FlowJobName: e2e.AddSuffix(s, srcTable),
366+
TableMappings: e2e.TableMappings(s, srcTable, dstTable),
367+
Destination: s.Peer().Name,
368+
}
369+
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
370+
flowConnConfig.DoInitialSnapshot = true
371+
flowConnConfig.Env = map[string]string{"PEERDB_CLICKHOUSE_ENABLE_JSON": "true"}
372+
373+
adminClient := s.Source().(*MongoSource).AdminClient()
374+
collection := adminClient.Database(srcDatabase).Collection(srcTable)
375+
376+
res, err := collection.InsertOne(t.Context(), bson.D{bson.E{Key: "key", Value: "val"}}, options.InsertOne())
377+
require.NoError(t, err)
378+
require.True(t, res.Acknowledged)
379+
380+
tc := e2e.NewTemporalClient(t)
381+
env := e2e.ExecutePeerflow(t.Context(), tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil)
382+
e2e.EnvWaitForEqualTablesWithNames(env, s, "initial load", srcTable, dstTable, "_id,_full_document")
383+
384+
e2e.SetupCDCFlowStatusQuery(t, env, flowConnConfig)
385+
386+
insertRes, err := collection.InsertOne(t.Context(), bson.D{bson.E{Key: "key2", Value: "val2"}}, options.InsertOne())
387+
require.NoError(t, err)
388+
require.True(t, insertRes.Acknowledged)
389+
e2e.EnvWaitForEqualTablesWithNames(env, s, "insert event", srcTable, dstTable, "_id,_full_document")
390+
oid := bson.D{bson.E{Key: "_id", Value: res.InsertedID}}
391+
392+
replaceRes, err := collection.ReplaceOne(t.Context(), oid, bson.D{bson.E{Key: "key2", Value: "val2"}}, options.Replace())
393+
require.NoError(t, err)
394+
require.Equal(t, int64(1), replaceRes.ModifiedCount)
395+
e2e.EnvWaitForEqualTablesWithNames(env, s, "replace event", srcTable, dstTable, "_id,_full_document")
396+
397+
deleteRes, err := collection.DeleteOne(t.Context(), oid, options.DeleteOne())
398+
require.NoError(t, err)
399+
require.Equal(t, int64(1), deleteRes.DeletedCount)
400+
e2e.EnvWaitForEqualTablesWithNames(env, s, "delete event", srcTable, dstTable, "_id,_full_document")
401+
402+
env.Cancel(t.Context())
403+
e2e.RequireEnvCanceled(t, env)
404+
}

0 commit comments

Comments
 (0)