|
| 1 | +/* |
| 2 | +Copyright 2020 The Kubernetes Authors. |
| 3 | +
|
| 4 | +Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | +you may not use this file except in compliance with the License. |
| 6 | +You may obtain a copy of the License at |
| 7 | +
|
| 8 | + http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | +
|
| 10 | +Unless required by applicable law or agreed to in writing, software |
| 11 | +distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | +See the License for the specific language governing permissions and |
| 14 | +limitations under the License. |
| 15 | +*/ |
| 16 | + |
| 17 | +package agentserver |
| 18 | + |
| 19 | +import ( |
| 20 | + "math/rand" |
| 21 | + "sync" |
| 22 | + "time" |
| 23 | + |
| 24 | + "k8s.io/klog" |
| 25 | + "sigs.k8s.io/apiserver-network-proxy/proto/agent" |
| 26 | +) |
| 27 | + |
| 28 | +// BackendManager is an interface to manage backend connections, i.e., |
| 29 | +// connection to the proxy agents. |
| 30 | +type BackendManager interface { |
| 31 | + // Backend returns a single backend. |
| 32 | + Backend() (agent.AgentService_ConnectServer, error) |
| 33 | + // AddBackend adds a backend. |
| 34 | + AddBackend(agentID string, conn agent.AgentService_ConnectServer) |
| 35 | + // RemoveBackend removes a backend. |
| 36 | + RemoveBackend(agentID string, conn agent.AgentService_ConnectServer) |
| 37 | +} |
| 38 | + |
| 39 | +var _ BackendManager = &DefaultBackendManager{} |
| 40 | + |
| 41 | +// DefaultBackendManager is the default backend manager. |
| 42 | +type DefaultBackendManager struct { |
| 43 | + mu sync.RWMutex //protects the following |
| 44 | + // A map between agentID and its grpc connections. |
| 45 | + // For a given agent, ProxyServer prefers backends[agentID][0] to send |
| 46 | + // traffic, because backends[agentID][1:] are more likely to be closed |
| 47 | + // by the agent to deduplicate connections to the same server. |
| 48 | + backends map[string][]agent.AgentService_ConnectServer |
| 49 | + // agentID is tracked in this slice to enable randomly picking an |
| 50 | + // agentID in the Backend() method. There is no reliable way to |
| 51 | + // randomly pick a key from a map (in this case, the backends) in |
| 52 | + // Golang. |
| 53 | + agentIDs []string |
| 54 | + random *rand.Rand |
| 55 | +} |
| 56 | + |
| 57 | +// NewDefaultBackendManager returns a DefaultBackendManager. |
| 58 | +func NewDefaultBackendManager() *DefaultBackendManager { |
| 59 | + return &DefaultBackendManager{ |
| 60 | + backends: make(map[string][]agent.AgentService_ConnectServer), |
| 61 | + random: rand.New(rand.NewSource(time.Now().UnixNano())), |
| 62 | + } |
| 63 | +} |
| 64 | + |
| 65 | +// AddBackend adds a backend. |
| 66 | +func (s *DefaultBackendManager) AddBackend(agentID string, conn agent.AgentService_ConnectServer) { |
| 67 | + klog.Infof("register Backend %v for agentID %s", conn, agentID) |
| 68 | + s.mu.Lock() |
| 69 | + defer s.mu.Unlock() |
| 70 | + _, ok := s.backends[agentID] |
| 71 | + if ok { |
| 72 | + for _, v := range s.backends[agentID] { |
| 73 | + if v == conn { |
| 74 | + klog.Warningf("this should not happen. Adding existing connection %v for agentID %s", conn, agentID) |
| 75 | + return |
| 76 | + } |
| 77 | + } |
| 78 | + s.backends[agentID] = append(s.backends[agentID], conn) |
| 79 | + return |
| 80 | + } |
| 81 | + s.backends[agentID] = []agent.AgentService_ConnectServer{conn} |
| 82 | + s.agentIDs = append(s.agentIDs, agentID) |
| 83 | +} |
| 84 | + |
| 85 | +// RemoveBackend removes a backend. |
| 86 | +func (s *DefaultBackendManager) RemoveBackend(agentID string, conn agent.AgentService_ConnectServer) { |
| 87 | + klog.Infof("remove Backend %v for agentID %s", conn, agentID) |
| 88 | + s.mu.Lock() |
| 89 | + defer s.mu.Unlock() |
| 90 | + backends, ok := s.backends[agentID] |
| 91 | + if !ok { |
| 92 | + klog.Warningf("can't find agentID %s in the backends", agentID) |
| 93 | + return |
| 94 | + } |
| 95 | + var found bool |
| 96 | + for i, c := range backends { |
| 97 | + if c == conn { |
| 98 | + s.backends[agentID] = append(s.backends[agentID][:i], s.backends[agentID][i+1:]...) |
| 99 | + if i == 0 && len(s.backends) != 0 { |
| 100 | + klog.Warningf("this should not happen. Removed connection %v that is not the first connection, remaining connections are %v", conn, s.backends[agentID]) |
| 101 | + } |
| 102 | + found = true |
| 103 | + } |
| 104 | + } |
| 105 | + if len(s.backends[agentID]) == 0 { |
| 106 | + delete(s.backends, agentID) |
| 107 | + for i := range s.agentIDs { |
| 108 | + if s.agentIDs[i] == agentID { |
| 109 | + s.agentIDs[i] = s.agentIDs[len(s.agentIDs)-1] |
| 110 | + s.agentIDs = s.agentIDs[:len(s.agentIDs)-1] |
| 111 | + break |
| 112 | + } |
| 113 | + } |
| 114 | + } |
| 115 | + if !found { |
| 116 | + klog.Errorf("can't find conn %v for agentID %s in the backends", conn, agentID) |
| 117 | + } |
| 118 | +} |
| 119 | + |
| 120 | +// ErrNotFound indicates that no backend can be found. |
| 121 | +type ErrNotFound struct{} |
| 122 | + |
| 123 | +// Error returns the error message. |
| 124 | +func (e *ErrNotFound) Error() string { |
| 125 | + return "No backend available" |
| 126 | +} |
| 127 | + |
| 128 | +// Backend returns a random backend. |
| 129 | +func (s *DefaultBackendManager) Backend() (agent.AgentService_ConnectServer, error) { |
| 130 | + s.mu.RLock() |
| 131 | + defer s.mu.RUnlock() |
| 132 | + if len(s.backends) == 0 { |
| 133 | + return nil, &ErrNotFound{} |
| 134 | + } |
| 135 | + agentID := s.agentIDs[s.random.Intn(len(s.agentIDs))] |
| 136 | + // always return the first connection to an agent, because the agent |
| 137 | + // will close later connections if there are multiple. |
| 138 | + return s.backends[agentID][0], nil |
| 139 | +} |
0 commit comments