Skip to content

Commit 60be8ae

Browse files
committed
Fix router path and set no timeout for invoke request
Signed-off-by: VanderChen <vanderchen@outlook.com>
1 parent fdefd40 commit 60be8ae

File tree

12 files changed

+611
-169
lines changed

12 files changed

+611
-169
lines changed

Dockerfile.router

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
# Multi-stage build for agentcube-router
2+
FROM golang:1.24.9-alpine AS builder
3+
4+
# Build arguments for multi-architecture support
5+
ARG TARGETOS=linux
6+
ARG TARGETARCH
7+
8+
WORKDIR /workspace
9+
10+
# Copy go mod files
11+
COPY go.mod go.sum ./
12+
RUN go mod download
13+
14+
# Copy source code
15+
COPY cmd/ cmd/
16+
COPY pkg/ pkg/
17+
COPY client-go/ client-go/
18+
19+
# Build with dynamic architecture support
20+
# Supports amd64, arm64, arm/v7, etc.
21+
RUN CGO_ENABLED=0 GOOS=${TARGETOS} GOARCH=${TARGETARCH} \
22+
go build -o agentcube-router ./cmd/router
23+
24+
# Runtime image
25+
FROM alpine:3.19
26+
27+
RUN apk --no-cache add ca-certificates
28+
29+
WORKDIR /app
30+
31+
# Copy binary from builder
32+
COPY --from=builder /workspace/agentcube-router .
33+
34+
# Run as non-root user
35+
RUN adduser -D -u 1000 router
36+
USER router
37+
38+
EXPOSE 8080
39+
40+
ENTRYPOINT ["/app/agentcube-router"]
41+
CMD ["--port=8080", "--debug"]

Makefile

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,10 @@ build: generate ## Build agentcube-apiserver binary
7272
@echo "Building agentcube-apiserver..."
7373
go build -o bin/agentcube-apiserver ./cmd/workload-manager
7474

75+
build-router: ## Build agentcube-router binary
76+
@echo "Building agentcube-router..."
77+
go build -o bin/agentcube-router ./cmd/router
78+
7579
build-agentd: generate ## Build agentd binary
7680
@echo "Building agentd..."
7781
go build -o bin/agentd ./cmd/agentd
@@ -80,7 +84,7 @@ build-test-tunnel: ## Build test-tunnel tool
8084
@echo "Building test-tunnel..."
8185
go build -o bin/test-tunnel ./cmd/test-tunnel
8286

83-
build-all: build build-agentd build-test-tunnel ## Build all binaries
87+
build-all: build build-router build-agentd build-test-tunnel ## Build all binaries
8488

8589
# Run server (development mode)
8690
run:
@@ -140,13 +144,18 @@ install: build
140144

141145
# Docker image variables
142146
APISERVER_IMAGE ?= agentcube-apiserver:latest
147+
ROUTER_IMAGE ?= agentcube-router:latest
143148
IMAGE_REGISTRY ?= ""
144149

145150
# Docker and Kubernetes targets
146151
docker-build:
147152
@echo "Building Docker image..."
148153
docker build -t $(APISERVER_IMAGE) .
149154

155+
docker-build-router: ## Build router Docker image
156+
@echo "Building router Docker image..."
157+
docker build -f Dockerfile.router -t $(ROUTER_IMAGE) .
158+
150159
# Multi-architecture build (supports amd64, arm64)
151160
docker-buildx:
152161
@echo "Building multi-architecture Docker image..."

cmd/router/main.go

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"os"
88
"os/signal"
99
"syscall"
10-
"time"
1110

1211
"github.com/volcano-sh/agentcube/pkg/router"
1312
)
@@ -30,12 +29,7 @@ func main() {
3029

3130
// Create Router API server configuration
3231
config := &router.Config{
33-
Port: *port,
34-
SandboxEndpoints: []string{
35-
"http://sandbox-1:8080",
36-
"http://sandbox-2:8080",
37-
"http://sandbox-3:8080",
38-
}, // Default sandbox endpoints, can be configured via env vars
32+
Port: *port,
3933
Debug: *debug,
4034
EnableTLS: *enableTLS,
4135
TLSCert: *tlsCert,
@@ -69,8 +63,10 @@ func main() {
6963
select {
7064
case <-ctx.Done():
7165
log.Println("Received shutdown signal, shutting down gracefully...")
72-
// Wait for server to finish shutting down
73-
time.Sleep(2 * time.Second)
66+
// Cancel the context to trigger server shutdown
67+
cancel()
68+
// Wait for server goroutine to exit after graceful shutdown is complete
69+
<-errCh
7470
case err := <-errCh:
7571
log.Fatalf("Server error: %v", err)
7672
}

k8s/agentcube-router.yaml

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,13 @@ rules:
4343
verbs: ["update"]
4444
- apiGroups: ["runtime.agentcube.volcano.sh"]
4545
resources: ["agentruntimes"]
46-
verbs: ["get", "list", "watch"]
46+
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
4747
- apiGroups: ["runtime.agentcube.volcano.sh"]
4848
resources: ["agentruntimes/status"]
49-
verbs: ["update", "patch"]
49+
verbs: ["get", "update", "patch"]
50+
- apiGroups: ["runtime.agentcube.volcano.sh"]
51+
resources: ["agentruntimes/finalizers"]
52+
verbs: ["update"]
5053
- apiGroups: [""]
5154
resources: ["pods"]
5255
verbs: ["get", "list", "watch"]
@@ -100,9 +103,10 @@ spec:
100103
value: "127.0.0.1:6379"
101104
- name: REDIS_PASSWORD
102105
value: ""
106+
- name: WORKLOAD_MGR_URL
107+
value: "http://agentcube-workload-manager:8080"
103108
args:
104109
- --port=8080
105-
- --runtime-class-name=
106110
- --debug
107111
resources:
108112
requests:

pkg/agentd/agentd_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ func TestReconciler_Reconcile_WithLastActivity(t *testing.T) {
3838
Name: "test-sandbox",
3939
Namespace: "default",
4040
Annotations: map[string]string{
41-
"agentcube.volcano.sh/last-activity": now.Add(-5 * time.Minute).Format(time.RFC3339),
41+
"last-activity-time": now.Add(-5 * time.Minute).Format(time.RFC3339),
4242
},
4343
},
4444
Status: sandboxv1alpha1.SandboxStatus{
@@ -61,7 +61,7 @@ func TestReconciler_Reconcile_WithLastActivity(t *testing.T) {
6161
Name: "test-sandbox",
6262
Namespace: "default",
6363
Annotations: map[string]string{
64-
"agentcube.volcano.sh/last-activity": now.Add(-20 * time.Minute).Format(time.RFC3339),
64+
"last-activity-time": now.Add(-20 * time.Minute).Format(time.RFC3339),
6565
},
6666
},
6767
Status: sandboxv1alpha1.SandboxStatus{
@@ -83,7 +83,7 @@ func TestReconciler_Reconcile_WithLastActivity(t *testing.T) {
8383
Name: "test-sandbox",
8484
Namespace: "default",
8585
Annotations: map[string]string{
86-
"agentcube.volcano.sh/last-activity": now.Add(-20 * time.Minute).Format(time.RFC3339),
86+
"last-activity-time": now.Add(-20 * time.Minute).Format(time.RFC3339),
8787
},
8888
},
8989
Status: sandboxv1alpha1.SandboxStatus{

pkg/router/apiserver.go

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,6 @@ func NewServer(config *Config) (*Server, error) {
6060
if config.MaxConnsPerHost <= 0 {
6161
config.MaxConnsPerHost = 10 // Default 10 connections per host
6262
}
63-
if config.SessionExpireDuration <= 0 {
64-
config.SessionExpireDuration = 3600 // Default 1 hour
65-
}
6663

6764
// Initialize Redis client
6865
redisOptions, err := makeRedisOptions()
@@ -148,10 +145,10 @@ func (s *Server) setupRoutes() {
148145
v1.Use(s.concurrencyLimitMiddleware()) // Apply concurrency limit to API routes
149146

150147
// Agent invoke requests
151-
v1.Any("/namespaces/:agentNamespace/agent-runtimes/:agentName/invocations/*path", s.handleAgentInvoke)
148+
v1.POST("/namespaces/:namespace/agent-runtimes/:name/invocations/*path", s.handleAgentInvoke)
152149

153-
// Code interpreter invoke requests - use different base path to avoid conflicts
154-
v1.Any("/code-namespaces/:namespace/code-interpreters/:name/invocations/*path", s.handleCodeInterpreterInvoke)
150+
// Code interpreter invoke requests
151+
v1.POST("/namespaces/:namespace/code-interpreters/:name/invocations/*path", s.handleCodeInterpreterInvoke)
155152
}
156153

157154
// Start starts the Router API server

pkg/router/apiserver_test.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ func TestNewServer(t *testing.T) {
4343
RequestTimeout: 60,
4444
MaxIdleConns: 200,
4545
MaxConnsPerHost: 20,
46-
SessionExpireDuration: 7200,
4746
EnableRedis: true,
4847
Debug: true,
4948
},
@@ -114,9 +113,6 @@ func TestNewServer(t *testing.T) {
114113
if server.config.MaxConnsPerHost <= 0 {
115114
t.Error("MaxConnsPerHost should have been set to default")
116115
}
117-
if server.config.SessionExpireDuration <= 0 {
118-
t.Error("SessionExpireDuration should have been set to default")
119-
}
120116
}
121117
})
122118
}
@@ -159,10 +155,6 @@ func TestServer_DefaultValues(t *testing.T) {
159155
if server.config.MaxConnsPerHost != 10 {
160156
t.Errorf("Expected default MaxConnsPerHost 10, got %d", server.config.MaxConnsPerHost)
161157
}
162-
163-
if server.config.SessionExpireDuration != 3600 {
164-
t.Errorf("Expected default SessionExpireDuration 3600, got %d", server.config.SessionExpireDuration)
165-
}
166158
}
167159

168160
func TestServer_ConcurrencyLimitMiddleware(t *testing.T) {

pkg/router/handlers.go

Lines changed: 38 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package router
22

33
import (
4-
"context"
4+
"fmt"
55
"log"
66
"net/http"
77
"net/http/httputil"
@@ -42,33 +42,34 @@ func (s *Server) handleHealthReady(c *gin.Context) {
4242
})
4343
}
4444

45-
// handleAgentInvoke handles agent invocation requests
46-
func (s *Server) handleAgentInvoke(c *gin.Context) {
47-
agentNamespace := c.Param("agentNamespace")
48-
agentName := c.Param("agentName")
49-
path := c.Param("path")
50-
51-
log.Printf("Agent invoke request: namespace=%s, agent=%s, path=%s", agentNamespace, agentName, path)
45+
// handleInvoke is a private helper function that handles invocation requests for both agents and code interpreters
46+
func (s *Server) handleInvoke(c *gin.Context, namespace, name, path, kind string) {
47+
log.Printf("%s invoke request: namespace=%s, name=%s, path=%s", kind, namespace, name, path)
5248

5349
// Extract session ID from header
5450
sessionID := c.GetHeader("x-agentcube-session-id")
5551

5652
// Get sandbox info from session manager
57-
sandbox, err := s.sessionManager.GetSandboxBySession(sessionID, agentNamespace, agentName, "AgentRuntime")
53+
sandbox, err := s.sessionManager.GetSandboxBySession(c.Request.Context(), sessionID, namespace, name, kind)
5854
if err != nil {
59-
log.Printf("Failed to get sandbox info: %v", err)
60-
c.JSON(http.StatusInternalServerError, gin.H{
61-
"error": "internal server error",
62-
"code": "INTERNAL_ERROR",
55+
log.Printf("Failed to get sandbox info: %v, session id %s", err, sessionID)
56+
c.JSON(http.StatusBadRequest, gin.H{
57+
"error": fmt.Sprintf("Invalid session id %s", sessionID),
58+
"code": "BadRequest",
6359
})
6460
return
6561
}
6662

6763
// Extract endpoint from sandbox - find matching entry point by path
6864
var endpoint string
6965
for _, ep := range sandbox.EntryPoints {
70-
if ep.Path == path || ep.Path == "" {
71-
endpoint = ep.Endpoint
66+
if strings.HasPrefix(path, ep.Path) {
67+
// Only add protocol if not already present
68+
if ep.Protocol != "" && !strings.Contains(ep.Endpoint, "://") {
69+
endpoint = strings.ToLower(ep.Protocol) + "://" + ep.Endpoint
70+
} else {
71+
endpoint = ep.Endpoint
72+
}
7273
break
7374
}
7475
}
@@ -83,12 +84,19 @@ func (s *Server) handleAgentInvoke(c *gin.Context) {
8384
})
8485
return
8586
}
86-
endpoint = sandbox.EntryPoints[0].Endpoint
87+
// Only add protocol if not already present
88+
if sandbox.EntryPoints[0].Protocol != "" && !strings.Contains(sandbox.EntryPoints[0].Endpoint, "://") {
89+
endpoint = strings.ToLower(sandbox.EntryPoints[0].Protocol) + "://" + sandbox.EntryPoints[0].Endpoint
90+
} else {
91+
endpoint = sandbox.EntryPoints[0].Endpoint
92+
}
8793
}
8894

95+
log.Printf("The selected entrypoint for session-id %s to sandbox is %s", sandbox.SessionID, endpoint)
96+
8997
// Update session activity in Redis when receiving request
9098
if sandbox.SessionID != "" && sandbox.SandboxID != "" {
91-
if err := s.redisClient.UpdateSandboxLastActivity(c.Request.Context(), sandbox.SandboxID, time.Now()); err != nil {
99+
if err := s.redisClient.UpdateSessionLastActivity(c.Request.Context(), sandbox.SessionID, time.Now()); err != nil {
92100
log.Printf("Failed to update sandbox last activity for request: %v", err)
93101
}
94102
}
@@ -97,59 +105,20 @@ func (s *Server) handleAgentInvoke(c *gin.Context) {
97105
s.forwardToSandbox(c, endpoint, path, sandbox.SessionID)
98106
}
99107

108+
// handleAgentInvoke handles agent invocation requests
109+
func (s *Server) handleAgentInvoke(c *gin.Context) {
110+
namespace := c.Param("namespace")
111+
name := c.Param("name")
112+
path := c.Param("path")
113+
s.handleInvoke(c, namespace, name, path, "AgentRuntime")
114+
}
115+
100116
// handleCodeInterpreterInvoke handles code interpreter invocation requests
101117
func (s *Server) handleCodeInterpreterInvoke(c *gin.Context) {
102118
namespace := c.Param("namespace")
103119
name := c.Param("name")
104120
path := c.Param("path")
105-
106-
log.Printf("Code interpreter invoke request: namespace=%s, name=%s, path=%s", namespace, name, path)
107-
108-
// Extract session ID from header
109-
sessionID := c.GetHeader("x-agentcube-session-id")
110-
111-
// Get sandbox info from session manager
112-
sandbox, err := s.sessionManager.GetSandboxBySession(sessionID, namespace, name, "CodeInterpreter")
113-
if err != nil {
114-
log.Printf("Failed to get sandbox info: %v", err)
115-
c.JSON(http.StatusInternalServerError, gin.H{
116-
"error": "internal server error",
117-
"code": "INTERNAL_ERROR",
118-
})
119-
return
120-
}
121-
122-
// Extract endpoint from sandbox - find matching entry point by path
123-
var endpoint string
124-
for _, ep := range sandbox.EntryPoints {
125-
if ep.Path == path || ep.Path == "" {
126-
endpoint = ep.Endpoint
127-
break
128-
}
129-
}
130-
131-
// If no matching endpoint found, use the first one as fallback
132-
if endpoint == "" {
133-
if len(sandbox.EntryPoints) == 0 {
134-
log.Printf("No entry points found for sandbox: %s", sandbox.SandboxID)
135-
c.JSON(http.StatusInternalServerError, gin.H{
136-
"error": "internal server error",
137-
"code": "INTERNAL_ERROR",
138-
})
139-
return
140-
}
141-
endpoint = sandbox.EntryPoints[0].Endpoint
142-
}
143-
144-
// Update session activity in Redis when receiving request
145-
if sandbox.SessionID != "" && sandbox.SandboxID != "" {
146-
if err := s.redisClient.UpdateSandboxLastActivity(c.Request.Context(), sandbox.SandboxID, time.Now()); err != nil {
147-
log.Printf("Failed to update sandbox last activity for request: %v", err)
148-
}
149-
}
150-
151-
// Forward request to sandbox with session ID
152-
s.forwardToSandbox(c, endpoint, path, sandbox.SessionID)
121+
s.handleInvoke(c, namespace, name, path, "CodeInterpreter")
153122
}
154123

155124
// forwardToSandbox forwards the request to the specified sandbox endpoint
@@ -228,10 +197,10 @@ func (s *Server) forwardToSandbox(c *gin.Context, endpoint, path, sessionID stri
228197
return nil
229198
}
230199

231-
// Set timeout for the proxy request using configured timeout
232-
ctx, cancel := context.WithTimeout(c.Request.Context(), time.Duration(s.config.RequestTimeout)*time.Second)
233-
defer cancel()
234-
c.Request = c.Request.WithContext(ctx)
200+
// No timeout for invoke requests to allow long-running operations
201+
// ctx, cancel := context.WithTimeout(c.Request.Context(), time.Duration(s.config.RequestTimeout)*time.Second)
202+
// defer cancel()
203+
// c.Request = c.Request.WithContext(ctx)
235204

236205
// Use the proxy to serve the request
237206
proxy.ServeHTTP(c.Writer, c.Request)

0 commit comments

Comments
 (0)