-
Notifications
You must be signed in to change notification settings - Fork 272
Expand file tree
/
Copy pathplacement.go
More file actions
115 lines (94 loc) · 4.2 KB
/
placement.go
File metadata and controls
115 lines (94 loc) · 4.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package placement
import (
"context"
"fmt"
"go.opentelemetry.io/otel"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/e2b-dev/infra/packages/api/internal/orchestrator/nodemanager"
"github.com/e2b-dev/infra/packages/api/internal/utils"
"github.com/e2b-dev/infra/packages/shared/pkg/grpc/orchestrator"
"github.com/e2b-dev/infra/packages/shared/pkg/logger"
"github.com/e2b-dev/infra/packages/shared/pkg/machineinfo"
"github.com/e2b-dev/infra/packages/shared/pkg/telemetry"
)
var tracer = otel.Tracer("github.com/e2b-dev/infra/packages/api/internal/orchestrator/placement")
var errSandboxCreateFailed = fmt.Errorf("failed to create a new sandbox, if the problem persists, contact us")
// Algorithm defines the interface for sandbox placement strategies.
// Implementations should choose an optimal node based on available resources
// and current load distribution.
type Algorithm interface {
chooseNode(ctx context.Context, nodes []*nodemanager.Node, nodesExcluded map[string]struct{}, requested nodemanager.SandboxResources, buildMachineInfo machineinfo.MachineInfo, filterByLabels bool, requiredLabels []string) (*nodemanager.Node, error)
}
func PlaceSandbox(ctx context.Context, algorithm Algorithm, clusterNodes []*nodemanager.Node, preferredNode *nodemanager.Node, sbxRequest *orchestrator.SandboxCreateRequest, buildMachineInfo machineinfo.MachineInfo, labelFilteringEnabled bool, requiredLabels []string) (*nodemanager.Node, error) {
ctx, span := tracer.Start(ctx, "place-sandbox")
defer span.End()
nodesExcluded := make(map[string]struct{})
var err error
var node *nodemanager.Node
if preferredNode != nil {
node = preferredNode
}
attempt := 0
for attempt < maxRetries {
select {
case <-ctx.Done():
return nil, fmt.Errorf("request timed out during %d. attempt", attempt+1)
default:
// Continue
}
if node != nil {
telemetry.ReportEvent(ctx, "Placing sandbox on the preferred node", telemetry.WithNodeID(node.ID))
} else {
if len(nodesExcluded) >= len(clusterNodes) {
return nil, fmt.Errorf("no nodes available")
}
node, err = algorithm.chooseNode(ctx, clusterNodes, nodesExcluded, nodemanager.SandboxResources{CPUs: sbxRequest.GetSandbox().GetVcpu(), MiBMemory: sbxRequest.GetSandbox().GetRamMb()}, buildMachineInfo, labelFilteringEnabled, requiredLabels)
if err != nil {
return nil, err
}
telemetry.ReportEvent(ctx, "Placing sandbox on the node", telemetry.WithNodeID(node.ID))
}
node.PlacementMetrics.StartPlacing(sbxRequest.GetSandbox().GetSandboxId(), nodemanager.SandboxResources{
CPUs: sbxRequest.GetSandbox().GetVcpu(),
MiBMemory: sbxRequest.GetSandbox().GetRamMb(),
})
ctx, span := tracer.Start(ctx, "create-sandbox")
span.SetAttributes(
telemetry.WithNodeID(node.ID),
telemetry.WithClusterID(node.ClusterID),
)
err = node.SandboxCreate(ctx, sbxRequest)
span.End()
if err == nil {
node.PlacementMetrics.Success(sbxRequest.GetSandbox().GetSandboxId())
// Optimistic update: assume resources are occupied after successful creation.
// Manually update node.metrics with the newly allocated resources.
// This will be overwritten by the next real Metrics report for auto-correction.
node.OptimisticAdd(ctx, nodemanager.SandboxResources{
CPUs: sbxRequest.GetSandbox().GetVcpu(),
MiBMemory: sbxRequest.GetSandbox().GetRamMb(),
})
return node, nil
}
failedNode := node
node = nil
st, ok := status.FromError(err)
statusCode := codes.Internal
if ok {
statusCode = st.Code()
}
switch statusCode {
case codes.ResourceExhausted:
failedNode.PlacementMetrics.Skip(sbxRequest.GetSandbox().GetSandboxId())
logger.L().Warn(ctx, "Node exhausted, trying another node", logger.WithSandboxID(sbxRequest.GetSandbox().GetSandboxId()), logger.WithNodeID(failedNode.ID))
default:
nodesExcluded[failedNode.ID] = struct{}{}
failedNode.PlacementMetrics.Fail(sbxRequest.GetSandbox().GetSandboxId())
logger.L().Error(ctx, "Failed to create sandbox", logger.WithSandboxID(sbxRequest.GetSandbox().GetSandboxId()), logger.WithNodeID(failedNode.ID), zap.Int("attempt", attempt+1), zap.Error(utils.UnwrapGRPCError(err)))
attempt++
}
}
return nil, errSandboxCreateFailed
}