Skip to content

Commit 02cf6dd

Browse files
committed
feat(placement): implement optimistic resource accounting with feature flags
Signed-off-by: MorningTZH <morningtzh@yeah.net>
1 parent 130e2b7 commit 02cf6dd

File tree

8 files changed

+182
-33
lines changed

8 files changed

+182
-33
lines changed

packages/api/internal/orchestrator/client.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ func (o *Orchestrator) connectToNode(ctx context.Context, discovered nodemanager
3131
connectCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), nodeConnectTimeout)
3232
defer cancel()
3333

34-
orchestratorNode, err := nodemanager.New(connectCtx, o.tel.TracerProvider, o.tel.MeterProvider, discovered)
34+
orchestratorNode, err := nodemanager.New(connectCtx, o.tel.TracerProvider, o.tel.MeterProvider, discovered, o.featureFlagsClient)
3535
if err != nil {
3636
return nil, err
3737
}
@@ -61,7 +61,7 @@ func (o *Orchestrator) connectToClusterNode(ctx context.Context, cluster *cluste
6161
connectCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), nodeConnectTimeout)
6262
defer cancel()
6363

64-
orchestratorNode, err := nodemanager.NewClusterNode(connectCtx, i.GetClient(), cluster.ID, cluster.SandboxDomain, i)
64+
orchestratorNode, err := nodemanager.NewClusterNode(connectCtx, i.GetClient(), cluster.ID, cluster.SandboxDomain, i, o.featureFlagsClient)
6565
if err != nil {
6666
logger.L().Error(ctx, "Failed to create node", zap.Error(err))
6767

packages/api/internal/orchestrator/delete_instance.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,5 +157,10 @@ func (o *Orchestrator) killSandboxOnNode(ctx context.Context, node *nodemanager.
157157
return fmt.Errorf("failed to delete sandbox '%s': %w", sbx.SandboxID, err)
158158
}
159159

160+
node.OptimisticRemove(ctx, nodemanager.SandboxResources{
161+
CPUs: sbx.VCpu,
162+
MiBMemory: sbx.RamMB,
163+
})
164+
160165
return nil
161166
}

packages/api/internal/orchestrator/nodemanager/mock.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212

1313
"github.com/e2b-dev/infra/packages/api/internal/api"
1414
"github.com/e2b-dev/infra/packages/api/internal/clusters"
15+
"github.com/e2b-dev/infra/packages/shared/pkg/featureflags"
1516
"github.com/e2b-dev/infra/packages/shared/pkg/grpc/orchestrator"
1617
infogrpc "github.com/e2b-dev/infra/packages/shared/pkg/grpc/orchestrator-info"
1718
templatemanager "github.com/e2b-dev/infra/packages/shared/pkg/grpc/template-manager"
@@ -119,6 +120,13 @@ func WithSandboxCreateError(err error) TestOptions {
119120
}
120121
}
121122

123+
// WithFeatureFlags sets a custom feature flags client for the test node
124+
func WithFeatureFlags(ff *featureflags.Client) TestOptions {
125+
return func(node *TestNode) {
126+
node.featureflags = ff
127+
}
128+
}
129+
122130
// MockSandboxClientCustom allows custom error logic per call
123131
type MockSandboxClientCustom struct {
124132
orchestrator.SandboxServiceClient

packages/api/internal/orchestrator/nodemanager/node.go

Lines changed: 37 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/e2b-dev/infra/packages/api/internal/api"
1616
"github.com/e2b-dev/infra/packages/api/internal/clusters"
1717
"github.com/e2b-dev/infra/packages/shared/pkg/consts"
18+
"github.com/e2b-dev/infra/packages/shared/pkg/featureflags"
1819
"github.com/e2b-dev/infra/packages/shared/pkg/logger"
1920
"github.com/e2b-dev/infra/packages/shared/pkg/machineinfo"
2021
"github.com/e2b-dev/infra/packages/shared/pkg/smap"
@@ -50,6 +51,9 @@ type Node struct {
5051

5152
PlacementMetrics PlacementMetrics
5253

54+
// featureflags is the feature flags client for feature flag checks
55+
featureflags *featureflags.Client
56+
5357
mutex sync.RWMutex
5458
}
5559

@@ -58,6 +62,7 @@ func New(
5862
tracerProvider trace.TracerProvider,
5963
meterProvider metric.MeterProvider,
6064
discoveredNode NomadServiceDiscovery,
65+
ff *featureflags.Client,
6166
) (*Node, error) {
6267
client, err := NewClient(tracerProvider, meterProvider, discoveredNode.OrchestratorAddress)
6368
if err != nil {
@@ -99,6 +104,8 @@ func New(
99104
createSuccess: atomic.Uint64{},
100105
createFails: atomic.Uint64{},
101106
},
107+
108+
featureflags: ff,
102109
}
103110

104111
n.UpdateMetricsFromServiceInfoResponse(nodeInfo)
@@ -108,7 +115,7 @@ func New(
108115
return n, nil
109116
}
110117

111-
func NewClusterNode(ctx context.Context, client *clusters.GRPCClient, clusterID uuid.UUID, sandboxDomain *string, i *clusters.Instance) (*Node, error) {
118+
func NewClusterNode(ctx context.Context, client *clusters.GRPCClient, clusterID uuid.UUID, sandboxDomain *string, i *clusters.Instance, ff *featureflags.Client) (*Node, error) {
112119
info := i.GetInfo()
113120
status, ok := OrchestratorToApiNodeStateMapper[info.Status]
114121
if !ok {
@@ -135,9 +142,10 @@ func NewClusterNode(ctx context.Context, client *clusters.GRPCClient, clusterID
135142
createFails: atomic.Uint64{},
136143
},
137144

138-
client: client,
139-
status: status,
140-
meta: nodeMetadata,
145+
client: client,
146+
status: status,
147+
meta: nodeMetadata,
148+
featureflags: ff,
141149
}
142150

143151
nodeClient, ctx := n.GetClient(ctx)
@@ -178,11 +186,28 @@ func (n *Node) IsNomadManaged() bool {
178186
return n.NomadNodeShortID != UnknownNomadNodeShortID
179187
}
180188

181-
func (n *Node) OptimisticAdd(res SandboxResources) {
182-
n.metricsMu.Lock()
183-
defer n.metricsMu.Unlock()
184-
185-
// Directly accumulate to the current metrics view
186-
n.metrics.CpuAllocated += uint32(res.CPUs)
187-
n.metrics.MemoryAllocatedBytes += uint64(res.MiBMemory) * 1024 * 1024 // Note: CpuPercent is difficult to estimate, usually just updating Allocated is sufficient for the scheduling algorithm
188-
}
189+
func (n *Node) OptimisticAdd(ctx context.Context, res SandboxResources) {
190+
if n.featureflags != nil && !n.featureflags.BoolFlag(ctx, featureflags.OptimisticResourceAccountingFlag) {
191+
return
192+
}
193+
194+
n.metricsMu.Lock()
195+
defer n.metricsMu.Unlock()
196+
197+
// Directly accumulate to the current metrics view
198+
n.metrics.CpuAllocated += uint32(res.CPUs)
199+
n.metrics.MemoryAllocatedBytes += uint64(res.MiBMemory) * 1024 * 1024 // Note: CpuPercent is difficult to estimate, usually just updating Allocated is sufficient for the scheduling algorithm
200+
}
201+
202+
func (n *Node) OptimisticRemove(ctx context.Context, res SandboxResources) {
203+
if n.featureflags != nil && !n.featureflags.BoolFlag(ctx, featureflags.OptimisticResourceAccountingFlag) {
204+
return
205+
}
206+
207+
n.metricsMu.Lock()
208+
defer n.metricsMu.Unlock()
209+
210+
// Directly subtract from the current metrics view
211+
n.metrics.CpuAllocated -= uint32(res.CPUs)
212+
n.metrics.MemoryAllocatedBytes -= uint64(res.MiBMemory) * 1024 * 1024
213+
}
Lines changed: 113 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,134 @@
11
package nodemanager_test
22

33
import (
4+
"context"
45
"testing"
56

7+
"github.com/launchdarkly/go-server-sdk/v7/testhelpers/ldtestdata"
68
"github.com/stretchr/testify/assert"
9+
"github.com/stretchr/testify/require"
710

811
"github.com/e2b-dev/infra/packages/api/internal/api"
912
"github.com/e2b-dev/infra/packages/api/internal/orchestrator/nodemanager"
13+
"github.com/e2b-dev/infra/packages/shared/pkg/featureflags"
1014
)
1115

12-
func TestNode_OptimisticAdd(t *testing.T) {
16+
func TestNode_OptimisticAdd_FlagEnabled(t *testing.T) {
1317
t.Parallel()
14-
15-
// Initialize an idle node
16-
node := nodemanager.NewTestNode("test-node", api.NodeStatusReady, 0, 4)
18+
19+
// 1. Create a LaunchDarkly test data source
20+
td := ldtestdata.DataSource()
21+
22+
// 2. Set the feature flag under test to true
23+
td.Update(td.Flag(featureflags.OptimisticResourceAccountingFlag.Key()).VariationForAll(true))
24+
25+
// 3. Create a Feature Flag client with the test data source
26+
ffClient, err := featureflags.NewClientWithDatasource(td)
27+
require.NoError(t, err)
28+
29+
// 4. Initialize Node with the injected ffClient
30+
node := nodemanager.NewTestNode("test-node", api.NodeStatusReady, 0, 4, nodemanager.WithFeatureFlags(ffClient))
1731
initialMetrics := node.Metrics()
1832

19-
// Simulate the resources to be allocated
33+
// 5. Call the method
2034
res := nodemanager.SandboxResources{
2135
CPUs: 2,
22-
MiBMemory: 1024, // 1GB
36+
MiBMemory: 1024,
2337
}
38+
node.OptimisticAdd(context.Background(), res)
2439

25-
// Perform optimistic addition
26-
node.OptimisticAdd(res)
40+
// 6. Assert: When flag is enabled, resources should be successfully accumulated
2741
newMetrics := node.Metrics()
28-
29-
// Verify that CPU and memory are increased as expected
3042
assert.Equal(t, initialMetrics.CpuAllocated+uint32(res.CPUs), newMetrics.CpuAllocated)
3143
assert.Equal(t, initialMetrics.MemoryAllocatedBytes+uint64(res.MiBMemory)*1024*1024, newMetrics.MemoryAllocatedBytes)
32-
}
44+
}
45+
46+
func TestNode_OptimisticAdd_FlagDisabled(t *testing.T) {
47+
t.Parallel()
48+
49+
// 1. Create a LaunchDarkly test data source
50+
td := ldtestdata.DataSource()
51+
52+
// 2. Set the feature flag under test to false
53+
td.Update(td.Flag(featureflags.OptimisticResourceAccountingFlag.Key()).VariationForAll(false))
54+
55+
// 3. Create a Feature Flag client with the test data source
56+
ffClient, err := featureflags.NewClientWithDatasource(td)
57+
require.NoError(t, err)
58+
59+
// 4. Initialize Node with the injected ffClient
60+
node := nodemanager.NewTestNode("test-node", api.NodeStatusReady, 0, 4, nodemanager.WithFeatureFlags(ffClient))
61+
initialMetrics := node.Metrics()
62+
63+
// 5. Call the method
64+
res := nodemanager.SandboxResources{
65+
CPUs: 2,
66+
MiBMemory: 1024,
67+
}
68+
node.OptimisticAdd(context.Background(), res)
69+
70+
// 6. Assert: When flag is disabled, return early, resources should not be accumulated
71+
newMetrics := node.Metrics()
72+
assert.Equal(t, initialMetrics.CpuAllocated, newMetrics.CpuAllocated)
73+
assert.Equal(t, initialMetrics.MemoryAllocatedBytes, newMetrics.MemoryAllocatedBytes)
74+
}
75+
76+
func TestNode_OptimisticRemove_FlagEnabled(t *testing.T) {
77+
t.Parallel()
78+
79+
// 1. Create a LaunchDarkly test data source
80+
td := ldtestdata.DataSource()
81+
82+
// 2. Set the feature flag under test to true
83+
td.Update(td.Flag(featureflags.OptimisticResourceAccountingFlag.Key()).VariationForAll(true))
84+
85+
// 3. Create a Feature Flag client with the test data source
86+
ffClient, err := featureflags.NewClientWithDatasource(td)
87+
require.NoError(t, err)
88+
89+
// 4. Initialize Node with the injected ffClient - some resources are already allocated at initialization
90+
node := nodemanager.NewTestNode("test-node", api.NodeStatusReady, 4, 8192, nodemanager.WithFeatureFlags(ffClient))
91+
initialMetrics := node.Metrics()
92+
93+
// 5. Call the method
94+
res := nodemanager.SandboxResources{
95+
CPUs: 2,
96+
MiBMemory: 1024,
97+
}
98+
node.OptimisticRemove(context.Background(), res)
99+
100+
// 6. Assert: When flag is enabled, resources should be successfully deducted
101+
newMetrics := node.Metrics()
102+
assert.Equal(t, initialMetrics.CpuAllocated-uint32(res.CPUs), newMetrics.CpuAllocated)
103+
assert.Equal(t, initialMetrics.MemoryAllocatedBytes-uint64(res.MiBMemory)*1024*1024, newMetrics.MemoryAllocatedBytes)
104+
}
105+
106+
func TestNode_OptimisticRemove_FlagDisabled(t *testing.T) {
107+
t.Parallel()
108+
109+
// 1. Create a LaunchDarkly test data source
110+
td := ldtestdata.DataSource()
111+
112+
// 2. Set the feature flag under test to false
113+
td.Update(td.Flag(featureflags.OptimisticResourceAccountingFlag.Key()).VariationForAll(false))
114+
115+
// 3. Create a Feature Flag client with the test data source
116+
ffClient, err := featureflags.NewClientWithDatasource(td)
117+
require.NoError(t, err)
118+
119+
// 4. Initialize Node with the injected ffClient - some resources are already allocated at initialization
120+
node := nodemanager.NewTestNode("test-node", api.NodeStatusReady, 4, 8192, nodemanager.WithFeatureFlags(ffClient))
121+
initialMetrics := node.Metrics()
122+
123+
// 5. Call the method
124+
res := nodemanager.SandboxResources{
125+
CPUs: 2,
126+
MiBMemory: 1024,
127+
}
128+
node.OptimisticRemove(context.Background(), res)
129+
130+
// 6. Assert: When flag is disabled, return early, resources should remain unchanged
131+
newMetrics := node.Metrics()
132+
assert.Equal(t, initialMetrics.CpuAllocated, newMetrics.CpuAllocated)
133+
assert.Equal(t, initialMetrics.MemoryAllocatedBytes, newMetrics.MemoryAllocatedBytes)
134+
}

packages/api/internal/orchestrator/placement/placement.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ func PlaceSandbox(ctx context.Context, algorithm Algorithm, clusterNodes []*node
8282
// Optimistic update: assume resources are occupied after successful creation.
8383
// Manually update node.metrics with the newly allocated resources.
8484
// This will be overwritten by the next real Metrics report for auto-correction.
85-
node.OptimisticAdd(nodemanager.SandboxResources{
85+
node.OptimisticAdd(ctx, nodemanager.SandboxResources{
8686
CPUs: sbxRequest.GetSandbox().GetVcpu(),
8787
MiBMemory: sbxRequest.GetSandbox().GetRamMb(),
8888
})

packages/api/internal/orchestrator/placement/placement_test.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"testing"
88
"time"
99

10+
"github.com/launchdarkly/go-server-sdk/v7/testhelpers/ldtestdata"
1011
"github.com/stretchr/testify/assert"
1112
"github.com/stretchr/testify/mock"
1213
"github.com/stretchr/testify/require"
@@ -15,6 +16,7 @@ import (
1516

1617
"github.com/e2b-dev/infra/packages/api/internal/api"
1718
"github.com/e2b-dev/infra/packages/api/internal/orchestrator/nodemanager"
19+
"github.com/e2b-dev/infra/packages/shared/pkg/featureflags"
1820
"github.com/e2b-dev/infra/packages/shared/pkg/grpc/orchestrator"
1921
"github.com/e2b-dev/infra/packages/shared/pkg/machineinfo"
2022
)
@@ -218,12 +220,18 @@ func TestPlaceSandbox_TriggersOptimisticUpdate(t *testing.T) {
218220
t.Parallel()
219221
ctx := t.Context()
220222

223+
// Enable the optimistic resource accounting flag for this test
224+
td := ldtestdata.DataSource()
225+
td.Update(td.Flag(featureflags.OptimisticResourceAccountingFlag.Key()).VariationForAll(true))
226+
ffClient, err := featureflags.NewClientWithDatasource(td)
227+
require.NoError(t, err)
228+
221229
// Create a node and record the initial allocated CPU
222-
node1 := nodemanager.NewTestNode("node1", api.NodeStatusReady, 0, 4)
230+
node1 := nodemanager.NewTestNode("node1", api.NodeStatusReady, 0, 4, nodemanager.WithFeatureFlags(ffClient))
223231
initialCpuAllocated := node1.Metrics().CpuAllocated
224232

225233
nodes := []*nodemanager.Node{node1}
226-
234+
227235
// Mock algorithm directly returns node1
228236
algorithm := &mockAlgorithm{}
229237
algorithm.On("chooseNode", mock.Anything, nodes, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
@@ -242,8 +250,8 @@ func TestPlaceSandbox_TriggersOptimisticUpdate(t *testing.T) {
242250

243251
require.NoError(t, err)
244252
assert.NotNil(t, resultNode)
245-
253+
246254
// Verify: After successful placement, the node's CpuAllocated should be increased by 2 from the base
247255
updatedCpuAllocated := resultNode.Metrics().CpuAllocated
248256
assert.Equal(t, initialCpuAllocated+2, updatedCpuAllocated, "Node metrics should be optimistically updated after placement")
249-
}
257+
}

packages/shared/pkg/featureflags/flags.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -115,9 +115,10 @@ var (
115115
// of synchronous. Only safe to enable after PeerToPeerChunkTransferFlag is ON.
116116
PeerToPeerAsyncCheckpointFlag = newBoolFlag("peer-to-peer-async-checkpoint", false)
117117

118-
PersistentVolumesFlag = newBoolFlag("can-use-persistent-volumes", env.IsDevelopment())
119-
ExecutionMetricsOnWebhooksFlag = newBoolFlag("execution-metrics-on-webhooks", false) // TODO: Remove NLT 20250315
120-
SandboxLabelBasedSchedulingFlag = newBoolFlag("sandbox-label-based-scheduling", false)
118+
PersistentVolumesFlag = newBoolFlag("can-use-persistent-volumes", env.IsDevelopment())
119+
ExecutionMetricsOnWebhooksFlag = newBoolFlag("execution-metrics-on-webhooks", false) // TODO: Remove NLT 20250315
120+
SandboxLabelBasedSchedulingFlag = newBoolFlag("sandbox-label-based-scheduling", false)
121+
OptimisticResourceAccountingFlag = newBoolFlag("optimistic-resource-accounting", true)
121122
)
122123

123124
type IntFlag struct {

0 commit comments

Comments
 (0)