Skip to content

Commit 7c9c74f

Browse files
authored
Merge pull request #58 from caesarxuchao/concurrency-test
Let the proxy server handle concurrent client requests correctly
2 parents 02dcfc2 + 645b8cd commit 7c9c74f

File tree

8 files changed

+492
-140
lines changed

8 files changed

+492
-140
lines changed
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
/*
2+
Copyright 2020 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package agentserver
18+
19+
import (
20+
"math/rand"
21+
"sync"
22+
"time"
23+
24+
"k8s.io/klog"
25+
"sigs.k8s.io/apiserver-network-proxy/proto/agent"
26+
)
27+
28+
// BackendManager is an interface to manage backend connections, i.e.,
29+
// connection to the proxy agents.
30+
type BackendManager interface {
31+
// Backend returns a single backend.
32+
Backend() (agent.AgentService_ConnectServer, error)
33+
// AddBackend adds a backend.
34+
AddBackend(agentID string, conn agent.AgentService_ConnectServer)
35+
// RemoveBackend removes a backend.
36+
RemoveBackend(agentID string, conn agent.AgentService_ConnectServer)
37+
}
38+
39+
var _ BackendManager = &DefaultBackendManager{}
40+
41+
// DefaultBackendManager is the default backend manager.
42+
type DefaultBackendManager struct {
43+
mu sync.RWMutex //protects the following
44+
// A map between agentID and its grpc connections.
45+
// For a given agent, ProxyServer prefers backends[agentID][0] to send
46+
// traffic, because backends[agentID][1:] are more likely to be closed
47+
// by the agent to deduplicate connections to the same server.
48+
backends map[string][]agent.AgentService_ConnectServer
49+
// agentID is tracked in this slice to enable randomly picking an
50+
// agentID in the Backend() method. There is no reliable way to
51+
// randomly pick a key from a map (in this case, the backends) in
52+
// Golang.
53+
agentIDs []string
54+
random *rand.Rand
55+
}
56+
57+
// NewDefaultBackendManager returns a DefaultBackendManager.
58+
func NewDefaultBackendManager() *DefaultBackendManager {
59+
return &DefaultBackendManager{
60+
backends: make(map[string][]agent.AgentService_ConnectServer),
61+
random: rand.New(rand.NewSource(time.Now().UnixNano())),
62+
}
63+
}
64+
65+
// AddBackend adds a backend.
66+
func (s *DefaultBackendManager) AddBackend(agentID string, conn agent.AgentService_ConnectServer) {
67+
klog.Infof("register Backend %v for agentID %s", conn, agentID)
68+
s.mu.Lock()
69+
defer s.mu.Unlock()
70+
_, ok := s.backends[agentID]
71+
if ok {
72+
for _, v := range s.backends[agentID] {
73+
if v == conn {
74+
klog.Warningf("this should not happen. Adding existing connection %v for agentID %s", conn, agentID)
75+
return
76+
}
77+
}
78+
s.backends[agentID] = append(s.backends[agentID], conn)
79+
return
80+
}
81+
s.backends[agentID] = []agent.AgentService_ConnectServer{conn}
82+
s.agentIDs = append(s.agentIDs, agentID)
83+
}
84+
85+
// RemoveBackend removes a backend.
86+
func (s *DefaultBackendManager) RemoveBackend(agentID string, conn agent.AgentService_ConnectServer) {
87+
klog.Infof("remove Backend %v for agentID %s", conn, agentID)
88+
s.mu.Lock()
89+
defer s.mu.Unlock()
90+
backends, ok := s.backends[agentID]
91+
if !ok {
92+
klog.Warningf("can't find agentID %s in the backends", agentID)
93+
return
94+
}
95+
var found bool
96+
for i, c := range backends {
97+
if c == conn {
98+
s.backends[agentID] = append(s.backends[agentID][:i], s.backends[agentID][i+1:]...)
99+
if i == 0 && len(s.backends) != 0 {
100+
klog.Warningf("this should not happen. Removed connection %v that is not the first connection, remaining connections are %v", conn, s.backends[agentID])
101+
}
102+
found = true
103+
}
104+
}
105+
if len(s.backends[agentID]) == 0 {
106+
delete(s.backends, agentID)
107+
for i := range s.agentIDs {
108+
if s.agentIDs[i] == agentID {
109+
s.agentIDs[i] = s.agentIDs[len(s.agentIDs)-1]
110+
s.agentIDs = s.agentIDs[:len(s.agentIDs)-1]
111+
break
112+
}
113+
}
114+
}
115+
if !found {
116+
klog.Errorf("can't find conn %v for agentID %s in the backends", conn, agentID)
117+
}
118+
}
119+
120+
// ErrNotFound indicates that no backend can be found.
121+
type ErrNotFound struct{}
122+
123+
// Error returns the error message.
124+
func (e *ErrNotFound) Error() string {
125+
return "No backend available"
126+
}
127+
128+
// Backend returns a random backend.
129+
func (s *DefaultBackendManager) Backend() (agent.AgentService_ConnectServer, error) {
130+
s.mu.RLock()
131+
defer s.mu.RUnlock()
132+
if len(s.backends) == 0 {
133+
return nil, &ErrNotFound{}
134+
}
135+
agentID := s.agentIDs[s.random.Intn(len(s.agentIDs))]
136+
// always return the first connection to an agent, because the agent
137+
// will close later connections if there are multiple.
138+
return s.backends[agentID][0], nil
139+
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
Copyright 2020 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package agentserver
18+
19+
import (
20+
"reflect"
21+
"testing"
22+
23+
"sigs.k8s.io/apiserver-network-proxy/proto/agent"
24+
)
25+
26+
type fakeAgentService_ConnectServer struct {
27+
agent.AgentService_ConnectServer
28+
}
29+
30+
func TestAddRemoveBackends(t *testing.T) {
31+
conn1 := new(fakeAgentService_ConnectServer)
32+
conn12 := new(fakeAgentService_ConnectServer)
33+
conn2 := new(fakeAgentService_ConnectServer)
34+
conn22 := new(fakeAgentService_ConnectServer)
35+
conn3 := new(fakeAgentService_ConnectServer)
36+
37+
p := NewDefaultBackendManager()
38+
39+
p.AddBackend("agent1", conn1)
40+
p.RemoveBackend("agent1", conn1)
41+
expectedBackends := make(map[string][]agent.AgentService_ConnectServer)
42+
expectedAgentIDs := []string{}
43+
if e, a := expectedBackends, p.backends; !reflect.DeepEqual(e, a) {
44+
t.Errorf("expected %v, got %v", e, a)
45+
}
46+
if e, a := expectedAgentIDs, p.agentIDs; !reflect.DeepEqual(e, a) {
47+
t.Errorf("expected %v, got %v", e, a)
48+
}
49+
50+
p = NewDefaultBackendManager()
51+
p.AddBackend("agent1", conn1)
52+
p.AddBackend("agent1", conn12)
53+
// Adding the same connection again should be a no-op.
54+
p.AddBackend("agent1", conn12)
55+
p.AddBackend("agent2", conn2)
56+
p.AddBackend("agent2", conn22)
57+
p.AddBackend("agent3", conn3)
58+
p.RemoveBackend("agent2", conn22)
59+
p.RemoveBackend("agent2", conn2)
60+
p.RemoveBackend("agent1", conn1)
61+
// This is invalid. agent1 doesn't have conn3. This should be a no-op.
62+
p.RemoveBackend("agent1", conn3)
63+
expectedBackends = map[string][]agent.AgentService_ConnectServer{
64+
"agent1": []agent.AgentService_ConnectServer{conn12},
65+
"agent3": []agent.AgentService_ConnectServer{conn3},
66+
}
67+
expectedAgentIDs = []string{"agent1", "agent3"}
68+
if e, a := expectedBackends, p.backends; !reflect.DeepEqual(e, a) {
69+
t.Errorf("expected %v, got %v", e, a)
70+
}
71+
if e, a := expectedAgentIDs, p.agentIDs; !reflect.DeepEqual(e, a) {
72+
t.Errorf("expected %v, got %v", e, a)
73+
}
74+
}

0 commit comments

Comments
 (0)