Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions pkg/port-forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,15 +116,25 @@ func handleConnection(con net.Conn, req *PortForwardRequest) {
log.Error("error closing connection: ", err)
}
fmt.Printf("Connection closed from %s => %d\n", con.RemoteAddr().String(), req.Port)
var e *websocket.CloseError
if errors.As(errRet, &e) && e.Code != websocket.CloseNormalClosure {
log.Error("connection terminated badly with ", e)
if IsPermanentCloseError(errRet) {
log.Error("Port-forward connection rejected: check your permissions or run 'qovery auth'")
} else if IsAgentResponseTimeout(errRet) {
log.Warnf("Port-forward timed out (agent could not reach the pod or set up the forward). Reconnect to try again.")
} else if IsInternalServerError(errRet) {
log.Warnf("%s Reconnect to try again.", ServiceUnavailableMessage("Port-forward"))
} else if errRet != nil {
var e *websocket.CloseError
if !errors.As(errRet, &e) || e.Code != websocket.CloseNormalClosure {
log.Error("Port-forward connection terminated: ", errRet)
}
}
}()

wsConn, err := mkWebsocketConn(req)
if err != nil {
log.Fatal("error while creating websocket connection", err)
errRet = err
log.Errorf("error while creating websocket connection: %v", err)
return
}
defer func() {
if err := wsConn.ws.Close(); err != nil {
Expand Down
47 changes: 35 additions & 12 deletions pkg/shell.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ import (
const StdinBufferSize = 4096
const ReconnectDelay = 5 * time.Second
const PingInterval = 30 * time.Second
const ReadTimeout = 60 * time.Second

// ReadTimeout must be > 2 × PingInterval so that a healthy connection always receives a pong
// before the deadline fires. The pong handler resets the deadline on every pong received.
const ReadTimeout = 75 * time.Second

type TerminalSize interface {
SetTtySize(width uint16, height uint16)
Expand Down Expand Up @@ -104,7 +107,7 @@ func ExecShell(req TerminalSize, path string) {

done := make(chan struct{})
wg.Add(1)
go readWebsocketConnection(ctx, wsConn, currentConsole, done, &normalExit, &wg)
go readWebsocketConnection(ctx, cancel, wsConn, currentConsole, done, &normalExit, &wg)

pingTicker := time.NewTicker(PingInterval)

Expand Down Expand Up @@ -144,7 +147,9 @@ func ExecShell(req TerminalSize, path string) {
}

// Do NOT close stdIn — readUserConsole owns it and it is used across reconnects.
time.Sleep(ReconnectDelay)
if ctx.Err() == nil && !normalExit.Load() && !userCancelled.Load() {
time.Sleep(ReconnectDelay)
}
}

wg.Wait()
Expand Down Expand Up @@ -173,7 +178,7 @@ func createWebsocketConn(req interface{}, path string) (*websocket.Conn, error)
return conn, err
}

func readWebsocketConnection(ctx context.Context, wsConn *websocket.Conn, currentConsole console.Console, done chan struct{}, normalExit *atomic.Bool, wg *sync.WaitGroup) {
func readWebsocketConnection(ctx context.Context, cancel context.CancelFunc, wsConn *websocket.Conn, currentConsole console.Console, done chan struct{}, normalExit *atomic.Bool, wg *sync.WaitGroup) {
defer wg.Done()

var once sync.Once
Expand All @@ -189,6 +194,16 @@ func readWebsocketConnection(ctx context.Context, wsConn *websocket.Conn, curren
}
defer safeClose()

// Set an initial read deadline. The pong handler refreshes it on every
// pong so that idle-but-healthy sessions are not torn down; only truly
// dead connections (no pong for ReadTimeout) are detected and closed.
_ = wsConn.SetReadDeadline(time.Now().Add(ReadTimeout))
// SetReadDeadline failure in the pong handler would surface as a ReadMessage error on
// the next iteration, but cannot happen on a healthy net.Conn.
wsConn.SetPongHandler(func(string) error {
return wsConn.SetReadDeadline(time.Now().Add(ReadTimeout))
})

for {
select {
case <-ctx.Done():
Expand All @@ -197,16 +212,24 @@ func readWebsocketConnection(ctx context.Context, wsConn *websocket.Conn, curren
msgType, msg, err := wsConn.ReadMessage()
if err != nil {
var e *websocket.CloseError
if errors.As(err, &e) {
if e.Code == websocket.CloseNormalClosure {
log.Info("** shell terminated bye **")
normalExit.Store(true)
} else {
log.Errorf("connection closed by server: %v", e)
}
if !errors.As(err, &e) {
log.Errorf("error while reading on websocket: %v", err)
return
}
log.Errorf("error while reading on websocket: %v", err)
switch {
case e.Code == websocket.CloseNormalClosure:
log.Info("** shell terminated bye **")
normalExit.Store(true)
case e.Code == 1007 || e.Code == 1008: // same as IsPermanentCloseError
log.Errorf("Shell connection rejected: check your permissions or run 'qovery auth'")
cancel()
case IsAgentResponseTimeout(err): // must come before generic 1011 branch
log.Warnf("Shell session timed out while the agent was preparing your connection. Retrying...")
case e.Code == 1011:
log.Warnf("%s Retrying...", ServiceUnavailableMessage("Shell"))
default:
log.Errorf("connection closed by server: %v", e)
}
return
}

Expand Down
68 changes: 68 additions & 0 deletions pkg/wserror.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package pkg

import (
"errors"
"strings"

"github.com/gorilla/websocket"
)

// IsPermanentCloseError returns true if the websocket close error should NOT
// be retried (permission denied, auth/policy violation).
// Transient errors (abnormal closure, going away, internal server error) return false.
func IsPermanentCloseError(err error) bool {
var closeErr *websocket.CloseError
if !errors.As(err, &closeErr) {
return false
}
switch closeErr.Code {
case 1007: // Invalid frame payload data — used by gateway for permission errors
return true
case 1008: // Policy Violation — used for auth/token errors
return true
default:
return false
}
}

// IsInternalServerError returns true if the websocket close error is code 1011 (Internal Error).
func IsInternalServerError(err error) bool {
var closeErr *websocket.CloseError
if !errors.As(err, &closeErr) {
return false
}
return closeErr.Code == 1011
}

// IsAgentResponseTimeout returns true if the websocket close error indicates
// that K8s operations on the shell-agent side timed out, or that the gateway
// timed out waiting for the agent to respond. All are transient and resolve
// once the pod's Kubernetes exec API is responsive again.
//
// IsAgentResponseTimeout is a strict subset of IsInternalServerError (both match close code 1011).
// Always check IsAgentResponseTimeout before IsInternalServerError, otherwise the specific timeout
// message is swallowed by the generic 1011 branch.
//
// Matched substrings and their sources:
// - "exceeded for receiving agent response" — gateway wait (shell_gateway.rs DEFAULT_AGENT_RESPONSE_TIMEOUT)
// - "while connecting to pod" — shell-agent K8s exec timeout (shell.rs KUBE_OPERATION_TIMEOUT)
// - "while setting up port forward" — shell-agent K8s port-forward timeout (port_forward.rs KUBE_PORT_FORWARD_TIMEOUT)
// - "Retry budget exhausted" — shell-agent retry budget guard (shell.rs / port_forward.rs)
func IsAgentResponseTimeout(err error) bool {
var closeErr *websocket.CloseError
if !errors.As(err, &closeErr) {
return false
}
if closeErr.Code != 1011 {
return false
}
return strings.Contains(closeErr.Text, "exceeded for receiving agent response") ||
strings.Contains(closeErr.Text, "while connecting to pod") ||
strings.Contains(closeErr.Text, "while setting up port forward") ||
strings.Contains(closeErr.Text, "Retry budget exhausted")
}

// ServiceUnavailableMessage returns a user-friendly message when the cluster agent is unreachable.
func ServiceUnavailableMessage(feature string) string {
return feature + " is not available. Please verify that the cluster hosting this service is running and healthy."
}
154 changes: 154 additions & 0 deletions pkg/wserror_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package pkg

import (
"errors"
"fmt"
"strings"
"testing"

"github.com/gorilla/websocket"
)

func TestIsAgentResponseTimeout(t *testing.T) {
tests := []struct {
name string
err error
want bool
}{
{
name: "1011 gateway wait timeout",
err: &websocket.CloseError{Code: 1011, Text: "Deadline of 90s exceeded for receiving agent response"},
want: true,
},
{
name: "1011 shell-agent K8s exec timeout",
err: &websocket.CloseError{Code: 1011, Text: "Timed out after 45s while connecting to pod"},
want: true,
},
{
name: "1011 shell-agent K8s port-forward timeout",
err: &websocket.CloseError{Code: 1011, Text: "Timed out after 45s while setting up port forward"},
want: true,
},
{
name: "1011 shell-agent retry budget exhausted (exec)",
err: &websocket.CloseError{Code: 1011, Text: "Retry budget exhausted: only 2s remaining, need at least 45s for K8s exec setup"},
want: true,
},
{
name: "1011 shell-agent retry budget exhausted (port-forward)",
err: &websocket.CloseError{Code: 1011, Text: "Retry budget exhausted: only 1s remaining, need at least 45s for K8s port-forward setup"},
want: true,
},
{
name: "1011 with different reason falls through to IsInternalServerError",
err: &websocket.CloseError{Code: 1011, Text: "some other internal error"},
want: false,
},
{
name: "wrong close code",
err: &websocket.CloseError{Code: 1007, Text: "exceeded for receiving agent response"},
want: false,
},
{
name: "non-websocket error",
err: errors.New("plain network error"),
want: false,
},
{
name: "wrapped 1011 gateway timeout",
err: fmt.Errorf("read failed: %w", &websocket.CloseError{Code: 1011, Text: "Deadline of 90s exceeded for receiving agent response"}),
want: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := IsAgentResponseTimeout(tt.err); got != tt.want {
t.Errorf("IsAgentResponseTimeout() = %v, want %v", got, tt.want)
}
})
}
}

// TestIsAgentResponseTimeoutBeforeIsInternalServerError verifies that a 1011 close error with
// a timeout message matches BOTH IsAgentResponseTimeout (true) and IsInternalServerError (true),
// since timeout is a strict subset of 1011. The test documents why IsAgentResponseTimeout must
// always be checked first in the error-handling chain — otherwise the specific timeout message
// is swallowed by the generic 1011 branch.
func TestIsAgentResponseTimeoutBeforeIsInternalServerError(t *testing.T) {
for _, text := range []string{
"Deadline of 90s exceeded for receiving agent response",
"Timed out after 45s while connecting to pod",
"Timed out after 45s while setting up port forward",
"Retry budget exhausted: only 2s remaining, need at least 45s for K8s exec setup",
"Retry budget exhausted: only 1s remaining, need at least 45s for K8s port-forward setup",
} {
err := &websocket.CloseError{Code: 1011, Text: text}
if !IsAgentResponseTimeout(err) {
t.Errorf("IsAgentResponseTimeout(%q) = false, want true", text)
}
if !IsInternalServerError(err) {
t.Errorf("IsInternalServerError(%q) = false, want true (timeout is a subset of 1011)", text)
}
}
}

func TestIsInternalServerError(t *testing.T) {
tests := []struct {
name string
err error
want bool
}{
{"1011 matches", &websocket.CloseError{Code: 1011, Text: "anything"}, true},
{"1007 does not match", &websocket.CloseError{Code: 1007, Text: ""}, false},
{"non-websocket error", errors.New("plain error"), false},
{"wrapped 1011", fmt.Errorf("wrap: %w", &websocket.CloseError{Code: 1011, Text: "x"}), true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := IsInternalServerError(tt.err); got != tt.want {
t.Errorf("IsInternalServerError() = %v, want %v", got, tt.want)
}
})
}
}

func TestIsPermanentCloseError(t *testing.T) {
tests := []struct {
name string
err error
want bool
}{
{"1007 is permanent", &websocket.CloseError{Code: 1007}, true},
{"1008 is permanent", &websocket.CloseError{Code: 1008}, true},
{"1011 is transient", &websocket.CloseError{Code: 1011}, false},
{"1000 is transient", &websocket.CloseError{Code: 1000}, false},
{"non-websocket error", errors.New("plain error"), false},
{"wrapped 1008", fmt.Errorf("wrap: %w", &websocket.CloseError{Code: 1008}), true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := IsPermanentCloseError(tt.err); got != tt.want {
t.Errorf("IsPermanentCloseError() = %v, want %v", got, tt.want)
}
})
}
}

func TestServiceUnavailableMessage(t *testing.T) {
for _, feature := range []string{"Shell", "Port-forward"} {
msg := ServiceUnavailableMessage(feature)
if !strings.HasPrefix(msg, feature) {
t.Errorf("ServiceUnavailableMessage(%q): expected prefix %q, got: %q", feature, feature, msg)
}
if !strings.Contains(msg, "cluster") {
t.Errorf("ServiceUnavailableMessage(%q): expected 'cluster' in message, got: %q", feature, msg)
}
if !strings.Contains(msg, "running") {
t.Errorf("ServiceUnavailableMessage(%q): expected 'running' in message, got: %q", feature, msg)
}
if !strings.HasSuffix(msg, ".") {
t.Errorf("ServiceUnavailableMessage(%q): expected message to end with '.', got: %q", feature, msg)
}
}
}
Loading