Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 114 additions & 5 deletions pkg/transport/stdio.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,25 @@ type StdioTransport struct {

// Container monitor
monitor rt.Monitor

// Retry configuration (for testing)
retryConfig *retryConfig
}

// retryConfig holds configuration for retry behavior
type retryConfig struct {
maxRetries int
initialDelay time.Duration
maxDelay time.Duration
}

// defaultRetryConfig returns the default retry configuration
func defaultRetryConfig() *retryConfig {
return &retryConfig{
maxRetries: 10,
initialDelay: 2 * time.Second,
maxDelay: 30 * time.Second,
}
}

// NewStdioTransport creates a new stdio transport.
Expand All @@ -75,6 +94,7 @@ func NewStdioTransport(
prometheusHandler: prometheusHandler,
shutdownCh: make(chan struct{}),
proxyMode: types.ProxyModeSSE, // default to SSE for backward compatibility
retryConfig: defaultRetryConfig(),
}
}

Expand Down Expand Up @@ -292,7 +312,7 @@ func (t *StdioTransport) IsRunning(_ context.Context) (bool, error) {
}

// processMessages handles the message exchange between the client and container.
func (t *StdioTransport) processMessages(ctx context.Context, stdin io.WriteCloser, stdout io.ReadCloser) {
func (t *StdioTransport) processMessages(ctx context.Context, _ io.WriteCloser, stdout io.ReadCloser) {
// Create a context that will be canceled when shutdown is signaled
ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand All @@ -318,14 +338,94 @@ func (t *StdioTransport) processMessages(ctx context.Context, stdin io.WriteClos
return
case msg := <-messageCh:
logger.Info("Process incoming messages and sending message to container")
if err := t.sendMessageToContainer(ctx, stdin, msg); err != nil {
// Use t.stdin instead of parameter so it uses the current stdin after re-attachment
t.mutex.Lock()
currentStdin := t.stdin
t.mutex.Unlock()
if err := t.sendMessageToContainer(ctx, currentStdin, msg); err != nil {
logger.Errorf("Error sending message to container: %v", err)
}
logger.Info("Messages processed")
}
}
}

// attemptReattachment tries to re-attach to a container that has lost its stdout connection.
// Returns true if re-attachment was successful, false otherwise.
func (t *StdioTransport) attemptReattachment(ctx context.Context, stdout io.ReadCloser) bool {
if t.deployer == nil || t.containerName == "" {
return false
}

maxRetries := t.retryConfig.maxRetries
initialDelay := t.retryConfig.initialDelay

for attempt := 0; attempt < maxRetries; attempt++ {
if attempt > 0 {
// Use exponential backoff: 2s, 4s, 8s, 16s, 30s, 30s...
// Calculate shift amount safely to prevent overflow
var shiftAmount uint
if attempt <= 1 {
shiftAmount = 0
} else if attempt-1 <= 30 {
// Safe: we've verified attempt-1 is within bounds for uint
shiftAmount = uint(attempt - 1) // #nosec G115
} else {
shiftAmount = 30 // Cap to prevent overflow
}
delay := initialDelay * time.Duration(1<<shiftAmount)
if delay > t.retryConfig.maxDelay {
delay = t.retryConfig.maxDelay
}
logger.Infof("Retry attempt %d/%d after %v", attempt+1, maxRetries, delay)
time.Sleep(delay)
}

running, checkErr := t.deployer.IsWorkloadRunning(ctx, t.containerName)
if checkErr != nil {
// Check if error is due to Docker being unavailable
if strings.Contains(checkErr.Error(), "EOF") || strings.Contains(checkErr.Error(), "connection refused") {
logger.Warnf("Docker socket unavailable (attempt %d/%d), will retry", attempt+1, maxRetries)
continue
}
logger.Warnf("Error checking if container is running (attempt %d/%d): %v", attempt+1, maxRetries, checkErr)
continue
}

if !running {
logger.Infof("Container not running (attempt %d/%d)", attempt+1, maxRetries)
return false
}

logger.Warn("Container is still running after stdout EOF - attempting to re-attach")

// Try to re-attach to the container
newStdin, newStdout, attachErr := t.deployer.AttachToWorkload(ctx, t.containerName)
if attachErr == nil {
logger.Info("Successfully re-attached to container - restarting message processing")

// Close old stdout
_ = stdout.Close()

// Update stdio references
t.mutex.Lock()
t.stdin = newStdin
t.stdout = newStdout
t.mutex.Unlock()

// Start ONLY the stdout reader, not the full processMessages
// The existing processMessages goroutine is still running and handling stdin
go t.processStdout(ctx, newStdout)
logger.Info("Restarted stdout processing with new pipe")
return true
}
logger.Errorf("Failed to re-attach to container (attempt %d/%d): %v", attempt+1, maxRetries, attachErr)
}

logger.Warn("Failed to re-attach after all retry attempts")
return false
}

// processStdout reads from the container's stdout and processes JSON-RPC messages.
func (t *StdioTransport) processStdout(ctx context.Context, stdout io.ReadCloser) {
// Create a buffer for accumulating data
Expand All @@ -343,7 +443,14 @@ func (t *StdioTransport) processStdout(ctx context.Context, stdout io.ReadCloser
n, err := stdout.Read(readBuffer)
if err != nil {
if err == io.EOF {
logger.Info("Container stdout closed")
logger.Warn("Container stdout closed - checking if container is still running")

// Try to re-attach to the container
if t.attemptReattachment(ctx, stdout) {
return
}

logger.Info("Container stdout closed - exiting read loop")
} else {
logger.Errorf("Error reading from container stdout: %v", err)
}
Expand Down Expand Up @@ -418,11 +525,13 @@ func sanitizeBinaryString(input string) string {
}

// isSpace reports whether r is a space character as defined by JSON.
// These are the valid space characters in this implementation:
// These are the valid space characters in JSON:
// - ' ' (U+0020, SPACE)
// - '\t' (U+0009, HORIZONTAL TAB)
// - '\n' (U+000A, LINE FEED)
// - '\r' (U+000D, CARRIAGE RETURN)
func isSpace(r rune) bool {
return r == ' ' || r == '\n'
return r == ' ' || r == '\t' || r == '\n' || r == '\r'
}

// parseAndForwardJSONRPC parses a JSON-RPC message and forwards it.
Expand Down
Loading
Loading