Skip to content

Commit 53f6529

Browse files
authored
int256: gob.Register (#3138)
1 parent d7187f5 commit 53f6529

File tree

7 files changed

+42
-45
lines changed

7 files changed

+42
-45
lines changed

flow/connectors/clickhouse/qrep_avro_sync.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ func (s *ClickHouseAvroSyncMethod) pushDataToS3(
175175
partition *protos.QRepPartition,
176176
stream *model.QRecordStream,
177177
destTypeConversions map[string]types.TypeConversion,
178-
numericTruncator *model.SnapshotTableNumericTruncator,
178+
numericTruncator model.SnapshotTableNumericTruncator,
179179
) ([]utils.AvroFile, int64, error) {
180180
avroSchema, err := s.getAvroSchema(ctx, config.Env, dstTableName, schema, columnNameAvroFieldMap)
181181
if err != nil {
@@ -381,7 +381,7 @@ func (s *ClickHouseAvroSyncMethod) writeToAvroFile(
381381
identifierForFile string,
382382
flowJobName string,
383383
typeConversions map[string]types.TypeConversion,
384-
numericTruncator *model.SnapshotTableNumericTruncator,
384+
numericTruncator model.SnapshotTableNumericTruncator,
385385
) (utils.AvroFile, error) {
386386
stagingPath := s.credsProvider.BucketPath
387387
ocfWriter := utils.NewPeerDBOCFWriter(stream, avroSchema, ocf.ZStandard, protos.DBType_CLICKHOUSE)

flow/connectors/utils/avro_writer.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func (p *peerDBOCFWriter) WriteOCF(
7373
env map[string]string,
7474
w io.Writer,
7575
typeConversions map[string]types.TypeConversion,
76-
numericTruncator *model.SnapshotTableNumericTruncator,
76+
numericTruncator model.SnapshotTableNumericTruncator,
7777
) (int64, error) {
7878
ocfWriter, err := p.createOCFWriter(w)
7979
if err != nil {
@@ -96,7 +96,7 @@ func (p *peerDBOCFWriter) WriteRecordsToS3(
9696
s3Creds AWSCredentialsProvider,
9797
avroSize *atomic.Int64,
9898
typeConversions map[string]types.TypeConversion,
99-
numericTruncator *model.SnapshotTableNumericTruncator,
99+
numericTruncator model.SnapshotTableNumericTruncator,
100100
) (AvroFile, error) {
101101
logger := internal.LoggerFromCtx(ctx)
102102
s3svc, err := CreateS3Client(ctx, s3Creds)
@@ -222,7 +222,7 @@ func (p *peerDBOCFWriter) writeRecordsToOCFWriter(
222222
env map[string]string,
223223
ocfWriter *ocf.Encoder,
224224
typeConversions map[string]types.TypeConversion,
225-
numericTruncator *model.SnapshotTableNumericTruncator,
225+
numericTruncator model.SnapshotTableNumericTruncator,
226226
) (int64, error) {
227227
logger := internal.LoggerFromCtx(ctx)
228228

flow/connectors/utils/cdc_store.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,10 +85,12 @@ func init() {
8585
gob.Register(types.QValueInt16{})
8686
gob.Register(types.QValueInt32{})
8787
gob.Register(types.QValueInt64{})
88+
gob.Register(types.QValueInt256{})
8889
gob.Register(types.QValueUInt8{})
8990
gob.Register(types.QValueUInt16{})
9091
gob.Register(types.QValueUInt32{})
9192
gob.Register(types.QValueUInt64{})
93+
gob.Register(types.QValueUInt256{})
9294
gob.Register(types.QValueBoolean{})
9395
gob.Register(types.QValueQChar{})
9496
gob.Register(types.QValueString{})

flow/connectors/utils/stream.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -157,15 +157,15 @@ func InitialiseTableRowsMap(tableMaps []*protos.TableMapping) map[string]*model.
157157

158158
func truncateNumerics(
159159
items model.Items, targetDWH protos.DBType, unboundedNumericAsString bool,
160-
numericTruncator *model.CdcTableNumericTruncator,
160+
numericTruncator model.CdcTableNumericTruncator,
161161
) model.Items {
162162
recordItems, ok := items.(model.RecordItems)
163163
if !ok {
164164
return items
165165
}
166166
hasNumerics := false
167167
for col, val := range recordItems.ColToVal {
168-
if !numericTruncator.Get(col).Skip {
168+
if numericTruncator.Get(col).Stat != nil {
169169
if val.Kind() == types.QValueKindNumeric || val.Kind() == types.QValueKindArrayNumeric {
170170
hasNumerics = true
171171
break
@@ -181,7 +181,7 @@ func truncateNumerics(
181181
newVal := val
182182

183183
columnTruncator := numericTruncator.Get(col)
184-
if !columnTruncator.Skip {
184+
if columnTruncator.Stat != nil {
185185
switch numeric := val.(type) {
186186
case types.QValueNumeric:
187187
destType := qvalue.GetNumericDestinationType(

flow/model/conversion_avro.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ func (qac *QRecordAvroConverter) Convert(
5353
env map[string]string,
5454
qrecord []types.QValue,
5555
typeConversions map[string]types.TypeConversion,
56-
numericTruncator *SnapshotTableNumericTruncator,
56+
numericTruncator SnapshotTableNumericTruncator,
5757
) (map[string]any, error) {
5858
m := make(map[string]any, len(qrecord))
5959
for idx, val := range qrecord {

flow/model/numeric_truncator.go

Lines changed: 29 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -7,20 +7,25 @@ import (
77
"github.com/PeerDB-io/peerdb/flow/shared/types"
88
)
99

10-
type StreamNumericTruncator map[string]*CdcTableNumericTruncator
10+
type CdcTableNumericTruncator struct {
11+
TruncatorsByColumn map[string]CdcColumnNumericTruncator
12+
DestinationTable string
13+
}
14+
15+
type StreamNumericTruncator map[string]CdcTableNumericTruncator
1116

1217
func NewStreamNumericTruncator(tableMappings []*protos.TableMapping, typesToSkip map[string]struct{}) StreamNumericTruncator {
13-
statsByTable := make(map[string]*CdcTableNumericTruncator, len(tableMappings))
18+
statsByTable := make(map[string]CdcTableNumericTruncator, len(tableMappings))
1419
for _, tableMapping := range tableMappings {
1520
statsByTable[tableMapping.DestinationTableIdentifier] = NewCdcTableNumericTruncator(
1621
tableMapping.DestinationTableIdentifier, tableMapping.Columns, typesToSkip)
1722
}
1823
return statsByTable
1924
}
2025

21-
func (ss StreamNumericTruncator) Get(destinationTable string) *CdcTableNumericTruncator {
26+
func (ss StreamNumericTruncator) Get(destinationTable string) CdcTableNumericTruncator {
2227
if ss == nil {
23-
return nil
28+
return CdcTableNumericTruncator{}
2429
}
2530
truncator, ok := ss[destinationTable]
2631
if !ok {
@@ -38,82 +43,72 @@ func (ss StreamNumericTruncator) Warnings() shared.QRepWarnings {
3843
return warnings
3944
}
4045

41-
type CdcTableNumericTruncator struct {
42-
TruncatorsByColumn map[string]*CdcColumnNumericTruncator
43-
DestinationTable string
44-
}
45-
4646
func NewCdcTableNumericTruncator(
4747
destinationTable string, columnSettings []*protos.ColumnSetting, typesToSkip map[string]struct{},
48-
) *CdcTableNumericTruncator {
49-
truncatorsByColumn := map[string]*CdcColumnNumericTruncator{}
48+
) CdcTableNumericTruncator {
49+
truncatorsByColumn := map[string]CdcColumnNumericTruncator{}
5050
for _, columnSetting := range columnSettings {
5151
if _, ok := typesToSkip[columnSetting.DestinationType]; ok {
5252
destinationName := columnSetting.DestinationName
5353
if destinationName == "" {
5454
destinationName = columnSetting.SourceName
5555
}
56-
truncatorsByColumn[destinationName] = &CdcColumnNumericTruncator{Skip: true}
56+
truncatorsByColumn[destinationName] = CdcColumnNumericTruncator{}
5757
}
5858
}
59-
return &CdcTableNumericTruncator{
59+
return CdcTableNumericTruncator{
6060
TruncatorsByColumn: truncatorsByColumn,
6161
DestinationTable: destinationTable,
6262
}
6363
}
6464

65-
func (ts *CdcTableNumericTruncator) Get(destinationColumn string) *CdcColumnNumericTruncator {
66-
if ts == nil {
67-
return &CdcColumnNumericTruncator{Skip: true}
65+
func (ts CdcTableNumericTruncator) Get(destinationColumn string) CdcColumnNumericTruncator {
66+
if ts.TruncatorsByColumn == nil {
67+
return CdcColumnNumericTruncator{}
6868
}
6969
stat, ok := ts.TruncatorsByColumn[destinationColumn]
7070
if !ok {
71-
stat = &CdcColumnNumericTruncator{
72-
Stat: qvalue.NewNumericStat(ts.DestinationTable, destinationColumn),
71+
numericStat := qvalue.NewNumericStat(ts.DestinationTable, destinationColumn)
72+
stat = CdcColumnNumericTruncator{
73+
Stat: &numericStat,
7374
}
7475
ts.TruncatorsByColumn[destinationColumn] = stat
7576
}
7677
return stat
7778
}
7879

79-
func (ts *CdcTableNumericTruncator) CollectWarnings(warnings *shared.QRepWarnings) {
80+
func (ts CdcTableNumericTruncator) CollectWarnings(warnings *shared.QRepWarnings) {
8081
for _, truncator := range ts.TruncatorsByColumn {
81-
if !truncator.Skip {
82+
if truncator.Stat != nil {
8283
truncator.Stat.CollectWarnings(warnings)
8384
}
8485
}
8586
}
8687

87-
//nolint:govet // semantically ordered
8888
type CdcColumnNumericTruncator struct {
89-
Skip bool
9089
Stat *qvalue.NumericStat
9190
}
9291

93-
type SnapshotTableNumericTruncator struct {
94-
stats []*qvalue.NumericStat
95-
}
92+
type SnapshotTableNumericTruncator []qvalue.NumericStat
9693

97-
func NewSnapshotTableNumericTruncator(destinationTable string, fields []types.QField) *SnapshotTableNumericTruncator {
98-
stats := make([]*qvalue.NumericStat, 0, len(fields))
94+
func NewSnapshotTableNumericTruncator(destinationTable string, fields []types.QField) SnapshotTableNumericTruncator {
95+
stats := make([]qvalue.NumericStat, 0, len(fields))
9996
for _, field := range fields {
10097
stats = append(stats, qvalue.NewNumericStat(destinationTable, field.Name))
10198
}
102-
return &SnapshotTableNumericTruncator{
103-
stats: stats,
104-
}
99+
return SnapshotTableNumericTruncator(stats)
105100
}
106101

107-
func (ts *SnapshotTableNumericTruncator) Get(idx int) *qvalue.NumericStat {
102+
func (ts SnapshotTableNumericTruncator) Get(idx int) *qvalue.NumericStat {
108103
if ts == nil {
109104
return nil
110105
}
111-
return ts.stats[idx]
106+
return &ts[idx]
112107
}
113108

114-
func (ts *SnapshotTableNumericTruncator) Warnings() shared.QRepWarnings {
109+
func (ts SnapshotTableNumericTruncator) Warnings() shared.QRepWarnings {
115110
var warnings shared.QRepWarnings
116-
for _, stat := range ts.stats {
111+
for _, stat := range ts {
117112
stat.CollectWarnings(&warnings)
118113
}
119114
return warnings

flow/model/qvalue/avro_converter.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -660,8 +660,8 @@ type NumericStat struct {
660660
BigInt256ClearedCount uint64
661661
}
662662

663-
func NewNumericStat(destinationTable, destinationColumn string) *NumericStat {
664-
return &NumericStat{
663+
func NewNumericStat(destinationTable, destinationColumn string) NumericStat {
664+
return NumericStat{
665665
DestinationTable: destinationTable,
666666
DestinationColumn: destinationColumn,
667667
}

0 commit comments

Comments
 (0)