Skip to content

Commit c981bd3

Browse files
authored
Merge pull request kubernetes-sigs#767 from kinvolk/imran/backport-prs-to-release-0.33
Backport PRs to release 0.33
2 parents 3714065 + 9367a46 commit c981bd3

File tree

5 files changed

+117
-79
lines changed

5 files changed

+117
-79
lines changed

pkg/agent/client.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -354,7 +354,13 @@ func (a *Client) Serve() {
354354
if status.Code(err) == codes.Canceled {
355355
klog.V(2).InfoS("stream canceled", "serverID", a.serverID, "agentID", a.agentID)
356356
} else {
357-
klog.ErrorS(err, "could not read stream", "serverID", a.serverID, "agentID", a.agentID)
357+
select {
358+
case <-a.stopCh:
359+
klog.V(5).InfoS("could not read stream because agent client is shutting down", "serverID", a.serverID, "agentID", a.agentID, "err", err)
360+
default:
361+
// If stopCh is not closed, this is a legitimate, unexpected error.
362+
klog.ErrorS(err, "could not read stream", "serverID", a.serverID, "agentID", a.agentID)
363+
}
358364
}
359365
return
360366
}
@@ -407,7 +413,13 @@ func (a *Client) Serve() {
407413
closePkt.GetCloseResponse().ConnectID = connID
408414
}
409415
if err := a.Send(closePkt); err != nil {
410-
klog.ErrorS(err, "close response failure", "")
416+
if err == io.EOF {
417+
klog.V(4).InfoS("received EOF; connection already closed", "connectionID", connID, "dialID", dialReq.Random, "err", err)
418+
} else if _, ok := a.connManager.Get(connID); !ok {
419+
klog.V(5).InfoS("connection already closed", "connectionID", connID, "dialID", dialReq.Random, "err", err)
420+
} else {
421+
klog.ErrorS(err, "close response failure", "connectionID", connID, "dialID", dialReq.Random)
422+
}
411423
}
412424
close(dataCh)
413425
a.connManager.Delete(connID)

pkg/server/backend_manager.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -299,10 +299,10 @@ func containIDType(idTypes []header.IdentifierType, idType header.IdentifierType
299299
// addBackend adds a backend.
300300
func (s *DefaultBackendStorage) addBackend(identifier string, idType header.IdentifierType, backend *Backend) {
301301
if !containIDType(s.idTypes, idType) {
302-
klog.V(4).InfoS("fail to add backend", "backend", identifier, "error", &ErrWrongIDType{idType, s.idTypes})
302+
klog.V(3).InfoS("fail to add backend", "backend", identifier, "error", &ErrWrongIDType{idType, s.idTypes})
303303
return
304304
}
305-
klog.V(5).InfoS("Register backend for agent", "agentID", identifier)
305+
klog.V(2).InfoS("Register backend for agent", "agentID", identifier)
306306
s.mu.Lock()
307307
defer s.mu.Unlock()
308308
_, ok := s.backends[identifier]
@@ -327,7 +327,7 @@ func (s *DefaultBackendStorage) removeBackend(identifier string, idType header.I
327327
klog.ErrorS(&ErrWrongIDType{idType, s.idTypes}, "fail to remove backend")
328328
return
329329
}
330-
klog.V(5).InfoS("Remove connection for agent", "agentID", identifier)
330+
klog.V(2).InfoS("Remove connection for agent", "agentID", identifier)
331331
s.mu.Lock()
332332
defer s.mu.Unlock()
333333
backends, ok := s.backends[identifier]
@@ -400,7 +400,7 @@ func (s *DefaultBackendStorage) GetRandomBackend() (*Backend, error) {
400400
return nil, &ErrNotFound{}
401401
}
402402
agentID := s.agentIDs[s.random.Intn(len(s.agentIDs))]
403-
klog.V(5).InfoS("Pick agent as backend", "agentID", agentID)
403+
klog.V(3).InfoS("Pick agent as backend", "agentID", agentID)
404404
// always return the first connection to an agent, because the agent
405405
// will close later connections if there are multiple.
406406
return s.backends[agentID][0], nil

pkg/server/server.go

Lines changed: 62 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,53 @@ const (
107107
destHostKey key = iota
108108
)
109109

110+
// mapDialErrorToHTTPStatus maps common TCP/network error strings to appropriate HTTP status codes
111+
func mapDialErrorToHTTPStatus(errStr string) int {
112+
// Convert to lowercase for case-insensitive matching
113+
errLower := strings.ToLower(errStr)
114+
115+
// Check each error pattern and return appropriate status code
116+
switch {
117+
// Timeouts - backend didn't respond in time -> 504 Gateway Timeout
118+
case strings.Contains(errLower, "i/o timeout"),
119+
strings.Contains(errLower, "deadline exceeded"),
120+
strings.Contains(errLower, "context deadline exceeded"),
121+
strings.Contains(errLower, "timeout"):
122+
return 504
123+
124+
// Resource exhaustion errors -> 503 Service Unavailable
125+
case strings.Contains(errLower, "too many open files"),
126+
strings.Contains(errLower, "socket: too many open files"):
127+
return 503
128+
129+
// Connection errors -> 502 Bad Gateway
130+
case strings.Contains(errLower, "connection refused"),
131+
strings.Contains(errLower, "connection reset by peer"),
132+
strings.Contains(errLower, "broken pipe"),
133+
strings.Contains(errLower, "network is unreachable"),
134+
strings.Contains(errLower, "no route to host"),
135+
strings.Contains(errLower, "host is unreachable"),
136+
strings.Contains(errLower, "network is down"):
137+
return 502
138+
139+
// DNS resolution failures -> 502 Bad Gateway
140+
case strings.Contains(errLower, "no such host"),
141+
strings.Contains(errLower, "name resolution"),
142+
strings.Contains(errLower, "lookup") && strings.Contains(errLower, "no such host"):
143+
return 502
144+
145+
// TLS/SSL errors -> 502 Bad Gateway
146+
case strings.Contains(errLower, "tls"),
147+
strings.Contains(errLower, "ssl"),
148+
strings.Contains(errLower, "certificate"):
149+
return 502
150+
151+
// Default to 502 Bad Gateway for unknown proxy errors
152+
default:
153+
return 502
154+
}
155+
}
156+
110157
func (c *ProxyClientConnection) send(pkt *client.Packet) error {
111158
defer func(start time.Time) { metrics.Metrics.ObserveFrontendWriteLatency(time.Since(start)) }(time.Now())
112159
if c.Mode == ModeGRPC {
@@ -121,11 +168,21 @@ func (c *ProxyClientConnection) send(pkt *client.Packet) error {
121168
_, err := c.HTTP.Write(pkt.GetData().Data)
122169
return err
123170
} else if pkt.Type == client.PacketType_DIAL_RSP {
124-
if pkt.GetDialResponse().Error != "" {
125-
body := bytes.NewBufferString(pkt.GetDialResponse().Error)
171+
dialErr := pkt.GetDialResponse().Error
172+
if dialErr != "" {
173+
// // Map the error to appropriate HTTP status code
174+
statusCode := mapDialErrorToHTTPStatus(dialErr)
175+
statusText := http.StatusText(statusCode)
176+
body := bytes.NewBufferString(dialErr)
126177
t := http.Response{
127-
StatusCode: 503,
178+
StatusCode: statusCode,
179+
Status: fmt.Sprintf("%d %s", statusCode, statusText),
128180
Body: io.NopCloser(body),
181+
Header: http.Header{
182+
"Content-Type": []string{"text/plain; charset=utf-8"},
183+
},
184+
Proto: "HTTP/1.1",
185+
ProtoMinor: 1,
129186
}
130187

131188
t.Write(c.HTTP)
@@ -718,7 +775,7 @@ func (s *ProxyServer) Connect(stream agent.AgentService_ConnectServer) error {
718775
}
719776
agentID := backend.GetAgentID()
720777

721-
klog.V(5).InfoS("Connect request from agent", "agentID", agentID, "serverID", s.serverID)
778+
klog.V(2).InfoS("Connect request from agent", "agentID", agentID, "serverID", s.serverID)
722779
labels := runpprof.Labels(
723780
"serverCount", strconv.Itoa(s.serverCount),
724781
"agentID", agentID,
@@ -945,7 +1002,7 @@ func (s *ProxyServer) serveRecvBackend(backend *Backend, agentID string, recvCh
9451002
klog.V(5).InfoS("Ignoring unrecognized packet from backend", "packet", pkt, "agentID", agentID)
9461003
}
9471004
}
948-
klog.V(5).InfoS("Close backend of agent", "agentID", agentID)
1005+
klog.V(3).InfoS("Close backend of agent", "agentID", agentID)
9491006
}
9501007

9511008
func (s *ProxyServer) sendBackendClose(backend *Backend, connectID int64, random int64, reason string) {

pkg/server/tunnel.go

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ func (t *Tunnel) ServeHTTP(w http.ResponseWriter, r *http.Request) {
4040
defer metrics.Metrics.HTTPConnectionDec()
4141

4242
klog.V(2).InfoS("Received request for host", "method", r.Method, "host", r.Host, "userAgent", r.UserAgent())
43-
if r.TLS != nil {
43+
if r.TLS != nil && len(r.TLS.PeerCertificates) > 0 {
4444
klog.V(2).InfoS("TLS", "commonName", r.TLS.PeerCertificates[0].Subject.CommonName)
4545
}
4646
if r.Method != http.MethodConnect {
@@ -60,14 +60,6 @@ func (t *Tunnel) ServeHTTP(w http.ResponseWriter, r *http.Request) {
6060
return
6161
}
6262

63-
// Send the HTTP 200 OK status after a successful hijack
64-
_, err = conn.Write([]byte("HTTP/1.1 200 Connection Established\r\n\r\n"))
65-
if err != nil {
66-
klog.ErrorS(err, "failed to send 200 connection established")
67-
conn.Close()
68-
return
69-
}
70-
7163
var closeOnce sync.Once
7264
defer closeOnce.Do(func() { conn.Close() })
7365

@@ -104,15 +96,23 @@ func (t *Tunnel) ServeHTTP(w http.ResponseWriter, r *http.Request) {
10496
connected: connected,
10597
start: time.Now(),
10698
backend: backend,
99+
dialID: random,
100+
agentID: backend.GetAgentID(),
107101
}
108102
t.Server.PendingDial.Add(random, connection)
109103
if err := backend.Send(dialRequest); err != nil {
110-
klog.ErrorS(err, "failed to tunnel dial request")
104+
klog.ErrorS(err, "failed to tunnel dial request", "host", r.Host, "dialID", connection.dialID, "agentID", connection.agentID)
105+
// Send proper HTTP error response
106+
conn.Write([]byte(fmt.Sprintf("HTTP/1.1 502 Bad Gateway\r\nContent-Type: text/plain; charset=utf-8\r\n\r\nFailed to tunnel dial request: %v\r\n", err)))
107+
conn.Close()
111108
return
112109
}
113110
ctxt := backend.Context()
114111
if ctxt.Err() != nil {
115-
klog.ErrorS(err, "context reports failure")
112+
klog.ErrorS(ctxt.Err(), "context reports failure")
113+
conn.Write([]byte(fmt.Sprintf("HTTP/1.1 502 Bad Gateway\r\nContent-Type: text/plain; charset=utf-8\r\n\r\nBackend context error: %v\r\n", ctxt.Err())))
114+
conn.Close()
115+
return
116116
}
117117

118118
select {
@@ -123,6 +123,15 @@ func (t *Tunnel) ServeHTTP(w http.ResponseWriter, r *http.Request) {
123123

124124
select {
125125
case <-connection.connected: // Waiting for response before we begin full communication.
126+
// Now that connection is established, send 200 OK to switch to tunnel mode
127+
_, err = conn.Write([]byte("HTTP/1.1 200 Connection Established\r\n\r\n"))
128+
if err != nil {
129+
klog.ErrorS(err, "failed to send 200 connection established", "host", r.Host, "agentID", connection.agentID)
130+
conn.Close()
131+
return
132+
}
133+
klog.V(3).InfoS("Connection established, sent 200 OK", "host", r.Host, "agentID", connection.agentID, "connectionID", connection.connectID)
134+
126135
case <-closed: // Connection was closed before being established
127136
}
128137

@@ -142,22 +151,22 @@ func (t *Tunnel) ServeHTTP(w http.ResponseWriter, r *http.Request) {
142151
conn.Close()
143152
}()
144153

145-
klog.V(3).InfoS("Starting proxy to host", "host", r.Host)
146-
pkt := make([]byte, 1<<15) // Match GRPC Window size
147-
148154
connID := connection.connectID
149155
agentID := connection.agentID
156+
klog.V(3).InfoS("Starting proxy to host", "host", r.Host, "agentID", agentID, "connectionID", connID)
157+
158+
pkt := make([]byte, 1<<15) // Match GRPC Window size
150159
var acc int
151160

152161
for {
153162
n, err := bufrw.Read(pkt[:])
154163
acc += n
155164
if err == io.EOF {
156-
klog.V(1).InfoS("EOF from host", "host", r.Host)
165+
klog.V(1).InfoS("EOF from host", "host", r.Host, "agentID", agentID, "connectionID", connID)
157166
break
158167
}
159168
if err != nil {
160-
klog.ErrorS(err, "Received failure on connection")
169+
klog.ErrorS(err, "Received failure on connection", "host", r.Host, "agentID", agentID, "connectionID", connID)
161170
break
162171
}
163172

@@ -172,7 +181,7 @@ func (t *Tunnel) ServeHTTP(w http.ResponseWriter, r *http.Request) {
172181
}
173182
err = backend.Send(packet)
174183
if err != nil {
175-
klog.ErrorS(err, "error sending packet")
184+
klog.ErrorS(err, "error sending packet", "host", r.Host, "agentID", agentID, "connectionID", connID)
176185
break
177186
}
178187
klog.V(5).InfoS("Forwarding data on tunnel to agent",

tests/proxy_test.go

Lines changed: 11 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -689,7 +689,7 @@ func TestFailedDNSLookupProxy_HTTPCONN(t *testing.T) {
689689
t.Error(err)
690690
}
691691

692-
urlString := "http://thissssssxxxxx.com:80"
692+
urlString := "http://thisdefinitelydoesnotexist.com:80"
693693
serverURL, _ := url.Parse(urlString)
694694

695695
// Send HTTP-Connect request
@@ -705,36 +705,12 @@ func TestFailedDNSLookupProxy_HTTPCONN(t *testing.T) {
705705
t.Errorf("reading HTTP response from CONNECT: %v", err)
706706
}
707707

708-
if res.StatusCode != 200 {
709-
t.Errorf("expect 200; got %d", res.StatusCode)
710-
}
711-
if br.Buffered() > 0 {
712-
t.Error("unexpected extra buffer")
713-
}
714-
dialer := func(_, _ string) (net.Conn, error) {
715-
return conn, nil
716-
}
717-
718-
c := &http.Client{
719-
Transport: &http.Transport{
720-
Dial: dialer,
721-
},
722-
}
723-
724-
resp, err := c.Get(urlString)
725-
if err != nil {
726-
t.Error(err)
708+
if res.StatusCode != 502 {
709+
t.Errorf("expect 502; got %d", res.StatusCode)
727710
}
728711

729-
if resp.StatusCode != 503 {
730-
t.Errorf("expect 503; got %d", res.StatusCode)
731-
}
732-
733-
body, err := io.ReadAll(resp.Body)
734-
resp.Body.Close()
735-
if !strings.Contains(err.Error(), "connection reset by peer") {
736-
t.Error(err)
737-
}
712+
body, err := io.ReadAll(res.Body)
713+
res.Body.Close()
738714

739715
if !strings.Contains(string(body), "no such host") {
740716
t.Errorf("Unexpected error: %v", err)
@@ -779,37 +755,21 @@ func TestFailedDial_HTTPCONN(t *testing.T) {
779755
br := bufio.NewReader(conn)
780756
res, err := http.ReadResponse(br, nil)
781757
if err != nil {
782-
t.Fatalf("reading HTTP response from CONNECT: %v", err)
783-
}
784-
if res.StatusCode != 200 {
785-
t.Fatalf("expect 200; got %d", res.StatusCode)
758+
t.Errorf("reading HTTP response from CONNECT: %v", err)
786759
}
787760

788-
dialer := func(_, _ string) (net.Conn, error) {
789-
return conn, nil
761+
if res.StatusCode != 502 {
762+
t.Errorf("expect 502; got %d", res.StatusCode)
790763
}
791764

792-
c := &http.Client{
793-
Transport: &http.Transport{
794-
Dial: dialer,
795-
},
796-
}
797-
798-
resp, err := c.Get(server.URL)
765+
body, err := io.ReadAll(res.Body)
766+
res.Body.Close()
799767
if err != nil {
800-
t.Fatal(err)
801-
}
802-
803-
body, err := io.ReadAll(resp.Body)
804-
resp.Body.Close()
805-
if err == nil {
806768
t.Fatalf("Expected error reading response body; response=%q", body)
807-
} else if !strings.Contains(err.Error(), "connection reset by peer") {
808-
t.Error(err)
809769
}
810770

811771
if !strings.Contains(string(body), "connection refused") {
812-
t.Errorf("Unexpected error: %v", err)
772+
t.Errorf("Expected 'connection refused' in error body, got: %s", string(body))
813773
}
814774

815775
if err := ps.Metrics().ExpectServerDialFailure(metricsserver.DialFailureErrorResponse, 1); err != nil {

0 commit comments

Comments
 (0)