Skip to content

Commit 83e81a0

Browse files
committed
Implement Close
- When agent receives CLOSE_REQ, it closes the corresponding connection identified by ConnID - When agent's connection failed, it sends CLOSE_RESP to server to notify it.
1 parent cff3efc commit 83e81a0

File tree

8 files changed

+554
-141
lines changed

8 files changed

+554
-141
lines changed

pkg/agent/agentclient/client.go

Lines changed: 84 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -29,19 +29,28 @@ import (
2929
)
3030

3131
type AgentClient struct {
32-
nextConnID int64
33-
conns map[int64]net.Conn
34-
dataChs map[int64]chan []byte
35-
address string
32+
nextConnID int64
33+
connContext map[int64]*connContext
34+
address string
3635

3736
stream agent.AgentService_ConnectClient
3837
}
3938

39+
type connContext struct {
40+
conn net.Conn
41+
cleanFunc func()
42+
dataCh chan []byte
43+
cleanOnce sync.Once
44+
}
45+
46+
func (c *connContext) cleanup() {
47+
c.cleanOnce.Do(c.cleanFunc)
48+
}
49+
4050
func NewAgentClient(address string) *AgentClient {
4151
a := &AgentClient{
42-
conns: make(map[int64]net.Conn),
43-
address: address,
44-
dataChs: make(map[int64]chan []byte),
52+
connContext: make(map[int64]*connContext),
53+
address: address,
4554
}
4655

4756
return a
@@ -107,17 +116,29 @@ func (a *AgentClient) Serve(stopCh <-chan struct{}) {
107116
}
108117

109118
connID := atomic.AddInt64(&a.nextConnID, 1)
110-
a.conns[connID] = conn
111-
a.dataChs[connID] = make(chan []byte, 5)
112-
var once sync.Once
113-
114-
cleanup := func() {
115-
once.Do(func() {
116-
conn.Close()
117-
close(a.dataChs[connID])
118-
delete(a.conns, connID)
119-
delete(a.dataChs, connID)
120-
})
119+
dataCh := make(chan []byte, 5)
120+
a.connContext[connID] = &connContext{
121+
conn: conn,
122+
dataCh: dataCh,
123+
cleanFunc: func() {
124+
resp := &agent.Packet{
125+
Type: agent.PacketType_CLOSE_RSP,
126+
Payload: &agent.Packet_CloseResponse{CloseResponse: &agent.CloseResponse{}},
127+
}
128+
resp.GetCloseResponse().ConnectID = connID
129+
130+
err := conn.Close()
131+
if err != nil {
132+
resp.GetCloseResponse().Error = err.Error()
133+
}
134+
135+
if err := a.stream.Send(resp); err != nil {
136+
klog.Warningf("close response send error: %v", err)
137+
}
138+
139+
close(dataCh)
140+
delete(a.connContext, connID)
141+
},
121142
}
122143

123144
resp.GetDialResponse().ConnectID = connID
@@ -126,16 +147,37 @@ func (a *AgentClient) Serve(stopCh <-chan struct{}) {
126147
continue
127148
}
128149

129-
go a.remoteToProxy(conn, connID, cleanup)
130-
go a.proxyToRemote(conn, a.dataChs[connID], cleanup)
150+
go a.remoteToProxy(conn, connID)
151+
go a.proxyToRemote(conn, connID)
131152

132153
case agent.PacketType_DATA:
133-
klog.Info("received DATA")
134154
data := pkt.GetData()
155+
klog.Infof("received DATA(id=%d)", data.ConnectID)
135156
klog.Infof("[tracing] %v", data)
136157

137-
if dataCh, ok := a.dataChs[data.ConnectID]; ok {
138-
dataCh <- data.Data
158+
if ctx, ok := a.connContext[data.ConnectID]; ok {
159+
ctx.dataCh <- data.Data
160+
}
161+
162+
case agent.PacketType_CLOSE_REQ:
163+
closeReq := pkt.GetCloseRequest()
164+
connID := closeReq.ConnectID
165+
166+
klog.Infof("received CLOSE_REQ(id=%d)", connID)
167+
168+
if ctx, ok := a.connContext[connID]; ok {
169+
ctx.cleanup()
170+
} else {
171+
resp := &agent.Packet{
172+
Type: agent.PacketType_CLOSE_RSP,
173+
Payload: &agent.Packet_CloseResponse{CloseResponse: &agent.CloseResponse{}},
174+
}
175+
resp.GetCloseResponse().ConnectID = connID
176+
resp.GetCloseResponse().Error = "Unknown connectID"
177+
if err := a.stream.Send(resp); err != nil {
178+
klog.Warningf("close response send error: %v", err)
179+
continue
180+
}
139181
}
140182

141183
default:
@@ -144,8 +186,13 @@ func (a *AgentClient) Serve(stopCh <-chan struct{}) {
144186
}
145187
}
146188

147-
func (a *AgentClient) remoteToProxy(conn net.Conn, connID int64, cleanup func()) {
148-
defer cleanup()
189+
func (a *AgentClient) remoteToProxy(conn net.Conn, connID int64) {
190+
ctx := a.connContext[connID]
191+
if ctx == nil {
192+
return
193+
}
194+
195+
defer ctx.cleanup()
149196

150197
var buf [1 << 12]byte
151198
resp := &agent.Packet{
@@ -154,17 +201,13 @@ func (a *AgentClient) remoteToProxy(conn net.Conn, connID int64, cleanup func())
154201

155202
for {
156203
n, err := conn.Read(buf[:])
204+
157205
if err == io.EOF {
206+
klog.Info("connection EOF")
158207
return
159208
} else if err != nil {
160-
klog.Errorf("connection read error: %v", err)
161-
resp.Payload = &agent.Packet_Data{Data: &agent.Data{
162-
Error: err.Error(),
163-
ConnectID: connID,
164-
}}
165-
if err := a.stream.Send(resp); err != nil {
166-
klog.Warningf("stream send error: %v", err)
167-
}
209+
klog.Warningf("connection read error: %v", err)
210+
return
168211
} else {
169212
resp.Payload = &agent.Packet_Data{Data: &agent.Data{
170213
Data: buf[:n],
@@ -177,10 +220,15 @@ func (a *AgentClient) remoteToProxy(conn net.Conn, connID int64, cleanup func())
177220
}
178221
}
179222

180-
func (a *AgentClient) proxyToRemote(conn net.Conn, dataCh <-chan []byte, cleanup func()) {
181-
defer cleanup()
223+
func (a *AgentClient) proxyToRemote(conn net.Conn, connID int64) {
224+
ctx := a.connContext[connID]
225+
if ctx == nil {
226+
return
227+
}
228+
229+
defer ctx.cleanup()
182230

183-
for d := range dataCh {
231+
for d := range ctx.dataCh {
184232
pos := 0
185233
for {
186234
n, err := conn.Write(d[pos:])

0 commit comments

Comments
 (0)