Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,12 @@ func BuildOCI(opts *coreoptions.AutoscalerOptions, do cloudprovider.NodeGroupDis
if strings.HasPrefix(ocidType, npconsts.OciNodePoolResourceIdent) && nodepoolTagsFound == true {
klog.Fatalf("-nodes and -node-group-auto-discovery parameters can not be used together.")
} else if strings.HasPrefix(ocidType, npconsts.OciNodePoolResourceIdent) || nodepoolTagsFound == true {
manager, err := nodepools.CreateNodePoolManager(opts.CloudConfig, opts.NodeGroupAutoDiscovery, do, createKubeClient(opts.AutoscalingOptions))
manager, err := nodepools.CreateNodePoolManager(
opts.CloudConfig,
opts.NodeGroupAutoDiscovery,
do,
createKubeClient(opts.AutoscalingOptions),
opts.AutoscalingOptions.NodeGroupDefaults.MaxNodeProvisionTime)
if err != nil {
klog.Fatalf("Could not create OCI OKE cloud provider: %v", err)
}
Expand Down
142 changes: 104 additions & 38 deletions cluster-autoscaler/cloudprovider/oci/nodepools/oci_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"regexp"
"strconv"
"strings"
"sync"
"time"

"github.com/pkg/errors"
Expand Down Expand Up @@ -87,7 +88,7 @@ type okeClient interface {
}

// CreateNodePoolManager creates an NodePoolManager that can manage autoscaling node pools
func CreateNodePoolManager(cloudConfigPath string, nodeGroupAutoDiscoveryList []string, discoveryOpts cloudprovider.NodeGroupDiscoveryOptions, kubeClient kubernetes.Interface) (NodePoolManager, error) {
func CreateNodePoolManager(cloudConfigPath string, nodeGroupAutoDiscoveryList []string, discoveryOpts cloudprovider.NodeGroupDiscoveryOptions, kubeClient kubernetes.Interface, maxNodeProvisionTime time.Duration) (NodePoolManager, error) {

var err error
var configProvider common.ConfigurationProvider
Expand Down Expand Up @@ -153,14 +154,16 @@ func CreateNodePoolManager(cloudConfigPath string, nodeGroupAutoDiscoveryList []
registeredTaintsGetter := CreateRegisteredTaintsGetter()

manager := &ociManagerImpl{
cfg: cloudConfig,
okeClient: &okeClient,
computeClient: &computeClient,
staticNodePools: map[string]NodePool{},
ociShapeGetter: ociShapeGetter,
ociTagsGetter: ociTagsGetter,
registeredTaintsGetter: registeredTaintsGetter,
nodePoolCache: newNodePoolCache(&okeClient),
cfg: cloudConfig,
okeClient: &okeClient,
computeClient: &computeClient,
staticNodePools: map[string]NodePool{},
ociShapeGetter: ociShapeGetter,
ociTagsGetter: ociTagsGetter,
registeredTaintsGetter: registeredTaintsGetter,
nodePoolCache: newNodePoolCache(&okeClient),
instanceCreationTimeCache: make(map[string]time.Time),
maxNodeProvisionTime: maxNodeProvisionTime,
}

// auto discover nodepools from compartments with nodeGroupAutoDiscovery parameter
Expand Down Expand Up @@ -373,11 +376,15 @@ type ociManagerImpl struct {
staticNodePools map[string]NodePool
nodeGroups []nodeGroupAutoDiscovery

lastRefresh time.Time
lastRefresh time.Time
maxNodeProvisionTime time.Duration

// caches the node pool objects received from OKE.
// All interactions with OKE's API should go through the cache.
nodePoolCache *nodePoolCache

instanceCreationTimeCache map[string]time.Time
instanceCreationTimeCacheMu sync.RWMutex
}

// Refresh triggers refresh of cached resources.
Expand Down Expand Up @@ -555,6 +562,38 @@ func getDisplayNamePrefix(clusterId string, nodePoolId string) string {
"-" + shortNodePoolId
}

func (m *ociManagerImpl) exceedsMaxNodeProvisionTime(node oke.Node) bool {
if node.Id == nil {
return false
}
// If not configured, treat as disabled (no timeout enforcement).
if m.maxNodeProvisionTime <= 0 {
return false
}

creationTime, found := m.GetInstanceCreationTimeFromCache(*node.Id)

if !found {
getInstanceResponse, err := m.computeClient.GetInstance(context.Background(), core.GetInstanceRequest{
InstanceId: node.Id,
})
if err != nil {
klog.V(4).Infof("OCI node %v is not found in compute: %v", *node.Name, err)
return false
}

creationTime = getInstanceResponse.TimeCreated.Time
m.SetInstanceCreationTimeInCache(*getInstanceResponse.Id, creationTime)
}

if creationTime.Add(m.maxNodeProvisionTime).After(time.Now()) {
return false
}

// Node has exceeded the allowed startup time
return true
}

// GetNodePoolNodes returns NodePool nodes that are not in a terminal state.
func (m *ociManagerImpl) GetNodePoolNodes(np NodePool) ([]cloudprovider.Instance, error) {
klog.V(4).Infof("getting nodes for node pool: %q", np.Id())
Expand All @@ -567,50 +606,58 @@ func (m *ociManagerImpl) GetNodePoolNodes(np NodePool) ([]cloudprovider.Instance

var instances []cloudprovider.Instance
for _, node := range nodePool.Nodes {

if node.NodeError != nil {

// We should move away from the approach of determining a node error as a Out of host capacity
// through string comparison. An error code specifically for Out of host capacity must be set
// and returned in the API response.
errorClass := cloudprovider.OtherErrorClass
if *node.NodeError.Code == "LimitExceeded" ||
*node.NodeError.Code == "QuotaExceeded" ||
(*node.NodeError.Code == "InternalError" &&
strings.Contains(*node.NodeError.Message, "Out of host capacity")) {
errorClass = cloudprovider.OutOfResourcesErrorClass
// Determine whether the node has exceeded MaxNodeProvisionTime
isTimedout := node.LifecycleState == oke.NodeLifecycleStateUpdating &&
m.exceedsMaxNodeProvisionTime(node)

var errorInfo *cloudprovider.InstanceErrorInfo
// Placeholder node (OOC or LimitExceed) is excluded, and shouldn't push to autoscaler
// We should not expect these nodes can register ever (refer to func expectedToRegister in clusterstate.go)
if node.Id != nil && *node.Id != "" {
if node.NodeError != nil && isTimedout {
errorInfo = &cloudprovider.InstanceErrorInfo{
ErrorClass: cloudprovider.OtherErrorClass,
ErrorCode: *node.NodeError.Code,
ErrorMessage: *node.NodeError.Message,
}
} else if isTimedout {
// MaxNodeProvisionTime is exceeded already, but OKE does not set error on the node,
// return a default timeout error
errorInfo = &cloudprovider.InstanceErrorInfo{
ErrorClass: cloudprovider.OtherErrorClass,
ErrorCode: "MaxNodeProvisionTimeExceeded",
ErrorMessage: "MaxNodeProvisionTimeExceeded",
}
}

instances = append(instances, cloudprovider.Instance{
Id: *node.Id,
Status: &cloudprovider.InstanceStatus{
State: cloudprovider.InstanceCreating,
ErrorInfo: &cloudprovider.InstanceErrorInfo{
ErrorClass: errorClass,
ErrorCode: *node.NodeError.Code,
ErrorMessage: *node.NodeError.Message,
},
},
})

continue
}

switch node.LifecycleState {
case oke.NodeLifecycleStateDeleted:
klog.V(4).Infof("skipping instance is in deleted state: %q", *node.Id)
if _, found := m.GetInstanceCreationTimeFromCache(*node.Id); found {
m.DeleteInstanceCreationTimeInCache(*node.Id)
}
case oke.NodeLifecycleStateDeleting:
instances = append(instances, cloudprovider.Instance{
Id: *node.Id,
Status: &cloudprovider.InstanceStatus{
State: cloudprovider.InstanceDeleting,
},
})
case oke.NodeLifecycleStateCreating, oke.NodeLifecycleStateUpdating:
if _, found := m.GetInstanceCreationTimeFromCache(*node.Id); found {
m.DeleteInstanceCreationTimeInCache(*node.Id)
}
case oke.NodeLifecycleStateCreating:
// Node in creating is just a placehold node. We must NOT return it. Otherwise the autoscaler core
// will expect that the node can register (refer to func expectedToRegister in clusterstate.go), and
// call deleteInstance after it reaches MaxNodeProvisionTime
klog.V(4).Infof("skipping placeholder node in Creating state: %v", *node.Name)
case oke.NodeLifecycleStateUpdating:
instances = append(instances, cloudprovider.Instance{
Id: *node.Id,
Status: &cloudprovider.InstanceStatus{
State: cloudprovider.InstanceCreating,
State: cloudprovider.InstanceCreating,
ErrorInfo: errorInfo,
},
})
case oke.NodeLifecycleStateActive:
Expand Down Expand Up @@ -875,6 +922,25 @@ func ReasonForError(err error) metav1.StatusReason {
return metav1.StatusReasonUnknown
}

func (m *ociManagerImpl) GetInstanceCreationTimeFromCache(nodeId string) (time.Time, bool) {
m.instanceCreationTimeCacheMu.RLock()
defer m.instanceCreationTimeCacheMu.RUnlock()
creationTimestamp, found := m.instanceCreationTimeCache[nodeId]
return creationTimestamp, found
}

func (m *ociManagerImpl) SetInstanceCreationTimeInCache(nodeId string, creationTimestamp time.Time) {
m.instanceCreationTimeCacheMu.Lock()
defer m.instanceCreationTimeCacheMu.Unlock()
m.instanceCreationTimeCache[nodeId] = creationTimestamp
}

func (m *ociManagerImpl) DeleteInstanceCreationTimeInCache(nodeId string) {
m.instanceCreationTimeCacheMu.Lock()
defer m.instanceCreationTimeCacheMu.Unlock()
delete(m.instanceCreationTimeCache, nodeId)
}

// APIStatus allows the conversion of errors into status objects
type APIStatus interface {
Status() metav1.Status
Expand Down
Loading