@@ -42,7 +42,7 @@ func TestSource_ReadN_Snapshot_CDC(t *testing.T) {
4242 ctx := test .Context (t )
4343
4444 tableName := createTableWithManyTypes (ctx , t )
45- // Snapshot data
45+ // Write the snapshot data
4646 insertRow (ctx , t , tableName , 1 , false )
4747 insertRow (ctx , t , tableName , 2 , true )
4848
@@ -77,38 +77,36 @@ func TestSource_ReadN_Snapshot_CDC(t *testing.T) {
7777 })
7878
7979 // Read and ack the 2 snapshots records
80- snapshotRecs , err := s .ReadN (ctx , 2 )
80+ recs , err := s .ReadN (ctx , 2 )
8181 is .NoErr (err )
82- is .Equal (2 , len (snapshotRecs ))
82+ is .Equal (2 , len (recs ))
8383
84- err = s .Ack (ctx , snapshotRecs [0 ].Position )
84+ // Verify snapshot record no. 1
85+ err = s .Ack (ctx , recs [0 ].Position )
8586 is .NoErr (err )
86- err = s .Ack (ctx , snapshotRecs [1 ].Position )
87+ assertRecordOK (is , tableName , recs [0 ], 1 , false )
88+ // Verify snapshot record no. 2
89+ err = s .Ack (ctx , recs [1 ].Position )
8790 is .NoErr (err )
91+ assertRecordOK (is , tableName , recs [1 ], 2 , true )
8892
89- // Verify snapshot records
90- assertRecordOK (is , tableName , snapshotRecs [0 ], 1 , false )
91- assertRecordOK (is , tableName , snapshotRecs [1 ], 2 , true )
92-
93- // CDC data
93+ // Write the CDC data
9494 insertRow (ctx , t , tableName , 3 , true )
9595 insertRow (ctx , t , tableName , 4 , false )
9696
97- // Read, ack and verify the first CDC record
98- cdcRecs , err := s .ReadN (ctx , 1 )
99- is .NoErr (err )
100- is .Equal (1 , len (cdcRecs ))
101- err = s .Ack (ctx , cdcRecs [0 ].Position )
97+ // Read, ack, and verify CDC record no. 1
98+ recs , err = s .ReadN (ctx , 1 )
10299 is .NoErr (err )
103- assertRecordOK (is , tableName , cdcRecs [0 ], 3 , true )
100+ is .Equal (1 , len (recs ))
101+ // record no. 3 has the NOT NULL columns only
102+ assertRecordOK (is , tableName , recs [0 ], 3 , true )
104103
105- // Read, ack and verify the second CDC record
106- cdcRecs , err = s .ReadN (ctx , 1 )
107- is .NoErr (err )
108- is .Equal (1 , len (cdcRecs ))
109- err = s .Ack (ctx , cdcRecs [0 ].Position )
104+ // Read, ack, and verify CDC record no. 1
105+ recs , err = s .ReadN (ctx , 1 )
110106 is .NoErr (err )
111- assertRecordOK (is , tableName , cdcRecs [0 ], 4 , false )
107+ is .Equal (1 , len (recs ))
108+ // record no. 3 has the NOT NULL columns only
109+ assertRecordOK (is , tableName , recs [0 ], 4 , false )
112110}
113111
114112func createTableWithManyTypes (ctx context.Context , t * testing.T ) string {
@@ -123,8 +121,8 @@ func createTableWithManyTypes(ctx context.Context, t *testing.T) string {
123121 id integer PRIMARY KEY,
124122 col_bytea bytea,
125123 col_bytea_not_null bytea NOT NULL,
126- col_varchar varchar(10 ),
127- col_varchar_not_null varchar(10 ) NOT NULL,
124+ col_varchar varchar(30 ),
125+ col_varchar_not_null varchar(30 ) NOT NULL,
128126 col_date date,
129127 col_date_not_null date NOT NULL,
130128 col_float4 float4,
@@ -195,6 +193,7 @@ func insertRow(ctx context.Context, t *testing.T, table string, rowNumber int, n
195193 }
196194
197195 columns = append (columns , key )
196+ // col_numeric is a big.Rat, so we convert it to a string
198197 if strings .HasPrefix (key , "col_numeric" ) {
199198 values = append (values , decimalString (value ))
200199 } else {
@@ -228,10 +227,10 @@ func generatePayloadData(id int, notNullOnly bool) opencdc.StructuredData {
228227
229228 "col_bytea" : []uint8 (fmt .Sprintf ("col_bytea_%v" , id )),
230229 "col_bytea_not_null" : []uint8 (fmt .Sprintf ("col_bytea_not_null_%v" , id )),
231- "col_varchar" : fmt .Sprintf ("foo- %v" , id ),
232- "col_varchar_not_null" : fmt .Sprintf ("foo- %v" , id ),
233- "col_text" : fmt .Sprintf ("bar- %v" , id ),
234- "col_text_not_null" : fmt .Sprintf ("bar- %v" , id ),
230+ "col_varchar" : fmt .Sprintf ("col_varchar_ %v" , id ),
231+ "col_varchar_not_null" : fmt .Sprintf ("col_varchar_not_null_ %v" , id ),
232+ "col_text" : fmt .Sprintf ("col_text_ %v" , id ),
233+ "col_text_not_null" : fmt .Sprintf ("col_text_not_null_ %v" , id ),
235234
236235 "col_uuid" : rowUUID ,
237236 "col_uuid_not_null" : rowUUID ,
@@ -273,7 +272,7 @@ func generatePayloadData(id int, notNullOnly bool) opencdc.StructuredData {
273272 "col_timestamptz_not_null" : rowTS ,
274273
275274 "col_bool" : id % 2 == 0 ,
276- "col_bool_not_null" : id % 2 == 0 ,
275+ "col_bool_not_null" : id % 2 == 1 ,
277276 }
278277
279278 if notNullOnly {
@@ -337,18 +336,19 @@ func assertPayloadOK(is *is.I, record opencdc.Record, rowNum int, notNullOnly bo
337336
338337 want := expectedData (rowNum , notNullOnly )
339338
340- is .Equal ("" , cmp .Diff (want , got , test .BigRatComparer )) // -want, +got
339+ is .Equal ("" , cmp .Diff (want , got , test .BigRatComparer )) // expected different payload ( -want, +got)
341340}
342341
343342// expectedData creates an opencdc.StructuredData with expected keys and values
344343// based on the ID and the columns (NOT NULL columns only or all columns).
345- // It also converts values that are written into the test table as one type
346- // but read as another (e.g., we use UUID objects when inserting test data,
347- // but they are read as strings).
344+ // Its output is different generatePayloadData, because certain values are written
345+ // into the test table as one type, but read as another (e.g., we use UUID objects
346+ // when inserting test data, but they are read as strings).
348347func expectedData (id int , notNullOnly bool ) opencdc.StructuredData {
349348 rec := generatePayloadData (id , notNullOnly )
350349
351350 for key , value := range rec {
351+ // UUID are written as byte arrays but read as strings.
352352 if strings .HasPrefix (key , "col_uuid" ) {
353353 if value == nil {
354354 rec [key ] = ""
0 commit comments