Skip to content

Commit 92d98b0

Browse files
authored
Merge pull request #90 from caesarxuchao/quick-return
Quick return
2 parents 67e3548 + 2119003 commit 92d98b0

File tree

3 files changed

+65
-41
lines changed

3 files changed

+65
-41
lines changed

pkg/agent/agentclient/client.go

Lines changed: 59 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,54 @@ import (
2727
"sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client"
2828
)
2929

30+
// connContext tracks a connection from agent to node network.
31+
type connContext struct {
32+
conn net.Conn
33+
cleanFunc func()
34+
dataCh chan []byte
35+
cleanOnce sync.Once
36+
}
37+
38+
func (c *connContext) cleanup() {
39+
c.cleanOnce.Do(c.cleanFunc)
40+
}
41+
42+
type connectionManager struct {
43+
mu sync.RWMutex
44+
connections map[int64]*connContext
45+
}
46+
47+
func (cm *connectionManager) Add(connID int64, ctx *connContext) {
48+
cm.mu.Lock()
49+
defer cm.mu.Unlock()
50+
cm.connections[connID] = ctx
51+
}
52+
53+
func (cm *connectionManager) Get(connID int64) (*connContext, bool) {
54+
cm.mu.RLock()
55+
defer cm.mu.RUnlock()
56+
ctx, ok := cm.connections[connID]
57+
return ctx, ok
58+
}
59+
60+
func (cm *connectionManager) Delete(connID int64) {
61+
cm.mu.Lock()
62+
defer cm.mu.Unlock()
63+
delete(cm.connections, connID)
64+
}
65+
66+
func newConnectionManager() *connectionManager {
67+
return &connectionManager{
68+
connections: make(map[int64]*connContext),
69+
}
70+
}
71+
3072
// AgentClient runs on the node network side. It connects to proxy server and establishes
3173
// a stream connection from which it sends and receives network traffic.
3274
type AgentClient struct {
33-
nextConnID int64
34-
connContext map[int64]*connContext
75+
nextConnID int64
76+
77+
connManager *connectionManager
3578

3679
stream *RedialableAgentClient
3780
stopCh <-chan struct{}
@@ -47,7 +90,7 @@ func newAgentClient(address, agentID string, cs *ClientSet, opts ...grpc.DialOpt
4790

4891
func newAgentClientWithRedialableAgentClient(rac *RedialableAgentClient) *AgentClient {
4992
return &AgentClient{
50-
connContext: make(map[int64]*connContext),
93+
connManager: newConnectionManager(),
5194
stream: rac,
5295
stopCh: rac.stopCh,
5396
}
@@ -62,18 +105,6 @@ func (c *AgentClient) Close() {
62105
c.stream.Close()
63106
}
64107

65-
// connContext tracks a connection from agent to node network.
66-
type connContext struct {
67-
conn net.Conn
68-
cleanFunc func()
69-
dataCh chan []byte
70-
cleanOnce sync.Once
71-
}
72-
73-
func (c *connContext) cleanup() {
74-
c.cleanOnce.Do(c.cleanFunc)
75-
}
76-
77108
// Connect connects to proxy server to establish a gRPC stream,
78109
// on which the proxied traffic is multiplexed through the stream
79110
// and piped to the local connection. It register itself as a
@@ -146,7 +177,7 @@ func (a *AgentClient) Serve() {
146177

147178
connID := atomic.AddInt64(&a.nextConnID, 1)
148179
dataCh := make(chan []byte, 5)
149-
a.connContext[connID] = &connContext{
180+
ctx := &connContext{
150181
conn: conn,
151182
dataCh: dataCh,
152183
cleanFunc: func() {
@@ -167,24 +198,26 @@ func (a *AgentClient) Serve() {
167198
}
168199

169200
close(dataCh)
170-
delete(a.connContext, connID)
201+
a.connManager.Delete(connID)
171202
},
172203
}
204+
a.connManager.Add(connID, ctx)
173205

174206
resp.GetDialResponse().ConnectID = connID
175207
if err := a.stream.RetrySend(resp); err != nil {
176208
klog.Warningf("stream send error: %v", err)
177209
continue
178210
}
179211

180-
go a.remoteToProxy(conn, connID)
181-
go a.proxyToRemote(conn, connID)
212+
go a.remoteToProxy(connID, ctx)
213+
go a.proxyToRemote(connID, ctx)
182214

183215
case client.PacketType_DATA:
184216
data := pkt.GetData()
185217
klog.Infof("received DATA(id=%d)", data.ConnectID)
186218

187-
if ctx, ok := a.connContext[data.ConnectID]; ok {
219+
ctx, ok := a.connManager.Get(data.ConnectID)
220+
if ok {
188221
ctx.dataCh <- data.Data
189222
}
190223

@@ -194,7 +227,8 @@ func (a *AgentClient) Serve() {
194227

195228
klog.Infof("received CLOSE_REQ(id=%d)", connID)
196229

197-
if ctx, ok := a.connContext[connID]; ok {
230+
ctx, ok := a.connManager.Get(connID)
231+
if ok {
198232
ctx.cleanup()
199233
} else {
200234
resp := &client.Packet{
@@ -215,12 +249,7 @@ func (a *AgentClient) Serve() {
215249
}
216250
}
217251

218-
func (a *AgentClient) remoteToProxy(conn net.Conn, connID int64) {
219-
ctx := a.connContext[connID]
220-
if ctx == nil {
221-
return
222-
}
223-
252+
func (a *AgentClient) remoteToProxy(connID int64, ctx *connContext) {
224253
defer ctx.cleanup()
225254

226255
var buf [1 << 12]byte
@@ -229,7 +258,7 @@ func (a *AgentClient) remoteToProxy(conn net.Conn, connID int64) {
229258
}
230259

231260
for {
232-
n, err := conn.Read(buf[:])
261+
n, err := ctx.conn.Read(buf[:])
233262
klog.Infof("received %d bytes from remote for connID[%d]", n, connID)
234263

235264
if err == io.EOF {
@@ -251,18 +280,13 @@ func (a *AgentClient) remoteToProxy(conn net.Conn, connID int64) {
251280
}
252281
}
253282

254-
func (a *AgentClient) proxyToRemote(conn net.Conn, connID int64) {
255-
ctx := a.connContext[connID]
256-
if ctx == nil {
257-
return
258-
}
259-
283+
func (a *AgentClient) proxyToRemote(connID int64, ctx *connContext) {
260284
defer ctx.cleanup()
261285

262286
for d := range ctx.dataCh {
263287
pos := 0
264288
for {
265-
n, err := conn.Write(d[pos:])
289+
n, err := ctx.conn.Write(d[pos:])
266290
if err == nil {
267291
klog.Infof("[connID: %d] write last %d data to remote", connID, n)
268292
break

pkg/agent/agentclient/client_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,16 @@ import (
1111

1212
"google.golang.org/grpc"
1313
"k8s.io/klog"
14-
"sigs.k8s.io/apiserver-network-proxy/proto/agent"
1514
"sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client"
15+
"sigs.k8s.io/apiserver-network-proxy/proto/agent"
1616
)
1717

1818
func TestServeData_HTTP(t *testing.T) {
1919
var err error
2020
var stream agent.AgentService_ConnectClient
2121
stopCh := make(chan struct{})
2222
testClient := &AgentClient{
23-
connContext: make(map[int64]*connContext),
23+
connManager: newConnectionManager(),
2424
stopCh: stopCh,
2525
}
2626
testClient.stream, stream = pipe2()
@@ -107,7 +107,7 @@ func TestServeData_HTTP(t *testing.T) {
107107
}
108108

109109
// Verify internal state is consistent
110-
if _, ok := testClient.connContext[connID]; ok {
110+
if _, ok := testClient.connManager.Get(connID); ok {
111111
t.Error("client.connContext not released")
112112
}
113113
}
@@ -116,7 +116,7 @@ func TestClose_Client(t *testing.T) {
116116
var stream agent.AgentService_ConnectClient
117117
stopCh := make(chan struct{})
118118
testClient := &AgentClient{
119-
connContext: make(map[int64]*connContext),
119+
connManager: newConnectionManager(),
120120
stopCh: stopCh,
121121
}
122122
testClient.stream, stream = pipe2()
@@ -171,7 +171,7 @@ func TestClose_Client(t *testing.T) {
171171
}
172172

173173
// Verify internal state is consistent
174-
if _, ok := testClient.connContext[connID]; ok {
174+
if _, ok := testClient.connManager.Get(connID); ok {
175175
t.Error("client.connContext not released")
176176
}
177177

pkg/agent/agentserver/tunnel.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ func (t *Tunnel) ServeHTTP(w http.ResponseWriter, r *http.Request) {
130130
err = backend.Send(packet)
131131
if err != nil {
132132
klog.Errorf("error sending packet %v", err)
133-
continue
133+
break
134134
}
135135
klog.Infof("Forwarding %d (total %d) bytes of DATA on tunnel for agentID %s, connID %d", n, acc, connection.agentID, connection.connectID)
136136
}

0 commit comments

Comments
 (0)