Skip to content

Commit 52591d5

Browse files
authored
avoid converting json null to sql null (#3593)
1 parent 3df973f commit 52591d5

File tree

4 files changed

+100
-4
lines changed

4 files changed

+100
-4
lines changed

flow/connectors/postgres/cdc.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -323,19 +323,23 @@ func (p *PostgresCDCSource) decodeColumnData(
323323
p.logger.Error("[pg_cdc] failed to unmarshal json", slog.Any("error", err))
324324
return nil, fmt.Errorf("failed to unmarshal json: %w", err)
325325
}
326+
if parsedData == nil {
327+
// avoid confusing SQL null & JSON null by using struct{} as json null sentinel
328+
parsedData = model.JsonNull{}
329+
}
326330
return p.parseFieldFromPostgresOID(dataType, typmod, true, protos.DBType_DBTYPE_UNKNOWN,
327331
parsedData, customTypeMapping, p.internalVersion)
328332
}
329333
return types.QValueNull(types.QValueKindJSON), nil
330334
} else if dataType == pgtype.JSONArrayOID || dataType == pgtype.JSONBArrayOID {
331-
textArr := &pgtype.FlatArray[pgtype.Text]{}
332-
if err := p.typeMap.Scan(dataType, formatCode, data, textArr); err != nil {
335+
textArr := pgtype.FlatArray[pgtype.Text]{}
336+
if err := p.typeMap.Scan(dataType, formatCode, data, &textArr); err != nil {
333337
p.logger.Error("[pg_cdc] failed to scan json array", slog.Any("error", err))
334338
return nil, fmt.Errorf("failed to scan json array: %w", err)
335339
}
336340

337-
arr := make([]any, len(*textArr))
338-
for j, text := range *textArr {
341+
arr := make([]any, len(textArr))
342+
for j, text := range textArr {
339343
if text.Valid {
340344
if err := p.jsonApi.UnmarshalFromString(text.String, &arr[j]); err != nil {
341345
p.logger.Error("[pg_cdc] failed to unmarshal json array element", slog.Any("error", err))

flow/connectors/postgres/qrep_query_executor.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -378,6 +378,10 @@ func (qe *QRepQueryExecutor) mapRowToQRecord(
378378
qe.logger.Error("[pg_query_executor] failed to unmarshal json", slog.Any("error", err))
379379
return nil, fmt.Errorf("failed to unmarshal json: %w", err)
380380
}
381+
if values[i] == nil {
382+
// avoid confusing SQL null & JSON null by using struct{} as json null sentinel
383+
values[i] = model.JsonNull{}
384+
}
381385
case pgtype.JSONArrayOID, pgtype.JSONBArrayOID:
382386
var textArr pgtype.FlatArray[pgtype.Text]
383387
if err := qe.conn.TypeMap().Scan(fd.DataTypeOID, fd.Format, buf, &textArr); err != nil {

flow/e2e/clickhouse_test.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1388,6 +1388,88 @@ func (s ClickHouseSuite) Test_InfiniteTimestamp() {
13881388
RequireEnvCanceled(s.t, env)
13891389
}
13901390

1391+
func (s ClickHouseSuite) Test_JSON_Null() {
1392+
if _, ok := s.source.(*PostgresSource); !ok {
1393+
s.t.Skip("only applies to postgres")
1394+
}
1395+
1396+
srcTableName := "test_json_null"
1397+
srcFullName := s.attachSchemaSuffix(srcTableName)
1398+
dstTableName := "test_json_null"
1399+
1400+
require.NoError(s.t, s.source.Exec(s.t.Context(), fmt.Sprintf(`
1401+
CREATE TABLE IF NOT EXISTS %s (
1402+
id INT PRIMARY KEY,
1403+
j_null JSON NULL,
1404+
j_notnull JSON NOT NULL,
1405+
j_sqlnull JSON NULL,
1406+
jb_null JSONB NULL,
1407+
jb_notnull JSONB NOT NULL,
1408+
jb_sqlnull JSONB NULL
1409+
);
1410+
`, srcFullName)))
1411+
1412+
require.NoError(s.t, s.source.Exec(s.t.Context(),
1413+
fmt.Sprintf(`INSERT INTO %s (id,j_null,j_notnull,j_sqlnull,jb_null,jb_notnull,jb_sqlnull)
1414+
VALUES (1,'null'::json,'null'::json,NULL,'null'::jsonb,'null'::jsonb,NULL)`,
1415+
srcFullName)))
1416+
1417+
connectionGen := FlowConnectionGenerationConfig{
1418+
FlowJobName: s.attachSuffix("ch_json_null"),
1419+
TableNameMapping: map[string]string{srcFullName: dstTableName},
1420+
Destination: s.Peer().Name,
1421+
}
1422+
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
1423+
flowConnConfig.DoInitialSnapshot = true
1424+
flowConnConfig.Env = map[string]string{"PEERDB_NULLABLE": "true"}
1425+
1426+
tc := NewTemporalClient(s.t)
1427+
env := ExecutePeerflow(s.t, tc, flowConnConfig)
1428+
SetupCDCFlowStatusQuery(s.t, env, flowConnConfig)
1429+
1430+
EnvWaitForEqualTablesWithNames(env, s, "waiting on initial", srcTableName, dstTableName, "id")
1431+
1432+
require.NoError(s.t, s.source.Exec(s.t.Context(),
1433+
fmt.Sprintf(`INSERT INTO %s (id,j_null,j_notnull,j_sqlnull,jb_null,jb_notnull,jb_sqlnull)
1434+
VALUES (2,'null'::json,'null'::json,NULL,'null'::jsonb,'null'::jsonb,NULL)`,
1435+
srcFullName)))
1436+
1437+
EnvWaitForEqualTablesWithNames(env, s, "waiting on cdc", srcTableName, dstTableName, "id")
1438+
1439+
ch, err := connclickhouse.Connect(s.t.Context(), nil, s.Peer().GetClickhouseConfig())
1440+
require.NoError(s.t, err)
1441+
rows, err := ch.Query(s.t.Context(),
1442+
fmt.Sprintf("select id,j_null,j_notnull,j_sqlnull,jb_null,jb_notnull,jb_sqlnull from %s order by id", dstTableName))
1443+
require.NoError(s.t, err)
1444+
defer rows.Close()
1445+
numRows := 0
1446+
for rows.Next() {
1447+
numRows++
1448+
var id int32
1449+
var jNull *string
1450+
var jbNull *string
1451+
var jNotNull string
1452+
var jbNotNull string
1453+
var jSqlNull *string
1454+
var jbSqlNull *string
1455+
require.NoError(s.t, rows.Scan(&id, &jNull, &jNotNull, &jSqlNull, &jbNull, &jbNotNull, &jbSqlNull))
1456+
s.t.Log(id, jNull, jbNull, jNotNull, jbNotNull)
1457+
require.NotNil(s.t, jNull)
1458+
require.NotNil(s.t, jbNull)
1459+
require.Equal(s.t, "null", *jNull)
1460+
require.Equal(s.t, "null", jNotNull)
1461+
require.Equal(s.t, "null", *jbNull)
1462+
require.Equal(s.t, "null", jbNotNull)
1463+
require.Nil(s.t, jSqlNull)
1464+
require.Nil(s.t, jbSqlNull)
1465+
}
1466+
require.NoError(s.t, rows.Err())
1467+
require.Equal(s.t, 2, numRows)
1468+
1469+
env.Cancel(s.t.Context())
1470+
RequireEnvCanceled(s.t, env)
1471+
}
1472+
13911473
func (s ClickHouseSuite) Test_JSON_CH() {
13921474
if mySource, ok := s.source.(*MySqlSource); ok && mySource.Config.Flavor == protos.MySqlFlavor_MYSQL_MARIA {
13931475
s.t.Skip("skip maria, where JSON is not a supported data type")

flow/model/model.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,12 @@ import (
1313
"github.com/PeerDB-io/peerdb/flow/shared/exceptions"
1414
)
1515

16+
type JsonNull struct{}
17+
18+
func (JsonNull) MarshalJSON() ([]byte, error) {
19+
return []byte("null"), nil
20+
}
21+
1622
type NameAndExclude struct {
1723
Exclude map[string]struct{}
1824
Name string

0 commit comments

Comments
 (0)