@@ -38,76 +38,140 @@ import (
3838 "github.com/shopspring/decimal"
3939)
4040
41- type readTestCase struct {
42- name string
43- notNullOnly bool
44- snapshot bool
45- cdc bool
46- opDelete bool
47- }
41+ var (
42+ slotName = "conduitslot1"
43+ publicationName = "conduitpub1"
44+ )
4845
49- func TestSource_ReadN (t * testing.T ) {
50- testCases := []readTestCase {
51- {
52- name : "snapshot not only only" ,
53- notNullOnly : true ,
54- snapshot : true ,
55- },
46+ func TestSource_ReadN_Snapshot (t * testing.T ) {
47+ testCases := []struct {
48+ name string
49+ notNullOnly bool
50+ }{
5651 {
57- name : "snapshot with nullable values " ,
52+ name : "with null columns " ,
5853 notNullOnly : false ,
59- snapshot : true ,
6054 },
6155 {
62- name : "cdc not only only" ,
56+ name : "not only only" ,
6357 notNullOnly : true ,
64- cdc : true ,
6558 },
59+ }
60+
61+ for _ , tc := range testCases {
62+ t .Run (tc .name , func (t * testing.T ) {
63+ is := is .New (t )
64+ ctx := test .Context (t )
65+ conn := test .ConnectSimple (ctx , t , test .RepmgrConnString )
66+
67+ tableName := createTableWithManyTypes (ctx , t )
68+ insertRow (ctx , is , conn , tableName , 1 , tc .notNullOnly )
69+
70+ s := openSource (ctx , is , tableName )
71+ t .Cleanup (func () {
72+ is .NoErr (logrepl .Cleanup (context .Background (), logrepl.CleanupConfig {
73+ URL : test .RepmgrConnString ,
74+ SlotName : slotName ,
75+ PublicationName : publicationName ,
76+ }))
77+ is .NoErr (s .Teardown (ctx ))
78+ })
79+
80+ // Read, ack, and assert the snapshot record is OK
81+ rec := readAndAck (ctx , is , s )
82+ assertRecordOK (is , tableName , rec , 1 , tc .notNullOnly )
83+ })
84+ }
85+ }
86+
87+ func TestSource_ReadN_CDC (t * testing.T ) {
88+ testCases := []struct {
89+ name string
90+ notNullOnly bool
91+ }{
6692 {
67- name : "cdc with nullable values " ,
93+ name : "with null columns " ,
6894 notNullOnly : false ,
69- cdc : true ,
7095 },
71-
7296 {
73- name : "delete cdc not only only" ,
97+ name : "not only only" ,
7498 notNullOnly : true ,
75- cdc : true ,
76- opDelete : true ,
77- },
78- {
79- name : "delete cdc nullable" ,
80- notNullOnly : false ,
81- cdc : true ,
82- opDelete : true ,
8399 },
84100 }
85101
86102 for _ , tc := range testCases {
87103 t .Run (tc .name , func (t * testing.T ) {
88- runReadTest (t , tc )
104+ is := is .New (t )
105+ ctx := test .Context (t )
106+ conn := test .ConnectSimple (ctx , t , test .RepmgrConnString )
107+
108+ tableName := createTableWithManyTypes (ctx , t )
109+
110+ s := openSource (ctx , is , tableName )
111+ t .Cleanup (func () {
112+ is .NoErr (logrepl .Cleanup (context .Background (), logrepl.CleanupConfig {
113+ URL : test .RepmgrConnString ,
114+ SlotName : slotName ,
115+ PublicationName : publicationName ,
116+ }))
117+ is .NoErr (s .Teardown (ctx ))
118+ })
119+
120+ insertRow (ctx , is , conn , tableName , 1 , tc .notNullOnly )
121+ // Read, ack, and assert the CDC record is OK
122+ rec := readAndAck (ctx , is , s )
123+ assertRecordOK (is , tableName , rec , 1 , tc .notNullOnly )
89124 })
90125 }
91126}
92127
93- func runReadTest (t * testing.T , tc readTestCase ) {
94- if tc .opDelete {
95- t .Skip ("Skipping delete test, see https://github.com/ConduitIO/conduit-connector-postgres/issues/301" )
128+ func TestSource_ReadN_Delete (t * testing.T ) {
129+ t .Skip ("Skipping until this issue is resolved: https://github.com/ConduitIO/conduit-connector-postgres/issues/301" )
130+ testCases := []struct {
131+ name string
132+ notNullOnly bool
133+ }{
134+ {
135+ name : "with null columns" ,
136+ notNullOnly : false ,
137+ },
138+ {
139+ name : "not only only" ,
140+ notNullOnly : true ,
141+ },
96142 }
97143
98- is := is .New (t )
99- ctx := test .Context (t )
100- conn := test .ConnectSimple (ctx , t , test .RepmgrConnString )
101-
102- tableName := createTableWithManyTypes (ctx , t )
103-
104- if tc .snapshot {
105- insertRow (ctx , is , conn , tableName , 1 , tc .notNullOnly )
144+ for _ , tc := range testCases {
145+ t .Run (tc .name , func (t * testing.T ) {
146+ is := is .New (t )
147+ ctx := test .Context (t )
148+ conn := test .ConnectSimple (ctx , t , test .RepmgrConnString )
149+
150+ tableName := createTableWithManyTypes (ctx , t )
151+
152+ s := openSource (ctx , is , tableName )
153+ t .Cleanup (func () {
154+ is .NoErr (logrepl .Cleanup (context .Background (), logrepl.CleanupConfig {
155+ URL : test .RepmgrConnString ,
156+ SlotName : slotName ,
157+ PublicationName : publicationName ,
158+ }))
159+ is .NoErr (s .Teardown (ctx ))
160+ })
161+
162+ insertRow (ctx , is , conn , tableName , 1 , tc .notNullOnly )
163+ // Read, ack, and assert the CDC record is OK
164+ cdcRec := readAndAck (ctx , is , s )
165+ assertRecordOK (is , tableName , cdcRec , 1 , tc .notNullOnly )
166+
167+ deleteRow (ctx , is , conn , tableName , 1 )
168+ deleteRec := readAndAck (ctx , is , s )
169+ is .Equal (opencdc .OperationDelete , deleteRec .Operation )
170+ })
106171 }
172+ }
107173
108- slotName := "conduitslot1"
109- publicationName := "conduitpub1"
110-
174+ func openSource (ctx context.Context , is * is.I , tableName string ) sdk.Source {
111175 s := NewSource ()
112176 err := sdk .Util .ParseConfig (
113177 ctx ,
@@ -126,34 +190,8 @@ func runReadTest(t *testing.T, tc readTestCase) {
126190
127191 err = s .Open (ctx , nil )
128192 is .NoErr (err )
129- t .Cleanup (func () {
130- is .NoErr (logrepl .Cleanup (context .Background (), logrepl.CleanupConfig {
131- URL : test .RepmgrConnString ,
132- SlotName : slotName ,
133- PublicationName : publicationName ,
134- }))
135- is .NoErr (s .Teardown (ctx ))
136- })
137193
138- if tc .snapshot {
139- // Read, ack, and assert the snapshot record is OK
140- rec := readAndAck (ctx , is , s )
141- assertRecordOK (is , tableName , rec , 1 , tc .notNullOnly )
142- }
143-
144- if tc .cdc {
145- insertRow (ctx , is , conn , tableName , 1 , tc .notNullOnly )
146-
147- // Read, ack, and verify the CDC record
148- rec := readAndAck (ctx , is , s )
149- assertRecordOK (is , tableName , rec , 1 , tc .notNullOnly )
150- }
151-
152- if tc .opDelete {
153- deleteRow (ctx , is , conn , tableName , 1 )
154- rec := readAndAck (ctx , is , s )
155- is .Equal (opencdc .OperationDelete , rec .Operation )
156- }
194+ return s
157195}
158196
159197func readAndAck (ctx context.Context , is * is.I , s sdk.Source ) opencdc.Record {
0 commit comments