Skip to content

Commit b82dffb

Browse files
authored
Merge pull request #22 from anfernee/improve
Several improvements: unit test, logging, comments
2 parents 856ad2a + 1d53e53 commit b82dffb

File tree

7 files changed

+425
-61
lines changed

7 files changed

+425
-61
lines changed

cmd/client/main.go

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"net"
2828
"net/http"
2929
"os"
30+
"os/signal"
3031

3132
"github.com/spf13/cobra"
3233
"github.com/spf13/pflag"
@@ -68,7 +69,6 @@ type GrpcProxyClientOptions struct {
6869
proxyHost string
6970
proxyPort int
7071
mode string
71-
7272
}
7373

7474
func (o *GrpcProxyClientOptions) Flags() *pflag.FlagSet {
@@ -83,6 +83,7 @@ func (o *GrpcProxyClientOptions) Flags() *pflag.FlagSet {
8383
flags.StringVar(&o.proxyHost, "proxyHost", o.proxyHost, "The host of the proxy server.")
8484
flags.IntVar(&o.proxyPort, "proxyPort", o.proxyPort, "The port the proxy server is listening on.")
8585
flags.StringVar(&o.mode, "mode", o.mode, "Mode can be either 'grpc' or 'http-connect'.")
86+
8687
return flags
8788
}
8889

@@ -130,9 +131,6 @@ func (o *GrpcProxyClientOptions) Validate() error {
130131
if o.requestPort > 49151 {
131132
return fmt.Errorf("please do not try to use ephemeral port %d for the request server port", o.requestPort)
132133
}
133-
if o.requestPort < 1024 {
134-
return fmt.Errorf("please do not try to use reserved port %d for the request server port", o.requestPort)
135-
}
136134
if o.proxyPort > 49151 {
137135
return fmt.Errorf("please do not try to use ephemeral port %d for the proxy server port", o.proxyPort)
138136
}
@@ -182,7 +180,6 @@ func (c *Client) run(o *GrpcProxyClientOptions) error {
182180
// Run remote simple http service on server side as
183181
// "python -m SimpleHTTPServer"
184182

185-
186183
dialer, err := c.getDialer(o)
187184
if err != nil {
188185
return fmt.Errorf("failed to get dialer for client, got %v", err)
@@ -202,6 +199,8 @@ func (c *Client) run(o *GrpcProxyClientOptions) error {
202199
if err != nil {
203200
return fmt.Errorf("failed to send request to client, got %v", err)
204201
}
202+
defer response.Body.Close() // TODO: proxy server should handle the case where Body isn't closed.
203+
205204
data, err := ioutil.ReadAll(response.Body)
206205
if err != nil {
207206
return fmt.Errorf("failed to read response from client, got %v", err)
@@ -233,6 +232,17 @@ func (c *Client) getDialer(o *GrpcProxyClientOptions) (func(ctx context.Context,
233232
})
234233

235234
var proxyConn net.Conn
235+
236+
// Setup signal handler
237+
ch := make(chan os.Signal, 1)
238+
signal.Notify(ch)
239+
240+
go func() {
241+
<-ch
242+
err := proxyConn.Close()
243+
klog.Infof("connection closed: %v", err)
244+
}()
245+
236246
switch o.mode {
237247
case "grpc":
238248
dialOption := grpc.WithTransportCredentials(transportCreds)

pkg/agent/agentclient/client.go

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ import (
2828
"sigs.k8s.io/apiserver-network-proxy/proto/agent"
2929
)
3030

31+
// AgentClient runs on the node network side. It connects to proxy server and establishes
32+
// a stream connection from which it sends and receives network traffic.
3133
type AgentClient struct {
3234
nextConnID int64
3335
connContext map[int64]*connContext
@@ -36,6 +38,17 @@ type AgentClient struct {
3638
stream agent.AgentService_ConnectClient
3739
}
3840

41+
// NewAgentClient creates an AgentClient
42+
func NewAgentClient(address string) *AgentClient {
43+
a := &AgentClient{
44+
connContext: make(map[int64]*connContext),
45+
address: address,
46+
}
47+
48+
return a
49+
}
50+
51+
// connContext tracks a connection from agent to node network.
3952
type connContext struct {
4053
conn net.Conn
4154
cleanFunc func()
@@ -47,15 +60,14 @@ func (c *connContext) cleanup() {
4760
c.cleanOnce.Do(c.cleanFunc)
4861
}
4962

50-
func NewAgentClient(address string) *AgentClient {
51-
a := &AgentClient{
52-
connContext: make(map[int64]*connContext),
53-
address: address,
54-
}
55-
56-
return a
57-
}
58-
63+
// Connect connnects to proxy server to establish a gRPC stream,
64+
// on which the proxied traffic is multiplexed through the stream
65+
// and piped to the local connection. It register itself as a
66+
// backend from proxy server, so proxy server will route traffic
67+
// to this agent.
68+
//
69+
// The caller needs to call Serve to start serving proxy requests
70+
// coming from proxy server.
5971
func (a *AgentClient) Connect(opts ...grpc.DialOption) error {
6072
c, err := grpc.Dial(a.address, opts...)
6173
if err != nil {
@@ -72,6 +84,10 @@ func (a *AgentClient) Connect(opts ...grpc.DialOption) error {
7284
return nil
7385
}
7486

87+
// Serve starts to serve proxied requests from proxy server over the
88+
// gRPC stream. Successful Connect is required before Serve. The
89+
// The requests include things like opening a connection to a server,
90+
// streaming data and close the connection.
7591
func (a *AgentClient) Serve(stopCh <-chan struct{}) {
7692
for {
7793
select {
@@ -81,8 +97,6 @@ func (a *AgentClient) Serve(stopCh <-chan struct{}) {
8197
default:
8298
}
8399

84-
klog.Info("waiting packets...")
85-
86100
pkt, err := a.stream.Recv()
87101
if err == io.EOF {
88102
klog.Info("received EOF, exit")
@@ -121,6 +135,7 @@ func (a *AgentClient) Serve(stopCh <-chan struct{}) {
121135
conn: conn,
122136
dataCh: dataCh,
123137
cleanFunc: func() {
138+
klog.Infof("close connection(id=%d)", connID)
124139
resp := &agent.Packet{
125140
Type: agent.PacketType_CLOSE_RSP,
126141
Payload: &agent.Packet_CloseResponse{CloseResponse: &agent.CloseResponse{}},
@@ -201,11 +216,13 @@ func (a *AgentClient) remoteToProxy(conn net.Conn, connID int64) {
201216

202217
for {
203218
n, err := conn.Read(buf[:])
219+
klog.Infof("received %d bytes from proxy server", n)
204220

205221
if err == io.EOF {
206222
klog.Info("connection EOF")
207223
return
208224
} else if err != nil {
225+
// Normal when receive a CLOSE_REQ
209226
klog.Warningf("connection read error: %v", err)
210227
return
211228
} else {

pkg/agent/agentserver/server.go

Lines changed: 73 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -39,14 +39,16 @@ func (c *ProxyClientConnection) send(pkt *agent.Packet) error {
3939
stream := c.Grpc
4040
return stream.Send(pkt)
4141
} else if c.Mode == "http-connect" {
42-
if pkt.Type != agent.PacketType_DATA {
43-
return nil
42+
if pkt.Type == agent.PacketType_CLOSE_RSP {
43+
return c.Http.Close()
44+
} else if pkt.Type == agent.PacketType_DATA {
45+
_, err := c.Http.Write(pkt.GetData().Data)
46+
return err
47+
} else {
48+
return fmt.Errorf("attempt to send via unrecognized connection type %v", pkt.Type)
4449
}
45-
writer := c.Http
46-
_, err := writer.Write(pkt.GetData().Data)
47-
return err
4850
} else {
49-
return fmt.Errorf("attempt to send via unrecognized connection type %q", c.Mode)
51+
return fmt.Errorf("attempt to send via unrecognized connection mode %q", c.Mode)
5052
}
5153
}
5254

@@ -84,6 +86,7 @@ func (s *ProxyServer) Proxy(stream agent.ProxyService_ProxyServer) error {
8486
close(recvCh)
8587
}()
8688

89+
// Start goroutine to receive packets from frontend and push to recvCh
8790
go func() {
8891
for {
8992
in, err := stream.Recv()
@@ -92,7 +95,8 @@ func (s *ProxyServer) Proxy(stream agent.ProxyService_ProxyServer) error {
9295
return
9396
}
9497
if err != nil {
95-
klog.Warningf("stream read error: %v", err)
98+
klog.Warningf(">>> Stream read from frontend error: %v", err)
99+
close(stopCh)
96100
return
97101
}
98102

@@ -104,57 +108,82 @@ func (s *ProxyServer) Proxy(stream agent.ProxyService_ProxyServer) error {
104108
}
105109

106110
func (s *ProxyServer) serveRecvFrontend(stream agent.ProxyService_ProxyServer, recvCh <-chan *agent.Packet) {
107-
klog.Info("start serve recv ...")
111+
klog.Info("start serving frontend stream")
112+
113+
var firstConnID int64
114+
108115
for pkt := range recvCh {
109116
switch pkt.Type {
110117
case agent.PacketType_DIAL_REQ:
111-
klog.Info("received DIAL_REQ")
118+
klog.Info(">>> Received DIAL_REQ")
112119
if s.Backend == nil {
113-
klog.Info("no backend found; drop")
120+
klog.Info(">>> No backend found; drop")
114121
continue
115122
}
116123

117124
if err := s.Backend.Send(pkt); err != nil {
118-
klog.Warningf("send packet to Backend failed: %v", err)
125+
klog.Warningf(">>> DIAL_REQ to Backend failed: %v", err)
119126
}
120127
s.PendingDial[pkt.GetDialRequest().Random] = &ProxyClientConnection{
121128
Mode: "grpc",
122129
Grpc: stream,
123130
connected: make(chan struct{}),
124131
}
125-
klog.Info("DIAL_REQ sent to backend") // got this. but backend didn't receive anything.
132+
klog.Info(">>> DIAL_REQ sent to backend") // got this. but backend didn't receive anything.
126133

127134
case agent.PacketType_CLOSE_REQ:
128-
klog.Infof("received CLOSE_REQ(id=%d)", pkt.GetCloseRequest().ConnectID)
135+
klog.Infof(">>> Received CLOSE_REQ(id=%d)", pkt.GetCloseRequest().ConnectID)
129136
if s.Backend == nil {
130-
klog.Info("no backend found; drop")
137+
klog.Info(">>> No backend found; drop")
131138
continue
132139
}
133140

134141
if err := s.Backend.Send(pkt); err != nil {
135-
klog.Warningf("send packet to Backend failed: %v", err)
142+
klog.Warningf(">>> CLOSE_REQ to Backend failed: %v", err)
136143
}
137144
klog.Info("CLOSE_REQ sent to backend")
138145

139146
case agent.PacketType_DATA:
140-
klog.Infof("received DATA(id=%d)", pkt.GetData().ConnectID)
147+
connID := pkt.GetData().ConnectID
148+
klog.Infof(">>> Received DATA(id=%d)", connID)
149+
if firstConnID == 0 {
150+
firstConnID = connID
151+
} else if firstConnID != connID {
152+
klog.Warningf(">>> Data(id=%d) doesn't match first connection id %d", firstConnID, connID)
153+
}
154+
141155
if s.Backend == nil {
142-
klog.Info("no backend found; drop")
156+
klog.Info(">>> No backend found; drop")
143157
continue
144158
}
145159

146160
if err := s.Backend.Send(pkt); err != nil {
147-
klog.Warningf("send packet to Backend failed: %v", err)
161+
klog.Warningf(">>> DATA to Backend failed: %v", err)
148162
}
149-
klog.Info("DATA sent to backend")
163+
klog.Info(">>> DATA sent to backend")
150164

151165
default:
152-
klog.Infof("Ignore %v packet coming from frontend", pkt.Type)
166+
klog.Infof(">>> Ignore %v packet coming from frontend", pkt.Type)
167+
}
168+
}
169+
170+
klog.Infof(">>> Close streaming (id=%d)", firstConnID)
171+
172+
pkt := &agent.Packet{
173+
Type: agent.PacketType_CLOSE_REQ,
174+
Payload: &agent.Packet_CloseRequest{
175+
CloseRequest: &agent.CloseRequest{
176+
ConnectID: firstConnID,
177+
},
178+
},
179+
}
180+
if s.Backend != nil {
181+
if err := s.Backend.Send(pkt); err != nil {
182+
klog.Warningf(">>> CLOSE_REQ to Backend failed: %v", err)
153183
}
154184
}
155185
}
156186

157-
// Ignored now
158187
func (s *ProxyServer) serveSend(stream agent.ProxyService_ProxyServer, sendCh <-chan *agent.Packet) {
159188
klog.Info("start serve send ...")
160189
for pkt := range sendCh {
@@ -206,40 +235,56 @@ func (s *ProxyServer) Connect(stream agent.AgentService_ConnectServer) error {
206235

207236
// route the packet back to the correct client
208237
func (s *ProxyServer) serveRecvBackend(stream agent.AgentService_ConnectServer, recvCh <-chan *agent.Packet) {
238+
var firstConnID int64
239+
209240
for pkt := range recvCh {
210241
switch pkt.Type {
211242
case agent.PacketType_DIAL_RSP:
212243
resp := pkt.GetDialResponse()
213-
klog.Warningf("Received dial response for %d, connectID is %d", resp.Random, resp.ConnectID)
244+
firstConnID = resp.ConnectID
245+
klog.Infof("<<< Received DIAL_RSP(rand=%d, id=%d)", resp.Random, resp.ConnectID)
246+
214247
if client, ok := s.PendingDial[resp.Random]; !ok {
215-
klog.Warning("DialResp not recognized; dropped")
248+
klog.Warning("<<< DialResp not recognized; dropped")
216249
} else {
217250
err := client.send(pkt)
218251
delete(s.PendingDial, resp.Random)
219252
if err != nil {
220-
klog.Warningf("dial response send to client stream error: %v", err)
253+
klog.Warningf("<<< DIAL_RSP send to client stream error: %v", err)
221254
} else {
222255
client.connectID = resp.ConnectID
223256
s.Frontends[resp.ConnectID] = client
224257
close(client.connected)
225258
}
226259
}
260+
227261
case agent.PacketType_DATA:
228262
resp := pkt.GetData()
263+
klog.Infof("<<< Received DATA(id=%d)", resp.ConnectID)
229264
if client, ok := s.Frontends[resp.ConnectID]; ok {
230265
if err := client.send(pkt); err != nil {
231-
klog.Warningf("data send to client stream error: %v", err)
266+
klog.Warningf("<<< DATA send to client stream error: %v", err)
267+
} else {
268+
klog.Infof("<<< DATA sent to frontend")
232269
}
233270
}
271+
234272
case agent.PacketType_CLOSE_RSP:
235273
resp := pkt.GetCloseResponse()
274+
klog.Infof("<<< Received CLOSE_RSP(id=%d)", resp.ConnectID)
236275
if client, ok := s.Frontends[resp.ConnectID]; ok {
237276
if err := client.send(pkt); err != nil {
238-
klog.Warningf("close response send to client stream error: %v", err)
277+
// Normal when frontend closes it.
278+
klog.Warningf("<<< CLOSE_RSP send to client stream error: %v", err)
279+
} else {
280+
klog.Infof("<<< CLOSE_RSP sent to frontend")
239281
}
240282
}
283+
241284
default:
242-
klog.Warningf("unrecognized packet %+v", pkt)
285+
klog.Warningf("<<< Unrecognized packet %+v", pkt)
243286
}
244287
}
245-
}
288+
289+
klog.Infof("<<< Close streaming (id=%d)", firstConnID)
290+
}

pkg/agent/agentserver/tunnel.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,16 +99,18 @@ func (t *Tunnel) ServeHTTP(w http.ResponseWriter, r *http.Request) {
9999

100100
klog.Infof("Starting proxy to %q", r.Host)
101101
pkt := make([]byte, 1<<12)
102+
102103
for {
103104
n, err := conn.Read(pkt[:])
104105
if err == io.EOF {
105-
// TODO: Close remote..
106+
klog.Warningf("EOF from %v", r.Host)
106107
break
107108
}
108109
if err != nil {
109110
klog.Errorf("Received error on connection %v", err)
110111
break
111112
}
113+
112114
packet := &agent.Packet{
113115
Type: agent.PacketType_DATA,
114116
Payload: &agent.Packet_Data{

0 commit comments

Comments
 (0)