Skip to content

Commit d8cf4cc

Browse files
committed
[*] move prefix to EtcdClient
1 parent da911ef commit d8cf4cc

File tree

3 files changed

+31
-33
lines changed

3 files changed

+31
-33
lines changed

cmd/etcd_fdw/main.go

Lines changed: 19 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,19 @@ func SetupLogging(logLevel string) error {
9090
return nil
9191
}
9292

93+
// SetupCloseHandler creates a 'listener' on a new goroutine which will notify the
94+
// program if it receives an interrupt from the OS. We then handle this by calling
95+
// our clean up procedure and exiting the program.
96+
func SetupCloseHandler(cancel context.CancelFunc) {
97+
c := make(chan os.Signal, 2)
98+
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
99+
go func() {
100+
<-c
101+
logrus.Debug("SetupCloseHandler received an interrupt from OS. Closing session...")
102+
cancel()
103+
}()
104+
}
105+
93106
func main() {
94107
// Quick check for version flags before full parsing
95108
for _, arg := range os.Args[1:] {
@@ -117,16 +130,7 @@ func main() {
117130
// Setup graceful shutdown
118131
ctx, cancel := context.WithCancel(context.Background())
119132
defer cancel()
120-
121-
// Setup signal handling for graceful shutdown
122-
sigChan := make(chan os.Signal, 1)
123-
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
124-
125-
go func() {
126-
sig := <-sigChan
127-
logrus.WithField("signal", sig).Info("Received shutdown signal, initiating graceful shutdown...")
128-
cancel()
129-
}()
133+
SetupCloseHandler(cancel)
130134

131135
// Connect to PostgreSQL with retry logic
132136
pgPool, err := sync.NewWithRetry(ctx, config.PostgresDSN)
@@ -136,18 +140,11 @@ func main() {
136140
defer pgPool.Close()
137141

138142
// Connect to etcd with retry logic
139-
var etcdClient *sync.EtcdClient
140-
if config.EtcdDSN != "" {
141-
var err error
142-
etcdClient, err = sync.NewEtcdClientWithRetry(ctx, config.EtcdDSN)
143-
if err != nil {
144-
logrus.WithError(err).Fatal("Failed to connect to etcd after retries")
145-
}
146-
defer etcdClient.Close()
143+
etcdClient, err := sync.NewEtcdClientWithRetry(ctx, config.EtcdDSN)
144+
if err != nil {
145+
logrus.WithError(err).Fatal("Failed to connect to etcd after retries")
147146
}
148-
149-
// Get prefix from etcd DSN
150-
prefix := sync.GetPrefix(config.EtcdDSN)
147+
defer etcdClient.Close()
151148

152149
// Parse polling interval
153150
pollingInterval, err := time.ParseDuration(config.PollingInterval)
@@ -156,7 +153,7 @@ func main() {
156153
}
157154

158155
// Create and start sync service
159-
syncService := sync.NewService(pgPool, etcdClient, prefix, pollingInterval)
156+
syncService := sync.NewService(pgPool, etcdClient, pollingInterval)
160157
if err := syncService.Start(ctx); err != nil && ctx.Err() == nil {
161158
logrus.WithError(err).Fatal("Synchronization failed")
162159
}

internal/sync/etcd.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
// EtcdClient handles all etcd operations for PostgreSQL synchronization
1717
type EtcdClient struct {
1818
*clientv3.Client
19+
prefix string
1920
}
2021

2122
// NewEtcdClient creates a new etcd client with DSN parsing
@@ -34,6 +35,7 @@ func NewEtcdClient(dsn string) (*EtcdClient, error) {
3435

3536
return &EtcdClient{
3637
Client: client,
38+
prefix: getPrefix(dsn),
3739
}, nil
3840
}
3941

@@ -46,15 +48,15 @@ func (c *EtcdClient) Close() error {
4648
}
4749

4850
// WatchPrefix sets up a watch for all keys with the given prefix
49-
func (c *EtcdClient) WatchPrefix(ctx context.Context, prefix string, startRevision int64) clientv3.WatchChan {
51+
func (c *EtcdClient) WatchPrefix(ctx context.Context, startRevision int64) clientv3.WatchChan {
5052
opts := []clientv3.OpOption{clientv3.WithPrefix()}
5153
if startRevision > 0 {
5254
opts = append(opts, clientv3.WithRev(startRevision+1))
5355
}
5456

55-
watchChan := c.Client.Watch(ctx, prefix, opts...)
57+
watchChan := c.Client.Watch(ctx, c.prefix, opts...)
5658
logrus.WithFields(logrus.Fields{
57-
"prefix": prefix,
59+
"prefix": c.prefix,
5860
"revision": startRevision,
5961
}).Info("Started etcd watch")
6062

@@ -173,7 +175,7 @@ func NewEtcdClientWithRetry(ctx context.Context, dsn string) (*EtcdClient, error
173175
}
174176

175177
// WatchWithRecovery wraps the etcd watch functionality with automatic recovery
176-
func (c *EtcdClient) WatchWithRecovery(ctx context.Context, prefix string, startRevision int64) <-chan clientv3.WatchResponse {
178+
func (c *EtcdClient) WatchWithRecovery(ctx context.Context, startRevision int64) <-chan clientv3.WatchResponse {
177179
watchChan := make(chan clientv3.WatchResponse)
178180

179181
go func() {
@@ -187,7 +189,7 @@ func (c *EtcdClient) WatchWithRecovery(ctx context.Context, prefix string, start
187189
return
188190
default:
189191
// Attempt to establish watch
190-
innerWatchChan := c.WatchPrefix(ctx, prefix, currentRevision)
192+
innerWatchChan := c.WatchPrefix(ctx, currentRevision)
191193

192194
for {
193195
select {
@@ -249,7 +251,7 @@ func RetryEtcdOperation(ctx context.Context, operation func() error, operationNa
249251
// parseEtcdDSN parses etcd DSN format: etcd://[user:password@]host1:port1[,host2:port2]/[prefix]?param=value
250252
func parseEtcdDSN(dsn string) (*clientv3.Config, error) {
251253
if dsn == "" {
252-
return nil, fmt.Errorf("etcd DSN is required")
254+
return &clientv3.Config{}, nil // Default config
253255
}
254256

255257
// Parse the DSN if provided
@@ -321,8 +323,8 @@ func parseEtcdDSN(dsn string) (*clientv3.Config, error) {
321323
return config, nil
322324
}
323325

324-
// GetPrefix extracts the prefix from the etcd DSN path
325-
func GetPrefix(dsn string) string {
326+
// getPrefix extracts the prefix from the etcd DSN path
327+
func getPrefix(dsn string) string {
326328
if dsn == "" || !strings.HasPrefix(dsn, "etcd://") {
327329
return "/"
328330
}

internal/sync/sync.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,10 @@ type Service struct {
2121
}
2222

2323
// NewService creates a new synchronization service
24-
func NewService(pgPool PgxIface, etcdClient *EtcdClient, prefix string, pollingInterval time.Duration) *Service {
24+
func NewService(pgPool PgxIface, etcdClient *EtcdClient, pollingInterval time.Duration) *Service {
2525
return &Service{
2626
pgPool: pgPool,
2727
etcdClient: etcdClient,
28-
prefix: prefix,
2928
pollingInterval: pollingInterval,
3029
}
3130
}
@@ -109,7 +108,7 @@ func (s *Service) syncEtcdToPostgreSQL(ctx context.Context) error {
109108
}
110109

111110
// Start watching from the next revision with automatic recovery
112-
watchChan := s.etcdClient.WatchWithRecovery(ctx, s.prefix, latestRevision)
111+
watchChan := s.etcdClient.WatchWithRecovery(ctx, latestRevision)
113112

114113
for {
115114
select {

0 commit comments

Comments
 (0)