Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 10 additions & 60 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,70 +30,20 @@ jobs:
uses: actions/setup-go@v6
with:
go-version: '1.25'
cache-dependency-path: 'go.sum'

- name: Cache Go modules
uses: actions/cache@v4
with:
path: |
~/.cache/go-build
~/go/pkg/mod
key: ${{ runner.os }}-go-1.25-${{ hashFiles('**/go.sum') }}
restore-keys: |
${{ runner.os }}-go-1.25-
${{ runner.os }}-go-

- name: Download dependencies
run: go mod download
- name: Build
run: go build ./cmd/etcd_fdw

- name: Verify dependencies
run: go mod verify
- name: GolangCI-Lint
uses: golangci/golangci-lint-action@v8
with:
version: latest

- name: Run tests
run: go test -v -race -coverprofile=coverage.out ./...
# Note: Integration tests run via testcontainers (no separate services needed)

- name: Upload coverage to Codecov
uses: codecov/codecov-action@v5
with:
file: ./coverage.out
flags: unittests
name: codecov-umbrella

build:
name: Build
runs-on: ubuntu-latest
needs: test

steps:
- name: Check out code
uses: actions/checkout@v5

- name: Set up Go
uses: actions/setup-go@v6
with:
go-version: '1.25'

- name: Build binary
run: go build -v ./cmd/etcd_fdw

- name: Test binary execution
run: ./etcd_fdw --version

lint:
name: Lint
runs-on: ubuntu-latest

steps:
- name: Check out code
uses: actions/checkout@v5

- name: Set up Go
uses: actions/setup-go@v6
with:
go-version: '1.25'

- name: Run golangci-lint
uses: golangci/golangci-lint-action@v8
- name: Coveralls
uses: coverallsapp/github-action@v2
with:
version: latest
args: --timeout=5m
file: coverage.out
61 changes: 24 additions & 37 deletions .golangci.yml
Original file line number Diff line number Diff line change
@@ -1,43 +1,30 @@
version: 2

run:
timeout: 5m

version: "2"
linters:
enable:
- errcheck
- govet
- ineffassign
- staticcheck
- unused
- goconst
- gocyclo
- misspell
- dupl
- unconvert
- whitespace
- revive

linters-settings:
gocyclo:
min-complexity: 15
dupl:
threshold: 100
goconst:
min-len: 2
min-occurrences: 2
misspell:
locale: US
revive:
settings:
gocyclo:
min-complexity: 15
exclusions:
generated: lax
presets:
- comments
- common-false-positives
- legacy
- std-error-handling
rules:
- name: exported
disabled: true

issues:
exclude-rules:
- path: _test\.go
linters:
- goconst
- path: cmd/
linters:
- gocyclo
- path: (.+)\.go$
text: SA5008 # ignore staticcheck for go-flags
paths:
- third_party$
- builtin$
- examples$
formatters:
exclusions:
generated: lax
paths:
- third_party$
- builtin$
- examples$
17 changes: 4 additions & 13 deletions cmd/etcd_fdw/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
package main

import (
"os"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -125,12 +124,8 @@ func TestCLIParsing(t *testing.T) {
// TestCLIEnvironmentVariables tests that CLI can read from environment variables
func TestCLIEnvironmentVariables(t *testing.T) {
// Set environment variables
os.Setenv("ETCD_FDW_POSTGRES_DSN", "postgres://env:pass@localhost:5432/envdb")
os.Setenv("ETCD_FDW_ETCD_DSN", "etcd://localhost:2379,localhost:2380/")
defer func() {
os.Unsetenv("ETCD_FDW_POSTGRES_DSN")
os.Unsetenv("ETCD_FDW_ETCD_DSN")
}()
t.Setenv("ETCD_FDW_POSTGRES_DSN", "postgres://env:pass@localhost:5432/envdb")
t.Setenv("ETCD_FDW_ETCD_DSN", "etcd://localhost:2379,localhost:2380/")

// This will fail because ParseCLI function doesn't exist yet
config, err := ParseCLI([]string{})
Expand All @@ -144,12 +139,8 @@ func TestCLIEnvironmentVariables(t *testing.T) {
// TestCLIFlagPrecedence tests that command-line flags override environment variables
func TestCLIFlagPrecedence(t *testing.T) {
// Set environment variables
os.Setenv("ETCD_FDW_POSTGRES_DSN", "postgres://env:pass@localhost:5432/envdb")
os.Setenv("ETCD_FDW_ETCD_DSN", "etcd://localhost:2379/")
defer func() {
os.Unsetenv("ETCD_FDW_POSTGRES_DSN")
os.Unsetenv("ETCD_FDW_ETCD_DSN")
}()
t.Setenv("ETCD_FDW_POSTGRES_DSN", "postgres://env:pass@localhost:5432/envdb")
t.Setenv("ETCD_FDW_ETCD_DSN", "etcd://localhost:2379/")

// Command-line flags should override environment
args := []string{
Expand Down
2 changes: 1 addition & 1 deletion cmd/etcd_fdw/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func main() {
if err != nil {
logrus.WithError(err).Fatal("Failed to connect to etcd after retries")
}
defer etcdClient.Close()
defer func() { _ = etcdClient.Close() }()

// Parse polling interval
pollingInterval, err := time.ParseDuration(config.PollingInterval)
Expand Down
2 changes: 2 additions & 0 deletions internal/log/formatter.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// Package log provides a custom logrus formatter
package log

import (
Expand All @@ -11,6 +12,7 @@ import (
"github.com/sirupsen/logrus"
)

// NewFormatter creates a new custom logrus formatter
func NewFormatter(noColors bool) *Formatter {
return &Formatter{
HideKeys: false,
Expand Down
6 changes: 3 additions & 3 deletions internal/migrations/migrations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func TestMigrationWithRealDatabase(t *testing.T) {
ctx := context.Background()
conn, err := pgx.Connect(ctx, dsn)
require.NoError(t, err, "Should connect to test database")
defer conn.Close(ctx)
defer func() { _ = conn.Close(ctx) }()

// Apply migrations
err = Apply(ctx, conn) // Use the Apply function instead of migrator method
Expand Down Expand Up @@ -102,7 +102,7 @@ func TestMigrationFunctions(t *testing.T) {
ctx := context.Background()
conn, err := pgx.Connect(ctx, dsn)
require.NoError(t, err, "Should connect to test database")
defer conn.Close(ctx)
defer func() { _ = conn.Close(ctx) }()

// Apply migrations first
err = Apply(ctx, conn)
Expand Down Expand Up @@ -163,7 +163,7 @@ func getTestDSN(t *testing.T) string {

// Cleanup container when test ends
t.Cleanup(func() {
pgContainer.Terminate(ctx)
_ = pgContainer.Terminate(ctx)
})

// Get connection string
Expand Down
61 changes: 4 additions & 57 deletions internal/sync/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (c *EtcdClient) WatchPrefix(ctx context.Context, startRevision int64) clien
opts = append(opts, clientv3.WithRev(startRevision+1))
}

watchChan := c.Client.Watch(ctx, c.prefix, opts...)
watchChan := c.Watch(ctx, c.prefix, opts...)
logrus.WithFields(logrus.Fields{
"prefix": c.prefix,
"revision": startRevision,
Expand All @@ -65,7 +65,7 @@ func (c *EtcdClient) WatchPrefix(ctx context.Context, startRevision int64) clien

// GetAllKeys retrieves all key-value pairs with the given prefix for initial sync
func (c *EtcdClient) GetAllKeys(ctx context.Context, prefix string) ([]KeyValueRecord, error) {
resp, err := c.Client.Get(ctx, prefix, clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend))
resp, err := c.Get(ctx, prefix, clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend))
if err != nil {
return nil, fmt.Errorf("failed to get all keys: %w", err)
}
Expand All @@ -90,59 +90,6 @@ func (c *EtcdClient) GetAllKeys(ctx context.Context, prefix string) ([]KeyValueR
return pairs, nil
}

// Put stores a key-value pair in etcd
func (c *EtcdClient) Put(ctx context.Context, key, value string) (*clientv3.PutResponse, error) {
resp, err := c.Client.Put(ctx, key, value)
if err != nil {
return nil, fmt.Errorf("failed to put key %s: %w", key, err)
}

logrus.WithFields(logrus.Fields{
"key": key,
"revision": resp.Header.Revision,
}).Debug("Put key to etcd")

return resp, nil
}

// Delete removes a key from etcd
func (c *EtcdClient) Delete(ctx context.Context, key string) (*clientv3.DeleteResponse, error) {
resp, err := c.Client.Delete(ctx, key)
if err != nil {
return nil, fmt.Errorf("failed to delete key %s: %w", key, err)
}

logrus.WithFields(logrus.Fields{
"key": key,
"revision": resp.Header.Revision,
"deleted": resp.Deleted,
}).Debug("Deleted key from etcd")

return resp, nil
}

// Get retrieves a single key from etcd
func (c *EtcdClient) Get(ctx context.Context, key string) (*KeyValueRecord, error) {
resp, err := c.Client.Get(ctx, key)
if err != nil {
return nil, fmt.Errorf("failed to get key %s: %w", key, err)
}

if len(resp.Kvs) == 0 {
return nil, nil // Key not found
}

kv := resp.Kvs[0]
value := string(kv.Value)

return &KeyValueRecord{
Key: string(kv.Key),
Value: value,
Revision: kv.ModRevision,
Tombstone: false,
}, nil
}

// NewEtcdClientWithRetry creates a new etcd client with retry logic
func NewEtcdClientWithRetry(ctx context.Context, dsn string) (*EtcdClient, error) {
config := DefaultRetryConfig()
Expand All @@ -158,7 +105,7 @@ func NewEtcdClientWithRetry(ctx context.Context, dsn string) (*EtcdClient, error
// Test the connection
if _, testErr := client.Get(ctx, "healthcheck"); testErr != nil {
if client != nil {
client.Close()
_ = client.Close()
}
return testErr
}
Expand Down Expand Up @@ -243,7 +190,7 @@ func (c *EtcdClient) WatchWithRecovery(ctx context.Context, startRevision int64)
}

// RetryEtcdOperation retries an etcd operation with exponential backoff
func RetryEtcdOperation(ctx context.Context, operation func() error, operationName string) error {
func RetryEtcdOperation(ctx context.Context, operation func() error) error {
config := DefaultRetryConfig()
return RetryWithBackoff(ctx, config, operation)
}
Expand Down
6 changes: 3 additions & 3 deletions internal/sync/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ func setupTestContainers(t *testing.T) (*pgxpool.Pool, *EtcdClient, func()) {

cleanup := func() {
pool.Close()
etcdClient.Close()
pgContainer.Terminate(ctx)
etcdContainer.Terminate(ctx)
_ = etcdClient.Close()
_ = pgContainer.Terminate(ctx)
_ = etcdContainer.Terminate(ctx)
}

return pool, etcdClient, cleanup
Expand Down
6 changes: 0 additions & 6 deletions internal/sync/postgresql.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,12 +196,6 @@ func NewWithRetry(ctx context.Context, databaseURL string, callbacks ...func(*pg
return pool, nil
}

// RetryOperation retries a database operation with exponential backoff
func RetryOperation(ctx context.Context, operation func() error, operationName string) error {
config := DefaultRetryConfig()
return RetryWithBackoff(ctx, config, operation)
}

// InsertPendingRecord inserts a new record with revision -1 (pending sync to etcd)
func InsertPendingRecord(ctx context.Context, pool PgxIface, key string, value string, tombstone bool) error {
query := `
Expand Down
8 changes: 2 additions & 6 deletions internal/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import (
clientv3 "go.etcd.io/etcd/client/v3"
)

const InvalidRevision = -1

// Service orchestrates bidirectional synchronization between etcd and PostgreSQL
type Service struct {
pgPool PgxIface
Expand Down Expand Up @@ -140,8 +138,6 @@ func (s *Service) syncEtcdToPostgreSQL(ctx context.Context) error {
if err != nil {
logrus.WithError(err).WithField("key", string(event.Kv.Key)).Error("Failed to process etcd event after retries")
// Continue processing other events rather than failing entirely
} else {
latestRevision = event.Kv.ModRevision
}
}
}
Expand Down Expand Up @@ -261,7 +257,7 @@ func (s *Service) processPendingRecord(ctx context.Context, record KeyValueRecor
}
newRevision = resp.Header.Revision
return nil
}, "etcd_delete")
})

if err != nil {
logrus.WithError(err).WithFields(logrus.Fields{
Expand All @@ -284,7 +280,7 @@ func (s *Service) processPendingRecord(ctx context.Context, record KeyValueRecor
}
newRevision = resp.Header.Revision
return nil
}, "etcd_put")
})

if err != nil {
logrus.WithError(err).WithFields(logrus.Fields{
Expand Down
Loading