@@ -491,6 +491,47 @@ func insertTestRows(ctx context.Context, is *is.I, pool *pgxpool.Pool, table str
491491 }
492492}
493493
494+ func verifyOpenCDCRecords (is * is.I , got []opencdc.Record , tableName string , fromID , toID int ) {
495+ is .Helper ()
496+
497+ // Build the expected records slice
498+ var want []opencdc.Record
499+
500+ for i := fromID ; i <= toID ; i ++ {
501+ id := int64 (i + 10 )
502+ record := opencdc.Record {
503+ Operation : opencdc .OperationCreate ,
504+ Key : opencdc.StructuredData {
505+ "id" : id ,
506+ },
507+ Payload : opencdc.Change {
508+ After : opencdc.StructuredData {
509+ "id" : id ,
510+ "key" : []uint8 (fmt .Sprintf ("%d" , id )),
511+ "column1" : fmt .Sprintf ("test-%d" , i ),
512+ "column2" : int32 (i ) * 100 , //nolint:gosec // fine, we know the value is small enough
513+ "column3" : false ,
514+ "column4" : big .NewRat (123 , 10 ),
515+ // UppercaseColumn1 is a Postgres interger (4 bytes)
516+ "UppercaseColumn1" : int32 (id ),
517+ },
518+ },
519+ Metadata : opencdc.Metadata {
520+ opencdc .MetadataCollection : tableName ,
521+ },
522+ }
523+
524+ want = append (want , record )
525+ }
526+
527+ cmpOpts := []cmp.Option {
528+ cmpopts .IgnoreUnexported (opencdc.Record {}),
529+ cmpopts .IgnoreFields (opencdc.Record {}, "Position" , "Metadata" ),
530+ test .BigRatComparer ,
531+ }
532+ is .Equal ("" , cmp .Diff (want , got , cmpOpts ... )) // mismatch (-want +got)
533+ }
534+
494535func TestCDCIterator_NextN (t * testing.T ) {
495536 ctx := test .Context (t )
496537 pool := test .ConnectPool (ctx , t , test .RepmgrConnString )
@@ -601,47 +642,6 @@ func TestCDCIterator_NextN(t *testing.T) {
601642 })
602643}
603644
604- func verifyOpenCDCRecords (is * is.I , got []opencdc.Record , tableName string , fromID , toID int ) {
605- is .Helper ()
606-
607- // Build the expected records slice
608- var want []opencdc.Record
609-
610- for i := fromID ; i <= toID ; i ++ {
611- id := int64 (i + 10 )
612- record := opencdc.Record {
613- Operation : opencdc .OperationCreate ,
614- Key : opencdc.StructuredData {
615- "id" : id ,
616- },
617- Payload : opencdc.Change {
618- After : opencdc.StructuredData {
619- "id" : id ,
620- "key" : []uint8 (fmt .Sprintf ("%d" , id )),
621- "column1" : fmt .Sprintf ("test-%d" , i ),
622- "column2" : int32 (i ) * 100 , //nolint:gosec // fine, we know the value is small enough
623- "column3" : false ,
624- "column4" : big .NewRat (123 , 10 ),
625- // UppercaseColumn1 is a Postgres interger (4 bytes)
626- "UppercaseColumn1" : int32 (id ),
627- },
628- },
629- Metadata : opencdc.Metadata {
630- opencdc .MetadataCollection : tableName ,
631- },
632- }
633-
634- want = append (want , record )
635- }
636-
637- cmpOpts := []cmp.Option {
638- cmpopts .IgnoreUnexported (opencdc.Record {}),
639- cmpopts .IgnoreFields (opencdc.Record {}, "Position" , "Metadata" ),
640- test .BigRatComparer ,
641- }
642- is .Equal ("" , cmp .Diff (want , got , cmpOpts ... )) // mismatch (-want +got)
643- }
644-
645645func testCDCIterator (ctx context.Context , t * testing.T , pool * pgxpool.Pool , table string , start bool ) * CDCIterator {
646646 is := is .New (t )
647647 config := CDCConfig {
0 commit comments