Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions packages/api/internal/orchestrator/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (o *Orchestrator) connectToNode(ctx context.Context, discovered nodemanager
connectCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), nodeConnectTimeout)
defer cancel()

orchestratorNode, err := nodemanager.New(connectCtx, o.tel.TracerProvider, o.tel.MeterProvider, discovered)
orchestratorNode, err := nodemanager.New(connectCtx, o.tel.TracerProvider, o.tel.MeterProvider, discovered, o.featureFlagsClient)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -61,7 +61,7 @@ func (o *Orchestrator) connectToClusterNode(ctx context.Context, cluster *cluste
connectCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), nodeConnectTimeout)
defer cancel()

orchestratorNode, err := nodemanager.NewClusterNode(connectCtx, i.GetClient(), cluster.ID, cluster.SandboxDomain, i)
orchestratorNode, err := nodemanager.NewClusterNode(connectCtx, i.GetClient(), cluster.ID, cluster.SandboxDomain, i, o.featureFlagsClient)
if err != nil {
logger.L().Error(ctx, "Failed to create node", zap.Error(err))

Expand Down
5 changes: 5 additions & 0 deletions packages/api/internal/orchestrator/delete_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,5 +157,10 @@ func (o *Orchestrator) killSandboxOnNode(ctx context.Context, node *nodemanager.
return fmt.Errorf("failed to delete sandbox '%s': %w", sbx.SandboxID, err)
}

node.OptimisticRemove(ctx, nodemanager.SandboxResources{
CPUs: sbx.VCpu,
MiBMemory: sbx.RamMB,
})

return nil
}
8 changes: 8 additions & 0 deletions packages/api/internal/orchestrator/nodemanager/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/e2b-dev/infra/packages/api/internal/api"
"github.com/e2b-dev/infra/packages/api/internal/clusters"
"github.com/e2b-dev/infra/packages/shared/pkg/featureflags"
"github.com/e2b-dev/infra/packages/shared/pkg/grpc/orchestrator"
infogrpc "github.com/e2b-dev/infra/packages/shared/pkg/grpc/orchestrator-info"
templatemanager "github.com/e2b-dev/infra/packages/shared/pkg/grpc/template-manager"
Expand Down Expand Up @@ -119,6 +120,13 @@ func WithSandboxCreateError(err error) TestOptions {
}
}

// WithFeatureFlags sets a custom feature flags client for the test node
func WithFeatureFlags(ff *featureflags.Client) TestOptions {
return func(node *TestNode) {
node.featureflags = ff
}
}

// MockSandboxClientCustom allows custom error logic per call
type MockSandboxClientCustom struct {
orchestrator.SandboxServiceClient
Expand Down
42 changes: 38 additions & 4 deletions packages/api/internal/orchestrator/nodemanager/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/e2b-dev/infra/packages/api/internal/api"
"github.com/e2b-dev/infra/packages/api/internal/clusters"
"github.com/e2b-dev/infra/packages/shared/pkg/consts"
"github.com/e2b-dev/infra/packages/shared/pkg/featureflags"
"github.com/e2b-dev/infra/packages/shared/pkg/logger"
"github.com/e2b-dev/infra/packages/shared/pkg/machineinfo"
"github.com/e2b-dev/infra/packages/shared/pkg/smap"
Expand Down Expand Up @@ -50,6 +51,9 @@ type Node struct {

PlacementMetrics PlacementMetrics

// featureflags is the feature flags client for feature flag checks
featureflags *featureflags.Client

mutex sync.RWMutex
}

Expand All @@ -58,6 +62,7 @@ func New(
tracerProvider trace.TracerProvider,
meterProvider metric.MeterProvider,
discoveredNode NomadServiceDiscovery,
ff *featureflags.Client,
) (*Node, error) {
client, err := NewClient(tracerProvider, meterProvider, discoveredNode.OrchestratorAddress)
if err != nil {
Expand Down Expand Up @@ -99,6 +104,8 @@ func New(
createSuccess: atomic.Uint64{},
createFails: atomic.Uint64{},
},

featureflags: ff,
}

n.UpdateMetricsFromServiceInfoResponse(nodeInfo)
Expand All @@ -108,7 +115,7 @@ func New(
return n, nil
}

func NewClusterNode(ctx context.Context, client *clusters.GRPCClient, clusterID uuid.UUID, sandboxDomain *string, i *clusters.Instance) (*Node, error) {
func NewClusterNode(ctx context.Context, client *clusters.GRPCClient, clusterID uuid.UUID, sandboxDomain *string, i *clusters.Instance, ff *featureflags.Client) (*Node, error) {
info := i.GetInfo()
status, ok := OrchestratorToApiNodeStateMapper[info.Status]
if !ok {
Expand All @@ -135,9 +142,10 @@ func NewClusterNode(ctx context.Context, client *clusters.GRPCClient, clusterID
createFails: atomic.Uint64{},
},

client: client,
status: status,
meta: nodeMetadata,
client: client,
status: status,
meta: nodeMetadata,
featureflags: ff,
}

nodeClient, ctx := n.GetClient(ctx)
Expand Down Expand Up @@ -177,3 +185,29 @@ func (n *Node) GetClient(ctx context.Context) (*clusters.GRPCClient, context.Con
func (n *Node) IsNomadManaged() bool {
return n.NomadNodeShortID != UnknownNomadNodeShortID
}

func (n *Node) OptimisticAdd(ctx context.Context, res SandboxResources) {
if n.featureflags != nil && !n.featureflags.BoolFlag(ctx, featureflags.OptimisticResourceAccountingFlag) {
return
}

n.metricsMu.Lock()
defer n.metricsMu.Unlock()

// Directly accumulate to the current metrics view
n.metrics.CpuAllocated += uint32(res.CPUs)
n.metrics.MemoryAllocatedBytes += uint64(res.MiBMemory) * 1024 * 1024 // Note: CpuPercent is difficult to estimate, usually just updating Allocated is sufficient for the scheduling algorithm
}

func (n *Node) OptimisticRemove(ctx context.Context, res SandboxResources) {
if n.featureflags != nil && !n.featureflags.BoolFlag(ctx, featureflags.OptimisticResourceAccountingFlag) {
return
}

n.metricsMu.Lock()
defer n.metricsMu.Unlock()

// Directly subtract from the current metrics view
n.metrics.CpuAllocated -= uint32(res.CPUs)
n.metrics.MemoryAllocatedBytes -= uint64(res.MiBMemory) * 1024 * 1024
}
134 changes: 134 additions & 0 deletions packages/api/internal/orchestrator/nodemanager/node_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package nodemanager_test

import (
"context"
"testing"

"github.com/launchdarkly/go-server-sdk/v7/testhelpers/ldtestdata"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/e2b-dev/infra/packages/api/internal/api"
"github.com/e2b-dev/infra/packages/api/internal/orchestrator/nodemanager"
"github.com/e2b-dev/infra/packages/shared/pkg/featureflags"
)

func TestNode_OptimisticAdd_FlagEnabled(t *testing.T) {
t.Parallel()

// 1. Create a LaunchDarkly test data source
td := ldtestdata.DataSource()

// 2. Set the feature flag under test to true
td.Update(td.Flag(featureflags.OptimisticResourceAccountingFlag.Key()).VariationForAll(true))

// 3. Create a Feature Flag client with the test data source
ffClient, err := featureflags.NewClientWithDatasource(td)
require.NoError(t, err)

// 4. Initialize Node with the injected ffClient
node := nodemanager.NewTestNode("test-node", api.NodeStatusReady, 0, 4, nodemanager.WithFeatureFlags(ffClient))
initialMetrics := node.Metrics()

// 5. Call the method
res := nodemanager.SandboxResources{
CPUs: 2,
MiBMemory: 1024,
}
node.OptimisticAdd(context.Background(), res)

// 6. Assert: When flag is enabled, resources should be successfully accumulated
newMetrics := node.Metrics()
assert.Equal(t, initialMetrics.CpuAllocated+uint32(res.CPUs), newMetrics.CpuAllocated)
assert.Equal(t, initialMetrics.MemoryAllocatedBytes+uint64(res.MiBMemory)*1024*1024, newMetrics.MemoryAllocatedBytes)
}

func TestNode_OptimisticAdd_FlagDisabled(t *testing.T) {
t.Parallel()

// 1. Create a LaunchDarkly test data source
td := ldtestdata.DataSource()

// 2. Set the feature flag under test to false
td.Update(td.Flag(featureflags.OptimisticResourceAccountingFlag.Key()).VariationForAll(false))

// 3. Create a Feature Flag client with the test data source
ffClient, err := featureflags.NewClientWithDatasource(td)
require.NoError(t, err)

// 4. Initialize Node with the injected ffClient
node := nodemanager.NewTestNode("test-node", api.NodeStatusReady, 0, 4, nodemanager.WithFeatureFlags(ffClient))
initialMetrics := node.Metrics()

// 5. Call the method
res := nodemanager.SandboxResources{
CPUs: 2,
MiBMemory: 1024,
}
node.OptimisticAdd(context.Background(), res)

// 6. Assert: When flag is disabled, return early, resources should not be accumulated
newMetrics := node.Metrics()
assert.Equal(t, initialMetrics.CpuAllocated, newMetrics.CpuAllocated)
assert.Equal(t, initialMetrics.MemoryAllocatedBytes, newMetrics.MemoryAllocatedBytes)
}

func TestNode_OptimisticRemove_FlagEnabled(t *testing.T) {
t.Parallel()

// 1. Create a LaunchDarkly test data source
td := ldtestdata.DataSource()

// 2. Set the feature flag under test to true
td.Update(td.Flag(featureflags.OptimisticResourceAccountingFlag.Key()).VariationForAll(true))

// 3. Create a Feature Flag client with the test data source
ffClient, err := featureflags.NewClientWithDatasource(td)
require.NoError(t, err)

// 4. Initialize Node with the injected ffClient - some resources are already allocated at initialization
node := nodemanager.NewTestNode("test-node", api.NodeStatusReady, 4, 8192, nodemanager.WithFeatureFlags(ffClient))
initialMetrics := node.Metrics()

// 5. Call the method
res := nodemanager.SandboxResources{
CPUs: 2,
MiBMemory: 1024,
}
node.OptimisticRemove(context.Background(), res)

// 6. Assert: When flag is enabled, resources should be successfully deducted
newMetrics := node.Metrics()
assert.Equal(t, initialMetrics.CpuAllocated-uint32(res.CPUs), newMetrics.CpuAllocated)
assert.Equal(t, initialMetrics.MemoryAllocatedBytes-uint64(res.MiBMemory)*1024*1024, newMetrics.MemoryAllocatedBytes)
}

func TestNode_OptimisticRemove_FlagDisabled(t *testing.T) {
t.Parallel()

// 1. Create a LaunchDarkly test data source
td := ldtestdata.DataSource()

// 2. Set the feature flag under test to false
td.Update(td.Flag(featureflags.OptimisticResourceAccountingFlag.Key()).VariationForAll(false))

// 3. Create a Feature Flag client with the test data source
ffClient, err := featureflags.NewClientWithDatasource(td)
require.NoError(t, err)

// 4. Initialize Node with the injected ffClient - some resources are already allocated at initialization
node := nodemanager.NewTestNode("test-node", api.NodeStatusReady, 4, 8192, nodemanager.WithFeatureFlags(ffClient))
initialMetrics := node.Metrics()

// 5. Call the method
res := nodemanager.SandboxResources{
CPUs: 2,
MiBMemory: 1024,
}
node.OptimisticRemove(context.Background(), res)

// 6. Assert: When flag is disabled, return early, resources should remain unchanged
newMetrics := node.Metrics()
assert.Equal(t, initialMetrics.CpuAllocated, newMetrics.CpuAllocated)
assert.Equal(t, initialMetrics.MemoryAllocatedBytes, newMetrics.MemoryAllocatedBytes)
}
8 changes: 8 additions & 0 deletions packages/api/internal/orchestrator/placement/placement.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,14 @@ func PlaceSandbox(ctx context.Context, algorithm Algorithm, clusterNodes []*node
if err == nil {
node.PlacementMetrics.Success(sbxRequest.GetSandbox().GetSandboxId())

// Optimistic update: assume resources are occupied after successful creation.
// Manually update node.metrics with the newly allocated resources.
// This will be overwritten by the next real Metrics report for auto-correction.
node.OptimisticAdd(ctx, nodemanager.SandboxResources{
CPUs: sbxRequest.GetSandbox().GetVcpu(),
MiBMemory: sbxRequest.GetSandbox().GetRamMb(),
})

return node, nil
}

Expand Down
Loading