Skip to content

Commit 4ad355b

Browse files
authored
Merge pull request #78 from caesarxuchao/add-packet-size
Fix races
2 parents d7a8816 + 2ee6177 commit 4ad355b

File tree

7 files changed

+109
-33
lines changed

7 files changed

+109
-33
lines changed

go.sum

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y
6060
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
6161
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
6262
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
63+
github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY=
6364
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
6465
github.com/google/gofuzz v0.0.0-20161122191042-44d81051d367/go.mod h1:HP5RmnzzSNb993RKQDq4+1A4ia9nllfqcQFTQJedwGI=
6566
github.com/google/gofuzz v1.0.0 h1:A8PeW59pxE9IoFRqBp37U+mSNaQoZ46F1f0f863XSXw=

pkg/agent/agentclient/client.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ func (c *connContext) cleanup() {
7474
c.cleanOnce.Do(c.cleanFunc)
7575
}
7676

77-
// Connect connnects to proxy server to establish a gRPC stream,
77+
// Connect connects to proxy server to establish a gRPC stream,
7878
// on which the proxied traffic is multiplexed through the stream
7979
// and piped to the local connection. It register itself as a
8080
// backend from proxy server, so proxy server will route traffic
@@ -230,7 +230,7 @@ func (a *AgentClient) remoteToProxy(conn net.Conn, connID int64) {
230230

231231
for {
232232
n, err := conn.Read(buf[:])
233-
klog.Infof("received %d bytes from proxy server", n)
233+
klog.Infof("received %d bytes from remote for connID[%d]", n, connID)
234234

235235
if err == io.EOF {
236236
klog.Info("connection EOF")
@@ -264,8 +264,10 @@ func (a *AgentClient) proxyToRemote(conn net.Conn, connID int64) {
264264
for {
265265
n, err := conn.Write(d[pos:])
266266
if err == nil {
267+
klog.Infof("[connID: %d] write last %d data to remote", connID, n)
267268
break
268269
} else if n > 0 {
270+
klog.Infof("[connID: %d] write %d data to remote with error: %v", connID, n, err)
269271
pos += n
270272
} else {
271273
klog.Errorf("conn write error: %v", err)

pkg/agent/agentserver/backend_manager.go

Lines changed: 41 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,51 @@ limitations under the License.
1717
package agentserver
1818

1919
import (
20+
"context"
2021
"math/rand"
2122
"sync"
2223
"time"
2324

2425
"k8s.io/klog"
26+
client "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client"
2527
"sigs.k8s.io/apiserver-network-proxy/proto/agent"
2628
)
2729

30+
type Backend interface {
31+
Send(p *client.Packet) error
32+
Context() context.Context
33+
}
34+
35+
var _ Backend = &backend{}
36+
var _Backend = new(agent.AgentService_ConnectServer)
37+
38+
type backend struct {
39+
// TODO: this is a multi-writer single-reader pattern, it's tricky to
40+
// write it using channel. Let's worry about performance later.
41+
mu sync.Mutex // mu protects conn
42+
conn agent.AgentService_ConnectServer
43+
}
44+
45+
func (b *backend) Send(p *client.Packet) error {
46+
b.mu.Lock()
47+
defer b.mu.Unlock()
48+
return b.conn.Send(p)
49+
}
50+
51+
func (b *backend) Context() context.Context {
52+
// TODO: does Context require lock protection?
53+
return b.conn.Context()
54+
}
55+
56+
func newBackend(conn agent.AgentService_ConnectServer) *backend {
57+
return &backend{conn: conn}
58+
}
59+
2860
// BackendManager is an interface to manage backend connections, i.e.,
2961
// connection to the proxy agents.
3062
type BackendManager interface {
3163
// Backend returns a single backend.
32-
Backend() (agent.AgentService_ConnectServer, error)
64+
Backend() (Backend, error)
3365
// AddBackend adds a backend.
3466
AddBackend(agentID string, conn agent.AgentService_ConnectServer)
3567
// RemoveBackend removes a backend.
@@ -45,7 +77,7 @@ type DefaultBackendManager struct {
4577
// For a given agent, ProxyServer prefers backends[agentID][0] to send
4678
// traffic, because backends[agentID][1:] are more likely to be closed
4779
// by the agent to deduplicate connections to the same server.
48-
backends map[string][]agent.AgentService_ConnectServer
80+
backends map[string][]*backend
4981
// agentID is tracked in this slice to enable randomly picking an
5082
// agentID in the Backend() method. There is no reliable way to
5183
// randomly pick a key from a map (in this case, the backends) in
@@ -57,7 +89,7 @@ type DefaultBackendManager struct {
5789
// NewDefaultBackendManager returns a DefaultBackendManager.
5890
func NewDefaultBackendManager() *DefaultBackendManager {
5991
return &DefaultBackendManager{
60-
backends: make(map[string][]agent.AgentService_ConnectServer),
92+
backends: make(map[string][]*backend),
6193
random: rand.New(rand.NewSource(time.Now().UnixNano())),
6294
}
6395
}
@@ -70,15 +102,15 @@ func (s *DefaultBackendManager) AddBackend(agentID string, conn agent.AgentServi
70102
_, ok := s.backends[agentID]
71103
if ok {
72104
for _, v := range s.backends[agentID] {
73-
if v == conn {
105+
if v.conn == conn {
74106
klog.Warningf("this should not happen. Adding existing connection %v for agentID %s", conn, agentID)
75107
return
76108
}
77109
}
78-
s.backends[agentID] = append(s.backends[agentID], conn)
110+
s.backends[agentID] = append(s.backends[agentID], newBackend(conn))
79111
return
80112
}
81-
s.backends[agentID] = []agent.AgentService_ConnectServer{conn}
113+
s.backends[agentID] = []*backend{newBackend(conn)}
82114
s.agentIDs = append(s.agentIDs, agentID)
83115
}
84116

@@ -94,7 +126,7 @@ func (s *DefaultBackendManager) RemoveBackend(agentID string, conn agent.AgentSe
94126
}
95127
var found bool
96128
for i, c := range backends {
97-
if c == conn {
129+
if c.conn == conn {
98130
s.backends[agentID] = append(s.backends[agentID][:i], s.backends[agentID][i+1:]...)
99131
if i == 0 && len(s.backends) != 0 {
100132
klog.Warningf("this should not happen. Removed connection %v that is not the first connection, remaining connections are %v", conn, s.backends[agentID])
@@ -126,13 +158,14 @@ func (e *ErrNotFound) Error() string {
126158
}
127159

128160
// Backend returns a random backend.
129-
func (s *DefaultBackendManager) Backend() (agent.AgentService_ConnectServer, error) {
161+
func (s *DefaultBackendManager) Backend() (Backend, error) {
130162
s.mu.RLock()
131163
defer s.mu.RUnlock()
132164
if len(s.backends) == 0 {
133165
return nil, &ErrNotFound{}
134166
}
135167
agentID := s.agentIDs[s.random.Intn(len(s.agentIDs))]
168+
klog.Infof("pick agentID=%s as backend", agentID)
136169
// always return the first connection to an agent, because the agent
137170
// will close later connections if there are multiple.
138171
return s.backends[agentID][0], nil

pkg/agent/agentserver/backend_manager_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ func TestAddRemoveBackends(t *testing.T) {
3838

3939
p.AddBackend("agent1", conn1)
4040
p.RemoveBackend("agent1", conn1)
41-
expectedBackends := make(map[string][]agent.AgentService_ConnectServer)
41+
expectedBackends := make(map[string][]*backend)
4242
expectedAgentIDs := []string{}
4343
if e, a := expectedBackends, p.backends; !reflect.DeepEqual(e, a) {
4444
t.Errorf("expected %v, got %v", e, a)
@@ -60,9 +60,9 @@ func TestAddRemoveBackends(t *testing.T) {
6060
p.RemoveBackend("agent1", conn1)
6161
// This is invalid. agent1 doesn't have conn3. This should be a no-op.
6262
p.RemoveBackend("agent1", conn3)
63-
expectedBackends = map[string][]agent.AgentService_ConnectServer{
64-
"agent1": []agent.AgentService_ConnectServer{conn12},
65-
"agent3": []agent.AgentService_ConnectServer{conn3},
63+
expectedBackends = map[string][]*backend{
64+
"agent1": []*backend{newBackend(conn12)},
65+
"agent3": []*backend{newBackend(conn3)},
6666
}
6767
expectedAgentIDs = []string{"agent1", "agent3"}
6868
if e, a := expectedBackends, p.backends; !reflect.DeepEqual(e, a) {

pkg/agent/agentserver/server.go

Lines changed: 49 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ type ProxyClientConnection struct {
4141
HTTP net.Conn
4242
connected chan struct{}
4343
connectID int64
44+
agentID string
4445
}
4546

4647
func (c *ProxyClientConnection) send(pkt *client.Packet) error {
@@ -63,6 +64,36 @@ func (c *ProxyClientConnection) send(pkt *client.Packet) error {
6364
}
6465
}
6566

67+
func NewPendingDialManager() *PendingDialManager {
68+
return &PendingDialManager{
69+
pendingDial: make(map[int64]*ProxyClientConnection),
70+
}
71+
}
72+
73+
type PendingDialManager struct {
74+
mu sync.RWMutex
75+
pendingDial map[int64]*ProxyClientConnection
76+
}
77+
78+
func (pm *PendingDialManager) Add(random int64, clientConn *ProxyClientConnection) {
79+
pm.mu.Lock()
80+
defer pm.mu.Unlock()
81+
pm.pendingDial[random] = clientConn
82+
}
83+
84+
func (pm *PendingDialManager) Get(random int64) (*ProxyClientConnection, bool) {
85+
pm.mu.RLock()
86+
defer pm.mu.RUnlock()
87+
clientConn, ok := pm.pendingDial[random]
88+
return clientConn, ok
89+
}
90+
91+
func (pm *PendingDialManager) Remove(random int64) {
92+
pm.mu.Lock()
93+
defer pm.mu.Unlock()
94+
delete(pm.pendingDial, random)
95+
}
96+
6697
// ProxyServer
6798
type ProxyServer struct {
6899
// BackendManager manages the backends.
@@ -73,7 +104,7 @@ type ProxyServer struct {
73104
// conn = Frontend[agentID][connID]
74105
frontends map[string]map[int64]*ProxyClientConnection
75106

76-
PendingDial map[int64]*ProxyClientConnection
107+
PendingDial *PendingDialManager
77108

78109
serverID string // unique ID of this server
79110
serverCount int // Number of proxy server instances, should be 1 unless it is a HA server.
@@ -143,7 +174,7 @@ func (s *ProxyServer) getFrontend(agentID string, connID int64) (*ProxyClientCon
143174
func NewProxyServer(serverID string, serverCount int, agentAuthenticationOptions *AgentTokenAuthenticationOptions) *ProxyServer {
144175
p := &ProxyServer{
145176
frontends: make(map[string](map[int64]*ProxyClientConnection)),
146-
PendingDial: make(map[int64]*ProxyClientConnection),
177+
PendingDial: NewPendingDialManager(),
147178
serverID: serverID,
148179
serverCount: serverCount,
149180
BackendManager: NewDefaultBackendManager(),
@@ -192,7 +223,7 @@ func (s *ProxyServer) serveRecvFrontend(stream client.ProxyService_ProxyServer,
192223
var firstConnID int64
193224
// The first packet should be a DIAL_REQ, we will randomly get a
194225
// backend from the BackendManger then.
195-
var backend agent.AgentService_ConnectServer
226+
var backend Backend
196227
var err error
197228

198229
for pkt := range recvCh {
@@ -211,11 +242,13 @@ func (s *ProxyServer) serveRecvFrontend(stream client.ProxyService_ProxyServer,
211242
if err := backend.Send(pkt); err != nil {
212243
klog.Warningf(">>> DIAL_REQ to Backend failed: %v", err)
213244
}
214-
s.PendingDial[pkt.GetDialRequest().Random] = &ProxyClientConnection{
215-
Mode: "grpc",
216-
Grpc: stream,
217-
connected: make(chan struct{}),
218-
}
245+
s.PendingDial.Add(
246+
pkt.GetDialRequest().Random,
247+
&ProxyClientConnection{
248+
Mode: "grpc",
249+
Grpc: stream,
250+
connected: make(chan struct{}),
251+
})
219252
klog.Info(">>> DIAL_REQ sent to backend") // got this. but backend didn't receive anything.
220253

221254
case client.PacketType_CLOSE_REQ:
@@ -233,7 +266,8 @@ func (s *ProxyServer) serveRecvFrontend(stream client.ProxyService_ProxyServer,
233266

234267
case client.PacketType_DATA:
235268
connID := pkt.GetData().ConnectID
236-
klog.Infof(">>> Received DATA(id=%d)", connID)
269+
data := pkt.GetData().Data
270+
klog.Infof(">>> Received %d bytes of DATA(id=%d)", len(data), connID)
237271
if firstConnID == 0 {
238272
firstConnID = connID
239273
} else if firstConnID != connID {
@@ -424,25 +458,26 @@ func (s *ProxyServer) serveRecvBackend(stream agent.AgentService_ConnectServer,
424458
switch pkt.Type {
425459
case client.PacketType_DIAL_RSP:
426460
resp := pkt.GetDialResponse()
427-
klog.Infof("<<< Received DIAL_RSP(rand=%d, id=%d)", resp.Random, resp.ConnectID)
461+
klog.Infof("<<< Received DIAL_RSP(rand=%d), agentID %s, connID %d)", resp.Random, agentID, resp.ConnectID)
428462

429-
if client, ok := s.PendingDial[resp.Random]; !ok {
463+
if client, ok := s.PendingDial.Get(resp.Random); !ok {
430464
klog.Warning("<<< DialResp not recognized; dropped")
431465
} else {
432466
err := client.send(pkt)
433-
delete(s.PendingDial, resp.Random)
467+
s.PendingDial.Remove(resp.Random)
434468
if err != nil {
435469
klog.Warningf("<<< DIAL_RSP send to client stream error: %v", err)
436470
} else {
437471
client.connectID = resp.ConnectID
472+
client.agentID = agentID
438473
s.addFrontend(agentID, resp.ConnectID, client)
439474
close(client.connected)
440475
}
441476
}
442477

443478
case client.PacketType_DATA:
444479
resp := pkt.GetData()
445-
klog.Infof("<<< Received DATA(id=%d)", resp.ConnectID)
480+
klog.Infof("<<< Received %d bytes of DATA from agentID %s, connID %d", len(resp.Data), agentID, resp.ConnectID)
446481
client, err := s.getFrontend(agentID, resp.ConnectID)
447482
if err != nil {
448483
klog.Warning(err)
@@ -469,7 +504,7 @@ func (s *ProxyServer) serveRecvBackend(stream agent.AgentService_ConnectServer,
469504
klog.Infof("<<< CLOSE_RSP sent to frontend")
470505
}
471506
s.removeFrontend(agentID, resp.ConnectID)
472-
klog.Infof("<<< Close streaming (id=%d)", resp.ConnectID)
507+
klog.Infof("<<< Close streaming (agentID=%s, connId=%d)", agentID, resp.ConnectID)
473508

474509
default:
475510
klog.Warningf("<<< Unrecognized packet %+v", pkt)

pkg/agent/agentserver/tunnel.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func (t *Tunnel) ServeHTTP(w http.ResponseWriter, r *http.Request) {
7373
HTTP: conn,
7474
connected: connected,
7575
}
76-
t.Server.PendingDial[random] = connection
76+
t.Server.PendingDial.Add(random, connection)
7777
backend, err := t.Server.BackendManager.Backend()
7878
if err != nil {
7979
http.Error(w, fmt.Sprintf("currently no tunnels available: %v", err), http.StatusInternalServerError)
@@ -102,8 +102,13 @@ func (t *Tunnel) ServeHTTP(w http.ResponseWriter, r *http.Request) {
102102
klog.Infof("Starting proxy to %q", r.Host)
103103
pkt := make([]byte, 1<<12)
104104

105+
connID := connection.connectID
106+
agentID := connection.agentID
107+
var acc int
108+
105109
for {
106110
n, err := bufrw.Read(pkt[:])
111+
acc += n
107112
if err == io.EOF {
108113
klog.Warningf("EOF from %v", r.Host)
109114
break
@@ -117,7 +122,7 @@ func (t *Tunnel) ServeHTTP(w http.ResponseWriter, r *http.Request) {
117122
Type: client.PacketType_DATA,
118123
Payload: &client.Packet_Data{
119124
Data: &client.Data{
120-
ConnectID: connection.connectID,
125+
ConnectID: connID,
121126
Data: pkt[:n],
122127
},
123128
},
@@ -127,8 +132,8 @@ func (t *Tunnel) ServeHTTP(w http.ResponseWriter, r *http.Request) {
127132
klog.Errorf("error sending packet %v", err)
128133
continue
129134
}
130-
klog.Infof("Forwarding on tunnel, packet type: %s", packet.Type)
135+
klog.Infof("Forwarding %d (total %d) bytes of DATA on tunnel for agentID %s, connID %d", n, acc, connection.agentID, connection.connectID)
131136
}
132137

133-
klog.Infof("Stopping transfer to %q", r.Host)
138+
klog.Infof("Stopping transfer to %q, agentID %s, connID %d", r.Host, agentID, connID)
134139
}

tests/concurrent_client_request_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ func (s *singleTimeManager) RemoveBackend(agentID string, conn agent.AgentServic
7878
delete(s.backends, agentID)
7979
}
8080

81-
func (s *singleTimeManager) Backend() (agent.AgentService_ConnectServer, error) {
81+
func (s *singleTimeManager) Backend() (agentserver.Backend, error) {
8282
s.mu.Lock()
8383
defer s.mu.Unlock()
8484
for k, v := range s.backends {

0 commit comments

Comments
 (0)