@@ -19,10 +19,16 @@ package agentserver
19
19
import (
20
20
"fmt"
21
21
"io"
22
+ "math/rand"
22
23
"net"
24
+ "strconv"
25
+ "sync"
26
+ "time"
23
27
28
+ "google.golang.org/grpc/metadata"
24
29
"k8s.io/klog"
25
30
"sigs.k8s.io/apiserver-network-proxy/proto/agent"
31
+ "sigs.k8s.io/apiserver-network-proxy/proto/header"
26
32
)
27
33
28
34
// ProxyClientConnection...
@@ -56,22 +62,91 @@ func (c *ProxyClientConnection) send(pkt *agent.Packet) error {
56
62
57
63
// ProxyServer
58
64
type ProxyServer struct {
59
- Backend agent.AgentService_ConnectServer
65
+ mu sync.RWMutex //protects the following
66
+ // A map between agentID and its grpc connections.
67
+ // For a given agent, ProxyServer prefers backends[agentID][0] to send
68
+ // traffic, because backends[agentID][1:] are more likely to be closed
69
+ // by the agent to deduplicate connections to the same server.
70
+ backends map [string ][]agent.AgentService_ConnectServer
71
+ agentIDs []string
72
+ random * rand.Rand
60
73
61
74
// connID track
62
75
Frontends map [int64 ]* ProxyClientConnection
63
76
PendingDial map [int64 ]* ProxyClientConnection
77
+
78
+ serverID string // unique ID of this server
79
+ serverCount int // Number of proxy server instances, should be 1 unless it is a HA server.
80
+
64
81
}
65
82
66
83
var _ agent.AgentServiceServer = & ProxyServer {}
67
84
68
85
var _ agent.ProxyServiceServer = & ProxyServer {}
69
86
87
+ func (s * ProxyServer ) addBackend (agentID string , conn agent.AgentService_ConnectServer ) {
88
+ klog .Infof ("register Backend %v for agentID %s" , conn , agentID )
89
+ s .mu .Lock ()
90
+ defer s .mu .Unlock ()
91
+ _ , ok := s .backends [agentID ]
92
+ if ok {
93
+ s .backends [agentID ] = append (s .backends [agentID ], conn )
94
+ return
95
+ }
96
+ s .backends [agentID ] = []agent.AgentService_ConnectServer {conn }
97
+ s .agentIDs = append (s .agentIDs , agentID )
98
+ }
99
+
100
+ func (s * ProxyServer ) removeBackend (agentID string , conn agent.AgentService_ConnectServer ) {
101
+ klog .Infof ("remove Backend %v for agentID %s" , conn , agentID )
102
+ s .mu .Lock ()
103
+ defer s .mu .Unlock ()
104
+ backends , ok := s .backends [agentID ]
105
+ if ! ok {
106
+ klog .Warningf ("can't find agentID %s in the backends" , agentID )
107
+ return
108
+ }
109
+ var found bool
110
+ for i , c := range backends {
111
+ if c == conn {
112
+ s .backends [agentID ] = append (s .backends [agentID ][:i ], s .backends [agentID ][i + 1 :]... )
113
+ found = true
114
+ }
115
+ }
116
+ if len (s .backends [agentID ]) == 0 {
117
+ delete (s .backends , agentID )
118
+ for i := range s .agentIDs {
119
+ if s .agentIDs [i ] == agentID {
120
+ s .agentIDs [i ] = s .agentIDs [len (s .agentIDs )- 1 ]
121
+ s .agentIDs = s .agentIDs [:len (s .agentIDs )- 1 ]
122
+ break
123
+ }
124
+ }
125
+ }
126
+ if ! found {
127
+ klog .Warningf ("can't find conn %v for agentID %s in the backends" , conn , agentID )
128
+ }
129
+ }
130
+
131
+ func (s * ProxyServer ) randomBackend () (agent.AgentService_ConnectServer , error ) {
132
+ s .mu .RLock ()
133
+ defer s .mu .RUnlock ()
134
+ if len (s .backends ) == 0 {
135
+ return nil , fmt .Errorf ("No backend available" )
136
+ }
137
+ agentID := s .agentIDs [s .random .Intn (len (s .agentIDs ))]
138
+ return s .backends [agentID ][0 ], nil
139
+ }
140
+
70
141
// NewProxyServer creates a new ProxyServer instance
71
- func NewProxyServer () * ProxyServer {
142
+ func NewProxyServer (serverID string , serverCount int ) * ProxyServer {
72
143
return & ProxyServer {
73
144
Frontends : make (map [int64 ]* ProxyClientConnection ),
74
145
PendingDial : make (map [int64 ]* ProxyClientConnection ),
146
+ serverID : serverID ,
147
+ serverCount : serverCount ,
148
+ backends : make (map [string ][]agent.AgentService_ConnectServer ),
149
+ random : rand .New (rand .NewSource (time .Now ().UTC ().UnixNano ())),
75
150
}
76
151
}
77
152
@@ -113,17 +188,25 @@ func (s *ProxyServer) serveRecvFrontend(stream agent.ProxyService_ProxyServer, r
113
188
klog .Info ("start serving frontend stream" )
114
189
115
190
var firstConnID int64
191
+ // The first packet should be a DIAL_REQ, we will randomly get a
192
+ // backend from s.backends then.
193
+ var backend agent.AgentService_ConnectServer
194
+ var err error
116
195
117
196
for pkt := range recvCh {
118
197
switch pkt .Type {
119
198
case agent .PacketType_DIAL_REQ :
120
199
klog .Info (">>> Received DIAL_REQ" )
121
- if s .Backend == nil {
122
- klog .Info (">>> No backend found; drop" )
200
+ // TODO: if we track what agent has historically served
201
+ // the address, then we can send the Dial_REQ to the
202
+ // same agent. That way we save the agent from creating
203
+ // a new connection to the address.
204
+ backend , err = s .randomBackend ()
205
+ if err != nil {
206
+ klog .Errorf (">>> failed to get a backend: %v" , err )
123
207
continue
124
208
}
125
-
126
- if err := s .Backend .Send (pkt ); err != nil {
209
+ if err := backend .Send (pkt ); err != nil {
127
210
klog .Warningf (">>> DIAL_REQ to Backend failed: %v" , err )
128
211
}
129
212
s .PendingDial [pkt .GetDialRequest ().Random ] = & ProxyClientConnection {
@@ -134,16 +217,17 @@ func (s *ProxyServer) serveRecvFrontend(stream agent.ProxyService_ProxyServer, r
134
217
klog .Info (">>> DIAL_REQ sent to backend" ) // got this. but backend didn't receive anything.
135
218
136
219
case agent .PacketType_CLOSE_REQ :
137
- klog .Infof (">>> Received CLOSE_REQ(id=%d)" , pkt .GetCloseRequest ().ConnectID )
138
- if s .Backend == nil {
139
- klog .Info (">>> No backend found; drop" )
220
+ connID := pkt .GetCloseRequest ().ConnectID
221
+ klog .Infof (">>> Received CLOSE_REQ(id=%d)" , connID )
222
+ if backend == nil {
223
+ klog .Errorf ("backend has not been initialized for connID %d. Client should send a Dial Request first." , connID )
140
224
continue
141
225
}
142
-
143
- if err := s . Backend . Send ( pkt ); err != nil {
226
+ if err := backend . Send ( pkt ); err != nil {
227
+ // TODO: retry with other backends connecting to this agent.
144
228
klog .Warningf (">>> CLOSE_REQ to Backend failed: %v" , err )
145
229
}
146
- klog .Info ("CLOSE_REQ sent to backend" )
230
+ klog .Info (">>> CLOSE_REQ sent to backend" )
147
231
148
232
case agent .PacketType_DATA :
149
233
connID := pkt .GetData ().ConnectID
@@ -154,12 +238,12 @@ func (s *ProxyServer) serveRecvFrontend(stream agent.ProxyService_ProxyServer, r
154
238
klog .Warningf (">>> Data(id=%d) doesn't match first connection id %d" , firstConnID , connID )
155
239
}
156
240
157
- if s . Backend == nil {
158
- klog .Info ( ">>> No backend found; drop" )
241
+ if backend == nil {
242
+ klog .Errorf ( "backend has not been initialized for connID %d. Client should send a Dial Request first." , connID )
159
243
continue
160
244
}
161
-
162
- if err := s . Backend . Send ( pkt ); err != nil {
245
+ if err := backend . Send ( pkt ); err != nil {
246
+ // TODO: retry with other backends connecting to this agent.
163
247
klog .Warningf (">>> DATA to Backend failed: %v" , err )
164
248
}
165
249
klog .Info (">>> DATA sent to backend" )
@@ -179,10 +263,13 @@ func (s *ProxyServer) serveRecvFrontend(stream agent.ProxyService_ProxyServer, r
179
263
},
180
264
},
181
265
}
182
- if s .Backend != nil {
183
- if err := s .Backend .Send (pkt ); err != nil {
184
- klog .Warningf (">>> CLOSE_REQ to Backend failed: %v" , err )
185
- }
266
+
267
+ if backend == nil {
268
+ klog .Errorf ("backend has not been initialized for connID %d. Client should send a Dial Request first." , firstConnID )
269
+ return
270
+ }
271
+ if err := backend .Send (pkt ); err != nil {
272
+ klog .Warningf (">>> CLOSE_REQ to Backend failed: %v" , err )
186
273
}
187
274
}
188
275
@@ -196,20 +283,36 @@ func (s *ProxyServer) serveSend(stream agent.ProxyService_ProxyServer, sendCh <-
196
283
}
197
284
}
198
285
286
+ func agentID (stream agent.AgentService_ConnectServer ) (string , error ) {
287
+ md , ok := metadata .FromIncomingContext (stream .Context ())
288
+ if ! ok {
289
+ return "" , fmt .Errorf ("failed to get context" )
290
+ }
291
+ agentIDs := md .Get (header .AgentID )
292
+ if len (agentIDs ) != 1 {
293
+ return "" , fmt .Errorf ("expected one agent ID in the context, got %v" , agentIDs )
294
+ }
295
+ return agentIDs [0 ], nil
296
+ }
297
+
199
298
// Connect is for agent to connect to ProxyServer as next hop
200
299
func (s * ProxyServer ) Connect (stream agent.AgentService_ConnectServer ) error {
201
- klog .Info ("connect request from Backend" )
300
+ agentID , err := agentID (stream )
301
+ if err != nil {
302
+ return err
303
+ }
304
+ klog .Infof ("Connect request from agent %s" , agentID )
305
+ s .addBackend (agentID , stream )
306
+ defer s .removeBackend (agentID , stream )
307
+
308
+ h := metadata .Pairs (header .ServerID , s .serverID , header .ServerCount , strconv .Itoa (s .serverCount ))
309
+ if err := stream .SendHeader (h ); err != nil {
310
+ return err
311
+ }
202
312
203
313
recvCh := make (chan * agent.Packet , 10 )
204
314
stopCh := make (chan error )
205
315
206
- klog .Infof ("register Backend %v" , stream )
207
- s .Backend = stream
208
- defer func () {
209
- klog .Infof ("unregister Backend %v" , stream )
210
- s .Backend = nil
211
- }()
212
-
213
316
go s .serveRecvBackend (stream , recvCh )
214
317
215
318
defer func () {
@@ -225,6 +328,7 @@ func (s *ProxyServer) Connect(stream agent.AgentService_ConnectServer) error {
225
328
}
226
329
if err != nil {
227
330
klog .Warningf ("stream read error: %v" , err )
331
+ close (stopCh )
228
332
return
229
333
}
230
334
0 commit comments