Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions config/core/200-roles/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ rules:
- apiGroups: [""]
resources: ["namespaces/finalizers"] # finalizers are needed for the owner reference of the webhook
verbs: ["update"]
- apiGroups: ["discovery.k8s.io"]
resources: ["endpointslices"]
verbs: ["get", "list", "create", "update", "delete", "patch", "watch"]
- apiGroups: ["apps"]
resources: ["deployments", "deployments/finalizers"] # finalizers are needed for the owner reference of the webhook
verbs: ["get", "list", "create", "update", "delete", "patch", "watch"]
Expand Down
6 changes: 3 additions & 3 deletions pkg/autoscaler/bucket/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,20 @@ import (
"knative.dev/pkg/hash"
)

const prefix = "autoscaler-bucket"
const BucketPrefix = "autoscaler-bucket"

// IsBucketHost returns true if the given host is a host of a K8S Service
// of a bucket.
func IsBucketHost(host string) bool {
// Currently checking prefix is ok as only requests sent via bucket service
// have host with the prefix. Maybe use regexp for improvement.
return strings.HasPrefix(host, prefix)
return strings.HasPrefix(host, BucketPrefix)
}

// AutoscalerBucketName returns the name of the Autoscaler bucket with given `ordinal`
// and `total` bucket count.
func AutoscalerBucketName(ordinal, total uint32) string {
return strings.ToLower(fmt.Sprintf("%s-%02d-of-%02d", prefix, ordinal, total))
return strings.ToLower(fmt.Sprintf("%s-%02d-of-%02d", BucketPrefix, ordinal, total))
}

// AutoscalerBucketSet returns a hash.BucketSet consisting of Autoscaler
Expand Down
50 changes: 29 additions & 21 deletions pkg/autoscaler/statforwarder/forwarder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,18 @@ import (
"github.com/google/go-cmp/cmp"
coordinationv1 "k8s.io/api/coordination/v1"
corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
ktesting "k8s.io/client-go/testing"
"k8s.io/utils/ptr"
fakekubeclient "knative.dev/pkg/client/injection/kube/client/fake"
fakeleaseinformer "knative.dev/pkg/client/injection/kube/informers/coordination/v1/lease/fake"
fakeendpointsinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/endpoints/fake"
fakeserviceinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/service/fake"
fakeendpointsliceinformer "knative.dev/pkg/client/injection/kube/informers/discovery/v1/endpointslice/fake"
"knative.dev/pkg/hash"
rtesting "knative.dev/pkg/reconciler/testing"
"knative.dev/pkg/system"
Expand Down Expand Up @@ -89,7 +91,7 @@ func must(t *testing.T, err error) {
func TestForwarderReconcile(t *testing.T) {
ctx, cancel, informers := rtesting.SetupFakeContextWithCancel(t)
kubeClient := fakekubeclient.Get(ctx)
endpoints := fakeendpointsinformer.Get(ctx)
endpoints := fakeendpointsliceinformer.Get(ctx)
service := fakeserviceinformer.Get(ctx)
lease := fakeleaseinformer.Get(ctx)

Expand Down Expand Up @@ -124,28 +126,34 @@ func TestForwarderReconcile(t *testing.T) {
t.Fatal("Timeout to get the Service:", lastErr)
}

wantSubsets := []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{
IP: testIP1,
want := discoveryv1.EndpointSlice{
AddressType: discoveryv1.AddressTypeIPv4,
Endpoints: []discoveryv1.Endpoint{{
Addresses: []string{testIP1},
Conditions: discoveryv1.EndpointConditions{
Ready: ptr.To(true),
Serving: ptr.To(true),
Terminating: ptr.To(false),
},
}},
Ports: []corev1.EndpointPort{{
Name: autoscalerPortName,
Port: autoscalerPort,
Protocol: corev1.ProtocolTCP,
Ports: []discoveryv1.EndpointPort{{
Name: ptr.To(autoscalerPortName),
Port: ptr.To[int32](autoscalerPort),
Protocol: ptr.To(corev1.ProtocolTCP),
}},
}}
}

// Check the endpoints got updated.
el := endpoints.Lister().Endpoints(testNs)
el := endpoints.Lister().EndpointSlices(testNs)
if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 2*time.Second, true, func(context.Context) (bool, error) {
got, err := el.Get(bucket1)
if err != nil {
lastErr = err
return false, nil //nolint:nilerr
}

if !cmp.Equal(wantSubsets, got.Subsets) {
lastErr = fmt.Errorf("Got Subsets = %v, want = %v", got.Subsets, wantSubsets)
if diff := cmp.Diff(want.Endpoints, got.Endpoints); diff != "" {
lastErr = fmt.Errorf("resulting endpoints are different (-want, +got) %s", diff)
return false, nil
}
return true, nil
Expand All @@ -160,7 +168,7 @@ func TestForwarderReconcile(t *testing.T) {
lease.Informer().GetIndexer().Add(l)

// Check that the endpoints got updated.
wantSubsets[0].Addresses[0].IP = testIP2
want.Endpoints[0].Addresses[0] = testIP2
if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 10*time.Second, true, func(context.Context) (bool, error) {
// Check the endpoints get updated.
got, err := el.Get(bucket1)
Expand All @@ -169,8 +177,8 @@ func TestForwarderReconcile(t *testing.T) {
return false, nil //nolint:nilerr
}

if !cmp.Equal(wantSubsets, got.Subsets) {
lastErr = fmt.Errorf("Got Subsets = %v, want = %v", got.Subsets, wantSubsets)
if !cmp.Equal(want.Endpoints, got.Endpoints) {
lastErr = fmt.Errorf("Got Subsets = %v, want = %v", got.Endpoints, want.Endpoints)
return false, nil
}
return true, nil
Expand Down Expand Up @@ -240,7 +248,7 @@ func TestForwarderRetryOnEndpointsCreationFailure(t *testing.T) {

endpointsCreation := 0
retried := make(chan struct{})
kubeClient.PrependReactor("create", "endpoints",
kubeClient.PrependReactor("create", "endpointslices",
func(action ktesting.Action) (bool, runtime.Object, error) {
endpointsCreation++
if endpointsCreation == 2 {
Expand All @@ -264,7 +272,7 @@ func TestForwarderRetryOnEndpointsCreationFailure(t *testing.T) {
func TestForwarderRetryOnEndpointsUpdateFailure(t *testing.T) {
ctx, cancel, informers := rtesting.SetupFakeContextWithCancel(t)
kubeClient := fakekubeclient.Get(ctx)
endpoints := fakeendpointsinformer.Get(ctx)
endpoints := fakeendpointsliceinformer.Get(ctx)
lease := fakeleaseinformer.Get(ctx)

waitInformers, err := rtesting.RunAndSyncInformers(ctx, informers...)
Expand All @@ -282,7 +290,7 @@ func TestForwarderRetryOnEndpointsUpdateFailure(t *testing.T) {

endpointsUpdate := 0
retried := make(chan struct{})
kubeClient.PrependReactor("update", "endpoints",
kubeClient.PrependReactor("update", "endpointslices",
func(action ktesting.Action) (bool, runtime.Object, error) {
endpointsUpdate++
if endpointsUpdate == 2 {
Expand All @@ -293,13 +301,13 @@ func TestForwarderRetryOnEndpointsUpdateFailure(t *testing.T) {
},
)

e := &corev1.Endpoints{
e := &discoveryv1.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Name: bucket1,
Namespace: testNs,
},
}
kubeClient.CoreV1().Endpoints(testNs).Create(ctx, e, metav1.CreateOptions{})
kubeClient.DiscoveryV1().EndpointSlices(testNs).Create(ctx, e, metav1.CreateOptions{})
endpoints.Informer().GetIndexer().Add(e)
kubeClient.CoordinationV1().Leases(testNs).Create(ctx, testLease, metav1.CreateOptions{})
lease.Informer().GetIndexer().Add(testLease)
Expand Down
55 changes: 30 additions & 25 deletions pkg/autoscaler/statforwarder/leases.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,19 @@ import (
"go.uber.org/zap"
coordinationv1 "k8s.io/api/coordination/v1"
corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrs "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
corev1listers "k8s.io/client-go/listers/core/v1"
discoveryv1listers "k8s.io/client-go/listers/discovery/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/utils/ptr"
kubeclient "knative.dev/pkg/client/injection/kube/client"
leaseinformer "knative.dev/pkg/client/injection/kube/informers/coordination/v1/lease"
endpointsinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/endpoints"
endpointsliceinformer "knative.dev/pkg/client/injection/kube/informers/discovery/v1/endpointslice"
"knative.dev/pkg/controller"
"knative.dev/pkg/hash"
"knative.dev/pkg/logging"
Expand All @@ -50,13 +52,12 @@ func LeaseBasedProcessor(ctx context.Context, f *Forwarder, accept statProcessor
if err != nil {
return err
}
endpointsInformer := endpointsinformer.Get(ctx)
lt := &leaseTracker{
logger: logging.FromContext(ctx),
selfIP: selfIP,
bs: f.bs,
kc: kubeclient.Get(ctx),
endpointsLister: endpointsInformer.Lister(),
endpointsLister: endpointsliceinformer.Get(ctx).Lister(),
id2ip: make(map[string]string),
accept: accept,
fwd: f,
Expand Down Expand Up @@ -86,7 +87,7 @@ type leaseTracker struct {
bs *hash.BucketSet

kc kubernetes.Interface
endpointsLister corev1listers.EndpointsLister
endpointsLister discoveryv1listers.EndpointSliceLister

// id2ip stores the IP extracted from the holder identity to avoid
// string split each time.
Expand Down Expand Up @@ -236,21 +237,31 @@ func (f *leaseTracker) createService(ctx context.Context, ns, n string) error {
// name, and the Forwarder.selfIP. If the Endpoints object already
// exists, it will update the Endpoints with the Forwarder.selfIP.
func (f *leaseTracker) createOrUpdateEndpoints(ctx context.Context, ns, n string) error {
wantSubsets := []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{
IP: f.selfIP,
want := &discoveryv1.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Name: n,
Namespace: ns,
},
AddressType: discoveryv1.AddressTypeIPv4,
Endpoints: []discoveryv1.Endpoint{{
Addresses: []string{f.selfIP},
Conditions: discoveryv1.EndpointConditions{
Ready: ptr.To(true),
Serving: ptr.To(true),
Terminating: ptr.To(false),
},
}},
Ports: []corev1.EndpointPort{{
Name: autoscalerPortName,
Port: autoscalerPort,
Protocol: corev1.ProtocolTCP,
Ports: []discoveryv1.EndpointPort{{
Name: ptr.To(autoscalerPortName),
Port: ptr.To[int32](autoscalerPort),
Protocol: ptr.To(corev1.ProtocolTCP),
}},
}}
}

exists := true
var lastErr error
if err := wait.PollUntilContextTimeout(ctx, retryInterval, retryTimeout, true, func(context.Context) (bool, error) {
e, err := f.endpointsLister.Endpoints(ns).Get(n)
got, err := f.endpointsLister.EndpointSlices(ns).Get(n)
if apierrs.IsNotFound(err) {
exists = false
return true, nil
Expand All @@ -262,13 +273,13 @@ func (f *leaseTracker) createOrUpdateEndpoints(ctx context.Context, ns, n string
return false, nil //nolint:nilerr
}

if equality.Semantic.DeepEqual(wantSubsets, e.Subsets) {
if equality.Semantic.DeepEqual(want.Endpoints, got.Endpoints) {
return true, nil
}

want := e.DeepCopy()
want.Subsets = wantSubsets
if _, lastErr = f.kc.CoreV1().Endpoints(ns).Update(ctx, want, metav1.UpdateOptions{}); lastErr != nil {
e := got.DeepCopy()
e.Endpoints = want.Endpoints
if _, lastErr = f.kc.DiscoveryV1().EndpointSlices(ns).Update(ctx, e, metav1.UpdateOptions{}); lastErr != nil {
// Do not return the error to cause a retry.
return false, nil //nolint:nilerr
}
Expand All @@ -284,13 +295,7 @@ func (f *leaseTracker) createOrUpdateEndpoints(ctx context.Context, ns, n string
}

if err := wait.PollUntilContextTimeout(ctx, retryInterval, retryTimeout, true, func(context.Context) (bool, error) {
_, lastErr = f.kc.CoreV1().Endpoints(ns).Create(ctx, &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: n,
Namespace: ns,
},
Subsets: wantSubsets,
}, metav1.CreateOptions{})
_, lastErr = f.kc.DiscoveryV1().EndpointSlices(ns).Create(ctx, want, metav1.CreateOptions{})
// Do not return the error to cause a retry.
return lastErr == nil, nil
}); err != nil {
Expand Down
16 changes: 16 additions & 0 deletions pkg/cleanup/cmd/cleanup/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"knative.dev/pkg/environment"
"knative.dev/pkg/logging"
"knative.dev/pkg/system"
autoscalerbucket "knative.dev/serving/pkg/autoscaler/bucket"
)

const (
Expand Down Expand Up @@ -106,6 +107,21 @@ func main() {
if err = client.RbacV1().ClusterRoles().Delete(context.Background(), "knative-serving-certmanager", metav1.DeleteOptions{}); err != nil && !apierrs.IsNotFound(err) {
logger.Fatal("failed to delete clusterrole knative-serving-certmanager: ", err)
}

epList, err := client.CoreV1().Endpoints(system.Namespace()).List(context.Background(), metav1.ListOptions{})
if err != nil {
logger.Fatal("failed to fetch endpoints: ", err)
}

for _, eps := range epList.Items {
if strings.HasPrefix(eps.GetName(), autoscalerbucket.BucketPrefix) {
err := client.CoreV1().Endpoints(system.Namespace()).Delete(context.Background(), eps.GetName(), metav1.DeleteOptions{})
if err != nil && !apierrs.IsNotFound(err) {
logger.Fatal("failed to delete autoscaler endpoints: ", err)
}
}
}

logger.Info("Old Serving resource deletion completed successfully")
}

Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading