Skip to content

Commit 3fe57a7

Browse files
committed
Use atomics for HEALTHZ command
The HEALTHZ command is used to detect if there a leader or follower are ready for commands. All that it cares about is if the leader has it's data fully loaded in memory, and that a follower is caught up to the leader. Previously there was a read lock around the 'caught up' flags, which could potentially be blocked by other operations such as SET and INTERSECTS. This commit removes the shared read lock when running the HEALTHZ command. It now uses atomics bit flags for the 'caught up' flags. The result is that the HEALTHZ command should have little-to-no noticable contention, always appearing instant to the user.
1 parent fbf767a commit 3fe57a7

File tree

5 files changed

+51
-29
lines changed

5 files changed

+51
-29
lines changed

internal/server/follow.go

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -203,13 +203,37 @@ func (s *Server) followDoLeaderAuth(conn *RESPConn, auth string) error {
203203
return nil
204204
}
205205

206+
// bit flags for the fcupflags server field
207+
const (
208+
bitCaughtUpOnce int32 = 1 // follower caught up at least once in the past
209+
bitCaughtUp int32 = 2 // follower is fully caught up to leader
210+
)
211+
212+
func (s *Server) setCaughtUp(caughtUp bool) {
213+
var flags int32
214+
if caughtUp {
215+
flags = bitCaughtUp | bitCaughtUpOnce
216+
} else {
217+
flags = s.fcupflags.Load() & bitCaughtUpOnce
218+
}
219+
s.fcupflags.Store(flags)
220+
}
221+
222+
func (s *Server) caughtUp() bool {
223+
return (s.fcupflags.Load() & bitCaughtUp) == bitCaughtUp
224+
}
225+
226+
func (s *Server) caughtUpOnce() bool {
227+
return (s.fcupflags.Load() & bitCaughtUpOnce) == bitCaughtUpOnce
228+
}
229+
206230
func (s *Server) followStep(host string, port int, followc int) error {
207231
if int(s.followc.Load()) != followc {
208232
return errNoLongerFollowing
209233
}
210234
s.mu.Lock()
211235
s.faofsz = 0
212-
s.fcup = false
236+
s.setCaughtUp(false)
213237
auth := s.config.leaderAuth()
214238
s.mu.Unlock()
215239
addr := fmt.Sprintf("%s:%d", host, port)
@@ -305,10 +329,7 @@ func (s *Server) followStep(host string, port int, followc int) error {
305329

306330
caughtUp := pos >= aofSize
307331
if caughtUp {
308-
s.mu.Lock()
309-
s.fcup = true
310-
s.fcuponce = true
311-
s.mu.Unlock()
332+
s.setCaughtUp(true)
312333
log.Info("caught up")
313334
}
314335

@@ -339,8 +360,7 @@ func (s *Server) followStep(host string, port int, followc int) error {
339360
caughtUp = true
340361
s.mu.Lock()
341362
s.flushAOF(false)
342-
s.fcup = true
343-
s.fcuponce = true
363+
s.setCaughtUp(true)
344364
s.mu.Unlock()
345365
log.Info("caught up")
346366
}

internal/server/metrics.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ func (s *Server) Collect(ch chan<- prometheus.Metric) {
111111
if s.config.followHost() != "" {
112112
replLbls = []string{"follower",
113113
fmt.Sprintf("%s:%d", s.config.followHost(), s.config.followPort()),
114-
fmt.Sprintf("%t", s.fcup), fmt.Sprintf("%t", s.fcuponce)}
114+
fmt.Sprintf("%t", s.caughtUp()), fmt.Sprintf("%t", s.caughtUpOnce())}
115115
}
116116
ch <- prometheus.MustNewConstMetric(
117117
metricDescriptions["replication"],

internal/server/scripts.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -743,7 +743,7 @@ func (s *Server) luaTile38AtomicRW(msg *Message) (resp.Value, error) {
743743
case "get", "keys", "scan", "nearby", "within", "intersects", "hooks", "search",
744744
"ttl", "bounds", "server", "info", "type", "jget", "fget", "exists", "fexists", "test":
745745
// read operations
746-
if s.config.followHost() != "" && !s.fcuponce {
746+
if s.config.followHost() != "" && !s.caughtUpOnce() {
747747
return resp.NullValue(), errCatchingUp
748748
}
749749
}
@@ -796,7 +796,7 @@ func (s *Server) luaTile38AtomicRO(msg *Message) (resp.Value, error) {
796796
case "get", "keys", "scan", "nearby", "within", "intersects", "hooks", "search",
797797
"ttl", "bounds", "server", "info", "type", "jget", "fget", "exists", "fexists", "test":
798798
// read operations
799-
if s.config.followHost() != "" && !s.fcuponce {
799+
if s.config.followHost() != "" && !s.caughtUpOnce() {
800800
return resp.NullValue(), errCatchingUp
801801
}
802802
}
@@ -849,7 +849,7 @@ func (s *Server) luaTile38NonAtomic(msg *Message) (resp.Value, error) {
849849
// read operations
850850
s.mu.RLock()
851851
defer s.mu.RUnlock()
852-
if s.config.followHost() != "" && !s.fcuponce {
852+
if s.config.followHost() != "" && !s.caughtUpOnce() {
853853
return resp.NullValue(), errCatchingUp
854854
}
855855
}

internal/server/server.go

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -131,16 +131,15 @@ type Server struct {
131131
hookExpires *btree.BTree // queue of all hooks marked for expiration
132132

133133
// followers (external aof readers)
134-
follows map[*bytes.Buffer]bool
135-
fcond *sync.Cond
136-
lstack []*commandDetails
137-
lives map[*liveBuffer]bool
138-
lcond *sync.Cond // live geofence signal
139-
faofsz int // last reported aofsize
140-
fcup bool // follow caught up
141-
fcuponce bool // follow caught up once
142-
aofconnM map[net.Conn]io.Closer
143-
pubq pubQueue
134+
follows map[*bytes.Buffer]bool
135+
fcond *sync.Cond
136+
lstack []*commandDetails
137+
lives map[*liveBuffer]bool
138+
lcond *sync.Cond // live geofence signal
139+
faofsz int // last reported aofsize
140+
fcupflags atomic.Int32 // follow caught up (caughtUp and caughtUpOnce)
141+
aofconnM map[net.Conn]io.Closer
142+
pubq pubQueue
144143

145144
// lua scripts
146145
luascripts *lScriptMap
@@ -1029,12 +1028,16 @@ func (s *Server) handleInputCommand(client *Client, msg *Message) error {
10291028
}
10301029
case "get", "keys", "scan", "nearby", "within", "intersects", "hooks",
10311030
"chans", "search", "ttl", "bounds", "server", "info", "type", "jget",
1032-
"evalro", "evalrosha", "healthz", "role", "fget", "exists", "fexists":
1031+
"evalro", "evalrosha", "role", "fget", "exists", "fexists":
10331032
// read operations
1034-
10351033
s.mu.RLock()
10361034
defer s.mu.RUnlock()
1037-
if s.config.followHost() != "" && !s.fcuponce {
1035+
// fallthrough to perform a "catching up to leader" check
1036+
fallthrough
1037+
case "healthz":
1038+
// healthz operation does not require a read lock. It only needs
1039+
// a caughtup once check to leader, which is atomic.
1040+
if s.config.followHost() != "" && !s.caughtUpOnce() {
10381041
return writeErr("catching up to leader")
10391042
}
10401043
case "follow", "slaveof", "replconf", "readonly", "config":

internal/server/stats.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -109,9 +109,7 @@ func (s *Server) cmdHEALTHZ(msg *Message) (resp.Value, error) {
109109
// >> Operation
110110

111111
if s.config.followHost() != "" {
112-
m := make(map[string]interface{})
113-
s.basicStats(m)
114-
if fmt.Sprintf("%v", m["caught_up"]) != "true" {
112+
if !s.caughtUp() {
115113
return retrerr(errors.New("not caught up"))
116114
}
117115
}
@@ -168,8 +166,9 @@ func (s *Server) basicStats(m map[string]interface{}) {
168166
if s.config.followHost() != "" {
169167
m["following"] = fmt.Sprintf("%s:%d", s.config.followHost(),
170168
s.config.followPort())
171-
m["caught_up"] = s.fcup
172-
m["caught_up_once"] = s.fcuponce
169+
170+
m["caught_up"] = s.caughtUp()
171+
m["caught_up_once"] = s.caughtUpOnce()
173172
}
174173
m["http_transport"] = s.http
175174
m["pid"] = os.Getpid()

0 commit comments

Comments
 (0)