@@ -31,6 +31,7 @@ import (
3131 "github.com/conduitio/conduit-connector-sdk/schema"
3232 "github.com/google/go-cmp/cmp"
3333 "github.com/matryer/is"
34+ "github.com/shopspring/decimal"
3435)
3536
3637func TestSource_ReadN_Snapshot_CDC (t * testing.T ) {
@@ -253,7 +254,7 @@ func insertRowNotNullColumnsOnly(ctx context.Context, t *testing.T, table string
253254 %d,
254255 %d,
255256 %d,
256- %f ,
257+ %s ,
257258 '%s',
258259 '%s'::timestamp,
259260 '%s'::timestamptz,
@@ -272,7 +273,7 @@ func insertRowNotNullColumnsOnly(ctx context.Context, t *testing.T, table string
272273 rec ["col_int2_not_null" ],
273274 rec ["col_int4_not_null" ],
274275 rec ["col_int8_not_null" ],
275- rec ["col_numeric_not_null" ],
276+ decimalString ( rec ["col_numeric_not_null" ]) ,
276277 rec ["col_text_not_null" ],
277278 rec ["col_timestamp_not_null" ],
278279 rec ["col_timestamptz_not_null" ],
@@ -321,7 +322,7 @@ func insertRowAllColumns(ctx context.Context, t *testing.T, table string, rowNum
321322 %d, %d,
322323 %d, %d,
323324 %d, %d,
324- %f , %f ,
325+ %s , %s ,
325326 '%s', '%s',
326327 '%s'::timestamp, '%s'::timestamp,
327328 '%s'::timestamptz, '%s'::timestamptz,
@@ -340,7 +341,7 @@ func insertRowAllColumns(ctx context.Context, t *testing.T, table string, rowNum
340341 rec ["col_int2" ], rec ["col_int2_not_null" ],
341342 rec ["col_int4" ], rec ["col_int4_not_null" ],
342343 rec ["col_int8" ], rec ["col_int8_not_null" ],
343- rec ["col_numeric" ], rec ["col_numeric_not_null" ],
344+ decimalString ( rec ["col_numeric" ]), decimalString ( rec ["col_numeric_not_null" ]) ,
344345 rec ["col_text" ], rec ["col_text_not_null" ],
345346 rec ["col_timestamp" ], rec ["col_timestamp_not_null" ],
346347 rec ["col_timestamptz" ], rec ["col_timestamptz_not_null" ],
@@ -354,6 +355,10 @@ func insertRowAllColumns(ctx context.Context, t *testing.T, table string, rowNum
354355 is .NoErr (err )
355356}
356357
358+ func decimalString (v interface {}) string {
359+ return decimal .NewFromBigRat (v .(* big.Rat ), 2 ).String ()
360+ }
361+
357362// expectedData creates an opencdc.StructuredData with expected keys and values
358363// based on the ID and the columns (NOT NULL columns only or all columns).
359364func expectedData (id int , notNullOnly bool ) opencdc.StructuredData {
@@ -385,10 +390,6 @@ func normalizeNullValue(key string, value interface{}) interface{} {
385390func normalizeNotNullValue (key string , value interface {}) interface {} {
386391 normalized := value
387392 switch {
388- case strings .HasPrefix (key , "col_numeric" ):
389- val := new (big.Rat )
390- val .SetString (fmt .Sprintf ("%v" , value ))
391- normalized = val
392393 case strings .HasPrefix (key , "col_date" ):
393394 val := assert (time .Parse ("2006-01-02" , value .(string )))
394395 normalized = val
@@ -407,6 +408,8 @@ func generatePayloadData(id int, notNullOnly bool) opencdc.StructuredData {
407408 rowUUID := fmt .Sprintf ("a74a9875-978e-4832-b1b8-6b0f8793a%03d" , id )
408409
409410 idInt64 := int64 (id )
411+ numericVal := big .NewRat (int64 (100 + id ), 10 )
412+
410413 rec := opencdc.StructuredData {
411414 "id" : id ,
412415 "col_bytea" : []uint8 (fmt .Sprintf ("col_bytea_%v" , id )),
@@ -425,8 +428,8 @@ func generatePayloadData(id int, notNullOnly bool) opencdc.StructuredData {
425428 "col_int4_not_null" : id ,
426429 "col_int8" : idInt64 ,
427430 "col_int8_not_null" : idInt64 ,
428- "col_numeric" : float64 ( 100 + id ) / 10 ,
429- "col_numeric_not_null" : float64 ( 100 + id ) / 10 ,
431+ "col_numeric" : numericVal ,
432+ "col_numeric_not_null" : numericVal ,
430433 "col_text" : fmt .Sprintf ("bar-%v" , id ),
431434 "col_text_not_null" : fmt .Sprintf ("bar-%v" , id ),
432435 "col_timestamp" : rowTS .Format (time .RFC3339 ),
0 commit comments