1- package main
1+ package sync
22
33import (
44 "context"
5- "database/sql"
65 "fmt"
76 "testing"
87 "time"
@@ -13,8 +12,6 @@ import (
1312 "github.com/testcontainers/testcontainers-go"
1413 "github.com/testcontainers/testcontainers-go/modules/postgres"
1514 "github.com/testcontainers/testcontainers-go/wait"
16-
17- "github.com/cybertec-postgresql/etcd_fdw/internal/sync"
1815)
1916
2017func setupPostgreSQLContainer (ctx context.Context , t * testing.T ) (* pgxpool.Pool , testcontainers.Container ) {
@@ -40,7 +37,7 @@ func setupPostgreSQLContainer(ctx context.Context, t *testing.T) (*pgxpool.Pool,
4037 CREATE TABLE etcd (
4138 ts timestamp with time zone NOT NULL DEFAULT now(),
4239 key text NOT NULL,
43- value text,
40+ value text NOT NULL ,
4441 revision bigint NOT NULL,
4542 tombstone boolean NOT NULL DEFAULT false,
4643 PRIMARY KEY(key, revision)
@@ -53,7 +50,7 @@ func setupPostgreSQLContainer(ctx context.Context, t *testing.T) (*pgxpool.Pool,
5350 return pool , pgContainer
5451}
5552
56- func setupEtcdContainer (ctx context.Context , t * testing.T ) (* sync. EtcdClient , testcontainers.Container ) {
53+ func setupEtcdContainer (ctx context.Context , t * testing.T ) (* EtcdClient , testcontainers.Container ) {
5754 etcdContainer , err := testcontainers .GenericContainer (ctx , testcontainers.GenericContainerRequest {
5855 ContainerRequest : testcontainers.ContainerRequest {
5956 Image : "quay.io/coreos/etcd:v3.5.9" ,
@@ -76,13 +73,13 @@ func setupEtcdContainer(ctx context.Context, t *testing.T) (*sync.EtcdClient, te
7673 require .NoError (t , err )
7774
7875 dsn := "etcd://" + endpoint + "/test"
79- etcdClient , err := sync . NewEtcdClient (dsn )
76+ etcdClient , err := NewEtcdClient (dsn )
8077 require .NoError (t , err )
8178
8279 return etcdClient , etcdContainer
8380}
8481
85- func setupTestContainers (t * testing.T ) (* pgxpool.Pool , * sync. EtcdClient , func ()) {
82+ func setupTestContainers (t * testing.T ) (* pgxpool.Pool , * EtcdClient , func ()) {
8683 ctx := context .Background ()
8784
8885 pool , pgContainer := setupPostgreSQLContainer (ctx , t )
@@ -117,18 +114,18 @@ func TestPollingMechanism(t *testing.T) {
117114 require .NoError (t , err )
118115
119116 // Test GetPendingRecords function
120- pendingRecords , err := sync . GetPendingRecords (ctx , pool )
117+ pendingRecords , err := GetPendingRecords (ctx , pool )
121118 require .NoError (t , err )
122119 assert .Len (t , pendingRecords , 1 )
123120 assert .Equal (t , "test/polling/key1" , pendingRecords [0 ].Key )
124121 assert .Equal (t , "value1" , pendingRecords [0 ].Value )
125122
126123 // Test UpdateRevision function
127- err = sync . UpdateRevision (ctx , pool , "test/polling/key1" , 123 )
124+ err = UpdateRevision (ctx , pool , "test/polling/key1" , 123 )
128125 require .NoError (t , err )
129126
130127 // Verify record was updated
131- pendingAfterUpdate , err := sync . GetPendingRecords (ctx , pool )
128+ pendingAfterUpdate , err := GetPendingRecords (ctx , pool )
132129 require .NoError (t , err )
133130 assert .Len (t , pendingAfterUpdate , 0 , "No pending records should remain after update" )
134131
@@ -154,7 +151,7 @@ func TestBulkInsert(t *testing.T) {
154151 defer cancel ()
155152
156153 // Prepare test records
157- records := []sync. KeyValueRecord {
154+ records := []KeyValueRecord {
158155 {
159156 Key : "test/bulk/key1" ,
160157 Value : ("value1" ),
@@ -179,7 +176,7 @@ func TestBulkInsert(t *testing.T) {
179176 }
180177
181178 // Test BulkInsert function
182- err := sync . BulkInsert (ctx , pool , records )
179+ err := BulkInsert (ctx , pool , records )
183180 require .NoError (t , err )
184181
185182 // Verify records were inserted correctly
@@ -189,17 +186,17 @@ func TestBulkInsert(t *testing.T) {
189186 assert .Equal (t , 3 , count )
190187
191188 // Verify specific record details
192- var key , value sql. NullString
189+ var key , value string
193190 var revision int64
194191 var tombstone bool
195192 err = pool .QueryRow (ctx , `
196193 SELECT key, value, revision, tombstone
197194 FROM etcd WHERE key = 'test/bulk/key1'
198195 ` ).Scan (& key , & value , & revision , & tombstone )
199196 require .NoError (t , err )
200- assert .Equal (t , "test/bulk/key1" , key . String )
201- assert .True (t , value . Valid )
202- assert .Equal (t , "value1" , value . String )
197+ assert .Equal (t , "test/bulk/key1" , key )
198+ assert .NotEmpty (t , value )
199+ assert .Equal (t , "value1" , value )
203200 assert .Equal (t , int64 (100 ), revision )
204201 assert .False (t , tombstone )
205202
@@ -209,8 +206,8 @@ func TestBulkInsert(t *testing.T) {
209206 FROM etcd WHERE key = 'test/bulk/key3'
210207 ` ).Scan (& key , & value , & revision , & tombstone )
211208 require .NoError (t , err )
212- assert .Equal (t , "test/bulk/key3" , key . String )
213- assert .False (t , value . Valid ) // NULL value
209+ assert .Equal (t , "test/bulk/key3" , key )
210+ assert .Empty (t , value ) // NULL value
214211 assert .Equal (t , int64 (102 ), revision )
215212 assert .True (t , tombstone )
216213}
@@ -227,23 +224,23 @@ func TestInsertPendingRecord(t *testing.T) {
227224 defer cancel ()
228225
229226 // Test inserting a new pending record
230- err := sync . InsertPendingRecord (ctx , pool , "test/pending/key1" , ("value1" ), false )
227+ err := InsertPendingRecord (ctx , pool , "test/pending/key1" , ("value1" ), false )
231228 require .NoError (t , err )
232229
233230 // Verify record was inserted with revision = -1
234231 var revision int64
235- var value sql. NullString
232+ var value string
236233 err = pool .QueryRow (ctx , `
237234 SELECT revision, value FROM etcd
238235 WHERE key = 'test/pending/key1'
239236 ` ).Scan (& revision , & value )
240237 require .NoError (t , err )
241238 assert .Equal (t , int64 (- 1 ), revision )
242- assert .True (t , value . Valid )
243- assert .Equal (t , "value1" , value . String )
239+ assert .NotEmpty (t , value )
240+ assert .Equal (t , "value1" , value )
244241
245242 // Test inserting second record with same key (should create new record with different timestamp)
246- err = sync . InsertPendingRecord (ctx , pool , "test/pending/key1" , ("updated_value" ), false )
243+ err = InsertPendingRecord (ctx , pool , "test/pending/key1" , ("updated_value" ), false )
247244 require .NoError (t , err )
248245
249246 // Verify both records exist (different timestamps, both with revision = -1)
@@ -256,7 +253,7 @@ func TestInsertPendingRecord(t *testing.T) {
256253 assert .Equal (t , 1 , count , "Should have 1 pending records for the same key with latest value" )
257254
258255 // Test inserting tombstone record
259- err = sync . InsertPendingRecord (ctx , pool , "test/pending/key2" , "" , true )
256+ err = InsertPendingRecord (ctx , pool , "test/pending/key2" , "" , true )
260257 require .NoError (t , err )
261258
262259 // Verify tombstone record
@@ -267,7 +264,7 @@ func TestInsertPendingRecord(t *testing.T) {
267264 ` ).Scan (& revision , & value , & tombstone )
268265 require .NoError (t , err )
269266 assert .Equal (t , int64 (- 1 ), revision )
270- assert .False (t , value . Valid ) // NULL value
267+ assert .Empty (t , value )
271268 assert .True (t , tombstone )
272269}
273270
@@ -283,7 +280,7 @@ func TestGetLatestRevision(t *testing.T) {
283280 defer cancel ()
284281
285282 // Test with empty table
286- latestRevision , err := sync . GetLatestRevision (ctx , pool )
283+ latestRevision , err := GetLatestRevision (ctx , pool )
287284 require .NoError (t , err )
288285 assert .Equal (t , int64 (0 ), latestRevision )
289286
@@ -298,7 +295,7 @@ func TestGetLatestRevision(t *testing.T) {
298295 require .NoError (t , err )
299296
300297 // Test latest revision (should ignore -1 pending records)
301- latestRevision , err = sync . GetLatestRevision (ctx , pool )
298+ latestRevision , err = GetLatestRevision (ctx , pool )
302299 require .NoError (t , err )
303300 assert .Equal (t , int64 (150 ), latestRevision )
304301}
@@ -320,13 +317,13 @@ func TestPendingRecordFiltering(t *testing.T) {
320317 ('test/filter/synced1', 'value1', 100, false),
321318 ('test/filter/pending1', 'value2', -1, false),
322319 ('test/filter/synced2', 'value3', 200, false),
323- ('test/filter/pending2', NULL , -1, true),
320+ ('test/filter/pending2', '' , -1, true),
324321 ('test/filter/pending3', 'value4', -1, false)
325322 ` )
326323 require .NoError (t , err )
327324
328325 // Test GetPendingRecords only returns revision = -1
329- pendingRecords , err := sync . GetPendingRecords (ctx , pool )
326+ pendingRecords , err := GetPendingRecords (ctx , pool )
330327 require .NoError (t , err )
331328 assert .Len (t , pendingRecords , 3 )
332329
@@ -361,22 +358,22 @@ func TestConflictResolution(t *testing.T) {
361358 defer cancel ()
362359
363360 // Insert a pending record
364- err := sync . InsertPendingRecord (ctx , pool , "test/conflict/key1" , "pending_value" , false )
361+ err := InsertPendingRecord (ctx , pool , "test/conflict/key1" , "pending_value" , false )
365362 require .NoError (t , err )
366363
367364 // Verify it's pending
368- pendingRecords , err := sync . GetPendingRecords (ctx , pool )
365+ pendingRecords , err := GetPendingRecords (ctx , pool )
369366 require .NoError (t , err )
370367 assert .Len (t , pendingRecords , 1 )
371368 assert .Equal (t , "test/conflict/key1" , pendingRecords [0 ].Key )
372369 assert .Equal (t , int64 (- 1 ), pendingRecords [0 ].Revision )
373370
374371 // Simulate etcd sync by updating revision
375- err = sync . UpdateRevision (ctx , pool , "test/conflict/key1" , 300 )
372+ err = UpdateRevision (ctx , pool , "test/conflict/key1" , 300 )
376373 require .NoError (t , err )
377374
378375 // Verify record is no longer pending
379- pendingAfterUpdate , err := sync . GetPendingRecords (ctx , pool )
376+ pendingAfterUpdate , err := GetPendingRecords (ctx , pool )
380377 require .NoError (t , err )
381378 assert .Len (t , pendingAfterUpdate , 0 )
382379
@@ -390,7 +387,7 @@ func TestConflictResolution(t *testing.T) {
390387 assert .Equal (t , int64 (300 ), revision )
391388
392389 // Test updating non-existent pending record (should fail gracefully)
393- err = sync . UpdateRevision (ctx , pool , "test/conflict/nonexistent" , 400 )
390+ err = UpdateRevision (ctx , pool , "test/conflict/nonexistent" , 400 )
394391 assert .Error (t , err )
395392 assert .Contains (t , err .Error (), "no pending record found" )
396393}
@@ -410,10 +407,10 @@ func TestPerformanceOpsPerSecond(t *testing.T) {
410407 recordCount := 1000
411408 start := time .Now ()
412409
413- records := make ([]sync. KeyValueRecord , recordCount )
410+ records := make ([]KeyValueRecord , recordCount )
414411 for i := 0 ; i < recordCount ; i ++ {
415412 value := fmt .Sprintf ("test_value_%d" , i )
416- records [i ] = sync. KeyValueRecord {
413+ records [i ] = KeyValueRecord {
417414 Key : fmt .Sprintf ("test/perf/key%d" , i ),
418415 Value : value ,
419416 Revision : int64 (i + 1 ),
@@ -422,7 +419,7 @@ func TestPerformanceOpsPerSecond(t *testing.T) {
422419 }
423420 }
424421
425- err := sync . BulkInsert (ctx , pool , records )
422+ err := BulkInsert (ctx , pool , records )
426423 require .NoError (t , err )
427424
428425 elapsed := time .Since (start )
@@ -453,11 +450,11 @@ func TestPerformanceSyncLatency(t *testing.T) {
453450 // Insert pending record
454451 key := fmt .Sprintf ("test/latency/key%d" , i )
455452 value := fmt .Sprintf ("test_value_%d" , i )
456- err := sync . InsertPendingRecord (ctx , pool , key , value , false )
453+ err := InsertPendingRecord (ctx , pool , key , value , false )
457454 require .NoError (t , err )
458455
459456 // Update revision (simulating sync completion)
460- err = sync . UpdateRevision (ctx , pool , key , int64 (i + 1 ))
457+ err = UpdateRevision (ctx , pool , key , int64 (i + 1 ))
461458 require .NoError (t , err )
462459
463460 latency := time .Since (start )
0 commit comments