diff --git a/packages/api/internal/orchestrator/client.go b/packages/api/internal/orchestrator/client.go index 918680c77e..28a5fc9308 100644 --- a/packages/api/internal/orchestrator/client.go +++ b/packages/api/internal/orchestrator/client.go @@ -31,7 +31,7 @@ func (o *Orchestrator) connectToNode(ctx context.Context, discovered nodemanager connectCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), nodeConnectTimeout) defer cancel() - orchestratorNode, err := nodemanager.New(connectCtx, o.tel.TracerProvider, o.tel.MeterProvider, discovered) + orchestratorNode, err := nodemanager.New(connectCtx, o.tel.TracerProvider, o.tel.MeterProvider, discovered, o.featureFlagsClient) if err != nil { return nil, err } @@ -61,7 +61,7 @@ func (o *Orchestrator) connectToClusterNode(ctx context.Context, cluster *cluste connectCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), nodeConnectTimeout) defer cancel() - orchestratorNode, err := nodemanager.NewClusterNode(connectCtx, i.GetClient(), cluster.ID, cluster.SandboxDomain, i) + orchestratorNode, err := nodemanager.NewClusterNode(connectCtx, i.GetClient(), cluster.ID, cluster.SandboxDomain, i, o.featureFlagsClient) if err != nil { logger.L().Error(ctx, "Failed to create node", zap.Error(err)) diff --git a/packages/api/internal/orchestrator/delete_instance.go b/packages/api/internal/orchestrator/delete_instance.go index 729bb4c450..29eee285f7 100644 --- a/packages/api/internal/orchestrator/delete_instance.go +++ b/packages/api/internal/orchestrator/delete_instance.go @@ -157,5 +157,10 @@ func (o *Orchestrator) killSandboxOnNode(ctx context.Context, node *nodemanager. return fmt.Errorf("failed to delete sandbox '%s': %w", sbx.SandboxID, err) } + node.OptimisticRemove(ctx, nodemanager.SandboxResources{ + CPUs: sbx.VCpu, + MiBMemory: sbx.RamMB, + }) + return nil } diff --git a/packages/api/internal/orchestrator/nodemanager/mock.go b/packages/api/internal/orchestrator/nodemanager/mock.go index f82512784d..42faba1d18 100644 --- a/packages/api/internal/orchestrator/nodemanager/mock.go +++ b/packages/api/internal/orchestrator/nodemanager/mock.go @@ -12,6 +12,7 @@ import ( "github.com/e2b-dev/infra/packages/api/internal/api" "github.com/e2b-dev/infra/packages/api/internal/clusters" + "github.com/e2b-dev/infra/packages/shared/pkg/featureflags" "github.com/e2b-dev/infra/packages/shared/pkg/grpc/orchestrator" infogrpc "github.com/e2b-dev/infra/packages/shared/pkg/grpc/orchestrator-info" templatemanager "github.com/e2b-dev/infra/packages/shared/pkg/grpc/template-manager" @@ -119,6 +120,13 @@ func WithSandboxCreateError(err error) TestOptions { } } +// WithFeatureFlags sets a custom feature flags client for the test node +func WithFeatureFlags(ff *featureflags.Client) TestOptions { + return func(node *TestNode) { + node.featureflags = ff + } +} + // MockSandboxClientCustom allows custom error logic per call type MockSandboxClientCustom struct { orchestrator.SandboxServiceClient diff --git a/packages/api/internal/orchestrator/nodemanager/node.go b/packages/api/internal/orchestrator/nodemanager/node.go index dd7f4fc634..dbc690a137 100644 --- a/packages/api/internal/orchestrator/nodemanager/node.go +++ b/packages/api/internal/orchestrator/nodemanager/node.go @@ -15,6 +15,7 @@ import ( "github.com/e2b-dev/infra/packages/api/internal/api" "github.com/e2b-dev/infra/packages/api/internal/clusters" "github.com/e2b-dev/infra/packages/shared/pkg/consts" + "github.com/e2b-dev/infra/packages/shared/pkg/featureflags" "github.com/e2b-dev/infra/packages/shared/pkg/logger" "github.com/e2b-dev/infra/packages/shared/pkg/machineinfo" "github.com/e2b-dev/infra/packages/shared/pkg/smap" @@ -50,6 +51,9 @@ type Node struct { PlacementMetrics PlacementMetrics + // featureflags is the feature flags client for feature flag checks + featureflags *featureflags.Client + mutex sync.RWMutex } @@ -58,6 +62,7 @@ func New( tracerProvider trace.TracerProvider, meterProvider metric.MeterProvider, discoveredNode NomadServiceDiscovery, + ff *featureflags.Client, ) (*Node, error) { client, err := NewClient(tracerProvider, meterProvider, discoveredNode.OrchestratorAddress) if err != nil { @@ -99,6 +104,8 @@ func New( createSuccess: atomic.Uint64{}, createFails: atomic.Uint64{}, }, + + featureflags: ff, } n.UpdateMetricsFromServiceInfoResponse(nodeInfo) @@ -108,7 +115,7 @@ func New( return n, nil } -func NewClusterNode(ctx context.Context, client *clusters.GRPCClient, clusterID uuid.UUID, sandboxDomain *string, i *clusters.Instance) (*Node, error) { +func NewClusterNode(ctx context.Context, client *clusters.GRPCClient, clusterID uuid.UUID, sandboxDomain *string, i *clusters.Instance, ff *featureflags.Client) (*Node, error) { info := i.GetInfo() status, ok := OrchestratorToApiNodeStateMapper[info.Status] if !ok { @@ -135,9 +142,10 @@ func NewClusterNode(ctx context.Context, client *clusters.GRPCClient, clusterID createFails: atomic.Uint64{}, }, - client: client, - status: status, - meta: nodeMetadata, + client: client, + status: status, + meta: nodeMetadata, + featureflags: ff, } nodeClient, ctx := n.GetClient(ctx) @@ -177,3 +185,29 @@ func (n *Node) GetClient(ctx context.Context) (*clusters.GRPCClient, context.Con func (n *Node) IsNomadManaged() bool { return n.NomadNodeShortID != UnknownNomadNodeShortID } + +func (n *Node) OptimisticAdd(ctx context.Context, res SandboxResources) { + if n.featureflags != nil && !n.featureflags.BoolFlag(ctx, featureflags.OptimisticResourceAccountingFlag) { + return + } + + n.metricsMu.Lock() + defer n.metricsMu.Unlock() + + // Directly accumulate to the current metrics view + n.metrics.CpuAllocated += uint32(res.CPUs) + n.metrics.MemoryAllocatedBytes += uint64(res.MiBMemory) * 1024 * 1024 // Note: CpuPercent is difficult to estimate, usually just updating Allocated is sufficient for the scheduling algorithm +} + +func (n *Node) OptimisticRemove(ctx context.Context, res SandboxResources) { + if n.featureflags != nil && !n.featureflags.BoolFlag(ctx, featureflags.OptimisticResourceAccountingFlag) { + return + } + + n.metricsMu.Lock() + defer n.metricsMu.Unlock() + + // Directly subtract from the current metrics view + n.metrics.CpuAllocated -= uint32(res.CPUs) + n.metrics.MemoryAllocatedBytes -= uint64(res.MiBMemory) * 1024 * 1024 +} diff --git a/packages/api/internal/orchestrator/nodemanager/node_test.go b/packages/api/internal/orchestrator/nodemanager/node_test.go new file mode 100644 index 0000000000..0ef528cd15 --- /dev/null +++ b/packages/api/internal/orchestrator/nodemanager/node_test.go @@ -0,0 +1,134 @@ +package nodemanager_test + +import ( + "context" + "testing" + + "github.com/launchdarkly/go-server-sdk/v7/testhelpers/ldtestdata" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/e2b-dev/infra/packages/api/internal/api" + "github.com/e2b-dev/infra/packages/api/internal/orchestrator/nodemanager" + "github.com/e2b-dev/infra/packages/shared/pkg/featureflags" +) + +func TestNode_OptimisticAdd_FlagEnabled(t *testing.T) { + t.Parallel() + + // 1. Create a LaunchDarkly test data source + td := ldtestdata.DataSource() + + // 2. Set the feature flag under test to true + td.Update(td.Flag(featureflags.OptimisticResourceAccountingFlag.Key()).VariationForAll(true)) + + // 3. Create a Feature Flag client with the test data source + ffClient, err := featureflags.NewClientWithDatasource(td) + require.NoError(t, err) + + // 4. Initialize Node with the injected ffClient + node := nodemanager.NewTestNode("test-node", api.NodeStatusReady, 0, 4, nodemanager.WithFeatureFlags(ffClient)) + initialMetrics := node.Metrics() + + // 5. Call the method + res := nodemanager.SandboxResources{ + CPUs: 2, + MiBMemory: 1024, + } + node.OptimisticAdd(context.Background(), res) + + // 6. Assert: When flag is enabled, resources should be successfully accumulated + newMetrics := node.Metrics() + assert.Equal(t, initialMetrics.CpuAllocated+uint32(res.CPUs), newMetrics.CpuAllocated) + assert.Equal(t, initialMetrics.MemoryAllocatedBytes+uint64(res.MiBMemory)*1024*1024, newMetrics.MemoryAllocatedBytes) +} + +func TestNode_OptimisticAdd_FlagDisabled(t *testing.T) { + t.Parallel() + + // 1. Create a LaunchDarkly test data source + td := ldtestdata.DataSource() + + // 2. Set the feature flag under test to false + td.Update(td.Flag(featureflags.OptimisticResourceAccountingFlag.Key()).VariationForAll(false)) + + // 3. Create a Feature Flag client with the test data source + ffClient, err := featureflags.NewClientWithDatasource(td) + require.NoError(t, err) + + // 4. Initialize Node with the injected ffClient + node := nodemanager.NewTestNode("test-node", api.NodeStatusReady, 0, 4, nodemanager.WithFeatureFlags(ffClient)) + initialMetrics := node.Metrics() + + // 5. Call the method + res := nodemanager.SandboxResources{ + CPUs: 2, + MiBMemory: 1024, + } + node.OptimisticAdd(context.Background(), res) + + // 6. Assert: When flag is disabled, return early, resources should not be accumulated + newMetrics := node.Metrics() + assert.Equal(t, initialMetrics.CpuAllocated, newMetrics.CpuAllocated) + assert.Equal(t, initialMetrics.MemoryAllocatedBytes, newMetrics.MemoryAllocatedBytes) +} + +func TestNode_OptimisticRemove_FlagEnabled(t *testing.T) { + t.Parallel() + + // 1. Create a LaunchDarkly test data source + td := ldtestdata.DataSource() + + // 2. Set the feature flag under test to true + td.Update(td.Flag(featureflags.OptimisticResourceAccountingFlag.Key()).VariationForAll(true)) + + // 3. Create a Feature Flag client with the test data source + ffClient, err := featureflags.NewClientWithDatasource(td) + require.NoError(t, err) + + // 4. Initialize Node with the injected ffClient - some resources are already allocated at initialization + node := nodemanager.NewTestNode("test-node", api.NodeStatusReady, 4, 8192, nodemanager.WithFeatureFlags(ffClient)) + initialMetrics := node.Metrics() + + // 5. Call the method + res := nodemanager.SandboxResources{ + CPUs: 2, + MiBMemory: 1024, + } + node.OptimisticRemove(context.Background(), res) + + // 6. Assert: When flag is enabled, resources should be successfully deducted + newMetrics := node.Metrics() + assert.Equal(t, initialMetrics.CpuAllocated-uint32(res.CPUs), newMetrics.CpuAllocated) + assert.Equal(t, initialMetrics.MemoryAllocatedBytes-uint64(res.MiBMemory)*1024*1024, newMetrics.MemoryAllocatedBytes) +} + +func TestNode_OptimisticRemove_FlagDisabled(t *testing.T) { + t.Parallel() + + // 1. Create a LaunchDarkly test data source + td := ldtestdata.DataSource() + + // 2. Set the feature flag under test to false + td.Update(td.Flag(featureflags.OptimisticResourceAccountingFlag.Key()).VariationForAll(false)) + + // 3. Create a Feature Flag client with the test data source + ffClient, err := featureflags.NewClientWithDatasource(td) + require.NoError(t, err) + + // 4. Initialize Node with the injected ffClient - some resources are already allocated at initialization + node := nodemanager.NewTestNode("test-node", api.NodeStatusReady, 4, 8192, nodemanager.WithFeatureFlags(ffClient)) + initialMetrics := node.Metrics() + + // 5. Call the method + res := nodemanager.SandboxResources{ + CPUs: 2, + MiBMemory: 1024, + } + node.OptimisticRemove(context.Background(), res) + + // 6. Assert: When flag is disabled, return early, resources should remain unchanged + newMetrics := node.Metrics() + assert.Equal(t, initialMetrics.CpuAllocated, newMetrics.CpuAllocated) + assert.Equal(t, initialMetrics.MemoryAllocatedBytes, newMetrics.MemoryAllocatedBytes) +} diff --git a/packages/api/internal/orchestrator/placement/placement.go b/packages/api/internal/orchestrator/placement/placement.go index b07e75501f..f1d755a329 100644 --- a/packages/api/internal/orchestrator/placement/placement.go +++ b/packages/api/internal/orchestrator/placement/placement.go @@ -79,6 +79,14 @@ func PlaceSandbox(ctx context.Context, algorithm Algorithm, clusterNodes []*node if err == nil { node.PlacementMetrics.Success(sbxRequest.GetSandbox().GetSandboxId()) + // Optimistic update: assume resources are occupied after successful creation. + // Manually update node.metrics with the newly allocated resources. + // This will be overwritten by the next real Metrics report for auto-correction. + node.OptimisticAdd(ctx, nodemanager.SandboxResources{ + CPUs: sbxRequest.GetSandbox().GetVcpu(), + MiBMemory: sbxRequest.GetSandbox().GetRamMb(), + }) + return node, nil } diff --git a/packages/api/internal/orchestrator/placement/placement_benchmark_test.go b/packages/api/internal/orchestrator/placement/placement_benchmark_test.go index 7b6ad043c7..4bc5332611 100644 --- a/packages/api/internal/orchestrator/placement/placement_benchmark_test.go +++ b/packages/api/internal/orchestrator/placement/placement_benchmark_test.go @@ -49,156 +49,270 @@ type LiveSandbox struct { PlacementLatency time.Duration } -// SimulatedNode represents a node with realistic resource tracking -type SimulatedNode struct { - *nodemanager.Node +// NodeSimulator defines the common behavior of simulated nodes +type NodeSimulator interface { + // GetNode returns the underlying nodemanager.Node object for use by the algorithm + GetNode() *nodemanager.Node + // PlaceSandbox attempts to place a Sandbox on the node, returns whether successful + PlaceSandbox(sbx *LiveSandbox) bool + // RemoveSandbox removes a Sandbox from the node + RemoveSandbox(sandboxID string) + // GetUtilization returns the current CPU and memory utilization (0-100) + GetUtilization() (float64, float64) + // GetSandboxCount returns the number of currently running Sandboxes + GetSandboxCount() int +} + +// NodeFactory defines the function signature for creating simulated nodes +type NodeFactory func(id string, config BenchmarkConfig) NodeSimulator +// StandardNode implements the original SimulatedNode logic (real-time metric synchronization) +type StandardNode struct { + *nodemanager.Node mu sync.RWMutex sandboxes map[string]*LiveSandbox totalPlacements int64 - rejectedPlacements int64 - lastUpdateTime time.Time + rejectedPlacements int64 // counts failed placements due to capacity } -// BenchmarkMetrics contains detailed metrics from the benchmark -type BenchmarkMetrics struct { - // Placement metrics - TotalPlacements int64 - SuccessfulPlacements int64 - FailedPlacements int64 - AvgPlacementTime time.Duration - MaxPlacementTime time.Duration - MinPlacementTime time.Duration - P50PlacementTime time.Duration - P95PlacementTime time.Duration - P99PlacementTime time.Duration - - // Node utilization metrics - AvgNodeCPUUtilization float64 - MaxNodeCPUUtilization float64 - MinNodeCPUUtilization float64 - AvgNodeMemUtilization float64 - MaxNodeMemUtilization float64 - MinNodeMemUtilization float64 - - // Load distribution metrics - CPULoadStdDev float64 - MemLoadStdDev float64 - LoadImbalanceCoefficient float64 -} +// Ensure StandardNode implements the interface +var _ NodeSimulator = &StandardNode{} -// createSimulatedNodes creates nodes with realistic resource tracking -func createSimulatedNodes(config BenchmarkConfig) []*SimulatedNode { - nodes := make([]*SimulatedNode, config.NumNodes) - for i := range config.NumNodes { - // Create base node - baseNode := nodemanager.NewTestNode( - fmt.Sprintf("node-%d", i), +func NewStandardNode(id string, config BenchmarkConfig) NodeSimulator { + return &StandardNode{ + Node: nodemanager.NewTestNode( + id, api.NodeStatusReady, - 0, // Start with no load + 0, config.NodeCPUCapacity, nodemanager.WithSandboxSleepingClient(config.SandboxCreateDuration), - ) - - simNode := &SimulatedNode{ - Node: baseNode, - sandboxes: make(map[string]*LiveSandbox), - lastUpdateTime: time.Now(), - } - nodes[i] = simNode + ), + sandboxes: make(map[string]*LiveSandbox), } +} - return nodes +func (n *StandardNode) GetNode() *nodemanager.Node { + return n.Node } -// placeSandbox places a sandbox on the node -func (n *SimulatedNode) placeSandbox(sbx *LiveSandbox) bool { +func (n *StandardNode) PlaceSandbox(sbx *LiveSandbox) bool { n.mu.Lock() defer n.mu.Unlock() metrics := n.Metrics() - // Check capacity with overcommit - if metrics.CpuAllocated+uint32(sbx.RequestedCPU) > metrics.CpuCount*4 { // 4x overcommit + // Check capacity with overcommit (Original Logic) + if metrics.CpuAllocated+uint32(sbx.RequestedCPU) > metrics.CpuCount*4 { atomic.AddInt64(&n.rejectedPlacements, 1) - return false } + // Real-time update: directly modify Node Metrics n.UpdateMetricsFromServiceInfoResponse(&orchestrator.ServiceInfoResponse{ - MetricSandboxesRunning: uint32(len(n.sandboxes)) + 1, - // Host system usage metrics - MetricCpuPercent: metrics.CpuPercent + uint32(sbx.ActualCPUUsage*100), - MetricMemoryUsedBytes: metrics.MemoryUsedBytes + uint64(sbx.ActualMemUsage), - // Host system total resources - MetricCpuCount: metrics.CpuCount, - MetricMemoryTotalBytes: metrics.MemoryTotalBytes, - // Allocated resources to sandboxes + MetricSandboxesRunning: uint32(len(n.sandboxes)) + 1, + MetricCpuPercent: metrics.CpuPercent + uint32(sbx.ActualCPUUsage*100), + MetricMemoryUsedBytes: metrics.MemoryUsedBytes + uint64(sbx.ActualMemUsage), + MetricCpuCount: metrics.CpuCount, + MetricMemoryTotalBytes: metrics.MemoryTotalBytes, MetricCpuAllocated: metrics.CpuAllocated + uint32(sbx.RequestedCPU), MetricMemoryAllocatedBytes: metrics.MemoryAllocatedBytes + uint64(sbx.RequestedMemory)*1024*1024, }) n.sandboxes[sbx.ID] = sbx atomic.AddInt64(&n.totalPlacements, 1) - return true } -// removeSandbox removes a sandbox from the node -func (n *SimulatedNode) removeSandbox(sandboxID string) { +func (n *StandardNode) RemoveSandbox(sandboxID string) { n.mu.Lock() defer n.mu.Unlock() metrics := n.Metrics() - if sbx, exists := n.sandboxes[sandboxID]; exists { n.UpdateMetricsFromServiceInfoResponse(&orchestrator.ServiceInfoResponse{ - MetricSandboxesRunning: uint32(len(n.sandboxes)) - 1, - - MetricCpuPercent: metrics.CpuPercent - uint32(sbx.ActualCPUUsage*100), - MetricMemoryUsedBytes: metrics.MemoryUsedBytes - uint64(sbx.ActualMemUsage), - + MetricSandboxesRunning: uint32(len(n.sandboxes)) - 1, + MetricCpuPercent: metrics.CpuPercent - uint32(sbx.ActualCPUUsage*100), + MetricMemoryUsedBytes: metrics.MemoryUsedBytes - uint64(sbx.ActualMemUsage), MetricCpuAllocated: metrics.CpuAllocated - uint32(sbx.RequestedCPU), MetricMemoryAllocatedBytes: metrics.MemoryAllocatedBytes - uint64(sbx.RequestedMemory)*1024*1024, - - MetricCpuCount: metrics.CpuCount, - MetricMemoryTotalBytes: metrics.MemoryTotalBytes, + MetricCpuCount: metrics.CpuCount, + MetricMemoryTotalBytes: metrics.MemoryTotalBytes, }) - delete(n.sandboxes, sandboxID) } } -// getUtilization returns current CPU and memory utilization percentages -func (n *SimulatedNode) getUtilization() (cpuUtil, memUtil float64) { +func (n *StandardNode) GetUtilization() (float64, float64) { n.mu.RLock() defer n.mu.RUnlock() - metrics := n.Metrics() + var cpuUtil, memUtil float64 if metrics.CpuCount > 0 { cpuUtil = ((float64(metrics.CpuPercent) / 100) / float64(metrics.CpuCount)) * 100 } if metrics.MemoryTotalBytes > 0 { memUtil = (float64(metrics.MemoryUsedBytes) / float64(metrics.MemoryTotalBytes)) * 100 } - return cpuUtil, memUtil } +func (n *StandardNode) GetSandboxCount() int { + n.mu.RLock() + defer n.mu.RUnlock() + return len(n.sandboxes) +} + +// LaggyNode simulates metric lag. +// It maintains both the “real” state and “reported” state (Node Metrics). +// Real state is only synced to Node Metrics when SyncMetrics() is called. +type LaggyNode struct { + *StandardNode // Embed StandardNode to reuse logic + + // Internal resource state (hidden from Orchestrator until SyncMetrics is called) + realCpuAllocated uint32 + realMemAllocated uint64 + realSandboxesRunning uint32 +} + +func NewLaggyNode(id string, config BenchmarkConfig) NodeSimulator { + base := NewStandardNode(id, config).(*StandardNode) + return &LaggyNode{ + StandardNode: base, + } +} + +// PlaceSandbox Override: only update real state, not Node Metrics +func (n *LaggyNode) PlaceSandbox(sbx *LiveSandbox) bool { + n.mu.Lock() + defer n.mu.Unlock() + + // 1. Admission control based on real capacity (simulating node-side rejection) + // Note: we use realCpuAllocated for the check + metrics := n.Node.Metrics() + if n.realCpuAllocated+uint32(sbx.RequestedCPU) > metrics.CpuCount*4 { + atomic.AddInt64(&n.rejectedPlacements, 1) + return false + } + + // 2. Update real state + n.sandboxes[sbx.ID] = sbx + n.realSandboxesRunning++ + n.realCpuAllocated += uint32(sbx.RequestedCPU) + n.realMemAllocated += uint64(sbx.RequestedMemory) * 1024 * 1024 + + atomic.AddInt64(&n.totalPlacements, 1) + + // Key: intentionally do NOT call UpdateMetricsFromServiceInfoResponse + // The metrics visible to Orchestrator remain unchanged until SyncMetrics is called + return true +} + +func (n *LaggyNode) RemoveSandbox(sandboxID string) { + n.mu.Lock() + defer n.mu.Unlock() + + if sbx, exists := n.sandboxes[sandboxID]; exists { + n.realSandboxesRunning-- + n.realCpuAllocated -= uint32(sbx.RequestedCPU) + n.realMemAllocated -= uint64(sbx.RequestedMemory) * 1024 * 1024 + delete(n.sandboxes, sandboxID) + } +} + +// SyncMetrics simulates heartbeat reporting, syncing real state to Orchestrator +func (n *LaggyNode) SyncMetrics() { + n.mu.Lock() + defer n.mu.Unlock() + + metrics := n.Node.Metrics() + + // Calculate real CPU usage (simplified as linear accumulation here, may be more complex in production) + var totalActualCpuUsage float64 + var totalActualMemUsage float64 + for _, sbx := range n.sandboxes { + totalActualCpuUsage += sbx.ActualCPUUsage + totalActualMemUsage += sbx.ActualMemUsage + } + + n.UpdateMetricsFromServiceInfoResponse(&orchestrator.ServiceInfoResponse{ + MetricSandboxesRunning: n.realSandboxesRunning, + MetricCpuPercent: uint32(totalActualCpuUsage * 100), + MetricMemoryUsedBytes: uint64(totalActualMemUsage), + MetricCpuAllocated: n.realCpuAllocated, + MetricMemoryAllocatedBytes: n.realMemAllocated, + + // Static fields remain unchanged + MetricCpuCount: metrics.CpuCount, + MetricMemoryTotalBytes: metrics.MemoryTotalBytes, + }) +} + +// BenchmarkMetrics contains detailed metrics from the benchmark +type BenchmarkMetrics struct { + // Placement metrics + TotalPlacements int64 + SuccessfulPlacements int64 + FailedPlacements int64 + AvgPlacementTime time.Duration + MaxPlacementTime time.Duration + MinPlacementTime time.Duration + P50PlacementTime time.Duration + P95PlacementTime time.Duration + P99PlacementTime time.Duration + + // Node utilization metrics + AvgNodeCPUUtilization float64 + MaxNodeCPUUtilization float64 + MinNodeCPUUtilization float64 + AvgNodeMemUtilization float64 + MaxNodeMemUtilization float64 + MinNodeMemUtilization float64 + + // Load distribution metrics + CPULoadStdDev float64 + MemLoadStdDev float64 + LoadImbalanceCoefficient float64 +} + +// createSimulatedNodes creates nodes using a factory function +func createSimulatedNodes(config BenchmarkConfig, factory NodeFactory) []NodeSimulator { + if factory == nil { + factory = NewStandardNode // Default to standard node + } + + nodes := make([]NodeSimulator, config.NumNodes) + for i := range config.NumNodes { + nodes[i] = factory(fmt.Sprintf("node-%d", i), config) + } + return nodes +} + +// Helper function: converts NodeSimulator slice to *nodemanager.Node slice (for algorithm use) +func toNodeManagerNodes(simNodes []NodeSimulator) []*nodemanager.Node { + nodes := make([]*nodemanager.Node, len(simNodes)) + for i, n := range simNodes { + nodes[i] = n.GetNode() + } + return nodes +} + // runBenchmark runs a comprehensive placement benchmark with lifecycle tracking -func runBenchmark(b *testing.B, algorithm Algorithm, config BenchmarkConfig) *BenchmarkMetrics { +func runBenchmark(b *testing.B, algorithm Algorithm, config BenchmarkConfig, nodeFactory NodeFactory) *BenchmarkMetrics { b.Helper() - ctx, cancel := context.WithTimeout(b.Context(), config.BenchmarkDuration) + parentCtx := b.Context() + if parentCtx == nil { + parentCtx = context.Background() + } + ctx, cancel := context.WithTimeout(parentCtx, config.BenchmarkDuration) defer cancel() - // Create simulated nodes - simNodes := createSimulatedNodes(config) + // Create nodes using factory + simNodes := createSimulatedNodes(config, nodeFactory) - // Convert to nodemanager.Node slice for algorithm - nodes := make([]*nodemanager.Node, len(simNodes)) - nodeMap := make(map[string]*SimulatedNode) - for i, n := range simNodes { - nodes[i] = n.Node - nodeMap[n.ID] = n + // Convert node list + nodes := toNodeManagerNodes(simNodes) + nodeMap := make(map[string]NodeSimulator) + for _, n := range simNodes { + nodeMap[n.GetNode().ID] = n } // Initialize metrics @@ -234,7 +348,7 @@ func runBenchmark(b *testing.B, algorithm Algorithm, config BenchmarkConfig) *Be if now.Sub(sandbox.StartTime) > sandbox.PlannedDuration { // Remove from node if node, exists := nodeMap[sandbox.NodeID]; exists { - node.removeSandbox(sandbox.ID) + node.RemoveSandbox(sandbox.ID) } // Remove from active list activeSandboxes.Delete(key) @@ -296,8 +410,9 @@ func runBenchmark(b *testing.B, algorithm Algorithm, config BenchmarkConfig) *Be return func() { placementStart := time.Now() node, err := PlaceSandbox(ctx, algorithm, nodes, nil, &orchestratorgrpc.SandboxCreateRequest{Sandbox: &orchestratorgrpc.SandboxConfig{ - Vcpu: sbx.RequestedCPU, - RamMb: sbx.RequestedMemory, + SandboxId: sbx.ID, + Vcpu: sbx.RequestedCPU, + RamMb: sbx.RequestedMemory, }}, machineinfo.MachineInfo{}, false, nil) placementTime := time.Since(placementStart) @@ -320,7 +435,7 @@ func runBenchmark(b *testing.B, algorithm Algorithm, config BenchmarkConfig) *Be // Find the simulated node and place the sandbox if simNode, exists := nodeMap[node.ID]; exists { sbx.NodeID = node.ID - if simNode.placeSandbox(sbx) { + if simNode.PlaceSandbox(sbx) { activeSandboxes.Store(sbx.ID, sbx) metrics.SuccessfulPlacements++ atomic.AddInt64(&recentSuccesses, 1) @@ -358,7 +473,7 @@ func runBenchmark(b *testing.B, algorithm Algorithm, config BenchmarkConfig) *Be } // calculateFinalMetrics calculates comprehensive final metrics -func calculateFinalMetrics(metrics *BenchmarkMetrics, nodes []*SimulatedNode, placementTimes []time.Duration) { +func calculateFinalMetrics(metrics *BenchmarkMetrics, nodes []NodeSimulator, placementTimes []time.Duration) { // Calculate placement time metrics if len(placementTimes) > 0 { var totalTime time.Duration @@ -383,7 +498,7 @@ func calculateFinalMetrics(metrics *BenchmarkMetrics, nodes []*SimulatedNode, pl metrics.MinNodeMemUtilization = 100.0 for _, node := range nodes { - cpuUtil, memUtil := node.getUtilization() + cpuUtil, memUtil := node.GetUtilization() cpuUtils = append(cpuUtils, cpuUtil) memUtils = append(memUtils, memUtil) @@ -458,7 +573,7 @@ func BenchmarkPlacementComparison(t *testing.B) { for _, alg := range algorithms { t.Run(alg.name, func(t *testing.B) { - metrics := runBenchmark(t, alg.algo, config) + metrics := runBenchmark(t, alg.algo, config, NewStandardNode) t.Logf("\n=== %s Results ===", alg.name) t.Logf("Placement Performance:") @@ -488,3 +603,189 @@ func BenchmarkPlacementComparison(t *testing.B) { }) } } + +// BenchmarkPlacementDistribution visualizes load distribution across nodes (simulating metric lag) +// Run command: go test -v -bench=BenchmarkPlacementDistribution -run=^$ ./internal/orchestrator/placement +func BenchmarkPlacementDistribution(b *testing.B) { + // Scenario config: “thundering herd” test under extremely high concurrency + // Use LaggyNode to simulate metric reporting delay: Orchestrator always sees stale metrics + config := BenchmarkConfig{ + NumNodes: 10, // Fewer nodes to make load hotspots more apparent + SandboxStartRate: 20, // 20 requests per second (burst traffic) + AvgSandboxCPU: 1, // 1 vCPU per Sandbox + AvgSandboxMemory: 1024, // 1024 MiB + CPUVariance: 0.0, // Fixed spec for easier distribution observation + MemoryVariance: 0.0, + ActualUsageRatio: 1.0, // Assume full utilization + ActualUsageVariance: 0.0, + SandboxDuration: 1 * time.Minute, // Long enough to ensure no release during test + BenchmarkDuration: 25 * time.Second, // Run long enough to trigger at least one heartbeat sync (20s ticker) + NodeCPUCapacity: 192, // 192 vCPU per Node + NodeMemoryCapacity: 1024 * 1024 * 1024 * 512, // 512 GiB per Node + SandboxCreateDuration: time.Millisecond, + } + + algorithms := []struct { + name string + algo Algorithm + }{ + // Compare algorithms here. Expect LeastBusy to have serious hotspot issues. + // {"LeastBusy", &LeastBusyAlgorithm{}}, + {"BestOfK_K3", NewBestOfK(DefaultBestOfKConfig())}, + {"BestOfK_K5", NewBestOfK(BestOfKConfig{R: 4, K: 5, Alpha: 0.5})}, + } + + for _, alg := range algorithms { + b.Run(alg.name, func(b *testing.B) { + b.Logf("Running distribution test for %s with LaggyNodes...", alg.name) + + parentCtx := b.Context() + if parentCtx == nil { + parentCtx = context.Background() + } + ctx, cancel := context.WithTimeout(parentCtx, config.BenchmarkDuration) + defer cancel() + + // 1. Create nodes using NewLaggyNode (default: only update internal real state, not Metrics) + simNodes := createSimulatedNodes(config, NewLaggyNode) + nodes := toNodeManagerNodes(simNodes) + nodeMap := make(map[string]NodeSimulator) + + // 2. Warmup: apply slight random baseline noise to nodes + // This breaks the initial tie in LeastBusy state, causing it to quickly lock onto a “victim” + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + for _, n := range simNodes { + nodeMap[n.GetNode().ID] = n + if ln, ok := n.(*LaggyNode); ok { + ln.realCpuAllocated = uint32(rng.Intn(5)) // 0-4 CPU random baseline noise + ln.SyncMetrics() // Initial sync once + } + } + + // 3. Start heartbeat simulator + // Simulates reporting metrics every 20s in a real environment. + // All requests between heartbeats see stale metrics. + go func() { + ticker := time.NewTicker(20 * time.Second) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + for _, n := range simNodes { + if ln, ok := n.(*LaggyNode); ok { + ln.SyncMetrics() + } + } + } + } + }() + + // 4. Concurrently generate Sandbox requests + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + ticker := time.NewTicker(time.Second / time.Duration(config.SandboxStartRate)) + defer ticker.Stop() + + var sandboxIDCounter int64 + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + sandboxID := atomic.AddInt64(&sandboxIDCounter, 1) + sbx := &LiveSandbox{ + ID: fmt.Sprintf("sbx-%d", sandboxID), + RequestedCPU: config.AvgSandboxCPU, + RequestedMemory: config.AvgSandboxMemory, + ActualCPUUsage: float64(config.AvgSandboxCPU), + ActualMemUsage: float64(config.AvgSandboxMemory) * 1024 * 1024, + StartTime: time.Now(), + } + + wg.Add(1) + go func(s *LiveSandbox) { + defer wg.Done() + + // Execute placement algorithm + node, err := PlaceSandbox(ctx, alg.algo, nodes, nil, &orchestratorgrpc.SandboxCreateRequest{ + Sandbox: &orchestratorgrpc.SandboxConfig{ + SandboxId: s.ID, + Vcpu: s.RequestedCPU, + RamMb: s.RequestedMemory, + }, + }, machineinfo.MachineInfo{}, false, nil) + + if err == nil && node != nil { + if simNode, ok := nodeMap[node.ID]; ok { + // Placement successful, but Metrics won't update immediately (LaggyNode feature) + simNode.PlaceSandbox(s) + } + } + }(sbx) + } + } + }() + + <-ctx.Done() + wg.Wait() + + // 5. Result visualization + b.Logf("\n=== Distribution Results (Laggy Metrics): %s ===", alg.name) + + maxCount := 0 + totalCount := 0 + for _, n := range simNodes { + count := n.GetSandboxCount() + totalCount += count + if count > maxCount { + maxCount = count + } + } + + if totalCount == 0 { + b.Log("No sandboxes placed.") + return + } + + // Print ASCII histogram + for i, n := range simNodes { + count := n.GetSandboxCount() + barLen := 0 + if maxCount > 0 { + barLen = int(float64(count) / float64(maxCount) * 40) // Max 40 characters + } + + bar := "" + for k := 0; k < barLen; k++ { + bar += "█" + } + + // Force Sync before printing to ensure we see the final real values + if ln, ok := n.(*LaggyNode); ok { + ln.SyncMetrics() + } + cpuUtil, _ := n.GetUtilization() + + b.Logf("Node %02d |%s| %d (CPU: %.1f%%)", i, fmt.Sprintf("%-40s", bar), count, cpuUtil) + } + + // Calculate distribution statistics (CV coefficient of variation) + avg := float64(totalCount) / float64(len(simNodes)) + var sumDiffSq float64 + for _, n := range simNodes { + diff := float64(n.GetSandboxCount()) - avg + sumDiffSq += diff * diff + } + stdDev := math.Sqrt(sumDiffSq / float64(len(simNodes))) + cv := 0.0 + if avg > 0 { + cv = stdDev / avg + } + b.Logf("Stats: Total=%d, Avg=%.1f, StdDev=%.2f, Imbalance(CV)=%.3f", totalCount, avg, stdDev, cv) + }) + } +} \ No newline at end of file diff --git a/packages/api/internal/orchestrator/placement/placement_best_of_K.go b/packages/api/internal/orchestrator/placement/placement_best_of_K.go index 32675124e2..f88065daa5 100644 --- a/packages/api/internal/orchestrator/placement/placement_best_of_K.go +++ b/packages/api/internal/orchestrator/placement/placement_best_of_K.go @@ -38,7 +38,15 @@ func DefaultBestOfKConfig() BestOfKConfig { // Score calculates the placement score for this node func (b *BestOfK) Score(node *nodemanager.Node, resources nodemanager.SandboxResources, config BestOfKConfig) float64 { metrics := node.Metrics() - reserved := metrics.CpuAllocated + + // Get locally recorded resources that haven't been reported yet. + pendingCPUs := int64(0) + for _, res := range node.PlacementMetrics.InProgress() { + pendingCPUs += res.CPUs + } + + // Combine allocated resources with in-progress allocations + reserved := metrics.CpuAllocated + uint32(pendingCPUs) // 1 CPU used = 100% CPU percept usageAvg := float64(metrics.CpuPercent) / 100 diff --git a/packages/api/internal/orchestrator/placement/placement_best_of_K_test.go b/packages/api/internal/orchestrator/placement/placement_best_of_K_test.go index a143cb2865..c13affc950 100644 --- a/packages/api/internal/orchestrator/placement/placement_best_of_K_test.go +++ b/packages/api/internal/orchestrator/placement/placement_best_of_K_test.go @@ -65,6 +65,36 @@ func TestBestOfK_Score_PreferBiggerNode(t *testing.T) { assert.Greater(t, score, score2) } +func TestBestOfK_Score_WithPendingResources(t *testing.T) { + t.Parallel() + config := DefaultBestOfKConfig() + algo := NewBestOfK(config).(*BestOfK) + + // Create two nodes with identical base loads + nodeNormal := nodemanager.NewTestNode("node-normal", api.NodeStatusReady, 0, 4) + nodeWithPending := nodemanager.NewTestNode("node-pending", api.NodeStatusReady, 0, 4) + + // Inject InProgress resources into nodeWithPending using StartPlacing + // This simulates a Sandbox that is currently being placed but hasn't fully started + pendingRes := nodemanager.SandboxResources{ + CPUs: 2, + MiBMemory: 1024, + } + nodeWithPending.PlacementMetrics.StartPlacing("pending-sbx-1", pendingRes) + + reqResources := nodemanager.SandboxResources{ + CPUs: 1, + MiBMemory: 512, + } + + scoreNormal := algo.Score(nodeNormal, reqResources, config) + scorePending := algo.Score(nodeWithPending, reqResources, config) + + // A node with pending resources has a higher 'reserved' CPU count, + // so its calculated Score should be greater (meaning worse/lower priority) + assert.Greater(t, scorePending, scoreNormal, "Node with pending resources should receive a higher (worse) score") +} + func TestBestOfK_CanFit(t *testing.T) { t.Parallel() config := DefaultBestOfKConfig() diff --git a/packages/api/internal/orchestrator/placement/placement_test.go b/packages/api/internal/orchestrator/placement/placement_test.go index 0b699d1334..3d27d43a8d 100644 --- a/packages/api/internal/orchestrator/placement/placement_test.go +++ b/packages/api/internal/orchestrator/placement/placement_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/launchdarkly/go-server-sdk/v7/testhelpers/ldtestdata" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -15,6 +16,7 @@ import ( "github.com/e2b-dev/infra/packages/api/internal/api" "github.com/e2b-dev/infra/packages/api/internal/orchestrator/nodemanager" + "github.com/e2b-dev/infra/packages/shared/pkg/featureflags" "github.com/e2b-dev/infra/packages/shared/pkg/grpc/orchestrator" "github.com/e2b-dev/infra/packages/shared/pkg/machineinfo" ) @@ -213,3 +215,43 @@ func TestPlaceSandbox_ResourceExhausted(t *testing.T) { // Verify node1 was NOT excluded (ResourceExhausted nodes should be retried) algorithm.AssertNumberOfCalls(t, "chooseNode", 2) } + +func TestPlaceSandbox_TriggersOptimisticUpdate(t *testing.T) { + t.Parallel() + ctx := t.Context() + + // Enable the optimistic resource accounting flag for this test + td := ldtestdata.DataSource() + td.Update(td.Flag(featureflags.OptimisticResourceAccountingFlag.Key()).VariationForAll(true)) + ffClient, err := featureflags.NewClientWithDatasource(td) + require.NoError(t, err) + + // Create a node and record the initial allocated CPU + node1 := nodemanager.NewTestNode("node1", api.NodeStatusReady, 0, 4, nodemanager.WithFeatureFlags(ffClient)) + initialCpuAllocated := node1.Metrics().CpuAllocated + + nodes := []*nodemanager.Node{node1} + + // Mock algorithm directly returns node1 + algorithm := &mockAlgorithm{} + algorithm.On("chooseNode", mock.Anything, nodes, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return(node1, nil) + + // Request 2 vCPUs + sbxRequest := &orchestrator.SandboxCreateRequest{ + Sandbox: &orchestrator.SandboxConfig{ + SandboxId: "test-optimistic-sandbox", + Vcpu: 2, + RamMb: 1024, + }, + } + + resultNode, err := PlaceSandbox(ctx, algorithm, nodes, nil, sbxRequest, machineinfo.MachineInfo{}, false, nil) + + require.NoError(t, err) + assert.NotNil(t, resultNode) + + // Verify: After successful placement, the node's CpuAllocated should be increased by 2 from the base + updatedCpuAllocated := resultNode.Metrics().CpuAllocated + assert.Equal(t, initialCpuAllocated+2, updatedCpuAllocated, "Node metrics should be optimistically updated after placement") +} diff --git a/packages/shared/pkg/featureflags/flags.go b/packages/shared/pkg/featureflags/flags.go index 4f9c31eec8..b241a32e7d 100644 --- a/packages/shared/pkg/featureflags/flags.go +++ b/packages/shared/pkg/featureflags/flags.go @@ -115,9 +115,10 @@ var ( // of synchronous. Only safe to enable after PeerToPeerChunkTransferFlag is ON. PeerToPeerAsyncCheckpointFlag = newBoolFlag("peer-to-peer-async-checkpoint", false) - PersistentVolumesFlag = newBoolFlag("can-use-persistent-volumes", env.IsDevelopment()) - ExecutionMetricsOnWebhooksFlag = newBoolFlag("execution-metrics-on-webhooks", false) // TODO: Remove NLT 20250315 - SandboxLabelBasedSchedulingFlag = newBoolFlag("sandbox-label-based-scheduling", false) + PersistentVolumesFlag = newBoolFlag("can-use-persistent-volumes", env.IsDevelopment()) + ExecutionMetricsOnWebhooksFlag = newBoolFlag("execution-metrics-on-webhooks", false) // TODO: Remove NLT 20250315 + SandboxLabelBasedSchedulingFlag = newBoolFlag("sandbox-label-based-scheduling", false) + OptimisticResourceAccountingFlag = newBoolFlag("optimistic-resource-accounting", true) ) type IntFlag struct {