Skip to content

Benchmark load balancing#4

Open
atoniolo76 wants to merge 63 commits intomainfrom
benchmark-load-balancing
Open

Benchmark load balancing#4
atoniolo76 wants to merge 63 commits intomainfrom
benchmark-load-balancing

Conversation

@atoniolo76
Copy link
Owner

Create infrastructure for clusters, load balancing, tokenizers and more. Overhaul the existing db system with stateless calls from lambda's backend. Add a bunch of commands for interacting with various components specific to GORGO load balancing. Add policies such as prefix-tree and least-load for inference users. Integrate @Orgo as a provider.

atoniolo76 and others added 30 commits January 25, 2026 15:32
…ancer.go to use sglang_metrics.go for updating availability state, remove WrapHandler to make Handler() flow more streamlined, update lb.go, and remove claude bloat.
…uest into trie before execution

[ ] network policy can use the existing metric polling with timestamps to get and update real time inter-node latency metrics
…er with go->rust bindings. Integrated GetTokenCount into new GORGO Policy for loadbalancer.go. Added FlushSGLangCache() in sglang_metrics.go for use in accurate logistic regression of server response times varying with token count from 100 requests in wildchat_flat.jsonl
…r tokenizer_cgo.go compilation on Linux, and include tokenizer test directly in TestClusterSGLang
… when running TestClusterSGLang. Added endpoint in loadbalancer.go for testing tokenizer functionality
…ancer/delete binary before reuploading gotoni binary
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a comprehensive load balancing infrastructure for distributed GPU cluster management. It adds event-driven tracing (Perfetto-compatible), tokenizer sidecar integration (Rust), SGLang cluster management, and multiple load balancing policies (GORGO, prefix-tree, least-loaded). The PR also refactors instance state management to use Lambda API as the source of truth instead of local database storage.

Changes:

  • Added distributed tracing system with Perfetto format support
  • Implemented Rust tokenizer sidecar with Unix socket communication
  • Created cluster management infrastructure for SGLang deployments
  • Added load balancing policies and configuration system
  • Removed local DB dependency for instance state, using Lambda API instead
  • Added comprehensive benchmarking tools using WildChat-1M dataset

Reviewed changes

Copilot reviewed 39 out of 42 changed files in this pull request and generated 12 comments.

Show a summary per file
File Description
pkg/trace/trace.go New event-driven tracing system (Perfetto format)
pkg/cluster/trace.go Duplicate of pkg/trace/trace.go
pkg/tokenizer/*.go Tokenizer sidecar management and client
pkg/tokenizer/main.rs Rust tokenizer service with Unix socket
pkg/sglang/sglang.go SGLang Docker deployment and diagnostics
pkg/cluster/*.go Cluster management, metrics, build/deploy
pkg/config/constants.go Centralized configuration constants
pkg/remote/client.go Refactored SSH key lookup from Lambda API
cmd/*.go New commands for cluster, tokenizer, LB management
benchmark/*.py Geo-distributed benchmarking tools
go.mod Added tokenizers dependency (with conflicts)

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +52 to +115
.expect("Failed to download tokenizer.json");

if !response.status().is_success() {
panic!("Failed to download tokenizer: HTTP {}", response.status());
}

let bytes = response.bytes().await
.expect("Failed to read tokenizer bytes");

// Cache the tokenizer
std::fs::write(TOKENIZER_CACHE_PATH, &bytes)
.expect("Failed to cache tokenizer");

println!("Tokenizer cached to: {}", TOKENIZER_CACHE_PATH);

Tokenizer::from_bytes(&bytes)
.expect("Failed to parse tokenizer.json")
}

#[tokio::main]
async fn main() {
println!("Loading tokenizer model: {}", MODEL_ID);

// Load the Mistral tokenizer
let tokenizer: Arc<Tokenizer> = Arc::new(load_or_download_tokenizer().await);
println!("Tokenizer loaded successfully");

// Build the router with shared state
let tk_for_count = Arc::clone(&tokenizer);

let app = Router::new()
.route("/count", post(move |Json(payload): Json<TokenizeRequest>| {
let tk: Arc<Tokenizer> = Arc::clone(&tk_for_count);
async move {
let encoding = tk.encode(payload.text, false).expect("Tokenization failed");
Json(TokenizeResponse {
count: encoding.get_ids().len(),
})
}
}))
.route("/health", get(|| async {
Json(HealthResponse {
status: "ok".to_string(),
model: MODEL_ID.to_string(),
})
}));

// Unix Domain Socket path
let socket_path = "/tmp/tokenizer.sock";

// Clean up old socket if it exists
let _ = std::fs::remove_file(socket_path);

println!("Starting tokenizer sidecar on {}", socket_path);

let listener = UnixListener::bind(socket_path).expect("Failed to bind to socket");

// Make socket world-readable/writable
std::fs::set_permissions(socket_path, std::fs::Permissions::from_mode(0o777))
.expect("Failed to set socket permissions");

// Serve using hyper with Unix socket
loop {
let (stream, _) = listener.accept().await.expect("Accept failed");
Copy link

Copilot AI Jan 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tokenizer sidecar uses expect() for error handling in several places (lines 52, 59, 63, 68, 86, 107, 111, 115), which will panic the entire process if the operation fails. For a production service, these should use proper error handling with ? operator or unwrap_or_else() with logging to allow graceful degradation rather than crashing.

Copilot uses AI. Check for mistakes.
Comment on lines +493 to 501
// RemoveInstanceFromConfig removes an instance from the config
// Deprecated: Instance state is now managed by Lambda API, not local DB.
// This function is kept for backwards compatibility but does nothing meaningful.
func RemoveInstanceFromConfig(instanceID string) error {
// Instance state is managed by Lambda API, not local DB
// No action needed - when an instance is terminated via Lambda API,
// it will no longer appear in ListRunningInstances
return nil
}
Copy link

Copilot AI Jan 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The RemoveInstanceFromConfig function is deprecated but returns no error and does nothing. Callers expecting this function to clean up instance state will silently fail. Consider either removing this function entirely or returning an error indicating it's no longer supported to make the deprecation explicit.

Copilot uses AI. Check for mistakes.
Comment on lines +93 to +106
tokenizerMu.RLock()
available := tokenizerAvailable
tokenizerMu.RUnlock()

if !available {
// Try to reconnect periodically
go func() {
tokenizerMu.Lock()
defer tokenizerMu.Unlock()
if !tokenizerAvailable {
tokenizerAvailable = checkTokenizerHealth()
}
}()
return fallbackTokenCount(text)
Copy link

Copilot AI Jan 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pkg/cluster/tokenizer.go file has a potential race condition. The tokenizerAvailable flag is read without lock on line 94 and checked again on line 102-103, but another goroutine could modify it between these checks. The reconnection goroutine spawned on line 99 should be synchronized to prevent multiple concurrent reconnection attempts.

Copilot uses AI. Check for mistakes.
Comment on lines +1 to +573
/*
Copyright © 2025 ALESSIO TONIOLO

trace.go provides event-driven tracing for the load balancer.

Design principles (inspired by PyTorch profiler):
- NOT constantly running - only active when explicitly started
- Event-driven with hooks inserted at key points
- Produces Perfetto-compatible traces (Chrome Trace Format)
- Supports distributed tracing across nodes via header propagation
- Minimal overhead when disabled

Usage:
1. Start trace via endpoint: POST /lb/trace/start
2. Run workload (benchmark)
3. Stop trace via endpoint: POST /lb/trace/stop
4. Returns Perfetto JSON that can be loaded in chrome://tracing or ui.perfetto.dev
*/
package serve

import (
"encoding/json"
"fmt"
"sync"
"sync/atomic"
"time"
)

// TraceEvent represents a single event in Chrome Trace Format (Perfetto-compatible)
// See: https://docs.google.com/document/d/1CvAClvFfyA5R-PhYUmn5OOQtYMH4h6I0nSsKchNAySU
type TraceEvent struct {
Name string `json:"name"` // Event name
Cat string `json:"cat"` // Category (e.g., "lb", "forward", "queue")
Ph string `json:"ph"` // Phase: B=begin, E=end, X=complete, i=instant
Ts int64 `json:"ts"` // Timestamp in microseconds
Dur int64 `json:"dur,omitempty"` // Duration in microseconds (for ph=X)
Pid int `json:"pid"` // Process ID (node ID hash)
Tid int `json:"tid"` // Thread ID (request ID hash)
Args map[string]interface{} `json:"args,omitempty"` // Additional metadata
ID string `json:"id,omitempty"` // For async events (flow)
Scope string `json:"s,omitempty"` // Scope for flow events
BindingPt string `json:"bp,omitempty"` // Binding point for flow events
}

// TraceSession manages an active tracing session
type TraceSession struct {
ID string
StartTime time.Time
NodeID string

events []TraceEvent
eventsMu sync.Mutex

// Atomic flag for fast "is tracing active" check
active atomic.Bool

// For distributed tracing - events received from other nodes
remoteEvents []TraceEvent
remoteEventsMu sync.Mutex
}

// Tracer is the global tracer instance
type Tracer struct {
session *TraceSession
sessionMu sync.RWMutex
nodeID string
nodePID int // Hash of node ID for Perfetto PID
}

// NewTracer creates a new tracer instance
func NewTracer(nodeID string) *Tracer {
return &Tracer{
nodeID: nodeID,
nodePID: hashString(nodeID) % 10000, // Keep PID small for readability
}
}

// IsActive returns true if tracing is currently active (fast path)
func (t *Tracer) IsActive() bool {
t.sessionMu.RLock()
defer t.sessionMu.RUnlock()
return t.session != nil && t.session.active.Load()
}

// Start begins a new tracing session
func (t *Tracer) Start() string {
t.sessionMu.Lock()
defer t.sessionMu.Unlock()

sessionID := fmt.Sprintf("trace-%d", time.Now().UnixNano())
t.session = &TraceSession{
ID: sessionID,
StartTime: time.Now(),
NodeID: t.nodeID,
events: make([]TraceEvent, 0, 1000),
}
t.session.active.Store(true)

// Record trace start event
t.session.events = append(t.session.events, TraceEvent{
Name: "trace_session",
Cat: "trace",
Ph: "B",
Ts: 0, // Relative to trace start
Pid: t.nodePID,
Tid: 0,
Args: map[string]interface{}{
"session_id": sessionID,
"node_id": t.nodeID,
},
})

return sessionID
}

// Stop ends the tracing session and returns the Perfetto JSON
func (t *Tracer) Stop() ([]byte, error) {
t.sessionMu.Lock()
defer t.sessionMu.Unlock()

if t.session == nil {
return nil, fmt.Errorf("no active trace session")
}

t.session.active.Store(false)

// Record trace end event
endTs := time.Since(t.session.StartTime).Microseconds()
t.session.events = append(t.session.events, TraceEvent{
Name: "trace_session",
Cat: "trace",
Ph: "E",
Ts: endTs,
Pid: t.nodePID,
Tid: 0,
})

// Merge remote events
t.session.eventsMu.Lock()
t.session.remoteEventsMu.Lock()
allEvents := make([]TraceEvent, 0, len(t.session.events)+len(t.session.remoteEvents))
allEvents = append(allEvents, t.session.events...)
allEvents = append(allEvents, t.session.remoteEvents...)
t.session.remoteEventsMu.Unlock()
t.session.eventsMu.Unlock()

// Build Perfetto trace - just the events array for direct compatibility
// Perfetto/Chrome Trace Viewer expects a plain array or {"traceEvents": [...]}
// We output just the array for maximum compatibility

// Add metadata as a special "M" (metadata) event at the start
metadataEvent := TraceEvent{
Name: "process_name",
Cat: "__metadata",
Ph: "M",
Ts: 0,
Pid: t.nodePID,
Tid: 0,
Args: map[string]interface{}{
"name": t.nodeID,
},
}

// Prepend metadata event
finalEvents := make([]TraceEvent, 0, len(allEvents)+1)
finalEvents = append(finalEvents, metadataEvent)
finalEvents = append(finalEvents, allEvents...)

// Clear session
t.session = nil

return json.MarshalIndent(finalEvents, "", " ")
}

// GetStatus returns current trace status
func (t *Tracer) GetStatus() map[string]interface{} {
t.sessionMu.RLock()
defer t.sessionMu.RUnlock()

if t.session == nil {
return map[string]interface{}{
"active": false,
"node_id": t.nodeID,
}
}

t.session.eventsMu.Lock()
eventCount := len(t.session.events)
t.session.eventsMu.Unlock()

return map[string]interface{}{
"active": t.session.active.Load(),
"session_id": t.session.ID,
"node_id": t.nodeID,
"duration_ms": float64(time.Since(t.session.StartTime).Microseconds()) / 1000.0,
"event_count": eventCount,
"start_time": t.session.StartTime.Format(time.RFC3339),
}
}

// =====================================================
// TRACE HOOKS - Insert these at key points in code
// =====================================================

// TraceRequestReceived records when a request arrives at the LB
func (t *Tracer) TraceRequestReceived(requestID string, method, path, remoteAddr string) {
if !t.IsActive() {
return
}

t.recordEvent(TraceEvent{
Name: "request",
Cat: "lb",
Ph: "B", // Begin
Tid: hashString(requestID) % 1000,
Args: map[string]interface{}{
"request_id": requestID,
"method": method,
"path": path,
"remote_addr": remoteAddr,
},
})
}

// TraceRequestComplete records when a request finishes
func (t *Tracer) TraceRequestComplete(requestID string, statusCode int, servedBy string) {
if !t.IsActive() {
return
}

t.recordEvent(TraceEvent{
Name: "request",
Cat: "lb",
Ph: "E", // End
Tid: hashString(requestID) % 1000,
Args: map[string]interface{}{
"request_id": requestID,
"status_code": statusCode,
"served_by": servedBy,
},
})
}

// TraceQueueEnter records when a request enters the queue
func (t *Tracer) TraceQueueEnter(requestID string, queueDepth int) {
if !t.IsActive() {
return
}

t.recordEvent(TraceEvent{
Name: "queue",
Cat: "queue",
Ph: "B",
Tid: hashString(requestID) % 1000,
Args: map[string]interface{}{
"request_id": requestID,
"queue_depth": queueDepth,
"action": "enqueue",
},
})
}

// TraceQueueExit records when a request leaves the queue
func (t *Tracer) TraceQueueExit(requestID string, waitTimeMs float64, exitReason string) {
if !t.IsActive() {
return
}

t.recordEvent(TraceEvent{
Name: "queue",
Cat: "queue",
Ph: "E",
Tid: hashString(requestID) % 1000,
Args: map[string]interface{}{
"request_id": requestID,
"wait_time_ms": waitTimeMs,
"exit_reason": exitReason, // "local_process", "forwarded_to_peer", "timeout", "cancelled"
},
})
}

// TraceQueueForwardAttempt records when the queue processor attempts to forward a request
func (t *Tracer) TraceQueueForwardAttempt(requestID string, queueDepth int, peerSelected bool, targetPeer string) {
if !t.IsActive() {
return
}

t.recordEvent(TraceEvent{
Name: "queue_forward_attempt",
Cat: "queue",
Ph: "i", // Instant event
Tid: hashString(requestID) % 1000,
Args: map[string]interface{}{
"request_id": requestID,
"queue_depth": queueDepth,
"peer_selected": peerSelected,
"target_peer": targetPeer,
},
})
}

// TraceQueueProbe records when the queue probe runs
func (t *Tracer) TraceQueueProbe(queueDepth int, peersAvailable int, forwarded int) {
if !t.IsActive() {
return
}

t.recordEvent(TraceEvent{
Name: "queue_probe",
Cat: "queue",
Ph: "i", // Instant event
Tid: 0, // Use thread 0 for system events
Args: map[string]interface{}{
"queue_depth": queueDepth,
"peers_available": peersAvailable,
"forwarded": forwarded,
},
})
}

// TraceForwardStart records when we start forwarding to a peer
// Returns a flow ID for stitching with remote trace
func (t *Tracer) TraceForwardStart(requestID, targetNode, targetIP string) string {
if !t.IsActive() {
return ""
}

flowID := fmt.Sprintf("%s-%s", requestID, targetNode)

// Record the forward span start
t.recordEvent(TraceEvent{
Name: "forward_to_peer",
Cat: "forward",
Ph: "B",
Tid: hashString(requestID) % 1000,
Args: map[string]interface{}{
"request_id": requestID,
"target_node": targetNode,
"target_ip": targetIP,
"flow_id": flowID,
},
})

// Record flow event (arrow in Perfetto)
t.recordEvent(TraceEvent{
Name: "forward_flow",
Cat: "forward",
Ph: "s", // Flow start
ID: flowID,
Tid: hashString(requestID) % 1000,
Scope: "forward",
})

return flowID
}

// TraceForwardEnd records when forwarding completes
func (t *Tracer) TraceForwardEnd(requestID string, statusCode int, durationMs float64) {
if !t.IsActive() {
return
}

t.recordEvent(TraceEvent{
Name: "forward_to_peer",
Cat: "forward",
Ph: "E",
Tid: hashString(requestID) % 1000,
Args: map[string]interface{}{
"request_id": requestID,
"status_code": statusCode,
"duration_ms": durationMs,
},
})
}

// TraceLocalProcess records local SGLang processing
func (t *Tracer) TraceLocalProcessStart(requestID string) {
if !t.IsActive() {
return
}

t.recordEvent(TraceEvent{
Name: "local_sglang",
Cat: "process",
Ph: "B",
Tid: hashString(requestID) % 1000,
Args: map[string]interface{}{
"request_id": requestID,
},
})
}

func (t *Tracer) TraceLocalProcessEnd(requestID string, statusCode int) {
if !t.IsActive() {
return
}

t.recordEvent(TraceEvent{
Name: "local_sglang",
Cat: "process",
Ph: "E",
Tid: hashString(requestID) % 1000,
Args: map[string]interface{}{
"request_id": requestID,
"status_code": statusCode,
},
})
}

// TracePolicyDecision records a policy decision (instant event)
func (t *Tracer) TracePolicyDecision(requestID, policy, decision string, metadata map[string]interface{}) {
if !t.IsActive() {
return
}

args := map[string]interface{}{
"request_id": requestID,
"policy": policy,
"decision": decision,
}
for k, v := range metadata {
args[k] = v
}

t.recordEvent(TraceEvent{
Name: "policy_decision",
Cat: "policy",
Ph: "i", // Instant event
Tid: hashString(requestID) % 1000,
Args: args,
})
}

// TraceCapacityCheck records capacity check result
func (t *Tracer) TraceCapacityCheck(requestID string, hasCapacity bool, runningReqs, waitingReqs int) {
if !t.IsActive() {
return
}

t.recordEvent(TraceEvent{
Name: "capacity_check",
Cat: "lb",
Ph: "i",
Tid: hashString(requestID) % 1000,
Args: map[string]interface{}{
"request_id": requestID,
"has_capacity": hasCapacity,
"running_reqs": runningReqs,
"waiting_reqs": waitingReqs,
},
})
}

// TraceTokenization records how long tokenization took (critical for GORGO overhead)
func (t *Tracer) TraceTokenization(requestID string, textLen, tokenCount int, durationUs int64) {
if !t.IsActive() {
return
}

t.recordEvent(TraceEvent{
Name: "tokenize",
Cat: "gorgo",
Ph: "X", // Complete event with duration
Tid: hashString(requestID) % 1000,
Dur: durationUs,
Args: map[string]interface{}{
"request_id": requestID,
"text_len": textLen,
"token_count": tokenCount,
"duration_us": durationUs,
},
})
}

// TraceGORGODecision records GORGO's routing decision with cost calculation
func (t *Tracer) TraceGORGODecision(requestID string, decision string, metadata map[string]interface{}) {
if !t.IsActive() {
return
}

args := map[string]interface{}{
"request_id": requestID,
"decision": decision,
}
for k, v := range metadata {
args[k] = v
}

t.recordEvent(TraceEvent{
Name: "gorgo_decision",
Cat: "gorgo",
Ph: "i",
Tid: hashString(requestID) % 1000,
Args: args,
})
}

// =====================================================
// DISTRIBUTED TRACING - Receive events from other nodes
// =====================================================

// ReceiveRemoteEvents merges trace events from a forwarded request
func (t *Tracer) ReceiveRemoteEvents(events []TraceEvent) {
if !t.IsActive() {
return
}

t.sessionMu.RLock()
session := t.session
t.sessionMu.RUnlock()

if session == nil {
return
}

session.remoteEventsMu.Lock()
session.remoteEvents = append(session.remoteEvents, events...)
session.remoteEventsMu.Unlock()
}

// GetTraceHeaders returns headers to propagate trace context
func (t *Tracer) GetTraceHeaders(requestID string) map[string]string {
if !t.IsActive() {
return nil
}

t.sessionMu.RLock()
defer t.sessionMu.RUnlock()

if t.session == nil {
return nil
}

return map[string]string{
"X-Trace-ID": t.session.ID,
"X-Trace-Request": requestID,
"X-Trace-Origin": t.nodeID,
}
}

// =====================================================
// INTERNAL HELPERS
// =====================================================

func (t *Tracer) recordEvent(event TraceEvent) {
t.sessionMu.RLock()
session := t.session
t.sessionMu.RUnlock()

if session == nil || !session.active.Load() {
return
}

// Set timestamp relative to trace start
event.Ts = time.Since(session.StartTime).Microseconds()
event.Pid = t.nodePID

session.eventsMu.Lock()
session.events = append(session.events, event)
session.eventsMu.Unlock()
}

// hashString creates a simple hash for consistent thread IDs
func hashString(s string) int {
h := 0
for _, c := range s {
h = 31*h + int(c)
}
if h < 0 {
h = -h
}
return h
}
Copy link

Copilot AI Jan 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pkg/trace/trace.go and pkg/cluster/trace.go files are identical duplicates (573 lines each). This violates the DRY principle and creates maintenance burden. One should be removed and the other should be imported where needed.

Copilot uses AI. Check for mistakes.
Comment on lines +110 to +111
std::fs::set_permissions(socket_path, std::fs::Permissions::from_mode(0o777))
.expect("Failed to set socket permissions");
Copy link

Copilot AI Jan 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the Rust tokenizer sidecar, the socket permissions are set to 0o777 (world-readable/writable). This is a security risk as any user on the system can connect to the tokenizer socket and potentially cause denial of service or other issues. Consider using more restrictive permissions like 0o660 and ensuring proper group ownership.

Copilot uses AI. Check for mistakes.
)

except Exception as e:
status = "error"
Copy link

Copilot AI Jan 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Variable status is not used.

Copilot uses AI. Check for mistakes.
Comment on lines 236 to 238
self.logger.error(f"[{request_id}] Request failed: {e}")
return web.json_response(
{"error": {"message": str(e), "type": "server_error"}},
Copy link

Copilot AI Jan 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Variable error_msg is not used.

Suggested change
self.logger.error(f"[{request_id}] Request failed: {e}")
return web.json_response(
{"error": {"message": str(e), "type": "server_error"}},
self.logger.error(f"[{request_id}] Request failed: {error_msg}")
return web.json_response(
{"error": {"message": error_msg, "type": "server_error"}},

Copilot uses AI. Check for mistakes.
import sys
import argparse
from collections import defaultdict
from typing import Dict, List, Tuple, Optional
Copy link

Copilot AI Jan 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Import of 'Optional' is not used.

Suggested change
from typing import Dict, List, Tuple, Optional
from typing import Dict, List, Tuple

Copilot uses AI. Check for mistakes.
import time
import uuid
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
Copy link

Copilot AI Jan 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Import of 'Optional' is not used.
Import of 'List' is not used.

Suggested change
from typing import Any, Dict, List, Optional, Tuple
from typing import Any, Dict, Tuple

Copilot uses AI. Check for mistakes.
Comment on lines +134 to +151
# Build GuideLLM command
GUIDELLM_CMD="guidellm benchmark \
--target http://localhost:$PROXY_PORT/v1 \
--profile $PROFILE \
--max-seconds $DURATION \
--data $SCRIPT_DIR/wildchat_guidellm.jsonl \
--data-column-mapper '{\"text_column\": \"prompt\"}' \
--output-path $OUTPUT_DIR"

# Add rate for profiles that need it
if [ "$PROFILE" = "poisson" ] || [ "$PROFILE" = "constant" ] || [ "$PROFILE" = "concurrent" ] || [ "$PROFILE" = "throughput" ]; then
GUIDELLM_CMD="$GUIDELLM_CMD --rate $RATE"
fi

echo "Running: $GUIDELLM_CMD"
echo ""

eval $GUIDELLM_CMD
Copy link

Copilot AI Jan 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The use of eval $GUIDELLM_CMD with command-line-controlled variables like PROFILE, RATE, DURATION, and OUTPUT_DIR creates a command injection risk: a crafted argument (for example, --profile 'poisson; rm -rf ~') would cause arbitrary shell commands to execute in the context of whoever runs this script. To prevent this, avoid eval entirely and invoke guidellm directly using a properly quoted argument list (e.g., building an array and calling it without eval) so user-supplied values cannot break out of their argument positions.

Suggested change
# Build GuideLLM command
GUIDELLM_CMD="guidellm benchmark \
--target http://localhost:$PROXY_PORT/v1 \
--profile $PROFILE \
--max-seconds $DURATION \
--data $SCRIPT_DIR/wildchat_guidellm.jsonl \
--data-column-mapper '{\"text_column\": \"prompt\"}' \
--output-path $OUTPUT_DIR"
# Add rate for profiles that need it
if [ "$PROFILE" = "poisson" ] || [ "$PROFILE" = "constant" ] || [ "$PROFILE" = "concurrent" ] || [ "$PROFILE" = "throughput" ]; then
GUIDELLM_CMD="$GUIDELLM_CMD --rate $RATE"
fi
echo "Running: $GUIDELLM_CMD"
echo ""
eval $GUIDELLM_CMD
# Build GuideLLM command as an argument array to avoid command injection
GUIDELLM_ARGS=(
benchmark
--target "http://localhost:$PROXY_PORT/v1"
--profile "$PROFILE"
--max-seconds "$DURATION"
--data "$SCRIPT_DIR/wildchat_guidellm.jsonl"
--data-column-mapper '{"text_column": "prompt"}'
--output-path "$OUTPUT_DIR"
)
# Add rate for profiles that need it
if [ "$PROFILE" = "poisson" ] || [ "$PROFILE" = "constant" ] || [ "$PROFILE" = "concurrent" ] || [ "$PROFILE" = "throughput" ]; then
GUIDELLM_ARGS+=( --rate "$RATE" )
fi
# Show the command being run
printf 'Running: guidellm'
for arg in "${GUIDELLM_ARGS[@]}"; do
printf ' %q' "$arg"
done
printf '\n\n'
guidellm "${GUIDELLM_ARGS[@]}"

Copilot uses AI. Check for mistakes.
@atoniolo76
Copy link
Owner Author

atoniolo76 commented Jan 29, 2026 via email

abinayadinesh1 and others added 16 commits January 28, 2026 18:58
Adds a new CLI command to visualize network latencies between cluster nodes:
- `gotoni lb latency --all` shows latencies from all nodes to their peers
- `--matrix` flag displays as a latency matrix for easier comparison
- Helps understand inter-region transport costs for GORGO/GORGO2 decisions

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <[email protected]>
SSH keys are now looked up directly from ~/.ssh directory by name,
without requiring the local database. The Lambda API provides the
SSH key name for each instance, and we find the corresponding .pem
file in the user's ssh directory.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <[email protected]>
- Add pkg/proxy/proxy.go: Centralized HTTP proxy with GORGO-style routing
  - Global request tracking across all SGLang servers
  - Cost calculation: queued_tokens * ms_per_token + running_tokens * ms_per_token * 0.5
  - EWMA latency tracking with dedicated probing
  - Prefix tree for KV cache-aware routing

- Add cmd/proxy.go: CLI commands for proxy management
  - proxy start/stop/status: Local proxy control
  - proxy deploy: Build, upload, and start proxy on remote instance
  - proxy servers add/remove: Manage SGLang server pool
  - proxy tune: Adjust GORGO parameters at runtime
  - proxy latency: Display latencies to all servers
  - --remote flag: Resolve instance names via Lambda API

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <[email protected]>
…+ lb installation, Update README for Research
Updated README to include './' before commands for consistency.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants