From dc8678baf18377237632a15cae47a9f4dd5f86be Mon Sep 17 00:00:00 2001 From: MorningTZH Date: Mon, 23 Mar 2026 17:14:00 +0800 Subject: [PATCH 1/3] fix(placement): prevent thundering herd with optimistic resource accounting and shadow state High concurrency placement requests were causing severe "thundering herd" issues due to stale node metrics. The orchestrator would continuously schedule multiple sandboxes on the same seemingly "empty" node before it could report updated resource usage (the "invisibility gap"). This commit introduces active load prediction and optimistic resource reservation to ensure perfectly balanced placement even during metric reporting intervals. Changes: - fix(placement): factor `InProgress` pending resources into the `BestOfK` scoring calculation to predict expected load. - fix(nodemanager): implement `OptimisticAdd` to immediately reserve resources upon successful placement, bridging the gap before async metric updates arrive. - test(placement): refactor `SimulatedNode` into a `NodeSimulator` interface to support diverse node behavior simulations. - test(placement): introduce `LaggyNode` to simulate real-world scenarios with stale/delayed node metrics. - test(placement): add `BenchmarkPlacementDistribution` to visualize load distribution and verify the elimination of the thundering herd effect under high concurrency. Signed-off-by: MorningTZH --- .../internal/orchestrator/nodemanager/node.go | 9 + .../orchestrator/placement/placement.go | 8 + .../placement/placement_benchmark_test.go | 491 ++++++++++++++---- .../placement/placement_best_of_K.go | 10 +- 4 files changed, 422 insertions(+), 96 deletions(-) diff --git a/packages/api/internal/orchestrator/nodemanager/node.go b/packages/api/internal/orchestrator/nodemanager/node.go index dd7f4fc634..3ca96a2cb7 100644 --- a/packages/api/internal/orchestrator/nodemanager/node.go +++ b/packages/api/internal/orchestrator/nodemanager/node.go @@ -177,3 +177,12 @@ func (n *Node) GetClient(ctx context.Context) (*clusters.GRPCClient, context.Con func (n *Node) IsNomadManaged() bool { return n.NomadNodeShortID != UnknownNomadNodeShortID } + +func (n *Node) OptimisticAdd(res SandboxResources) { + n.metricsMu.Lock() + defer n.metricsMu.Unlock() + + // Directly accumulate to the current metrics view + n.metrics.CpuAllocated += uint32(res.CPUs) + n.metrics.MemoryAllocatedBytes += uint64(res.MiBMemory) * 1024 * 1024 // Note: CpuPercent is difficult to estimate, usually just updating Allocated is sufficient for the scheduling algorithm +} \ No newline at end of file diff --git a/packages/api/internal/orchestrator/placement/placement.go b/packages/api/internal/orchestrator/placement/placement.go index b07e75501f..a2a4f9ea2b 100644 --- a/packages/api/internal/orchestrator/placement/placement.go +++ b/packages/api/internal/orchestrator/placement/placement.go @@ -79,6 +79,14 @@ func PlaceSandbox(ctx context.Context, algorithm Algorithm, clusterNodes []*node if err == nil { node.PlacementMetrics.Success(sbxRequest.GetSandbox().GetSandboxId()) + // Optimistic update: assume resources are occupied after successful creation. + // Manually update node.metrics with the newly allocated resources. + // This will be overwritten by the next real Metrics report for auto-correction. + node.OptimisticAdd(nodemanager.SandboxResources{ + CPUs: sbxRequest.GetSandbox().GetVcpu(), + MiBMemory: sbxRequest.GetSandbox().GetRamMb(), + }) + return node, nil } diff --git a/packages/api/internal/orchestrator/placement/placement_benchmark_test.go b/packages/api/internal/orchestrator/placement/placement_benchmark_test.go index 7b6ad043c7..4bc5332611 100644 --- a/packages/api/internal/orchestrator/placement/placement_benchmark_test.go +++ b/packages/api/internal/orchestrator/placement/placement_benchmark_test.go @@ -49,156 +49,270 @@ type LiveSandbox struct { PlacementLatency time.Duration } -// SimulatedNode represents a node with realistic resource tracking -type SimulatedNode struct { - *nodemanager.Node +// NodeSimulator defines the common behavior of simulated nodes +type NodeSimulator interface { + // GetNode returns the underlying nodemanager.Node object for use by the algorithm + GetNode() *nodemanager.Node + // PlaceSandbox attempts to place a Sandbox on the node, returns whether successful + PlaceSandbox(sbx *LiveSandbox) bool + // RemoveSandbox removes a Sandbox from the node + RemoveSandbox(sandboxID string) + // GetUtilization returns the current CPU and memory utilization (0-100) + GetUtilization() (float64, float64) + // GetSandboxCount returns the number of currently running Sandboxes + GetSandboxCount() int +} + +// NodeFactory defines the function signature for creating simulated nodes +type NodeFactory func(id string, config BenchmarkConfig) NodeSimulator +// StandardNode implements the original SimulatedNode logic (real-time metric synchronization) +type StandardNode struct { + *nodemanager.Node mu sync.RWMutex sandboxes map[string]*LiveSandbox totalPlacements int64 - rejectedPlacements int64 - lastUpdateTime time.Time + rejectedPlacements int64 // counts failed placements due to capacity } -// BenchmarkMetrics contains detailed metrics from the benchmark -type BenchmarkMetrics struct { - // Placement metrics - TotalPlacements int64 - SuccessfulPlacements int64 - FailedPlacements int64 - AvgPlacementTime time.Duration - MaxPlacementTime time.Duration - MinPlacementTime time.Duration - P50PlacementTime time.Duration - P95PlacementTime time.Duration - P99PlacementTime time.Duration - - // Node utilization metrics - AvgNodeCPUUtilization float64 - MaxNodeCPUUtilization float64 - MinNodeCPUUtilization float64 - AvgNodeMemUtilization float64 - MaxNodeMemUtilization float64 - MinNodeMemUtilization float64 - - // Load distribution metrics - CPULoadStdDev float64 - MemLoadStdDev float64 - LoadImbalanceCoefficient float64 -} +// Ensure StandardNode implements the interface +var _ NodeSimulator = &StandardNode{} -// createSimulatedNodes creates nodes with realistic resource tracking -func createSimulatedNodes(config BenchmarkConfig) []*SimulatedNode { - nodes := make([]*SimulatedNode, config.NumNodes) - for i := range config.NumNodes { - // Create base node - baseNode := nodemanager.NewTestNode( - fmt.Sprintf("node-%d", i), +func NewStandardNode(id string, config BenchmarkConfig) NodeSimulator { + return &StandardNode{ + Node: nodemanager.NewTestNode( + id, api.NodeStatusReady, - 0, // Start with no load + 0, config.NodeCPUCapacity, nodemanager.WithSandboxSleepingClient(config.SandboxCreateDuration), - ) - - simNode := &SimulatedNode{ - Node: baseNode, - sandboxes: make(map[string]*LiveSandbox), - lastUpdateTime: time.Now(), - } - nodes[i] = simNode + ), + sandboxes: make(map[string]*LiveSandbox), } +} - return nodes +func (n *StandardNode) GetNode() *nodemanager.Node { + return n.Node } -// placeSandbox places a sandbox on the node -func (n *SimulatedNode) placeSandbox(sbx *LiveSandbox) bool { +func (n *StandardNode) PlaceSandbox(sbx *LiveSandbox) bool { n.mu.Lock() defer n.mu.Unlock() metrics := n.Metrics() - // Check capacity with overcommit - if metrics.CpuAllocated+uint32(sbx.RequestedCPU) > metrics.CpuCount*4 { // 4x overcommit + // Check capacity with overcommit (Original Logic) + if metrics.CpuAllocated+uint32(sbx.RequestedCPU) > metrics.CpuCount*4 { atomic.AddInt64(&n.rejectedPlacements, 1) - return false } + // Real-time update: directly modify Node Metrics n.UpdateMetricsFromServiceInfoResponse(&orchestrator.ServiceInfoResponse{ - MetricSandboxesRunning: uint32(len(n.sandboxes)) + 1, - // Host system usage metrics - MetricCpuPercent: metrics.CpuPercent + uint32(sbx.ActualCPUUsage*100), - MetricMemoryUsedBytes: metrics.MemoryUsedBytes + uint64(sbx.ActualMemUsage), - // Host system total resources - MetricCpuCount: metrics.CpuCount, - MetricMemoryTotalBytes: metrics.MemoryTotalBytes, - // Allocated resources to sandboxes + MetricSandboxesRunning: uint32(len(n.sandboxes)) + 1, + MetricCpuPercent: metrics.CpuPercent + uint32(sbx.ActualCPUUsage*100), + MetricMemoryUsedBytes: metrics.MemoryUsedBytes + uint64(sbx.ActualMemUsage), + MetricCpuCount: metrics.CpuCount, + MetricMemoryTotalBytes: metrics.MemoryTotalBytes, MetricCpuAllocated: metrics.CpuAllocated + uint32(sbx.RequestedCPU), MetricMemoryAllocatedBytes: metrics.MemoryAllocatedBytes + uint64(sbx.RequestedMemory)*1024*1024, }) n.sandboxes[sbx.ID] = sbx atomic.AddInt64(&n.totalPlacements, 1) - return true } -// removeSandbox removes a sandbox from the node -func (n *SimulatedNode) removeSandbox(sandboxID string) { +func (n *StandardNode) RemoveSandbox(sandboxID string) { n.mu.Lock() defer n.mu.Unlock() metrics := n.Metrics() - if sbx, exists := n.sandboxes[sandboxID]; exists { n.UpdateMetricsFromServiceInfoResponse(&orchestrator.ServiceInfoResponse{ - MetricSandboxesRunning: uint32(len(n.sandboxes)) - 1, - - MetricCpuPercent: metrics.CpuPercent - uint32(sbx.ActualCPUUsage*100), - MetricMemoryUsedBytes: metrics.MemoryUsedBytes - uint64(sbx.ActualMemUsage), - + MetricSandboxesRunning: uint32(len(n.sandboxes)) - 1, + MetricCpuPercent: metrics.CpuPercent - uint32(sbx.ActualCPUUsage*100), + MetricMemoryUsedBytes: metrics.MemoryUsedBytes - uint64(sbx.ActualMemUsage), MetricCpuAllocated: metrics.CpuAllocated - uint32(sbx.RequestedCPU), MetricMemoryAllocatedBytes: metrics.MemoryAllocatedBytes - uint64(sbx.RequestedMemory)*1024*1024, - - MetricCpuCount: metrics.CpuCount, - MetricMemoryTotalBytes: metrics.MemoryTotalBytes, + MetricCpuCount: metrics.CpuCount, + MetricMemoryTotalBytes: metrics.MemoryTotalBytes, }) - delete(n.sandboxes, sandboxID) } } -// getUtilization returns current CPU and memory utilization percentages -func (n *SimulatedNode) getUtilization() (cpuUtil, memUtil float64) { +func (n *StandardNode) GetUtilization() (float64, float64) { n.mu.RLock() defer n.mu.RUnlock() - metrics := n.Metrics() + var cpuUtil, memUtil float64 if metrics.CpuCount > 0 { cpuUtil = ((float64(metrics.CpuPercent) / 100) / float64(metrics.CpuCount)) * 100 } if metrics.MemoryTotalBytes > 0 { memUtil = (float64(metrics.MemoryUsedBytes) / float64(metrics.MemoryTotalBytes)) * 100 } - return cpuUtil, memUtil } +func (n *StandardNode) GetSandboxCount() int { + n.mu.RLock() + defer n.mu.RUnlock() + return len(n.sandboxes) +} + +// LaggyNode simulates metric lag. +// It maintains both the “real” state and “reported” state (Node Metrics). +// Real state is only synced to Node Metrics when SyncMetrics() is called. +type LaggyNode struct { + *StandardNode // Embed StandardNode to reuse logic + + // Internal resource state (hidden from Orchestrator until SyncMetrics is called) + realCpuAllocated uint32 + realMemAllocated uint64 + realSandboxesRunning uint32 +} + +func NewLaggyNode(id string, config BenchmarkConfig) NodeSimulator { + base := NewStandardNode(id, config).(*StandardNode) + return &LaggyNode{ + StandardNode: base, + } +} + +// PlaceSandbox Override: only update real state, not Node Metrics +func (n *LaggyNode) PlaceSandbox(sbx *LiveSandbox) bool { + n.mu.Lock() + defer n.mu.Unlock() + + // 1. Admission control based on real capacity (simulating node-side rejection) + // Note: we use realCpuAllocated for the check + metrics := n.Node.Metrics() + if n.realCpuAllocated+uint32(sbx.RequestedCPU) > metrics.CpuCount*4 { + atomic.AddInt64(&n.rejectedPlacements, 1) + return false + } + + // 2. Update real state + n.sandboxes[sbx.ID] = sbx + n.realSandboxesRunning++ + n.realCpuAllocated += uint32(sbx.RequestedCPU) + n.realMemAllocated += uint64(sbx.RequestedMemory) * 1024 * 1024 + + atomic.AddInt64(&n.totalPlacements, 1) + + // Key: intentionally do NOT call UpdateMetricsFromServiceInfoResponse + // The metrics visible to Orchestrator remain unchanged until SyncMetrics is called + return true +} + +func (n *LaggyNode) RemoveSandbox(sandboxID string) { + n.mu.Lock() + defer n.mu.Unlock() + + if sbx, exists := n.sandboxes[sandboxID]; exists { + n.realSandboxesRunning-- + n.realCpuAllocated -= uint32(sbx.RequestedCPU) + n.realMemAllocated -= uint64(sbx.RequestedMemory) * 1024 * 1024 + delete(n.sandboxes, sandboxID) + } +} + +// SyncMetrics simulates heartbeat reporting, syncing real state to Orchestrator +func (n *LaggyNode) SyncMetrics() { + n.mu.Lock() + defer n.mu.Unlock() + + metrics := n.Node.Metrics() + + // Calculate real CPU usage (simplified as linear accumulation here, may be more complex in production) + var totalActualCpuUsage float64 + var totalActualMemUsage float64 + for _, sbx := range n.sandboxes { + totalActualCpuUsage += sbx.ActualCPUUsage + totalActualMemUsage += sbx.ActualMemUsage + } + + n.UpdateMetricsFromServiceInfoResponse(&orchestrator.ServiceInfoResponse{ + MetricSandboxesRunning: n.realSandboxesRunning, + MetricCpuPercent: uint32(totalActualCpuUsage * 100), + MetricMemoryUsedBytes: uint64(totalActualMemUsage), + MetricCpuAllocated: n.realCpuAllocated, + MetricMemoryAllocatedBytes: n.realMemAllocated, + + // Static fields remain unchanged + MetricCpuCount: metrics.CpuCount, + MetricMemoryTotalBytes: metrics.MemoryTotalBytes, + }) +} + +// BenchmarkMetrics contains detailed metrics from the benchmark +type BenchmarkMetrics struct { + // Placement metrics + TotalPlacements int64 + SuccessfulPlacements int64 + FailedPlacements int64 + AvgPlacementTime time.Duration + MaxPlacementTime time.Duration + MinPlacementTime time.Duration + P50PlacementTime time.Duration + P95PlacementTime time.Duration + P99PlacementTime time.Duration + + // Node utilization metrics + AvgNodeCPUUtilization float64 + MaxNodeCPUUtilization float64 + MinNodeCPUUtilization float64 + AvgNodeMemUtilization float64 + MaxNodeMemUtilization float64 + MinNodeMemUtilization float64 + + // Load distribution metrics + CPULoadStdDev float64 + MemLoadStdDev float64 + LoadImbalanceCoefficient float64 +} + +// createSimulatedNodes creates nodes using a factory function +func createSimulatedNodes(config BenchmarkConfig, factory NodeFactory) []NodeSimulator { + if factory == nil { + factory = NewStandardNode // Default to standard node + } + + nodes := make([]NodeSimulator, config.NumNodes) + for i := range config.NumNodes { + nodes[i] = factory(fmt.Sprintf("node-%d", i), config) + } + return nodes +} + +// Helper function: converts NodeSimulator slice to *nodemanager.Node slice (for algorithm use) +func toNodeManagerNodes(simNodes []NodeSimulator) []*nodemanager.Node { + nodes := make([]*nodemanager.Node, len(simNodes)) + for i, n := range simNodes { + nodes[i] = n.GetNode() + } + return nodes +} + // runBenchmark runs a comprehensive placement benchmark with lifecycle tracking -func runBenchmark(b *testing.B, algorithm Algorithm, config BenchmarkConfig) *BenchmarkMetrics { +func runBenchmark(b *testing.B, algorithm Algorithm, config BenchmarkConfig, nodeFactory NodeFactory) *BenchmarkMetrics { b.Helper() - ctx, cancel := context.WithTimeout(b.Context(), config.BenchmarkDuration) + parentCtx := b.Context() + if parentCtx == nil { + parentCtx = context.Background() + } + ctx, cancel := context.WithTimeout(parentCtx, config.BenchmarkDuration) defer cancel() - // Create simulated nodes - simNodes := createSimulatedNodes(config) + // Create nodes using factory + simNodes := createSimulatedNodes(config, nodeFactory) - // Convert to nodemanager.Node slice for algorithm - nodes := make([]*nodemanager.Node, len(simNodes)) - nodeMap := make(map[string]*SimulatedNode) - for i, n := range simNodes { - nodes[i] = n.Node - nodeMap[n.ID] = n + // Convert node list + nodes := toNodeManagerNodes(simNodes) + nodeMap := make(map[string]NodeSimulator) + for _, n := range simNodes { + nodeMap[n.GetNode().ID] = n } // Initialize metrics @@ -234,7 +348,7 @@ func runBenchmark(b *testing.B, algorithm Algorithm, config BenchmarkConfig) *Be if now.Sub(sandbox.StartTime) > sandbox.PlannedDuration { // Remove from node if node, exists := nodeMap[sandbox.NodeID]; exists { - node.removeSandbox(sandbox.ID) + node.RemoveSandbox(sandbox.ID) } // Remove from active list activeSandboxes.Delete(key) @@ -296,8 +410,9 @@ func runBenchmark(b *testing.B, algorithm Algorithm, config BenchmarkConfig) *Be return func() { placementStart := time.Now() node, err := PlaceSandbox(ctx, algorithm, nodes, nil, &orchestratorgrpc.SandboxCreateRequest{Sandbox: &orchestratorgrpc.SandboxConfig{ - Vcpu: sbx.RequestedCPU, - RamMb: sbx.RequestedMemory, + SandboxId: sbx.ID, + Vcpu: sbx.RequestedCPU, + RamMb: sbx.RequestedMemory, }}, machineinfo.MachineInfo{}, false, nil) placementTime := time.Since(placementStart) @@ -320,7 +435,7 @@ func runBenchmark(b *testing.B, algorithm Algorithm, config BenchmarkConfig) *Be // Find the simulated node and place the sandbox if simNode, exists := nodeMap[node.ID]; exists { sbx.NodeID = node.ID - if simNode.placeSandbox(sbx) { + if simNode.PlaceSandbox(sbx) { activeSandboxes.Store(sbx.ID, sbx) metrics.SuccessfulPlacements++ atomic.AddInt64(&recentSuccesses, 1) @@ -358,7 +473,7 @@ func runBenchmark(b *testing.B, algorithm Algorithm, config BenchmarkConfig) *Be } // calculateFinalMetrics calculates comprehensive final metrics -func calculateFinalMetrics(metrics *BenchmarkMetrics, nodes []*SimulatedNode, placementTimes []time.Duration) { +func calculateFinalMetrics(metrics *BenchmarkMetrics, nodes []NodeSimulator, placementTimes []time.Duration) { // Calculate placement time metrics if len(placementTimes) > 0 { var totalTime time.Duration @@ -383,7 +498,7 @@ func calculateFinalMetrics(metrics *BenchmarkMetrics, nodes []*SimulatedNode, pl metrics.MinNodeMemUtilization = 100.0 for _, node := range nodes { - cpuUtil, memUtil := node.getUtilization() + cpuUtil, memUtil := node.GetUtilization() cpuUtils = append(cpuUtils, cpuUtil) memUtils = append(memUtils, memUtil) @@ -458,7 +573,7 @@ func BenchmarkPlacementComparison(t *testing.B) { for _, alg := range algorithms { t.Run(alg.name, func(t *testing.B) { - metrics := runBenchmark(t, alg.algo, config) + metrics := runBenchmark(t, alg.algo, config, NewStandardNode) t.Logf("\n=== %s Results ===", alg.name) t.Logf("Placement Performance:") @@ -488,3 +603,189 @@ func BenchmarkPlacementComparison(t *testing.B) { }) } } + +// BenchmarkPlacementDistribution visualizes load distribution across nodes (simulating metric lag) +// Run command: go test -v -bench=BenchmarkPlacementDistribution -run=^$ ./internal/orchestrator/placement +func BenchmarkPlacementDistribution(b *testing.B) { + // Scenario config: “thundering herd” test under extremely high concurrency + // Use LaggyNode to simulate metric reporting delay: Orchestrator always sees stale metrics + config := BenchmarkConfig{ + NumNodes: 10, // Fewer nodes to make load hotspots more apparent + SandboxStartRate: 20, // 20 requests per second (burst traffic) + AvgSandboxCPU: 1, // 1 vCPU per Sandbox + AvgSandboxMemory: 1024, // 1024 MiB + CPUVariance: 0.0, // Fixed spec for easier distribution observation + MemoryVariance: 0.0, + ActualUsageRatio: 1.0, // Assume full utilization + ActualUsageVariance: 0.0, + SandboxDuration: 1 * time.Minute, // Long enough to ensure no release during test + BenchmarkDuration: 25 * time.Second, // Run long enough to trigger at least one heartbeat sync (20s ticker) + NodeCPUCapacity: 192, // 192 vCPU per Node + NodeMemoryCapacity: 1024 * 1024 * 1024 * 512, // 512 GiB per Node + SandboxCreateDuration: time.Millisecond, + } + + algorithms := []struct { + name string + algo Algorithm + }{ + // Compare algorithms here. Expect LeastBusy to have serious hotspot issues. + // {"LeastBusy", &LeastBusyAlgorithm{}}, + {"BestOfK_K3", NewBestOfK(DefaultBestOfKConfig())}, + {"BestOfK_K5", NewBestOfK(BestOfKConfig{R: 4, K: 5, Alpha: 0.5})}, + } + + for _, alg := range algorithms { + b.Run(alg.name, func(b *testing.B) { + b.Logf("Running distribution test for %s with LaggyNodes...", alg.name) + + parentCtx := b.Context() + if parentCtx == nil { + parentCtx = context.Background() + } + ctx, cancel := context.WithTimeout(parentCtx, config.BenchmarkDuration) + defer cancel() + + // 1. Create nodes using NewLaggyNode (default: only update internal real state, not Metrics) + simNodes := createSimulatedNodes(config, NewLaggyNode) + nodes := toNodeManagerNodes(simNodes) + nodeMap := make(map[string]NodeSimulator) + + // 2. Warmup: apply slight random baseline noise to nodes + // This breaks the initial tie in LeastBusy state, causing it to quickly lock onto a “victim” + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + for _, n := range simNodes { + nodeMap[n.GetNode().ID] = n + if ln, ok := n.(*LaggyNode); ok { + ln.realCpuAllocated = uint32(rng.Intn(5)) // 0-4 CPU random baseline noise + ln.SyncMetrics() // Initial sync once + } + } + + // 3. Start heartbeat simulator + // Simulates reporting metrics every 20s in a real environment. + // All requests between heartbeats see stale metrics. + go func() { + ticker := time.NewTicker(20 * time.Second) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + for _, n := range simNodes { + if ln, ok := n.(*LaggyNode); ok { + ln.SyncMetrics() + } + } + } + } + }() + + // 4. Concurrently generate Sandbox requests + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + ticker := time.NewTicker(time.Second / time.Duration(config.SandboxStartRate)) + defer ticker.Stop() + + var sandboxIDCounter int64 + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + sandboxID := atomic.AddInt64(&sandboxIDCounter, 1) + sbx := &LiveSandbox{ + ID: fmt.Sprintf("sbx-%d", sandboxID), + RequestedCPU: config.AvgSandboxCPU, + RequestedMemory: config.AvgSandboxMemory, + ActualCPUUsage: float64(config.AvgSandboxCPU), + ActualMemUsage: float64(config.AvgSandboxMemory) * 1024 * 1024, + StartTime: time.Now(), + } + + wg.Add(1) + go func(s *LiveSandbox) { + defer wg.Done() + + // Execute placement algorithm + node, err := PlaceSandbox(ctx, alg.algo, nodes, nil, &orchestratorgrpc.SandboxCreateRequest{ + Sandbox: &orchestratorgrpc.SandboxConfig{ + SandboxId: s.ID, + Vcpu: s.RequestedCPU, + RamMb: s.RequestedMemory, + }, + }, machineinfo.MachineInfo{}, false, nil) + + if err == nil && node != nil { + if simNode, ok := nodeMap[node.ID]; ok { + // Placement successful, but Metrics won't update immediately (LaggyNode feature) + simNode.PlaceSandbox(s) + } + } + }(sbx) + } + } + }() + + <-ctx.Done() + wg.Wait() + + // 5. Result visualization + b.Logf("\n=== Distribution Results (Laggy Metrics): %s ===", alg.name) + + maxCount := 0 + totalCount := 0 + for _, n := range simNodes { + count := n.GetSandboxCount() + totalCount += count + if count > maxCount { + maxCount = count + } + } + + if totalCount == 0 { + b.Log("No sandboxes placed.") + return + } + + // Print ASCII histogram + for i, n := range simNodes { + count := n.GetSandboxCount() + barLen := 0 + if maxCount > 0 { + barLen = int(float64(count) / float64(maxCount) * 40) // Max 40 characters + } + + bar := "" + for k := 0; k < barLen; k++ { + bar += "█" + } + + // Force Sync before printing to ensure we see the final real values + if ln, ok := n.(*LaggyNode); ok { + ln.SyncMetrics() + } + cpuUtil, _ := n.GetUtilization() + + b.Logf("Node %02d |%s| %d (CPU: %.1f%%)", i, fmt.Sprintf("%-40s", bar), count, cpuUtil) + } + + // Calculate distribution statistics (CV coefficient of variation) + avg := float64(totalCount) / float64(len(simNodes)) + var sumDiffSq float64 + for _, n := range simNodes { + diff := float64(n.GetSandboxCount()) - avg + sumDiffSq += diff * diff + } + stdDev := math.Sqrt(sumDiffSq / float64(len(simNodes))) + cv := 0.0 + if avg > 0 { + cv = stdDev / avg + } + b.Logf("Stats: Total=%d, Avg=%.1f, StdDev=%.2f, Imbalance(CV)=%.3f", totalCount, avg, stdDev, cv) + }) + } +} \ No newline at end of file diff --git a/packages/api/internal/orchestrator/placement/placement_best_of_K.go b/packages/api/internal/orchestrator/placement/placement_best_of_K.go index 32675124e2..f88065daa5 100644 --- a/packages/api/internal/orchestrator/placement/placement_best_of_K.go +++ b/packages/api/internal/orchestrator/placement/placement_best_of_K.go @@ -38,7 +38,15 @@ func DefaultBestOfKConfig() BestOfKConfig { // Score calculates the placement score for this node func (b *BestOfK) Score(node *nodemanager.Node, resources nodemanager.SandboxResources, config BestOfKConfig) float64 { metrics := node.Metrics() - reserved := metrics.CpuAllocated + + // Get locally recorded resources that haven't been reported yet. + pendingCPUs := int64(0) + for _, res := range node.PlacementMetrics.InProgress() { + pendingCPUs += res.CPUs + } + + // Combine allocated resources with in-progress allocations + reserved := metrics.CpuAllocated + uint32(pendingCPUs) // 1 CPU used = 100% CPU percept usageAvg := float64(metrics.CpuPercent) / 100 From 130e2b7ccab0e7d9dfde145efa7737a7908a44ba Mon Sep 17 00:00:00 2001 From: MorningTZH Date: Tue, 24 Mar 2026 17:32:23 +0800 Subject: [PATCH 2/3] unit tests Signed-off-by: MorningTZH --- .../orchestrator/nodemanager/node_test.go | 32 +++++++++++++++++ .../placement/placement_best_of_K_test.go | 30 ++++++++++++++++ .../orchestrator/placement/placement_test.go | 34 +++++++++++++++++++ 3 files changed, 96 insertions(+) create mode 100644 packages/api/internal/orchestrator/nodemanager/node_test.go diff --git a/packages/api/internal/orchestrator/nodemanager/node_test.go b/packages/api/internal/orchestrator/nodemanager/node_test.go new file mode 100644 index 0000000000..4eb48efb9d --- /dev/null +++ b/packages/api/internal/orchestrator/nodemanager/node_test.go @@ -0,0 +1,32 @@ +package nodemanager_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/e2b-dev/infra/packages/api/internal/api" + "github.com/e2b-dev/infra/packages/api/internal/orchestrator/nodemanager" +) + +func TestNode_OptimisticAdd(t *testing.T) { + t.Parallel() + + // Initialize an idle node + node := nodemanager.NewTestNode("test-node", api.NodeStatusReady, 0, 4) + initialMetrics := node.Metrics() + + // Simulate the resources to be allocated + res := nodemanager.SandboxResources{ + CPUs: 2, + MiBMemory: 1024, // 1GB + } + + // Perform optimistic addition + node.OptimisticAdd(res) + newMetrics := node.Metrics() + + // Verify that CPU and memory are increased as expected + assert.Equal(t, initialMetrics.CpuAllocated+uint32(res.CPUs), newMetrics.CpuAllocated) + assert.Equal(t, initialMetrics.MemoryAllocatedBytes+uint64(res.MiBMemory)*1024*1024, newMetrics.MemoryAllocatedBytes) +} \ No newline at end of file diff --git a/packages/api/internal/orchestrator/placement/placement_best_of_K_test.go b/packages/api/internal/orchestrator/placement/placement_best_of_K_test.go index a143cb2865..c13affc950 100644 --- a/packages/api/internal/orchestrator/placement/placement_best_of_K_test.go +++ b/packages/api/internal/orchestrator/placement/placement_best_of_K_test.go @@ -65,6 +65,36 @@ func TestBestOfK_Score_PreferBiggerNode(t *testing.T) { assert.Greater(t, score, score2) } +func TestBestOfK_Score_WithPendingResources(t *testing.T) { + t.Parallel() + config := DefaultBestOfKConfig() + algo := NewBestOfK(config).(*BestOfK) + + // Create two nodes with identical base loads + nodeNormal := nodemanager.NewTestNode("node-normal", api.NodeStatusReady, 0, 4) + nodeWithPending := nodemanager.NewTestNode("node-pending", api.NodeStatusReady, 0, 4) + + // Inject InProgress resources into nodeWithPending using StartPlacing + // This simulates a Sandbox that is currently being placed but hasn't fully started + pendingRes := nodemanager.SandboxResources{ + CPUs: 2, + MiBMemory: 1024, + } + nodeWithPending.PlacementMetrics.StartPlacing("pending-sbx-1", pendingRes) + + reqResources := nodemanager.SandboxResources{ + CPUs: 1, + MiBMemory: 512, + } + + scoreNormal := algo.Score(nodeNormal, reqResources, config) + scorePending := algo.Score(nodeWithPending, reqResources, config) + + // A node with pending resources has a higher 'reserved' CPU count, + // so its calculated Score should be greater (meaning worse/lower priority) + assert.Greater(t, scorePending, scoreNormal, "Node with pending resources should receive a higher (worse) score") +} + func TestBestOfK_CanFit(t *testing.T) { t.Parallel() config := DefaultBestOfKConfig() diff --git a/packages/api/internal/orchestrator/placement/placement_test.go b/packages/api/internal/orchestrator/placement/placement_test.go index 0b699d1334..df9c553452 100644 --- a/packages/api/internal/orchestrator/placement/placement_test.go +++ b/packages/api/internal/orchestrator/placement/placement_test.go @@ -213,3 +213,37 @@ func TestPlaceSandbox_ResourceExhausted(t *testing.T) { // Verify node1 was NOT excluded (ResourceExhausted nodes should be retried) algorithm.AssertNumberOfCalls(t, "chooseNode", 2) } + +func TestPlaceSandbox_TriggersOptimisticUpdate(t *testing.T) { + t.Parallel() + ctx := t.Context() + + // Create a node and record the initial allocated CPU + node1 := nodemanager.NewTestNode("node1", api.NodeStatusReady, 0, 4) + initialCpuAllocated := node1.Metrics().CpuAllocated + + nodes := []*nodemanager.Node{node1} + + // Mock algorithm directly returns node1 + algorithm := &mockAlgorithm{} + algorithm.On("chooseNode", mock.Anything, nodes, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return(node1, nil) + + // Request 2 vCPUs + sbxRequest := &orchestrator.SandboxCreateRequest{ + Sandbox: &orchestrator.SandboxConfig{ + SandboxId: "test-optimistic-sandbox", + Vcpu: 2, + RamMb: 1024, + }, + } + + resultNode, err := PlaceSandbox(ctx, algorithm, nodes, nil, sbxRequest, machineinfo.MachineInfo{}, false, nil) + + require.NoError(t, err) + assert.NotNil(t, resultNode) + + // Verify: After successful placement, the node's CpuAllocated should be increased by 2 from the base + updatedCpuAllocated := resultNode.Metrics().CpuAllocated + assert.Equal(t, initialCpuAllocated+2, updatedCpuAllocated, "Node metrics should be optimistically updated after placement") +} \ No newline at end of file From 02cf6dd7f6f71d196d1beca45ecb5e035db279cc Mon Sep 17 00:00:00 2001 From: MorningTZH Date: Tue, 31 Mar 2026 18:51:08 +0800 Subject: [PATCH 3/3] feat(placement): implement optimistic resource accounting with feature flags Signed-off-by: MorningTZH --- packages/api/internal/orchestrator/client.go | 4 +- .../internal/orchestrator/delete_instance.go | 5 + .../internal/orchestrator/nodemanager/mock.go | 8 ++ .../internal/orchestrator/nodemanager/node.go | 49 +++++-- .../orchestrator/nodemanager/node_test.go | 124 ++++++++++++++++-- .../orchestrator/placement/placement.go | 2 +- .../orchestrator/placement/placement_test.go | 16 ++- packages/shared/pkg/featureflags/flags.go | 7 +- 8 files changed, 182 insertions(+), 33 deletions(-) diff --git a/packages/api/internal/orchestrator/client.go b/packages/api/internal/orchestrator/client.go index 918680c77e..28a5fc9308 100644 --- a/packages/api/internal/orchestrator/client.go +++ b/packages/api/internal/orchestrator/client.go @@ -31,7 +31,7 @@ func (o *Orchestrator) connectToNode(ctx context.Context, discovered nodemanager connectCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), nodeConnectTimeout) defer cancel() - orchestratorNode, err := nodemanager.New(connectCtx, o.tel.TracerProvider, o.tel.MeterProvider, discovered) + orchestratorNode, err := nodemanager.New(connectCtx, o.tel.TracerProvider, o.tel.MeterProvider, discovered, o.featureFlagsClient) if err != nil { return nil, err } @@ -61,7 +61,7 @@ func (o *Orchestrator) connectToClusterNode(ctx context.Context, cluster *cluste connectCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), nodeConnectTimeout) defer cancel() - orchestratorNode, err := nodemanager.NewClusterNode(connectCtx, i.GetClient(), cluster.ID, cluster.SandboxDomain, i) + orchestratorNode, err := nodemanager.NewClusterNode(connectCtx, i.GetClient(), cluster.ID, cluster.SandboxDomain, i, o.featureFlagsClient) if err != nil { logger.L().Error(ctx, "Failed to create node", zap.Error(err)) diff --git a/packages/api/internal/orchestrator/delete_instance.go b/packages/api/internal/orchestrator/delete_instance.go index 729bb4c450..29eee285f7 100644 --- a/packages/api/internal/orchestrator/delete_instance.go +++ b/packages/api/internal/orchestrator/delete_instance.go @@ -157,5 +157,10 @@ func (o *Orchestrator) killSandboxOnNode(ctx context.Context, node *nodemanager. return fmt.Errorf("failed to delete sandbox '%s': %w", sbx.SandboxID, err) } + node.OptimisticRemove(ctx, nodemanager.SandboxResources{ + CPUs: sbx.VCpu, + MiBMemory: sbx.RamMB, + }) + return nil } diff --git a/packages/api/internal/orchestrator/nodemanager/mock.go b/packages/api/internal/orchestrator/nodemanager/mock.go index f82512784d..42faba1d18 100644 --- a/packages/api/internal/orchestrator/nodemanager/mock.go +++ b/packages/api/internal/orchestrator/nodemanager/mock.go @@ -12,6 +12,7 @@ import ( "github.com/e2b-dev/infra/packages/api/internal/api" "github.com/e2b-dev/infra/packages/api/internal/clusters" + "github.com/e2b-dev/infra/packages/shared/pkg/featureflags" "github.com/e2b-dev/infra/packages/shared/pkg/grpc/orchestrator" infogrpc "github.com/e2b-dev/infra/packages/shared/pkg/grpc/orchestrator-info" templatemanager "github.com/e2b-dev/infra/packages/shared/pkg/grpc/template-manager" @@ -119,6 +120,13 @@ func WithSandboxCreateError(err error) TestOptions { } } +// WithFeatureFlags sets a custom feature flags client for the test node +func WithFeatureFlags(ff *featureflags.Client) TestOptions { + return func(node *TestNode) { + node.featureflags = ff + } +} + // MockSandboxClientCustom allows custom error logic per call type MockSandboxClientCustom struct { orchestrator.SandboxServiceClient diff --git a/packages/api/internal/orchestrator/nodemanager/node.go b/packages/api/internal/orchestrator/nodemanager/node.go index 3ca96a2cb7..dbc690a137 100644 --- a/packages/api/internal/orchestrator/nodemanager/node.go +++ b/packages/api/internal/orchestrator/nodemanager/node.go @@ -15,6 +15,7 @@ import ( "github.com/e2b-dev/infra/packages/api/internal/api" "github.com/e2b-dev/infra/packages/api/internal/clusters" "github.com/e2b-dev/infra/packages/shared/pkg/consts" + "github.com/e2b-dev/infra/packages/shared/pkg/featureflags" "github.com/e2b-dev/infra/packages/shared/pkg/logger" "github.com/e2b-dev/infra/packages/shared/pkg/machineinfo" "github.com/e2b-dev/infra/packages/shared/pkg/smap" @@ -50,6 +51,9 @@ type Node struct { PlacementMetrics PlacementMetrics + // featureflags is the feature flags client for feature flag checks + featureflags *featureflags.Client + mutex sync.RWMutex } @@ -58,6 +62,7 @@ func New( tracerProvider trace.TracerProvider, meterProvider metric.MeterProvider, discoveredNode NomadServiceDiscovery, + ff *featureflags.Client, ) (*Node, error) { client, err := NewClient(tracerProvider, meterProvider, discoveredNode.OrchestratorAddress) if err != nil { @@ -99,6 +104,8 @@ func New( createSuccess: atomic.Uint64{}, createFails: atomic.Uint64{}, }, + + featureflags: ff, } n.UpdateMetricsFromServiceInfoResponse(nodeInfo) @@ -108,7 +115,7 @@ func New( return n, nil } -func NewClusterNode(ctx context.Context, client *clusters.GRPCClient, clusterID uuid.UUID, sandboxDomain *string, i *clusters.Instance) (*Node, error) { +func NewClusterNode(ctx context.Context, client *clusters.GRPCClient, clusterID uuid.UUID, sandboxDomain *string, i *clusters.Instance, ff *featureflags.Client) (*Node, error) { info := i.GetInfo() status, ok := OrchestratorToApiNodeStateMapper[info.Status] if !ok { @@ -135,9 +142,10 @@ func NewClusterNode(ctx context.Context, client *clusters.GRPCClient, clusterID createFails: atomic.Uint64{}, }, - client: client, - status: status, - meta: nodeMetadata, + client: client, + status: status, + meta: nodeMetadata, + featureflags: ff, } nodeClient, ctx := n.GetClient(ctx) @@ -178,11 +186,28 @@ func (n *Node) IsNomadManaged() bool { return n.NomadNodeShortID != UnknownNomadNodeShortID } -func (n *Node) OptimisticAdd(res SandboxResources) { - n.metricsMu.Lock() - defer n.metricsMu.Unlock() - - // Directly accumulate to the current metrics view - n.metrics.CpuAllocated += uint32(res.CPUs) - n.metrics.MemoryAllocatedBytes += uint64(res.MiBMemory) * 1024 * 1024 // Note: CpuPercent is difficult to estimate, usually just updating Allocated is sufficient for the scheduling algorithm -} \ No newline at end of file +func (n *Node) OptimisticAdd(ctx context.Context, res SandboxResources) { + if n.featureflags != nil && !n.featureflags.BoolFlag(ctx, featureflags.OptimisticResourceAccountingFlag) { + return + } + + n.metricsMu.Lock() + defer n.metricsMu.Unlock() + + // Directly accumulate to the current metrics view + n.metrics.CpuAllocated += uint32(res.CPUs) + n.metrics.MemoryAllocatedBytes += uint64(res.MiBMemory) * 1024 * 1024 // Note: CpuPercent is difficult to estimate, usually just updating Allocated is sufficient for the scheduling algorithm +} + +func (n *Node) OptimisticRemove(ctx context.Context, res SandboxResources) { + if n.featureflags != nil && !n.featureflags.BoolFlag(ctx, featureflags.OptimisticResourceAccountingFlag) { + return + } + + n.metricsMu.Lock() + defer n.metricsMu.Unlock() + + // Directly subtract from the current metrics view + n.metrics.CpuAllocated -= uint32(res.CPUs) + n.metrics.MemoryAllocatedBytes -= uint64(res.MiBMemory) * 1024 * 1024 +} diff --git a/packages/api/internal/orchestrator/nodemanager/node_test.go b/packages/api/internal/orchestrator/nodemanager/node_test.go index 4eb48efb9d..0ef528cd15 100644 --- a/packages/api/internal/orchestrator/nodemanager/node_test.go +++ b/packages/api/internal/orchestrator/nodemanager/node_test.go @@ -1,32 +1,134 @@ package nodemanager_test import ( + "context" "testing" + "github.com/launchdarkly/go-server-sdk/v7/testhelpers/ldtestdata" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/e2b-dev/infra/packages/api/internal/api" "github.com/e2b-dev/infra/packages/api/internal/orchestrator/nodemanager" + "github.com/e2b-dev/infra/packages/shared/pkg/featureflags" ) -func TestNode_OptimisticAdd(t *testing.T) { +func TestNode_OptimisticAdd_FlagEnabled(t *testing.T) { t.Parallel() - - // Initialize an idle node - node := nodemanager.NewTestNode("test-node", api.NodeStatusReady, 0, 4) + + // 1. Create a LaunchDarkly test data source + td := ldtestdata.DataSource() + + // 2. Set the feature flag under test to true + td.Update(td.Flag(featureflags.OptimisticResourceAccountingFlag.Key()).VariationForAll(true)) + + // 3. Create a Feature Flag client with the test data source + ffClient, err := featureflags.NewClientWithDatasource(td) + require.NoError(t, err) + + // 4. Initialize Node with the injected ffClient + node := nodemanager.NewTestNode("test-node", api.NodeStatusReady, 0, 4, nodemanager.WithFeatureFlags(ffClient)) initialMetrics := node.Metrics() - // Simulate the resources to be allocated + // 5. Call the method res := nodemanager.SandboxResources{ CPUs: 2, - MiBMemory: 1024, // 1GB + MiBMemory: 1024, } + node.OptimisticAdd(context.Background(), res) - // Perform optimistic addition - node.OptimisticAdd(res) + // 6. Assert: When flag is enabled, resources should be successfully accumulated newMetrics := node.Metrics() - - // Verify that CPU and memory are increased as expected assert.Equal(t, initialMetrics.CpuAllocated+uint32(res.CPUs), newMetrics.CpuAllocated) assert.Equal(t, initialMetrics.MemoryAllocatedBytes+uint64(res.MiBMemory)*1024*1024, newMetrics.MemoryAllocatedBytes) -} \ No newline at end of file +} + +func TestNode_OptimisticAdd_FlagDisabled(t *testing.T) { + t.Parallel() + + // 1. Create a LaunchDarkly test data source + td := ldtestdata.DataSource() + + // 2. Set the feature flag under test to false + td.Update(td.Flag(featureflags.OptimisticResourceAccountingFlag.Key()).VariationForAll(false)) + + // 3. Create a Feature Flag client with the test data source + ffClient, err := featureflags.NewClientWithDatasource(td) + require.NoError(t, err) + + // 4. Initialize Node with the injected ffClient + node := nodemanager.NewTestNode("test-node", api.NodeStatusReady, 0, 4, nodemanager.WithFeatureFlags(ffClient)) + initialMetrics := node.Metrics() + + // 5. Call the method + res := nodemanager.SandboxResources{ + CPUs: 2, + MiBMemory: 1024, + } + node.OptimisticAdd(context.Background(), res) + + // 6. Assert: When flag is disabled, return early, resources should not be accumulated + newMetrics := node.Metrics() + assert.Equal(t, initialMetrics.CpuAllocated, newMetrics.CpuAllocated) + assert.Equal(t, initialMetrics.MemoryAllocatedBytes, newMetrics.MemoryAllocatedBytes) +} + +func TestNode_OptimisticRemove_FlagEnabled(t *testing.T) { + t.Parallel() + + // 1. Create a LaunchDarkly test data source + td := ldtestdata.DataSource() + + // 2. Set the feature flag under test to true + td.Update(td.Flag(featureflags.OptimisticResourceAccountingFlag.Key()).VariationForAll(true)) + + // 3. Create a Feature Flag client with the test data source + ffClient, err := featureflags.NewClientWithDatasource(td) + require.NoError(t, err) + + // 4. Initialize Node with the injected ffClient - some resources are already allocated at initialization + node := nodemanager.NewTestNode("test-node", api.NodeStatusReady, 4, 8192, nodemanager.WithFeatureFlags(ffClient)) + initialMetrics := node.Metrics() + + // 5. Call the method + res := nodemanager.SandboxResources{ + CPUs: 2, + MiBMemory: 1024, + } + node.OptimisticRemove(context.Background(), res) + + // 6. Assert: When flag is enabled, resources should be successfully deducted + newMetrics := node.Metrics() + assert.Equal(t, initialMetrics.CpuAllocated-uint32(res.CPUs), newMetrics.CpuAllocated) + assert.Equal(t, initialMetrics.MemoryAllocatedBytes-uint64(res.MiBMemory)*1024*1024, newMetrics.MemoryAllocatedBytes) +} + +func TestNode_OptimisticRemove_FlagDisabled(t *testing.T) { + t.Parallel() + + // 1. Create a LaunchDarkly test data source + td := ldtestdata.DataSource() + + // 2. Set the feature flag under test to false + td.Update(td.Flag(featureflags.OptimisticResourceAccountingFlag.Key()).VariationForAll(false)) + + // 3. Create a Feature Flag client with the test data source + ffClient, err := featureflags.NewClientWithDatasource(td) + require.NoError(t, err) + + // 4. Initialize Node with the injected ffClient - some resources are already allocated at initialization + node := nodemanager.NewTestNode("test-node", api.NodeStatusReady, 4, 8192, nodemanager.WithFeatureFlags(ffClient)) + initialMetrics := node.Metrics() + + // 5. Call the method + res := nodemanager.SandboxResources{ + CPUs: 2, + MiBMemory: 1024, + } + node.OptimisticRemove(context.Background(), res) + + // 6. Assert: When flag is disabled, return early, resources should remain unchanged + newMetrics := node.Metrics() + assert.Equal(t, initialMetrics.CpuAllocated, newMetrics.CpuAllocated) + assert.Equal(t, initialMetrics.MemoryAllocatedBytes, newMetrics.MemoryAllocatedBytes) +} diff --git a/packages/api/internal/orchestrator/placement/placement.go b/packages/api/internal/orchestrator/placement/placement.go index a2a4f9ea2b..f1d755a329 100644 --- a/packages/api/internal/orchestrator/placement/placement.go +++ b/packages/api/internal/orchestrator/placement/placement.go @@ -82,7 +82,7 @@ func PlaceSandbox(ctx context.Context, algorithm Algorithm, clusterNodes []*node // Optimistic update: assume resources are occupied after successful creation. // Manually update node.metrics with the newly allocated resources. // This will be overwritten by the next real Metrics report for auto-correction. - node.OptimisticAdd(nodemanager.SandboxResources{ + node.OptimisticAdd(ctx, nodemanager.SandboxResources{ CPUs: sbxRequest.GetSandbox().GetVcpu(), MiBMemory: sbxRequest.GetSandbox().GetRamMb(), }) diff --git a/packages/api/internal/orchestrator/placement/placement_test.go b/packages/api/internal/orchestrator/placement/placement_test.go index df9c553452..3d27d43a8d 100644 --- a/packages/api/internal/orchestrator/placement/placement_test.go +++ b/packages/api/internal/orchestrator/placement/placement_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/launchdarkly/go-server-sdk/v7/testhelpers/ldtestdata" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -15,6 +16,7 @@ import ( "github.com/e2b-dev/infra/packages/api/internal/api" "github.com/e2b-dev/infra/packages/api/internal/orchestrator/nodemanager" + "github.com/e2b-dev/infra/packages/shared/pkg/featureflags" "github.com/e2b-dev/infra/packages/shared/pkg/grpc/orchestrator" "github.com/e2b-dev/infra/packages/shared/pkg/machineinfo" ) @@ -218,12 +220,18 @@ func TestPlaceSandbox_TriggersOptimisticUpdate(t *testing.T) { t.Parallel() ctx := t.Context() + // Enable the optimistic resource accounting flag for this test + td := ldtestdata.DataSource() + td.Update(td.Flag(featureflags.OptimisticResourceAccountingFlag.Key()).VariationForAll(true)) + ffClient, err := featureflags.NewClientWithDatasource(td) + require.NoError(t, err) + // Create a node and record the initial allocated CPU - node1 := nodemanager.NewTestNode("node1", api.NodeStatusReady, 0, 4) + node1 := nodemanager.NewTestNode("node1", api.NodeStatusReady, 0, 4, nodemanager.WithFeatureFlags(ffClient)) initialCpuAllocated := node1.Metrics().CpuAllocated nodes := []*nodemanager.Node{node1} - + // Mock algorithm directly returns node1 algorithm := &mockAlgorithm{} algorithm.On("chooseNode", mock.Anything, nodes, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). @@ -242,8 +250,8 @@ func TestPlaceSandbox_TriggersOptimisticUpdate(t *testing.T) { require.NoError(t, err) assert.NotNil(t, resultNode) - + // Verify: After successful placement, the node's CpuAllocated should be increased by 2 from the base updatedCpuAllocated := resultNode.Metrics().CpuAllocated assert.Equal(t, initialCpuAllocated+2, updatedCpuAllocated, "Node metrics should be optimistically updated after placement") -} \ No newline at end of file +} diff --git a/packages/shared/pkg/featureflags/flags.go b/packages/shared/pkg/featureflags/flags.go index 4f9c31eec8..b241a32e7d 100644 --- a/packages/shared/pkg/featureflags/flags.go +++ b/packages/shared/pkg/featureflags/flags.go @@ -115,9 +115,10 @@ var ( // of synchronous. Only safe to enable after PeerToPeerChunkTransferFlag is ON. PeerToPeerAsyncCheckpointFlag = newBoolFlag("peer-to-peer-async-checkpoint", false) - PersistentVolumesFlag = newBoolFlag("can-use-persistent-volumes", env.IsDevelopment()) - ExecutionMetricsOnWebhooksFlag = newBoolFlag("execution-metrics-on-webhooks", false) // TODO: Remove NLT 20250315 - SandboxLabelBasedSchedulingFlag = newBoolFlag("sandbox-label-based-scheduling", false) + PersistentVolumesFlag = newBoolFlag("can-use-persistent-volumes", env.IsDevelopment()) + ExecutionMetricsOnWebhooksFlag = newBoolFlag("execution-metrics-on-webhooks", false) // TODO: Remove NLT 20250315 + SandboxLabelBasedSchedulingFlag = newBoolFlag("sandbox-label-based-scheduling", false) + OptimisticResourceAccountingFlag = newBoolFlag("optimistic-resource-accounting", true) ) type IntFlag struct {