Skip to content

Commit 53580b0

Browse files
committed
implement server agent draining
Signed-off-by: Imran Pochi <[email protected]>
1 parent 7c8afba commit 53580b0

File tree

3 files changed

+49
-7
lines changed

3 files changed

+49
-7
lines changed

pkg/server/backend_manager.go

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,25 @@ type Backend struct {
4949
// cached from conn.Context()
5050
id string
5151
idents header.Identifiers
52+
53+
// draining indicates if this backend is draining and should not accept new connections
54+
draining bool
55+
// mu protects draining field
56+
mu sync.RWMutex
57+
}
58+
59+
// IsDraining returns true if the backend is draining
60+
func (b *Backend) IsDraining() bool {
61+
b.mu.RLock()
62+
defer b.mu.RUnlock()
63+
return b.draining
64+
}
65+
66+
// SetDraining marks the backend as draining
67+
func (b *Backend) SetDraining() {
68+
b.mu.Lock()
69+
defer b.mu.Unlock()
70+
b.draining = true
5271
}
5372

5473
func (b *Backend) Send(p *client.Packet) error {
@@ -346,9 +365,24 @@ func (s *DefaultBackendStorage) GetRandomBackend() (*Backend, error) {
346365
if len(s.backends) == 0 {
347366
return nil, &ErrNotFound{}
348367
}
349-
agentID := s.agentIDs[s.random.Intn(len(s.agentIDs))]
350-
klog.V(3).InfoS("Pick agent as backend", "agentID", agentID)
351-
// always return the first connection to an agent, because the agent
352-
// will close later connections if there are multiple.
353-
return s.backends[agentID][0], nil
368+
369+
// Start at a random agent and check each agent in sequence
370+
startIdx := s.random.Intn(len(s.agentIDs))
371+
for i := 0; i < len(s.agentIDs); i++ {
372+
// Wrap around using modulo
373+
currentIdx := (startIdx + i) % len(s.agentIDs)
374+
agentID := s.agentIDs[currentIdx]
375+
// always return the first connection to an agent, because the agent
376+
// will close later connections if there are multiple.
377+
backend := s.backends[agentID][0]
378+
379+
if !backend.IsDraining() {
380+
klog.V(3).InfoS("Pick agent as backend", "agentID", agentID)
381+
return backend, nil
382+
}
383+
}
384+
385+
// All agents are draining
386+
klog.V(2).InfoS("No non-draining backends available")
387+
return nil, &ErrNotFound{}
354388
}

pkg/server/desthost_backend_manager.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,14 @@ func (dibm *DestHostBackendManager) Backend(ctx context.Context) (*Backend, erro
7979
if destHost != "" {
8080
bes, exist := dibm.backends[destHost]
8181
if exist && len(bes) > 0 {
82-
klog.V(5).InfoS("Get the backend through the DestHostBackendManager", "destHost", destHost)
83-
return dibm.backends[destHost][0], nil
82+
// Find a non-draining backend for this destination host
83+
for _, backend := range bes {
84+
if !backend.IsDraining() {
85+
klog.V(5).InfoS("Get the backend through the DestHostBackendManager", "destHost", destHost)
86+
return backend, nil
87+
}
88+
}
89+
klog.V(4).InfoS("All backends for destination host are draining", "destHost", destHost)
8490
}
8591
}
8692
return nil, &ErrNotFound{}

pkg/server/server.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -999,6 +999,8 @@ func (s *ProxyServer) serveRecvBackend(backend *Backend, agentID string, recvCh
999999

10001000
case client.PacketType_DRAIN:
10011001
klog.V(2).InfoS("agent is draining", "agentID", agentID)
1002+
backend.SetDraining()
1003+
klog.V(2).InfoS("marked backend as draining, will not route new requests to this agent", "agentID", agentID)
10021004
default:
10031005
klog.V(5).InfoS("Ignoring unrecognized packet from backend", "packet", pkt, "agentID", agentID)
10041006
}

0 commit comments

Comments
 (0)