From 8bbc6a81c80a0045ece0b1ee74a2fd73a6d63f3c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=82=9F=E7=A9=BA?= Date: Sat, 5 Jul 2025 16:07:01 +0800 Subject: [PATCH] refactor: Move and update utility functions, adjust polling and logging - Moved `AppendFunc` and `Filter` from `internal/utils/k8s.go` to `pkg/utils/datastructure.go` - Updated `initialSyncDelay` variable name in `internal/manager/run.go` - Increased polling timeout for HTTPRoutePolicy status in --- internal/controller/apisixroute_controller.go | 4 +- internal/manager/run.go | 5 ++- internal/utils/k8s.go | 13 ------- pkg/utils/datastructure.go | 13 +++++++ test/e2e/crds/gatewayproxy.go | 2 +- test/e2e/framework/ingress.go | 2 +- test/e2e/framework/k8s.go | 39 +++++++------------ test/e2e/ingress/ingress.go | 2 +- 8 files changed, 35 insertions(+), 45 deletions(-) diff --git a/internal/controller/apisixroute_controller.go b/internal/controller/apisixroute_controller.go index bce279135c..99c1c680b8 100644 --- a/internal/controller/apisixroute_controller.go +++ b/internal/controller/apisixroute_controller.go @@ -607,14 +607,14 @@ func (r *ApisixRouteReconciler) filterEndpointSlicesBySubsetLabels(ctx context.C in[i] = r.filterEndpointSliceByTargetPod(ctx, in[i], labels) } - return utils.Filter(in, func(v discoveryv1.EndpointSlice) bool { + return pkgutils.Filter(in, func(v discoveryv1.EndpointSlice) bool { return len(v.Endpoints) > 0 }) } // filterEndpointSliceByTargetPod filters item.Endpoints which is not a subset of labels func (r *ApisixRouteReconciler) filterEndpointSliceByTargetPod(ctx context.Context, item discoveryv1.EndpointSlice, labels map[string]string) discoveryv1.EndpointSlice { - item.Endpoints = utils.Filter(item.Endpoints, func(v discoveryv1.Endpoint) bool { + item.Endpoints = pkgutils.Filter(item.Endpoints, func(v discoveryv1.Endpoint) bool { if v.TargetRef == nil || v.TargetRef.Kind != KindPod { return true } diff --git a/internal/manager/run.go b/internal/manager/run.go index bb34b9cb5e..6f4ba42fce 100644 --- a/internal/manager/run.go +++ b/internal/manager/run.go @@ -174,13 +174,14 @@ func Run(ctx context.Context, logger logr.Logger) error { go func() { setupLog.Info("starting provider sync") - initalSyncDelay := config.ControllerConfig.ProviderConfig.InitSyncDelay.Duration - time.AfterFunc(initalSyncDelay, func() { + initialSyncDelay := config.ControllerConfig.ProviderConfig.InitSyncDelay.Duration + time.AfterFunc(initialSyncDelay, func() { setupLog.Info("trying to initialize provider") if err := provider.Sync(ctx); err != nil { setupLog.Error(err, "unable to sync resources to provider") return } + setupLog.Info("All cache synced successfully") }) syncPeriod := config.ControllerConfig.ProviderConfig.SyncPeriod.Duration diff --git a/internal/utils/k8s.go b/internal/utils/k8s.go index 023831f1c2..aa1054b2aa 100644 --- a/internal/utils/k8s.go +++ b/internal/utils/k8s.go @@ -66,19 +66,6 @@ func MatchHostDef(host string) bool { return hostDefRegex.MatchString(host) } -func AppendFunc[T any](s []T, keep func(v T) bool, values ...T) []T { - for _, v := range values { - if keep(v) { - s = append(s, v) - } - } - return s -} - -func Filter[T any](s []T, keep func(v T) bool) []T { - return AppendFunc(make([]T, 0), keep, s...) -} - func IsSubsetOf(a, b map[string]string) bool { if len(a) == 0 { // Empty labels matches everything. diff --git a/pkg/utils/datastructure.go b/pkg/utils/datastructure.go index 4467cf8758..38cfa13419 100644 --- a/pkg/utils/datastructure.go +++ b/pkg/utils/datastructure.go @@ -58,3 +58,16 @@ func DedupComparable[T comparable](s []T) []T { } return results } + +func AppendFunc[T any](s []T, keep func(v T) bool, values ...T) []T { + for _, v := range values { + if keep(v) { + s = append(s, v) + } + } + return s +} + +func Filter[T any](s []T, keep func(v T) bool) []T { + return AppendFunc(make([]T, 0), keep, s...) +} diff --git a/test/e2e/crds/gatewayproxy.go b/test/e2e/crds/gatewayproxy.go index 70773f6fb5..0b914adf2c 100644 --- a/test/e2e/crds/gatewayproxy.go +++ b/test/e2e/crds/gatewayproxy.go @@ -188,7 +188,7 @@ spec: } By(fmt.Sprintf("wait for keyword: %s", keyword)) - s.WaitControllerManagerLog(keyword, 60, time.Minute) + s.WaitControllerManagerLog(s.Namespace(), keyword, 60, time.Minute) }) }) }) diff --git a/test/e2e/framework/ingress.go b/test/e2e/framework/ingress.go index e3e09747e9..a3e58693ee 100644 --- a/test/e2e/framework/ingress.go +++ b/test/e2e/framework/ingress.go @@ -67,5 +67,5 @@ func (f *Framework) DeployIngress(opts IngressDeployOpts) { LabelSelector: "control-plane=controller-manager", }) f.GomegaT.Expect(err).ToNot(HaveOccurred(), "waiting for controller-manager pod ready") - f.WaitControllerManagerLog("All cache synced successfully", 0, time.Minute) + f.WaitControllerManagerLog(opts.Namespace, "All cache synced successfully", 60, time.Minute) } diff --git a/test/e2e/framework/k8s.go b/test/e2e/framework/k8s.go index 3fb7159832..ba80ae7585 100644 --- a/test/e2e/framework/k8s.go +++ b/test/e2e/framework/k8s.go @@ -48,6 +48,8 @@ import ( "k8s.io/client-go/tools/remotecommand" "k8s.io/kubectl/pkg/scheme" "k8s.io/utils/ptr" + + "github.com/apache/apisix-ingress-controller/pkg/utils" ) // buildRestConfig builds the rest.Config object from kubeconfig filepath and @@ -207,11 +209,14 @@ func (f *Framework) GetPodIP(namespace, selector string) string { return pods[0].Status.PodIP } -func (f *Framework) GetPods(namespace, selector string) []corev1.Pod { +func (f *Framework) GetPods(namespace, selector string, filters ...func(pod corev1.Pod) bool) []corev1.Pod { podList, err := f.clientset.CoreV1().Pods(cmp.Or(namespace, _namespace)).List(f.Context, metav1.ListOptions{ LabelSelector: selector, }) f.GomegaT.Expect(err).ShouldNot(HaveOccurred()) + for _, filter := range filters { + podList.Items = utils.Filter(podList.Items, filter) + } return podList.Items } @@ -295,27 +300,10 @@ func (f *Framework) NewExpectResponse(httpBody any) *httpexpect.Response { }) } -// ListPods query pods by label selector. -func (f *Framework) ListPods(selector string) []corev1.Pod { - pods, err := f.clientset.CoreV1().Pods(_namespace).List(context.TODO(), metav1.ListOptions{ - LabelSelector: selector, +func (f *Framework) ListRunningPods(namespace, selector string) []corev1.Pod { + return f.GetPods(namespace, selector, func(pod corev1.Pod) bool { + return pod.Status.Phase == corev1.PodRunning && pod.DeletionTimestamp == nil }) - f.GomegaT.Expect(err).ShouldNot(HaveOccurred(), "list pod: ", selector) - return pods.Items -} - -func (f *Framework) ListRunningPods(selector string) []corev1.Pod { - pods, err := f.clientset.CoreV1().Pods(_namespace).List(context.TODO(), metav1.ListOptions{ - LabelSelector: selector, - }) - f.GomegaT.Expect(err).ShouldNot(HaveOccurred(), "list pod: ", selector) - runningPods := make([]corev1.Pod, 0) - for _, p := range pods.Items { - if p.Status.Phase == corev1.PodRunning && p.DeletionTimestamp == nil { - runningPods = append(runningPods, p) - } - } - return runningPods } // ExecCommandInPod exec cmd in specify pod and return the output from stdout and stderr @@ -362,12 +350,13 @@ func (f *Framework) GetPodLogs(name string, previous bool) string { return string(logs) } -func (f *Framework) WaitControllerManagerLog(keyword string, sinceSeconds int64, timeout time.Duration) { - f.WaitPodsLog("control-plane=controller-manager", keyword, sinceSeconds, timeout) +func (f *Framework) WaitControllerManagerLog(namespace, keyword string, sinceSeconds int64, timeout time.Duration) { + f.WaitPodsLog(namespace, "control-plane=controller-manager", keyword, sinceSeconds, timeout) } -func (f *Framework) WaitPodsLog(selector, keyword string, sinceSeconds int64, timeout time.Duration) { - pods := f.ListRunningPods(selector) +func (f *Framework) WaitPodsLog(namespace, selector, keyword string, sinceSeconds int64, timeout time.Duration) { + pods := f.ListRunningPods(namespace, selector) + f.GomegaT.Expect(pods).ToNot(BeEmpty()) wg := sync.WaitGroup{} for _, p := range pods { wg.Add(1) diff --git a/test/e2e/ingress/ingress.go b/test/e2e/ingress/ingress.go index 0c85c53291..e079833392 100644 --- a/test/e2e/ingress/ingress.go +++ b/test/e2e/ingress/ingress.go @@ -746,7 +746,7 @@ spec: err = s.DeleteResource("Ingress", "default") Expect(err).NotTo(HaveOccurred(), "delete Ingress") - err = framework.PollUntilHTTPRoutePolicyHaveStatus(s.K8sClient, 8*time.Second, + err = framework.PollUntilHTTPRoutePolicyHaveStatus(s.K8sClient, 20*time.Second, types.NamespacedName{Namespace: s.Namespace(), Name: "http-route-policy-0"}, func(hrp *v1alpha1.HTTPRoutePolicy) bool { return len(hrp.Status.Ancestors) == 0