44 "context"
55 "net"
66 "net/netip"
7- "sync"
7+ "sync/atomic "
88 "time"
99
1010 "github.com/cybertec-postgresql/vip-manager/vipconfig"
@@ -25,10 +25,9 @@ var log *zap.SugaredLogger = zap.L().Sugar()
2525type IPManager struct {
2626 configurer ipConfigurer
2727
28- states <- chan bool
29- currentState bool
30- stateLock sync.Mutex
31- recheck * sync.Cond
28+ states <- chan bool
29+ shouldSetIPUp atomic.Bool
30+ recheckChan chan struct {}
3231}
3332
3433func getMask (vip netip.Addr , mask int ) net.IPMask {
@@ -69,7 +68,7 @@ func NewIPManager(conf *vipconfig.Config, states <-chan bool) (m *IPManager, err
6968 states : states ,
7069 }
7170 log = conf .Logger .Sugar ()
72- m .recheck = sync . NewCond ( & m . stateLock )
71+ m .recheckChan = make ( chan struct {} )
7372 switch conf .HostingType {
7473 case "hetzner" :
7574 m .configurer , err = newHetznerConfigurer (ipConf , conf .Verbose )
@@ -86,71 +85,45 @@ func NewIPManager(conf *vipconfig.Config, states <-chan bool) (m *IPManager, err
8685
8786func (m * IPManager ) applyLoop (ctx context.Context ) {
8887 strUpDown := map [bool ]string {true : "up" , false : "down" }
89- timeout := 0
9088 for {
91- // Check if we should exit
89+ isIPUp := m .configurer .queryAddress ()
90+ shouldSetIPUp := m .shouldSetIPUp .Load ()
91+ log .Infof ("IP address %s is %s, must be %s" ,
92+ m .configurer .getCIDR (),
93+ strUpDown [isIPUp ],
94+ strUpDown [shouldSetIPUp ])
95+ if isIPUp != shouldSetIPUp {
96+ var isOk bool
97+ if shouldSetIPUp {
98+ isOk = m .configurer .configureAddress ()
99+ } else {
100+ isOk = m .configurer .deconfigureAddress ()
101+ }
102+ if ! isOk {
103+ log .Error ("Failed to configure virtual ip for this machine" )
104+ }
105+ }
92106 select {
93107 case <- ctx .Done ():
94- m .configurer .deconfigureAddress ()
95108 return
96- case <- time .After (time .Duration (timeout ) * time .Second ):
97- actualState := m .configurer .queryAddress ()
98- m .stateLock .Lock ()
99- desiredState := m .currentState
100- log .Infof ("IP address %s state is %s, must be %s" ,
101- m .configurer .getCIDR (),
102- strUpDown [actualState ],
103- strUpDown [desiredState ])
104- if actualState != desiredState {
105- m .stateLock .Unlock ()
106- var configureState bool
107- if desiredState {
108- configureState = m .configurer .configureAddress ()
109- } else {
110- configureState = m .configurer .deconfigureAddress ()
111- }
112- if ! configureState {
113- log .Error ("Error while acquiring virtual ip for this machine" )
114- //Sleep a little bit to avoid busy waiting due to the for loop.
115- timeout = 10
116- } else {
117- timeout = 0
118- }
119- } else {
120- // Wait for notification
121- m .recheck .Wait ()
122- // Want to query actual state anyway, so unlock
123- m .stateLock .Unlock ()
124- }
109+ case <- m .recheckChan : // signal to recheck
110+ case <- time .After (time .Duration (10 ) * time .Second ): // recheck every 10 seconds
125111 }
126112 }
127113}
128114
129115// SyncStates implements states synchronization
130116func (m * IPManager ) SyncStates (ctx context.Context , states <- chan bool ) {
131- ticker := time .NewTicker (10 * time .Second )
132-
133- var wg sync.WaitGroup
134- wg .Add (1 )
135- go func () {
136- m .applyLoop (ctx )
137- wg .Done ()
138- }()
139-
117+ go m .applyLoop (ctx )
140118 for {
141119 select {
142120 case newState := <- states :
143- m .stateLock .Lock ()
144- if m .currentState != newState {
145- m .currentState = newState
146- m .recheck .Broadcast ()
121+ if m .shouldSetIPUp .Load () != newState {
122+ m .shouldSetIPUp .Store (newState )
123+ m .recheckChan <- struct {}{}
147124 }
148- m .stateLock .Unlock ()
149- case <- ticker .C :
150- m .recheck .Broadcast ()
151125 case <- ctx .Done ():
152- m .recheck .Broadcast ()
153- wg .Wait ()
126+ m .configurer .deconfigureAddress ()
154127 m .configurer .cleanupArp ()
155128 return
156129 }
0 commit comments