@@ -14,8 +14,7 @@ import (
1414 "github.com/testcontainers/testcontainers-go/modules/postgres"
1515 "github.com/testcontainers/testcontainers-go/wait"
1616
17- "github.com/cybertec-postgresql/etcd_fdw/internal/db"
18- "github.com/cybertec-postgresql/etcd_fdw/internal/etcd"
17+ "github.com/cybertec-postgresql/etcd_fdw/internal/sync"
1918)
2019
2120func setupPostgreSQLContainer (ctx context.Context , t * testing.T ) (* pgxpool.Pool , testcontainers.Container ) {
@@ -54,7 +53,7 @@ func setupPostgreSQLContainer(ctx context.Context, t *testing.T) (*pgxpool.Pool,
5453 return pool , pgContainer
5554}
5655
57- func setupEtcdContainer (ctx context.Context , t * testing.T ) (* etcd .EtcdClient , testcontainers.Container ) {
56+ func setupEtcdContainer (ctx context.Context , t * testing.T ) (* sync .EtcdClient , testcontainers.Container ) {
5857 etcdContainer , err := testcontainers .GenericContainer (ctx , testcontainers.GenericContainerRequest {
5958 ContainerRequest : testcontainers.ContainerRequest {
6059 Image : "quay.io/coreos/etcd:v3.5.9" ,
@@ -77,13 +76,13 @@ func setupEtcdContainer(ctx context.Context, t *testing.T) (*etcd.EtcdClient, te
7776 require .NoError (t , err )
7877
7978 dsn := "etcd://" + endpoint + "/test"
80- etcdClient , err := etcd .NewEtcdClient (dsn )
79+ etcdClient , err := sync .NewEtcdClient (dsn )
8180 require .NoError (t , err )
8281
8382 return etcdClient , etcdContainer
8483}
8584
86- func setupTestContainers (t * testing.T ) (* pgxpool.Pool , * etcd .EtcdClient , func ()) {
85+ func setupTestContainers (t * testing.T ) (* pgxpool.Pool , * sync .EtcdClient , func ()) {
8786 ctx := context .Background ()
8887
8988 pool , pgContainer := setupPostgreSQLContainer (ctx , t )
@@ -118,18 +117,18 @@ func TestPollingMechanism(t *testing.T) {
118117 require .NoError (t , err )
119118
120119 // Test GetPendingRecords function
121- pendingRecords , err := db .GetPendingRecords (ctx , pool )
120+ pendingRecords , err := sync .GetPendingRecords (ctx , pool )
122121 require .NoError (t , err )
123122 assert .Len (t , pendingRecords , 1 )
124123 assert .Equal (t , "test/polling/key1" , pendingRecords [0 ].Key )
125124 assert .Equal (t , "value1" , pendingRecords [0 ].Value )
126125
127126 // Test UpdateRevision function
128- err = db .UpdateRevision (ctx , pool , "test/polling/key1" , 123 )
127+ err = sync .UpdateRevision (ctx , pool , "test/polling/key1" , 123 )
129128 require .NoError (t , err )
130129
131130 // Verify record was updated
132- pendingAfterUpdate , err := db .GetPendingRecords (ctx , pool )
131+ pendingAfterUpdate , err := sync .GetPendingRecords (ctx , pool )
133132 require .NoError (t , err )
134133 assert .Len (t , pendingAfterUpdate , 0 , "No pending records should remain after update" )
135134
@@ -155,7 +154,7 @@ func TestBulkInsert(t *testing.T) {
155154 defer cancel ()
156155
157156 // Prepare test records
158- records := []db .KeyValueRecord {
157+ records := []sync .KeyValueRecord {
159158 {
160159 Key : "test/bulk/key1" ,
161160 Value : ("value1" ),
@@ -180,7 +179,7 @@ func TestBulkInsert(t *testing.T) {
180179 }
181180
182181 // Test BulkInsert function
183- err := db .BulkInsert (ctx , pool , records )
182+ err := sync .BulkInsert (ctx , pool , records )
184183 require .NoError (t , err )
185184
186185 // Verify records were inserted correctly
@@ -228,7 +227,7 @@ func TestInsertPendingRecord(t *testing.T) {
228227 defer cancel ()
229228
230229 // Test inserting a new pending record
231- err := db .InsertPendingRecord (ctx , pool , "test/pending/key1" , ("value1" ), false )
230+ err := sync .InsertPendingRecord (ctx , pool , "test/pending/key1" , ("value1" ), false )
232231 require .NoError (t , err )
233232
234233 // Verify record was inserted with revision = -1
@@ -244,7 +243,7 @@ func TestInsertPendingRecord(t *testing.T) {
244243 assert .Equal (t , "value1" , value .String )
245244
246245 // Test inserting second record with same key (should create new record with different timestamp)
247- err = db .InsertPendingRecord (ctx , pool , "test/pending/key1" , ("updated_value" ), false )
246+ err = sync .InsertPendingRecord (ctx , pool , "test/pending/key1" , ("updated_value" ), false )
248247 require .NoError (t , err )
249248
250249 // Verify both records exist (different timestamps, both with revision = -1)
@@ -257,7 +256,7 @@ func TestInsertPendingRecord(t *testing.T) {
257256 assert .Equal (t , 1 , count , "Should have 1 pending records for the same key with latest value" )
258257
259258 // Test inserting tombstone record
260- err = db .InsertPendingRecord (ctx , pool , "test/pending/key2" , "" , true )
259+ err = sync .InsertPendingRecord (ctx , pool , "test/pending/key2" , "" , true )
261260 require .NoError (t , err )
262261
263262 // Verify tombstone record
@@ -284,7 +283,7 @@ func TestGetLatestRevision(t *testing.T) {
284283 defer cancel ()
285284
286285 // Test with empty table
287- latestRevision , err := db .GetLatestRevision (ctx , pool )
286+ latestRevision , err := sync .GetLatestRevision (ctx , pool )
288287 require .NoError (t , err )
289288 assert .Equal (t , int64 (0 ), latestRevision )
290289
@@ -299,7 +298,7 @@ func TestGetLatestRevision(t *testing.T) {
299298 require .NoError (t , err )
300299
301300 // Test latest revision (should ignore -1 pending records)
302- latestRevision , err = db .GetLatestRevision (ctx , pool )
301+ latestRevision , err = sync .GetLatestRevision (ctx , pool )
303302 require .NoError (t , err )
304303 assert .Equal (t , int64 (150 ), latestRevision )
305304}
@@ -327,7 +326,7 @@ func TestPendingRecordFiltering(t *testing.T) {
327326 require .NoError (t , err )
328327
329328 // Test GetPendingRecords only returns revision = -1
330- pendingRecords , err := db .GetPendingRecords (ctx , pool )
329+ pendingRecords , err := sync .GetPendingRecords (ctx , pool )
331330 require .NoError (t , err )
332331 assert .Len (t , pendingRecords , 3 )
333332
@@ -345,7 +344,7 @@ func TestPendingRecordFiltering(t *testing.T) {
345344 for _ , record := range pendingRecords {
346345 if record .Key == "test/filter/pending2" {
347346 assert .True (t , record .Tombstone )
348- assert .Nil (t , record .Value )
347+ assert .Equal (t , "" , record .Value ) // Empty string for tombstones
349348 }
350349 }
351350}
@@ -362,22 +361,22 @@ func TestConflictResolution(t *testing.T) {
362361 defer cancel ()
363362
364363 // Insert a pending record
365- err := db .InsertPendingRecord (ctx , pool , "test/conflict/key1" , "pending_value" , false )
364+ err := sync .InsertPendingRecord (ctx , pool , "test/conflict/key1" , "pending_value" , false )
366365 require .NoError (t , err )
367366
368367 // Verify it's pending
369- pendingRecords , err := db .GetPendingRecords (ctx , pool )
368+ pendingRecords , err := sync .GetPendingRecords (ctx , pool )
370369 require .NoError (t , err )
371370 assert .Len (t , pendingRecords , 1 )
372371 assert .Equal (t , "test/conflict/key1" , pendingRecords [0 ].Key )
373372 assert .Equal (t , int64 (- 1 ), pendingRecords [0 ].Revision )
374373
375374 // Simulate etcd sync by updating revision
376- err = db .UpdateRevision (ctx , pool , "test/conflict/key1" , 300 )
375+ err = sync .UpdateRevision (ctx , pool , "test/conflict/key1" , 300 )
377376 require .NoError (t , err )
378377
379378 // Verify record is no longer pending
380- pendingAfterUpdate , err := db .GetPendingRecords (ctx , pool )
379+ pendingAfterUpdate , err := sync .GetPendingRecords (ctx , pool )
381380 require .NoError (t , err )
382381 assert .Len (t , pendingAfterUpdate , 0 )
383382
@@ -391,7 +390,7 @@ func TestConflictResolution(t *testing.T) {
391390 assert .Equal (t , int64 (300 ), revision )
392391
393392 // Test updating non-existent pending record (should fail gracefully)
394- err = db .UpdateRevision (ctx , pool , "test/conflict/nonexistent" , 400 )
393+ err = sync .UpdateRevision (ctx , pool , "test/conflict/nonexistent" , 400 )
395394 assert .Error (t , err )
396395 assert .Contains (t , err .Error (), "no pending record found" )
397396}
@@ -411,10 +410,10 @@ func TestPerformanceOpsPerSecond(t *testing.T) {
411410 recordCount := 1000
412411 start := time .Now ()
413412
414- records := make ([]db .KeyValueRecord , recordCount )
413+ records := make ([]sync .KeyValueRecord , recordCount )
415414 for i := 0 ; i < recordCount ; i ++ {
416415 value := fmt .Sprintf ("test_value_%d" , i )
417- records [i ] = db .KeyValueRecord {
416+ records [i ] = sync .KeyValueRecord {
418417 Key : fmt .Sprintf ("test/perf/key%d" , i ),
419418 Value : value ,
420419 Revision : int64 (i + 1 ),
@@ -423,7 +422,7 @@ func TestPerformanceOpsPerSecond(t *testing.T) {
423422 }
424423 }
425424
426- err := db .BulkInsert (ctx , pool , records )
425+ err := sync .BulkInsert (ctx , pool , records )
427426 require .NoError (t , err )
428427
429428 elapsed := time .Since (start )
@@ -454,11 +453,11 @@ func TestPerformanceSyncLatency(t *testing.T) {
454453 // Insert pending record
455454 key := fmt .Sprintf ("test/latency/key%d" , i )
456455 value := fmt .Sprintf ("test_value_%d" , i )
457- err := db .InsertPendingRecord (ctx , pool , key , value , false )
456+ err := sync .InsertPendingRecord (ctx , pool , key , value , false )
458457 require .NoError (t , err )
459458
460459 // Update revision (simulating sync completion)
461- err = db .UpdateRevision (ctx , pool , key , int64 (i + 1 ))
460+ err = sync .UpdateRevision (ctx , pool , key , int64 (i + 1 ))
462461 require .NoError (t , err )
463462
464463 latency := time .Since (start )
0 commit comments