Skip to content

Commit 5040b70

Browse files
authored
Move shared validations into their own module (#3547)
This allows to import just the validation code with minimal dependencies and avoid tracking unrelated dependency updates in PeerDB. Connector-specific code remaining in shared/ was kind of small so moved it to other places. CH type conversion was supposed to be shared with other services in the future but it pulls in QValue so we can decide to move all of it to shared-light if we want to expose QValue or to move just type conversions and give it its own small data contracts.
1 parent 344b21d commit 5040b70

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+329
-117
lines changed

flow/connectors/clickhouse/avro_sync.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ import (
1616
"github.com/PeerDB-io/peerdb/flow/internal"
1717
"github.com/PeerDB-io/peerdb/flow/model"
1818
"github.com/PeerDB-io/peerdb/flow/model/qvalue"
19+
peerdb_clickhouse "github.com/PeerDB-io/peerdb/flow/pkg/clickhouse"
1920
"github.com/PeerDB-io/peerdb/flow/shared"
20-
peerdb_clickhouse "github.com/PeerDB-io/peerdb/flow/shared/clickhouse"
2121
"github.com/PeerDB-io/peerdb/flow/shared/exceptions"
2222
"github.com/PeerDB-io/peerdb/flow/shared/types"
2323
)

flow/connectors/clickhouse/cdc.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ import (
1414
"github.com/PeerDB-io/peerdb/flow/internal"
1515
"github.com/PeerDB-io/peerdb/flow/model"
1616
"github.com/PeerDB-io/peerdb/flow/model/qvalue"
17+
peerdb_clickhouse "github.com/PeerDB-io/peerdb/flow/pkg/clickhouse"
1718
"github.com/PeerDB-io/peerdb/flow/shared"
18-
peerdb_clickhouse "github.com/PeerDB-io/peerdb/flow/shared/clickhouse"
1919
"github.com/PeerDB-io/peerdb/flow/shared/types"
2020
)
2121

@@ -118,7 +118,7 @@ func (c *ClickHouseConnector) syncRecordsViaAvro(
118118
req.Records.GetRecords(), tableNameRowsMapping, syncBatchID, unboundedNumericAsString,
119119
protos.DBType_CLICKHOUSE,
120120
)
121-
numericTruncator := model.NewStreamNumericTruncator(req.TableMappings, peerdb_clickhouse.NumericDestinationTypes)
121+
numericTruncator := model.NewStreamNumericTruncator(req.TableMappings, NumericDestinationTypes)
122122
stream, err := utils.RecordsToRawTableStream(streamReq, numericTruncator)
123123
if err != nil {
124124
return nil, fmt.Errorf("failed to convert records to raw table stream: %w", err)

flow/connectors/clickhouse/clickhouse.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ import (
2222
"github.com/PeerDB-io/peerdb/flow/connectors/utils"
2323
"github.com/PeerDB-io/peerdb/flow/generated/protos"
2424
"github.com/PeerDB-io/peerdb/flow/internal"
25+
peerdb_clickhouse "github.com/PeerDB-io/peerdb/flow/pkg/clickhouse"
2526
"github.com/PeerDB-io/peerdb/flow/shared"
26-
peerdb_clickhouse "github.com/PeerDB-io/peerdb/flow/shared/clickhouse"
2727
"github.com/PeerDB-io/peerdb/flow/shared/types"
2828
)
2929

flow/connectors/clickhouse/normalize.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ import (
1919
"github.com/PeerDB-io/peerdb/flow/internal"
2020
"github.com/PeerDB-io/peerdb/flow/model"
2121
"github.com/PeerDB-io/peerdb/flow/model/qvalue"
22+
peerdb_clickhouse "github.com/PeerDB-io/peerdb/flow/pkg/clickhouse"
2223
"github.com/PeerDB-io/peerdb/flow/shared"
23-
peerdb_clickhouse "github.com/PeerDB-io/peerdb/flow/shared/clickhouse"
2424
"github.com/PeerDB-io/peerdb/flow/shared/types"
2525
)
2626

flow/connectors/clickhouse/normalize_query.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import (
1010
"github.com/PeerDB-io/peerdb/flow/generated/protos"
1111
"github.com/PeerDB-io/peerdb/flow/internal"
1212
"github.com/PeerDB-io/peerdb/flow/model/qvalue"
13-
peerdb_clickhouse "github.com/PeerDB-io/peerdb/flow/shared/clickhouse"
13+
peerdb_clickhouse "github.com/PeerDB-io/peerdb/flow/pkg/clickhouse"
1414
"github.com/PeerDB-io/peerdb/flow/shared/types"
1515
)
1616

flow/connectors/clickhouse/type_conversion.go

Lines changed: 59 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,69 @@ package connclickhouse
22

33
import (
44
"github.com/PeerDB-io/peerdb/flow/generated/protos"
5-
"github.com/PeerDB-io/peerdb/flow/shared/clickhouse"
65
"github.com/PeerDB-io/peerdb/flow/shared/types"
76
)
87

8+
/*
9+
This file handles the mapping for ClickHouse destination types and
10+
their corresponding TypeConversion implementations. A TypeConversion
11+
object contains two functions: one for schema conversion (QField) and
12+
one for value conversion (QValue). This allows the avro writer to
13+
stage the schema/data in the converted type format, and therefore
14+
successfully uploaded to the desired destination type in ClickHouse.
15+
16+
To add a type conversion:
17+
18+
(1) In flow/model/shared/type_converter.go:
19+
- implement a SchemaConversionFn interface to convert the QField type
20+
- implement a ValueConversionFn interface to convert the QValue data
21+
22+
(2) Add the new conversion to the `supportedDestinationTypes` map here
23+
(if destination type doesn't exist, create a new map entry for it).
24+
*/
25+
var SupportedDestinationTypes = map[string][]types.TypeConversion{
26+
"String": {
27+
types.NewTypeConversion(
28+
types.NumericToStringSchemaConversion,
29+
types.NumericToStringValueConversion,
30+
),
31+
},
32+
"Int256": {
33+
types.NewTypeConversion(
34+
types.NumericToInt256SchemaConversion,
35+
types.NumericToInt256ValueConversion,
36+
),
37+
},
38+
"UInt256": {
39+
types.NewTypeConversion(
40+
types.NumericToUInt256SchemaConversion,
41+
types.NumericToUInt256ValueConversion,
42+
),
43+
},
44+
}
45+
46+
var NumericDestinationTypes = map[string]struct{}{
47+
"String": {},
48+
"Int256": {},
49+
"UInt256": {},
50+
}
51+
52+
// returns the full list of supported type conversions. The keys are
53+
// QValueKind to allows the implementation to be source-connector agnostic.
54+
func ListSupportedTypeConversions() map[types.QValueKind][]string {
55+
typeConversions := make(map[types.QValueKind][]string)
56+
57+
for dstType, l := range SupportedDestinationTypes {
58+
for _, conversion := range l {
59+
typeConversions[conversion.FromKind()] = append(typeConversions[conversion.FromKind()], dstType)
60+
}
61+
}
62+
return typeConversions
63+
}
64+
965
func GetColumnsTypeConversion() (*protos.ColumnsTypeConversionResponse, error) {
1066
res := make([]*protos.ColumnsTypeConversion, 0)
11-
for qkind, destTypes := range clickhouse.ListSupportedTypeConversions() {
67+
for qkind, destTypes := range ListSupportedTypeConversions() {
1268
res = append(res, &protos.ColumnsTypeConversion{
1369
Qkind: string(qkind),
1470
DestinationTypes: destTypes,
@@ -32,7 +88,7 @@ func findTypeConversions(schema types.QRecordSchema, columns []*protos.ColumnSet
3288
if !exist {
3389
continue
3490
}
35-
conversions, exist := clickhouse.SupportedDestinationTypes[col.DestinationType]
91+
conversions, exist := SupportedDestinationTypes[col.DestinationType]
3692
if !exist {
3793
continue
3894
}

flow/connectors/clickhouse/validate.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99

1010
"github.com/PeerDB-io/peerdb/flow/generated/protos"
1111
"github.com/PeerDB-io/peerdb/flow/internal"
12-
chvalidate "github.com/PeerDB-io/peerdb/flow/shared/clickhouse"
12+
chvalidate "github.com/PeerDB-io/peerdb/flow/pkg/clickhouse"
1313
)
1414

1515
func (c *ClickHouseConnector) ValidateMirrorDestination(

flow/connectors/mongo/mongo.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ import (
2121
"github.com/PeerDB-io/peerdb/flow/connectors/utils"
2222
"github.com/PeerDB-io/peerdb/flow/generated/protos"
2323
"github.com/PeerDB-io/peerdb/flow/internal"
24+
peerdb_mongo "github.com/PeerDB-io/peerdb/flow/pkg/mongo"
2425
"github.com/PeerDB-io/peerdb/flow/shared"
25-
peerdb_mongo "github.com/PeerDB-io/peerdb/flow/shared/mongo"
2626
)
2727

2828
const (

flow/connectors/mongo/qrep.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ import (
1717
"github.com/PeerDB-io/peerdb/flow/model"
1818
"github.com/PeerDB-io/peerdb/flow/otel_metrics"
1919
"github.com/PeerDB-io/peerdb/flow/shared"
20-
shared_mongo "github.com/PeerDB-io/peerdb/flow/shared/mongo"
2120
"github.com/PeerDB-io/peerdb/flow/shared/types"
2221
)
2322

@@ -28,7 +27,7 @@ func (c *MongoConnector) GetQRepPartitions(
2827
) ([]*protos.QRepPartition, error) {
2928
fullTablePartition := []*protos.QRepPartition{
3029
{
31-
PartitionId: shared_mongo.MongoFullTablePartitionId,
30+
PartitionId: utils.FullTablePartitionID,
3231
Range: nil,
3332
FullTablePartition: true,
3433
},

flow/connectors/mongo/schema.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import (
55
"fmt"
66

77
"github.com/PeerDB-io/peerdb/flow/generated/protos"
8-
shared_mongo "github.com/PeerDB-io/peerdb/flow/shared/mongo"
8+
shared_mongo "github.com/PeerDB-io/peerdb/flow/pkg/mongo"
99
)
1010

1111
func (c *MongoConnector) GetAllTables(ctx context.Context) (*protos.AllTablesResponse, error) {

0 commit comments

Comments
 (0)