Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions devenv/server-entrypoint.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
#!/bin/sh
set -e

# Clean up stale CNI state from previous runs. After a container restart the
# cni0 bridge may linger with a zeroed MAC (00:00:00:00:00:00), causing the
# bridge plugin to fail with "could not set bridge's mac: invalid argument".
ip link delete cni0 2>/dev/null || true
rm -rf /var/lib/cni/networks/* /var/lib/cni/results/* 2>/dev/null || true

# Ensure IP forwarding and subnet MASQUERADE for CNI.
sysctl -w net.ipv4.ip_forward=1 2>/dev/null || true
iptables -t nat -C POSTROUTING -s 10.88.0.0/16 ! -o cni0 -j MASQUERADE 2>/dev/null || \
Expand Down
6 changes: 6 additions & 0 deletions docker/server-entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ set -e

MCP_IMAGE="${MCP_IMAGE:-docker.io/library/memoh-mcp:latest}"

# ---- Clean up stale CNI state from previous runs ----
# After a container restart the cni0 bridge may linger with a zeroed MAC
# (00:00:00:00:00:00), causing "could not set bridge's mac: invalid argument".
ip link delete cni0 2>/dev/null || true
rm -rf /var/lib/cni/networks/* /var/lib/cni/results/* 2>/dev/null || true

# ---- Ensure IP forwarding and subnet MASQUERADE for CNI ----
sysctl -w net.ipv4.ip_forward=1 2>/dev/null || true
iptables -t nat -C POSTROUTING -s 10.88.0.0/16 ! -o cni0 -j MASQUERADE 2>/dev/null || \
Expand Down
28 changes: 23 additions & 5 deletions internal/containerd/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"os"
"os/exec"
"path/filepath"
"strconv"
"strings"
Expand Down Expand Up @@ -51,19 +52,26 @@ func setupCNINetwork(ctx context.Context, task client.Task, containerID string,
}
result, err := cni.Setup(ctx, containerID, netnsPath)
if err != nil {
if !isDuplicateAllocationError(err) && !isVethExistsError(err) {
retryable := isDuplicateAllocationError(err) || isVethExistsError(err) || isBridgeMACError(err)
if !retryable {
return "", err
}
// Stale IPAM allocation or veth exists (e.g. after container restart with persisted
// /var/lib/cni). Remove may fail if the previous iptables/veth state
// is already gone; ignore the error so the retry Setup still runs.
if isBridgeMACError(err) {
// Stale bridge with zeroed MAC after container restart; delete it so
// the plugin can recreate a healthy one.
_ = exec.CommandContext(ctx, "ip", "link", "delete", "cni0").Run()
}
_ = cni.Remove(ctx, containerID, netnsPath)
result, err = cni.Setup(ctx, containerID, netnsPath)
if err != nil {
return "", err
}
}
return extractIP(result), nil
ip := extractIP(result)
if ip == "" {
return "", fmt.Errorf("cni setup returned no usable IP for %s", containerID)
}
return ip, nil
}

func extractIP(result *gocni.Result) string {
Expand Down Expand Up @@ -139,3 +147,13 @@ func isVethExistsError(err error) bool {
}
return strings.Contains(err.Error(), "already exists")
}

// isBridgeMACError returns true if the CNI bridge plugin failed because the
// stale cni0 bridge has a zeroed MAC address (common after container restart).
func isBridgeMACError(err error) bool {
if err == nil {
return false
}
msg := err.Error()
return strings.Contains(msg, "set bridge") && strings.Contains(msg, "mac")
}
66 changes: 37 additions & 29 deletions internal/handlers/containerd.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,15 +247,8 @@ func (h *ContainerdHandler) ensureContainerAndTask(ctx context.Context, containe
}
if len(tasks) > 0 {
if tasks[0].Status == ctr.TaskStatusRunning {
if netResult, netErr := h.service.SetupNetwork(ctx, ctr.NetworkSetupRequest{
ContainerID: containerID,
CNIBinDir: h.cfg.CNIBinaryDir,
CNIConfDir: h.cfg.CNIConfigDir,
}); netErr != nil {
h.logger.Warn("network re-setup failed for running task",
slog.String("container_id", containerID), slog.Any("error", netErr))
} else if netResult.IP != "" && h.manager != nil {
h.manager.SetContainerIP(botID, netResult.IP)
if err := h.setupNetworkOrFail(ctx, containerID, botID); err != nil {
return err
}
return nil
}
Expand All @@ -270,17 +263,37 @@ func (h *ContainerdHandler) ensureContainerAndTask(ctx context.Context, containe
if err := h.service.StartContainer(ctx, containerID, nil); err != nil {
return err
}
if netResult, netErr := h.service.SetupNetwork(ctx, ctr.NetworkSetupRequest{
ContainerID: containerID,
CNIBinDir: h.cfg.CNIBinaryDir,
CNIConfDir: h.cfg.CNIConfigDir,
}); netErr != nil {
h.logger.Warn("network setup failed, task kept running",
slog.String("container_id", containerID), slog.Any("error", netErr))
} else if netResult.IP != "" && h.manager != nil {
h.manager.SetContainerIP(botID, netResult.IP)
return h.setupNetworkOrFail(ctx, containerID, botID)
}

// setupNetworkOrFail attempts CNI network setup with one retry. Returns an error
// if no usable IP is obtained — callers must not silently ignore this.
func (h *ContainerdHandler) setupNetworkOrFail(ctx context.Context, containerID, botID string) error {
var lastErr error
for attempt := 0; attempt < 2; attempt++ {
netResult, err := h.service.SetupNetwork(ctx, ctr.NetworkSetupRequest{
ContainerID: containerID,
CNIBinDir: h.cfg.CNIBinaryDir,
CNIConfDir: h.cfg.CNIConfigDir,
})
if err != nil {
lastErr = err
h.logger.Warn("network setup attempt failed",
slog.String("container_id", containerID),
slog.Int("attempt", attempt+1),
slog.Any("error", err))
continue
}
if netResult.IP == "" {
lastErr = fmt.Errorf("network setup returned no IP for %s", containerID)
continue
}
if h.manager != nil {
h.manager.SetContainerIP(botID, netResult.IP)
}
return nil
}
return nil
return fmt.Errorf("network setup failed for container %s: %w", containerID, lastErr)
}

// botContainerID resolves container_id for a bot from the database.
Expand Down Expand Up @@ -967,20 +980,15 @@ func (h *ContainerdHandler) ReconcileContainers(ctx context.Context) {
slog.String("bot_id", botID), slog.Any("error", dbErr))
}
}
if netResult, netErr := h.service.SetupNetwork(ctx, ctr.NetworkSetupRequest{
ContainerID: containerID,
CNIBinDir: h.cfg.CNIBinaryDir,
CNIConfDir: h.cfg.CNIConfigDir,
}); netErr != nil {
h.logger.Warn("reconcile: network re-setup failed for running task",
if netErr := h.setupNetworkOrFail(ctx, containerID, botID); netErr != nil {
h.logger.Error("reconcile: network setup failed for running task, container unreachable",
slog.String("bot_id", botID),
slog.String("container_id", containerID),
slog.Any("error", netErr))
} else if netResult.IP != "" && h.manager != nil {
h.manager.SetContainerIP(botID, netResult.IP)
} else {
h.logger.Info("reconcile: container healthy",
slog.String("bot_id", botID), slog.String("container_id", containerID))
}
h.logger.Info("reconcile: container healthy",
slog.String("bot_id", botID), slog.String("container_id", containerID))
continue
}

Expand Down
45 changes: 26 additions & 19 deletions internal/mcp/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,34 +116,39 @@ func (m *Manager) SetContainerIP(botID, ip string) {
}

// recoverContainerIP attempts to restore the container IP by re-running CNI setup.
// CNI plugins are idempotent - calling Setup again returns the existing IP allocation.
// CNI plugins are idempotent — calling Setup again returns the existing IP allocation.
// Retries up to 2 times to tolerate transient CNI failures (IPAM lock contention, etc.).
func (m *Manager) recoverContainerIP(botID string) (string, error) {
ctx := context.Background()
containerID := m.containerID(botID)

// First check if container exists and get basic info
info, err := m.service.GetContainer(ctx, containerID)
if err != nil {
return "", err
}

// Check if IP is stored in labels (if we ever add label persistence)
if ip, ok := info.Labels["mcp.container_ip"]; ok {
return ip, nil
}

// Container exists but IP not cached - need to re-setup network to get IP
// This happens after server restart when in-memory cache is lost
netResult, err := m.service.SetupNetwork(ctx, ctr.NetworkSetupRequest{
ContainerID: containerID,
CNIBinDir: m.cfg.CNIBinaryDir,
CNIConfDir: m.cfg.CNIConfigDir,
})
if err != nil {
return "", fmt.Errorf("network setup for IP recovery: %w", err)
const maxAttempts = 2
var lastErr error
for i := 0; i < maxAttempts; i++ {
netResult, err := m.service.SetupNetwork(ctx, ctr.NetworkSetupRequest{
ContainerID: containerID,
CNIBinDir: m.cfg.CNIBinaryDir,
CNIConfDir: m.cfg.CNIConfigDir,
})
if err != nil {
lastErr = err
m.logger.Warn("IP recovery attempt failed",
slog.String("bot_id", botID), slog.Int("attempt", i+1), slog.Any("error", err))
time.Sleep(time.Duration(i+1) * 500 * time.Millisecond)
continue
}
return netResult.IP, nil
}

return netResult.IP, nil
return "", fmt.Errorf("network setup for IP recovery after %d attempts: %w", maxAttempts, lastErr)
}

// MCPClient returns a gRPC client for the given bot's container.
Expand Down Expand Up @@ -295,12 +300,14 @@ func (m *Manager) Start(ctx context.Context, botID string) error {
}
return err
}
if netResult.IP != "" {
m.mu.Lock()
m.containerIPs[botID] = netResult.IP
m.mu.Unlock()
m.logger.Info("container network ready", slog.String("bot_id", botID), slog.String("ip", netResult.IP))
if netResult.IP == "" {
if stopErr := m.service.StopContainer(ctx, m.containerID(botID), &ctr.StopTaskOptions{Force: true}); stopErr != nil {
m.logger.Warn("cleanup: stop task failed", slog.String("container_id", m.containerID(botID)), slog.Any("error", stopErr))
}
return fmt.Errorf("network setup returned no IP for bot %s", botID)
}
m.SetContainerIP(botID, netResult.IP)
m.logger.Info("container network ready", slog.String("bot_id", botID), slog.String("ip", netResult.IP))
return nil
}

Expand Down
23 changes: 15 additions & 8 deletions internal/mcp/mcpclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"fmt"
"io"
"sync"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
Expand All @@ -20,11 +21,14 @@ import (
pb "github.com/memohai/memoh/internal/mcp/mcpcontainer"
)

const connectingTimeout = 30 * time.Second

// Client wraps a gRPC connection to a single MCP container.
type Client struct {
conn *grpc.ClientConn
svc pb.ContainerServiceClient
target string
conn *grpc.ClientConn
svc pb.ContainerServiceClient
target string
createdAt time.Time
}

// NewClientFromConn wraps an existing gRPC connection into a Client.
Expand All @@ -47,9 +51,10 @@ func Dial(_ context.Context, ip string) (*Client, error) {
return nil, fmt.Errorf("grpc dial %s: %w", target, err)
}
return &Client{
conn: conn,
svc: pb.NewContainerServiceClient(conn),
target: target,
conn: conn,
svc: pb.NewContainerServiceClient(conn),
target: target,
createdAt: time.Now(),
}, nil
}

Expand Down Expand Up @@ -308,12 +313,14 @@ func (p *Pool) MCPClient(ctx context.Context, botID string) (*Client, error) {
}

// Get returns a cached client or dials a new one.
// Stale connections (Shutdown / TransientFailure) are evicted automatically.
// Stale connections (Shutdown / TransientFailure / stuck Connecting) are evicted automatically.
func (p *Pool) Get(ctx context.Context, botID string) (*Client, error) {
p.mu.RLock()
if c, ok := p.clients[botID]; ok {
state := c.conn.GetState()
if state != connectivity.Shutdown && state != connectivity.TransientFailure {
stale := state == connectivity.Shutdown || state == connectivity.TransientFailure ||
(state == connectivity.Connecting && time.Since(c.createdAt) > connectingTimeout)
if !stale {
p.mu.RUnlock()
return c, nil
}
Expand Down