Skip to content

Commit f65d2ae

Browse files
committed
address comments
1 parent fa8da60 commit f65d2ae

File tree

25 files changed

+251
-339
lines changed

25 files changed

+251
-339
lines changed

cmd/controller/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ func main() {
6262
overlayUndecoratedCloudProvider,
6363
clusterState,
6464
op.InstanceTypeStore,
65-
corecontrollers.WithRegistrationHook(registrationhooks.NewPlacementGroupRegistrationHook(op.GetClient(), op.InstanceProvider, op.PlacementGroupProvider)),
65+
corecontrollers.WithRegistrationHook(registrationhooks.NewPlacementGroupRegistrationHook(op.InstanceProvider)),
6666
)...).
6767
WithControllers(ctx, controllers.NewControllers(
6868
ctx,

kwok/operator/operator.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ func NewOperator(ctx context.Context, operator *operator.Operator) (context.Cont
149149
amiProvider := amifamily.NewDefaultProvider(operator.Clock, versionProvider, ssmProvider, ec2api, cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval))
150150
placementGroupProvider := placementgroup.NewProvider(
151151
ec2api,
152-
cache.New(awscache.PlacementGroupTTL, awscache.DefaultCleanupInterval),
152+
cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval),
153153
)
154154
amiResolver := amifamily.NewDefaultResolver(cfg.Region)
155155
launchTemplateProvider := launchtemplate.NewDefaultProvider(

pkg/apis/v1/ec2nodeclass.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ type EC2NodeClassSpec struct {
5757
// +kubebuilder:validation:XValidation:message="expected at least one, got none, ['name', 'id']",rule="has(self.name) || has(self.id)"
5858
// +kubebuilder:validation:XValidation:message="'name' and 'id' are mutually exclusive",rule="!(has(self.name) && has(self.id))"
5959
// +optional
60-
PlacementGroupSelector *PlacementGroupSelectorTerm `json:"placementGroupSelector,omitempty"`
60+
PlacementGroupSelector *PlacementGroupSelector `json:"placementGroupSelector,omitempty"`
6161
// AssociatePublicIPAddress controls if public IP addresses are assigned to instances that are launched with the nodeclass.
6262
// +optional
6363
AssociatePublicIPAddress *bool `json:"associatePublicIPAddress,omitempty"`
@@ -211,7 +211,7 @@ type CapacityReservationSelectorTerm struct {
211211
InstanceMatchCriteria string `json:"instanceMatchCriteria,omitempty"`
212212
}
213213

214-
type PlacementGroupSelectorTerm struct {
214+
type PlacementGroupSelector struct {
215215
// Name is the placement group name in EC2
216216
// +kubebuilder:validation:MinLength:=1
217217
// +optional

pkg/apis/v1/ec2nodeclass_status.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -173,13 +173,11 @@ func (in *EC2NodeClass) StatusConditions() status.ConditionSet {
173173
ConditionTypeSecurityGroupsReady,
174174
ConditionTypeInstanceProfileReady,
175175
ConditionTypeValidationSucceeded,
176+
ConditionTypePlacementGroupReady,
176177
}
177178
if CapacityReservationsEnabled {
178179
conds = append(conds, ConditionTypeCapacityReservationsReady)
179180
}
180-
if in.Spec.PlacementGroupSelector != nil {
181-
conds = append(conds, ConditionTypePlacementGroupReady)
182-
}
183181
return status.NewReadyConditions(conds...).For(in)
184182
}
185183

pkg/apis/v1/zz_generated.deepcopy.go

Lines changed: 5 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/cache/cache.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,6 @@ const (
3131
// updated every minute, but we want to persist the data longer in the event of an EC2 API outage. 24 hours was the
3232
// compormise made for API outage reseliency and gargage collecting entries for orphaned reservations.
3333
CapacityReservationAvailabilityTTL = 24 * time.Hour
34-
// PlacementGroupTTL is the time we will persist cached placement group data. Like capacity reservations, we persist
35-
// longer than the reconciliation interval to maintain availability during EC2 API outages.
36-
PlacementGroupTTL = 6 * time.Hour
3734
// InstanceTypesZonesAndOfferingsTTL is the time before we refresh instance types, zones, and offerings at EC2
3835
InstanceTypesZonesAndOfferingsTTL = 5 * time.Minute
3936
// InstanceProfileTTL is the time before we refresh checking instance profile existence at IAM

pkg/cache/unavailableofferings.go

Lines changed: 32 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,28 @@ import (
2222
"sync/atomic"
2323

2424
ec2types "github.com/aws/aws-sdk-go-v2/service/ec2/types"
25+
"github.com/awslabs/operatorpkg/option"
2526
"sigs.k8s.io/controller-runtime/pkg/log"
2627

2728
"github.com/patrickmn/go-cache"
2829
)
2930

30-
// PlacementGroupScope optionally scopes ICE cache entries to a specific placement group and partition.
31-
// When set, ICE entries are isolated so that placement-group-specific failures don't block non-PG launches.
32-
type PlacementGroupScope struct {
33-
// ID is the placement group ID (e.g., "pg-0123456789abcdef0")
34-
ID string
35-
// Partition is the partition number as a string (e.g., "3"). Empty when no specific partition is targeted
36-
// or for non-partition placement groups.
37-
Partition string
31+
// UnavailableOfferingsOption is a functional option for scoping ICE cache entries.
32+
type UnavailableOfferingsOption = option.Function[unavailableOfferingsOptions]
33+
34+
type unavailableOfferingsOptions struct {
35+
placementGroupID string
36+
placementGroupPartition string
37+
}
38+
39+
// WithPlacementGroup scopes an ICE cache entry to a specific placement group ID.
40+
func WithPlacementGroup(id string) UnavailableOfferingsOption {
41+
return func(o *unavailableOfferingsOptions) { o.placementGroupID = id }
42+
}
43+
44+
// WithPlacementGroupPartition further scopes an ICE cache entry to a specific partition.
45+
func WithPlacementGroupPartition(partition string) UnavailableOfferingsOption {
46+
return func(o *unavailableOfferingsOptions) { o.placementGroupPartition = partition }
3847
}
3948

4049
// UnavailableOfferings stores any offerings that return ICE (insufficient capacity errors) when
@@ -93,8 +102,8 @@ func (u *UnavailableOfferings) SeqNum(instanceType ec2types.InstanceType) uint64
93102
// The pgScope parameter scopes the lookup so that an ICE from a placement group launch
94103
// does not incorrectly prevent launches of the same instance type + zone without that placement group.
95104
// When a partition is specified in the scope, the lookup is further scoped to that partition.
96-
func (u *UnavailableOfferings) IsUnavailable(instanceType ec2types.InstanceType, zone, capacityType string, pgScope ...PlacementGroupScope) bool {
97-
_, offeringFound := u.offeringCache.Get(u.key(instanceType, zone, capacityType, pgScope...))
105+
func (u *UnavailableOfferings) IsUnavailable(instanceType ec2types.InstanceType, zone, capacityType string, opts ...UnavailableOfferingsOption) bool {
106+
_, offeringFound := u.offeringCache.Get(u.key(instanceType, zone, capacityType, opts...))
98107
_, capacityTypeFound := u.capacityTypeCache.Get(capacityType)
99108
_, azFound := u.azCache.Get(zone)
100109
return offeringFound || capacityTypeFound || azFound
@@ -104,7 +113,8 @@ func (u *UnavailableOfferings) IsUnavailable(instanceType ec2types.InstanceType,
104113
// The pgScope parameter scopes the cache entry so that placement-group-specific ICEs don't
105114
// block non-PG launches of the same instance type + zone. When a partition is specified, the cache
106115
// entry is further scoped so only that partition is marked unavailable.
107-
func (u *UnavailableOfferings) MarkUnavailable(ctx context.Context, instanceType ec2types.InstanceType, zone, capacityType string, unavailableReason map[string]string, pgScope ...PlacementGroupScope) {
116+
func (u *UnavailableOfferings) MarkUnavailable(ctx context.Context, instanceType ec2types.InstanceType, zone, capacityType string, unavailableReason map[string]string, opts ...UnavailableOfferingsOption) {
117+
resolved := option.Resolve(opts...)
108118
// even if the key is already in the cache, we still need to call Set to extend the cached entry's TTL
109119
logValues := []any{
110120
"reason", unavailableReason["reason"],
@@ -113,10 +123,10 @@ func (u *UnavailableOfferings) MarkUnavailable(ctx context.Context, instanceType
113123
"capacity-type", capacityType,
114124
"ttl", UnavailableOfferingsTTL,
115125
}
116-
if len(pgScope) > 0 && pgScope[0].ID != "" {
117-
logValues = append(logValues, "placement-group-id", pgScope[0].ID)
118-
if pgScope[0].Partition != "" {
119-
logValues = append(logValues, "placement-group-partition", pgScope[0].Partition)
126+
if resolved.placementGroupID != "" {
127+
logValues = append(logValues, "placement-group-id", resolved.placementGroupID)
128+
if resolved.placementGroupPartition != "" {
129+
logValues = append(logValues, "placement-group-partition", resolved.placementGroupPartition)
120130
}
121131
}
122132
// Add fleetID if provided
@@ -126,7 +136,7 @@ func (u *UnavailableOfferings) MarkUnavailable(ctx context.Context, instanceType
126136
logValues = append(logValues, key, unavailableReason[key])
127137
}
128138
log.FromContext(ctx).WithValues(logValues...).V(1).Info("removing offering from offerings")
129-
u.offeringCache.SetDefault(u.key(instanceType, zone, capacityType, pgScope...), struct{}{})
139+
u.offeringCache.SetDefault(u.key(instanceType, zone, capacityType, opts...), struct{}{})
130140
u.offeringCacheSeqNumMu.Lock()
131141
u.offeringCacheSeqNum[instanceType]++
132142
u.offeringCacheSeqNumMu.Unlock()
@@ -156,12 +166,13 @@ func (u *UnavailableOfferings) Flush() {
156166
// When a placement group scope is provided, the PG ID (and optionally partition) is included in the key
157167
// to scope ICE entries per placement group and partition.
158168
// Format: <capacityType>:<instanceType>:<zone>[:<pgID>[:<partition>]]
159-
func (u *UnavailableOfferings) key(instanceType ec2types.InstanceType, zone string, capacityType string, pgScope ...PlacementGroupScope) string {
160-
if len(pgScope) > 0 && pgScope[0].ID != "" {
161-
if pgScope[0].Partition != "" {
162-
return fmt.Sprintf("%s:%s:%s:%s:%s", capacityType, instanceType, zone, pgScope[0].ID, pgScope[0].Partition)
169+
func (u *UnavailableOfferings) key(instanceType ec2types.InstanceType, zone string, capacityType string, opts ...UnavailableOfferingsOption) string {
170+
resolved := option.Resolve(opts...)
171+
if resolved.placementGroupID != "" {
172+
if resolved.placementGroupPartition != "" {
173+
return fmt.Sprintf("%s:%s:%s:%s:%s", capacityType, instanceType, zone, resolved.placementGroupID, resolved.placementGroupPartition)
163174
}
164-
return fmt.Sprintf("%s:%s:%s:%s", capacityType, instanceType, zone, pgScope[0].ID)
175+
return fmt.Sprintf("%s:%s:%s:%s", capacityType, instanceType, zone, resolved.placementGroupID)
165176
}
166177
return fmt.Sprintf("%s:%s:%s", capacityType, instanceType, zone)
167178
}

pkg/cloudprovider/cloudprovider.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ func (c *CloudProvider) Create(ctx context.Context, nodeClaim *karpv1.NodeClaim)
149149
instanceType, _ := lo.Find(instanceTypes, func(i *cloudprovider.InstanceType) bool {
150150
return i.Name == string(instance.Type)
151151
})
152-
nc := c.instanceToNodeClaim(instance, instanceType, nodeClass)
152+
nc := c.instanceToNodeClaim(ctx, instance, instanceType, nodeClass)
153153
nc.Annotations = lo.Assign(nc.Annotations, map[string]string{
154154
v1.AnnotationEC2NodeClassHash: nodeClass.Hash(),
155155
v1.AnnotationEC2NodeClassHashVersion: v1.EC2NodeClassHashVersion,
@@ -173,7 +173,7 @@ func (c *CloudProvider) List(ctx context.Context) ([]*karpv1.NodeClaim, error) {
173173
if client.IgnoreNotFound(err) != nil {
174174
return nil, fmt.Errorf("resolving nodeclass, %w", err)
175175
}
176-
nodeClaims = append(nodeClaims, c.instanceToNodeClaim(it, instanceType, nc))
176+
nodeClaims = append(nodeClaims, c.instanceToNodeClaim(ctx, it, instanceType, nc))
177177
}
178178
return nodeClaims, nil
179179
}
@@ -196,7 +196,7 @@ func (c *CloudProvider) Get(ctx context.Context, providerID string) (*karpv1.Nod
196196
if client.IgnoreNotFound(err) != nil {
197197
return nil, fmt.Errorf("resolving nodeclass, %w", err)
198198
}
199-
return c.instanceToNodeClaim(instance, instanceType, nc), nil
199+
return c.instanceToNodeClaim(ctx, instance, instanceType, nc), nil
200200
}
201201

202202
// GetInstanceTypes returns all available InstanceTypes
@@ -411,7 +411,7 @@ func (c *CloudProvider) resolveNodePoolFromInstance(ctx context.Context, instanc
411411
}
412412

413413
//nolint:gocyclo
414-
func (c *CloudProvider) instanceToNodeClaim(i *instance.Instance, instanceType *cloudprovider.InstanceType, nodeClass *v1.EC2NodeClass) *karpv1.NodeClaim {
414+
func (c *CloudProvider) instanceToNodeClaim(ctx context.Context, i *instance.Instance, instanceType *cloudprovider.InstanceType, nodeClass *v1.EC2NodeClass) *karpv1.NodeClaim {
415415
nodeClaim := &karpv1.NodeClaim{}
416416
labels := map[string]string{}
417417
annotations := map[string]string{}
@@ -466,7 +466,7 @@ func (c *CloudProvider) instanceToNodeClaim(i *instance.Instance, instanceType *
466466
}
467467
// Placement group labels
468468
if nodeClass != nil {
469-
if pg := c.placementGroupProvider.GetForNodeClass(nodeClass); pg != nil {
469+
if pg, _ := c.placementGroupProvider.Get(ctx, nodeClass); pg != nil {
470470
labels[v1.LabelPlacementGroupID] = pg.ID
471471
}
472472
}

pkg/cloudprovider/drift.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ func (c *CloudProvider) isNodeClassDrifted(ctx context.Context, nodeClaim *karpv
6666
return "", fmt.Errorf("calculating subnet drift, %w", err)
6767
}
6868
capacityReservationsDrifted := c.isCapacityReservationDrifted(instance, nodeClass)
69-
placementGroupDrifted := c.isPlacementGroupDrifted(nodeClaim, nodeClass)
69+
placementGroupDrifted := c.isPlacementGroupDrifted(ctx, nodeClaim, nodeClass)
7070
drifted := lo.FindOrElse([]cloudprovider.DriftReason{
7171
securitygroupDrifted,
7272
subnetDrifted,
@@ -151,10 +151,10 @@ func (c *CloudProvider) isCapacityReservationDrifted(instance *instance.Instance
151151

152152
// isPlacementGroupDrifted checks if the node's placement group ID label no longer matches the EC2NodeClass's
153153
// resolved placement group. This covers scenarios where placementGroupSelector was added, removed, or changed.
154-
func (c *CloudProvider) isPlacementGroupDrifted(nodeClaim *karpv1.NodeClaim, nodeClass *v1.EC2NodeClass) cloudprovider.DriftReason {
154+
func (c *CloudProvider) isPlacementGroupDrifted(ctx context.Context, nodeClaim *karpv1.NodeClaim, nodeClass *v1.EC2NodeClass) cloudprovider.DriftReason {
155155
nodeClaimPGID := nodeClaim.Labels[v1.LabelPlacementGroupID]
156156
var nodeClassPGID string
157-
if pg := c.placementGroupProvider.GetForNodeClass(nodeClass); pg != nil {
157+
if pg, _ := c.placementGroupProvider.Get(ctx, nodeClass); pg != nil {
158158
nodeClassPGID = pg.ID
159159
}
160160
if nodeClaimPGID != nodeClassPGID {

pkg/cloudprovider/registrationhooks/placementgrouphook.go

Lines changed: 9 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,11 @@ import (
1919
"fmt"
2020
"strconv"
2121

22-
"k8s.io/apimachinery/pkg/types"
23-
"sigs.k8s.io/controller-runtime/pkg/client"
24-
2522
karpv1 "sigs.k8s.io/karpenter/pkg/apis/v1"
2623
"sigs.k8s.io/karpenter/pkg/cloudprovider"
2724

2825
v1 "github.com/aws/karpenter-provider-aws/pkg/apis/v1"
2926
"github.com/aws/karpenter-provider-aws/pkg/providers/instance"
30-
"github.com/aws/karpenter-provider-aws/pkg/providers/placementgroup"
3127
"github.com/aws/karpenter-provider-aws/pkg/utils"
3228
)
3329

@@ -38,16 +34,12 @@ import (
3834
// taint is removed, so that TopologySpreadConstraints using the partition topology key always
3935
// see accurate partition data.
4036
type PlacementGroupRegistrationHook struct {
41-
kubeClient client.Client
42-
instanceProvider instance.Provider
43-
placementGroupProvider placementgroup.Provider
37+
instanceProvider instance.Provider
4438
}
4539

46-
func NewPlacementGroupRegistrationHook(kubeClient client.Client, instanceProvider instance.Provider, placementGroupProvider placementgroup.Provider) *PlacementGroupRegistrationHook {
40+
func NewPlacementGroupRegistrationHook(instanceProvider instance.Provider) *PlacementGroupRegistrationHook {
4741
return &PlacementGroupRegistrationHook{
48-
kubeClient: kubeClient,
49-
instanceProvider: instanceProvider,
50-
placementGroupProvider: placementGroupProvider,
42+
instanceProvider: instanceProvider,
5143
}
5244
}
5345

@@ -56,46 +48,34 @@ func (h *PlacementGroupRegistrationHook) Name() string {
5648
}
5749

5850
func (h *PlacementGroupRegistrationHook) Registered(ctx context.Context, nodeClaim *karpv1.NodeClaim) (cloudprovider.NodeLifecycleHookResult, error) {
59-
// Resolve the EC2NodeClass from the NodeClaim's nodeClassRef
60-
nodeClass := &v1.EC2NodeClass{}
61-
if err := h.kubeClient.Get(ctx, types.NamespacedName{Name: nodeClaim.Spec.NodeClassRef.Name}, nodeClass); err != nil {
62-
return cloudprovider.NodeLifecycleHookResult{}, fmt.Errorf("resolving ec2nodeclass for placement group hook, %w", err)
63-
}
64-
65-
// Check if the EC2NodeClass has a partition placement group
66-
pg := h.placementGroupProvider.GetForNodeClass(nodeClass)
67-
if pg == nil || pg.Strategy != placementgroup.StrategyPartition {
51+
if _, ok := nodeClaim.Labels[v1.LabelPlacementGroupID]; !ok {
6852
return cloudprovider.NodeLifecycleHookResult{}, nil
6953
}
7054

71-
// Check if the partition label is already populated on the NodeClaim
7255
if _, ok := nodeClaim.Labels[v1.LabelPlacementGroupPartition]; ok {
7356
return cloudprovider.NodeLifecycleHookResult{}, nil
7457
}
7558

76-
// We need the providerID to look up the instance
7759
if nodeClaim.Status.ProviderID == "" {
7860
return cloudprovider.NodeLifecycleHookResult{Requeue: true}, nil
7961
}
8062

81-
// Parse the instance ID from the provider ID
8263
instanceID, err := utils.ParseInstanceID(nodeClaim.Status.ProviderID)
8364
if err != nil {
84-
return cloudprovider.NodeLifecycleHookResult{Requeue: true}, fmt.Errorf("parsing instance ID from provider ID, %w", err)
65+
return cloudprovider.NodeLifecycleHookResult{}, fmt.Errorf("parsing instance ID from provider ID, %w", err)
8566
}
8667

87-
// Get the instance details, skipping cache to get fresh partition data
8868
inst, err := h.instanceProvider.Get(ctx, instanceID, instance.SkipCache)
8969
if err != nil {
90-
return cloudprovider.NodeLifecycleHookResult{Requeue: true}, fmt.Errorf("describing instance for partition number, %w", err)
70+
return cloudprovider.NodeLifecycleHookResult{}, fmt.Errorf("describing instance for partition number, %w", err)
9171
}
9272

93-
// Check if the partition number has been assigned
73+
// If the instance doesn't have a partition number, it's not in a partition placement group
74+
// (e.g., cluster or spread PGs don't have partitions). Nothing to gate.
9475
if inst.PartitionNumber == nil {
95-
return cloudprovider.NodeLifecycleHookResult{Requeue: true}, nil
76+
return cloudprovider.NodeLifecycleHookResult{}, nil
9677
}
9778

98-
// Set the partition label on the NodeClaim so it gets synced to the Node during registration
9979
nodeClaim.Labels[v1.LabelPlacementGroupPartition] = strconv.FormatInt(int64(*inst.PartitionNumber), 10)
10080
return cloudprovider.NodeLifecycleHookResult{}, nil
10181
}

0 commit comments

Comments
 (0)