@@ -33,18 +33,21 @@ import (
3333 "github.com/conduitio/conduit-connector-sdk/schema"
3434 "github.com/google/go-cmp/cmp"
3535 "github.com/google/uuid"
36+ "github.com/jackc/pgx/v5"
3637 "github.com/matryer/is"
3738 "github.com/shopspring/decimal"
3839)
3940
41+ // todo test updates and deletes, for not null and nullable values
4042func TestSource_ReadN_Snapshot_CDC (t * testing.T ) {
4143 is := is .New (t )
4244 ctx := test .Context (t )
45+ notNullOnly := false
46+ conn := test .ConnectSimple (ctx , t , test .RepmgrConnString )
4347
4448 tableName := createTableWithManyTypes (ctx , t )
4549 // Write the snapshot data
46- insertRow (ctx , t , tableName , 1 , false )
47- insertRow (ctx , t , tableName , 2 , true )
50+ insertRow (ctx , is , conn , tableName , 1 , notNullOnly )
4851
4952 slotName := "conduitslot1"
5053 publicationName := "conduitpub1"
@@ -76,37 +79,31 @@ func TestSource_ReadN_Snapshot_CDC(t *testing.T) {
7679 is .NoErr (s .Teardown (ctx ))
7780 })
7881
79- // Read and ack the 2 snapshots records
80- recs , err := s .ReadN (ctx , 2 )
81- is .NoErr (err )
82- is .Equal (2 , len (recs ))
83-
84- // Verify snapshot record no. 1
85- err = s .Ack (ctx , recs [0 ].Position )
86- is .NoErr (err )
87- assertRecordOK (is , tableName , recs [0 ], 1 , false )
88- // Verify snapshot record no. 2
89- err = s .Ack (ctx , recs [1 ].Position )
90- is .NoErr (err )
91- assertRecordOK (is , tableName , recs [1 ], 2 , true )
82+ // Read, ack, and assert the snapshot record is OK
83+ rec := readAndAck (ctx , is , s )
84+ assertRecordOK (is , tableName , rec , 1 , notNullOnly )
9285
9386 // Write the CDC data
94- insertRow (ctx , t , tableName , 3 , true )
95- insertRow (ctx , t , tableName , 4 , false )
87+ insertRow (ctx , is , conn , tableName , 2 , notNullOnly )
9688
97- // Read, ack, and verify CDC record no. 1
98- recs , err = s .ReadN (ctx , 1 )
89+ // Read, ack, and verify the CDC record
90+ rec = readAndAck (ctx , is , s )
91+ assertRecordOK (is , tableName , rec , 2 , notNullOnly )
92+
93+ deleteRow (ctx , is , conn , tableName , 2 )
94+ rec = readAndAck (ctx , is , s )
95+ is .Equal (opencdc .OperationDelete , rec .Operation )
96+ }
97+
98+ func readAndAck (ctx context.Context , is * is.I , s sdk.Source ) opencdc.Record {
99+ recs , err := s .ReadN (ctx , 1 )
99100 is .NoErr (err )
100101 is .Equal (1 , len (recs ))
101- // record no. 3 has the NOT NULL columns only
102- assertRecordOK (is , tableName , recs [0 ], 3 , true )
103102
104- // Read, ack, and verify CDC record no. 1
105- recs , err = s .ReadN (ctx , 1 )
103+ err = s .Ack (ctx , recs [0 ].Position )
106104 is .NoErr (err )
107- is .Equal (1 , len (recs ))
108- // record no. 3 has the NOT NULL columns only
109- assertRecordOK (is , tableName , recs [0 ], 4 , false )
105+
106+ return recs [0 ]
110107}
111108
112109func createTableWithManyTypes (ctx context.Context , t * testing.T ) string {
@@ -173,10 +170,7 @@ func createTableWithManyTypes(ctx context.Context, t *testing.T) string {
173170
174171// insertRow inserts a row using the values provided by generatePayloadData.
175172// if notNullOnly is true, only NOT NULL columns are inserted.
176- func insertRow (ctx context.Context , t * testing.T , table string , rowNumber int , notNullOnly bool ) {
177- is := is .New (t )
178- conn := test .ConnectSimple (ctx , t , test .RepmgrConnString )
179-
173+ func insertRow (ctx context.Context , is * is.I , conn * pgx.Conn , table string , rowNumber int , notNullOnly bool ) {
180174 rec := generatePayloadData (rowNumber , false )
181175
182176 var columns []string
@@ -213,6 +207,17 @@ func insertRow(ctx context.Context, t *testing.T, table string, rowNumber int, n
213207 is .NoErr (err )
214208}
215209
210+ func deleteRow (ctx context.Context , is * is.I , conn * pgx.Conn , table string , rowNumber int ) {
211+ query , args , err := squirrel .Delete (internal .WrapSQLIdent (table )).
212+ Where (squirrel.Eq {"id" : rowNumber }).
213+ PlaceholderFormat (squirrel .Dollar ).
214+ ToSql ()
215+ is .NoErr (err )
216+
217+ _ , err = conn .Exec (ctx , query , args ... )
218+ is .NoErr (err )
219+ }
220+
216221func generatePayloadData (id int , notNullOnly bool ) opencdc.StructuredData {
217222 // Add a time zone offset
218223 rowTS := assert (time .Parse (time .RFC3339 , fmt .Sprintf ("2022-01-21T17:04:05+%02d:00" , id )))
0 commit comments