diff --git a/dagger/e2e.go b/dagger/e2e.go index e908710a..8c95892c 100644 --- a/dagger/e2e.go +++ b/dagger/e2e.go @@ -702,6 +702,107 @@ spec: fmt.Printf("reportAllImages test passed - found %d total images (vs %d with filtering)\n", len(allImagesSet), len(imagesSet)) + // Create a new namespace and a pod in that namespace, then restart the app and ensure the image is reported + newNs := "dynamic-image-ns" + newPodName := "dynamic-image-pod" + newPodImage := "docker.io/library/busybox:1.35" + + // Create namespace + ctr = dag.Container().From("bitnami/kubectl:latest"). + WithFile(kubeconfigPath, kubeconfigSource.File("/kubeconfig")). + WithEnvVariable("KUBECONFIG", kubeconfigPath). + With(CacheBustingExec( + []string{ + "kubectl", "create", "namespace", newNs, + })) + out, err = ctr.Stdout(ctx) + if err != nil { + return fmt.Errorf("failed to create namespace %s: %w", newNs, err) + } + fmt.Println(out) + + // Create a pod in the new namespace + newNsPodYaml := "apiVersion: v1\n" + + "kind: Pod\n" + + "metadata:\n" + + " name: " + newPodName + "\n" + + " namespace: " + newNs + "\n" + + "spec:\n" + + " containers:\n" + + " - name: busybox\n" + + " image: " + newPodImage + "\n" + + " command: [\"sleep\", \"500d\"]\n" + newNsPodSource := source.WithNewFile("/dynamic-ns-pod.yaml", newNsPodYaml) + + ctr = dag.Container().From("bitnami/kubectl:latest"). + WithFile(kubeconfigPath, kubeconfigSource.File("/kubeconfig")). + WithEnvVariable("KUBECONFIG", kubeconfigPath). + WithFile("/tmp/dynamic-ns-pod.yaml", newNsPodSource.File("/dynamic-ns-pod.yaml")). + WithExec([]string{"kubectl", "apply", "-f", "/tmp/dynamic-ns-pod.yaml"}) + out, err = ctr.Stdout(ctx) + if err != nil { + stderr, _ := ctr.Stderr(ctx) + return fmt.Errorf("failed to apply pod in new namespace: %w\n\nStderr: %s\n\nStdout: %s", err, stderr, out) + } + fmt.Println(out) + + // Wait for the pod to be ready + ctr = dag.Container().From("bitnami/kubectl:latest"). + WithFile(kubeconfigPath, kubeconfigSource.File("/kubeconfig")). + WithEnvVariable("KUBECONFIG", kubeconfigPath). + WithExec([]string{"kubectl", "wait", "--for=condition=ready", "pod/" + newPodName, "-n", newNs, "--timeout=1m"}) + out, err = ctr.Stdout(ctx) + if err != nil { + return fmt.Errorf("failed to wait for pod %s in namespace %s to be ready: %w", newPodName, newNs, err) + } + fmt.Println(out) + + // Restart test-chart deployment to force a reporting message + ctr = dag.Container().From("bitnami/kubectl:latest"). + WithFile(kubeconfigPath, kubeconfigSource.File("/kubeconfig")). + WithEnvVariable("KUBECONFIG", kubeconfigPath). + With(CacheBustingExec( + []string{ + "kubectl", "rollout", "restart", "deploy/test-chart", + })) + out, err = ctr.Stdout(ctx) + if err != nil { + return fmt.Errorf("failed to restart replicated deployment: %w", err) + } + fmt.Println(out) + + // Poll for the new image to appear in the running images list + maxAttempts = 6 + retryDelay = 5 * time.Second + for attempt := 1; attempt <= maxAttempts; attempt++ { + var getErr error + allImagesSet, getErr = getRunningImages(ctx, appID, customerID, instanceAppID, tokenPlaintext) + if getErr != nil { + if attempt == maxAttempts { + return fmt.Errorf("failed to get running images after new namespace creation (after %d attempts): %w", maxAttempts, getErr) + } + fmt.Printf("attempt %d/%d: failed to get running images: %v\n", attempt, maxAttempts, getErr) + time.Sleep(retryDelay) + continue + } + + if _, ok := allImagesSet[newPodImage]; ok { + fmt.Printf("New namespace image %s detected in running images on attempt %d\n", newPodImage, attempt) + break + } + + if attempt == maxAttempts { + seen := make([]string, 0, len(allImagesSet)) + for k := range allImagesSet { + seen = append(seen, k) + } + return fmt.Errorf("after creating namespace %s and pod %s, expected image %s not found. Seen: %v", newNs, newPodName, newPodImage, seen) + } + + fmt.Printf("attempt %d/%d: image %s not yet reported (retrying)\n", attempt, maxAttempts, newPodImage) + time.Sleep(retryDelay) + } + return nil } diff --git a/pkg/appstate/appstate.go b/pkg/appstate/appstate.go index e0d6d76f..d33a567b 100644 --- a/pkg/appstate/appstate.go +++ b/pkg/appstate/appstate.go @@ -11,8 +11,11 @@ import ( reporttypes "github.com/replicatedhq/replicated-sdk/pkg/report/types" "github.com/replicatedhq/replicated-sdk/pkg/store" authv1 "k8s.io/api/authorization/v1" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" ) @@ -223,6 +226,16 @@ func (m *AppMonitor) runInformers(ctx context.Context, informers []types.StatusI ServiceResourceKind: runServiceController, StatefulSetResourceKind: runStatefulSetController, } + for namespace, kinds := range namespaceKinds { + for kind, informers := range kinds { + if impl, ok := kindImpls[kind]; ok { + goRun(impl, namespace, informers) + } else { + log.Printf("Informer requested for unsupported resource kind %v", kind) + } + } + } + // Start a Pod image controller per namespace // When reportAllImages is true or in embedded cluster, watch all accessible namespaces sdkStore := store.GetStore() @@ -257,20 +270,59 @@ func (m *AppMonitor) runInformers(ctx context.Context, informers []types.StatusI namespacesToWatch = informerNamespaces } - // Filter out namespaces we don't have permission to access - for ns := range namespacesToWatch { - if canAccessPodsInNamespace(ctx, m.clientset, ns) { - goRun(runPodImageController, ns, nil) + // Track started pod watchers to avoid duplicates from dynamic namespace events + var startedMu sync.Mutex + startedNamespaces := make(map[string]struct{}) + + maybeStartPodWatcher := func(ns string) { + if !canAccessPodsInNamespace(ctx, m.clientset, ns) { + return } + startedMu.Lock() + if _, ok := startedNamespaces[ns]; ok { + startedMu.Unlock() + return + } + startedNamespaces[ns] = struct{}{} + startedMu.Unlock() + goRun(runPodImageController, ns, nil) } - for namespace, kinds := range namespaceKinds { - for kind, informers := range kinds { - if impl, ok := kindImpls[kind]; ok { - goRun(impl, namespace, informers) - } else { - log.Printf("Informer requested for unsupported resource kind %v", kind) - } + + // Start initial pod watchers for namespaces we can access + for ns := range namespacesToWatch { + maybeStartPodWatcher(ns) + } + + // If configured to watch all namespaces and we have list+watch perms on namespaces, + // start a namespace informer to dynamically watch new namespaces and spawn pod watchers + if shouldWatchAllNamespaces && canListNamespaces(ctx, m.clientset) && canWatchNamespaces(ctx, m.clientset) { + listwatch := &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return m.clientset.CoreV1().Namespaces().List(ctx, options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return m.clientset.CoreV1().Namespaces().Watch(ctx, options) + }, } + informer := cache.NewSharedInformer( + listwatch, + &corev1.Namespace{}, + time.Minute, + ) + informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + ns, _ := obj.(*corev1.Namespace) + if ns == nil { + return + } + maybeStartPodWatcher(ns.Name) + }, + }) + shutdown.Add(1) + go func() { + defer shutdown.Done() + informer.Run(ctx.Done()) + }() } for { @@ -370,3 +422,28 @@ func canListNamespaces(ctx context.Context, clientset kubernetes.Interface) bool return true } + +// canWatchNamespaces checks if the current service account has permission to watch namespaces +func canWatchNamespaces(ctx context.Context, clientset kubernetes.Interface) bool { + sar := &authv1.SelfSubjectAccessReview{ + Spec: authv1.SelfSubjectAccessReviewSpec{ + ResourceAttributes: &authv1.ResourceAttributes{ + Verb: "watch", + Group: "", + Resource: "namespaces", + }, + }, + } + + result, err := clientset.AuthorizationV1().SelfSubjectAccessReviews().Create(ctx, sar, metav1.CreateOptions{}) + if err != nil { + log.Printf("Failed to check namespace watch permission: %v", err) + return false + } + + if !result.Status.Allowed { + return false + } + + return true +} diff --git a/pkg/appstate/daemonsets.go b/pkg/appstate/daemonsets.go index c154181c..bdf6f9b7 100644 --- a/pkg/appstate/daemonsets.go +++ b/pkg/appstate/daemonsets.go @@ -37,10 +37,10 @@ func runDaemonSetController(ctx context.Context, clientset kubernetes.Interface, ) { listwatch := &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return clientset.AppsV1().DaemonSets(targetNamespace).List(context.TODO(), options) + return clientset.AppsV1().DaemonSets(targetNamespace).List(ctx, options) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return clientset.AppsV1().DaemonSets(targetNamespace).Watch(context.TODO(), options) + return clientset.AppsV1().DaemonSets(targetNamespace).Watch(ctx, options) }, } diff --git a/pkg/appstate/deployments.go b/pkg/appstate/deployments.go index a3e0492e..0c0af206 100644 --- a/pkg/appstate/deployments.go +++ b/pkg/appstate/deployments.go @@ -27,10 +27,10 @@ func runDeploymentController( ) { listwatch := &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return clientset.AppsV1().Deployments(targetNamespace).List(context.TODO(), options) + return clientset.AppsV1().Deployments(targetNamespace).List(ctx, options) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return clientset.AppsV1().Deployments(targetNamespace).Watch(context.TODO(), options) + return clientset.AppsV1().Deployments(targetNamespace).Watch(ctx, options) }, } informer := cache.NewSharedInformer( diff --git a/pkg/appstate/ingress.go b/pkg/appstate/ingress.go index 096f6956..4daa9d87 100644 --- a/pkg/appstate/ingress.go +++ b/pkg/appstate/ingress.go @@ -31,10 +31,10 @@ func runIngressController( ) { listwatch := &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return clientset.NetworkingV1().Ingresses(targetNamespace).List(context.TODO(), options) + return clientset.NetworkingV1().Ingresses(targetNamespace).List(ctx, options) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return clientset.NetworkingV1().Ingresses(targetNamespace).Watch(context.TODO(), options) + return clientset.NetworkingV1().Ingresses(targetNamespace).Watch(ctx, options) }, } informer := cache.NewSharedInformer( diff --git a/pkg/appstate/persistentvolumeclaim.go b/pkg/appstate/persistentvolumeclaim.go index 9c71f709..abc4a7ce 100644 --- a/pkg/appstate/persistentvolumeclaim.go +++ b/pkg/appstate/persistentvolumeclaim.go @@ -27,10 +27,10 @@ func runPersistentVolumeClaimController( ) { listwatch := &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return clientset.CoreV1().PersistentVolumeClaims(targetNamespace).List(context.TODO(), options) + return clientset.CoreV1().PersistentVolumeClaims(targetNamespace).List(ctx, options) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return clientset.CoreV1().PersistentVolumeClaims(targetNamespace).Watch(context.TODO(), options) + return clientset.CoreV1().PersistentVolumeClaims(targetNamespace).Watch(ctx, options) }, } informer := cache.NewSharedInformer( diff --git a/pkg/appstate/pods.go b/pkg/appstate/pods.go index 7e25805b..a1008414 100644 --- a/pkg/appstate/pods.go +++ b/pkg/appstate/pods.go @@ -21,10 +21,10 @@ import ( func runPodImageController(ctx context.Context, clientset kubernetes.Interface, targetNamespace string, _ []appstatetypes.StatusInformer, _ chan<- appstatetypes.ResourceState) { listwatch := &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return clientset.CoreV1().Pods(targetNamespace).List(context.TODO(), options) + return clientset.CoreV1().Pods(targetNamespace).List(ctx, options) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return clientset.CoreV1().Pods(targetNamespace).Watch(context.TODO(), options) + return clientset.CoreV1().Pods(targetNamespace).Watch(ctx, options) }, } informer := cache.NewSharedInformer( diff --git a/pkg/appstate/service.go b/pkg/appstate/service.go index 67ac630f..b94c52d9 100644 --- a/pkg/appstate/service.go +++ b/pkg/appstate/service.go @@ -28,10 +28,10 @@ func runServiceController( ) { listwatch := &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return clientset.CoreV1().Services(targetNamespace).List(context.TODO(), options) + return clientset.CoreV1().Services(targetNamespace).List(ctx, options) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return clientset.CoreV1().Services(targetNamespace).Watch(context.TODO(), options) + return clientset.CoreV1().Services(targetNamespace).Watch(ctx, options) }, } informer := cache.NewSharedInformer( diff --git a/pkg/appstate/statefulsets.go b/pkg/appstate/statefulsets.go index b24ad425..8f77322f 100644 --- a/pkg/appstate/statefulsets.go +++ b/pkg/appstate/statefulsets.go @@ -38,10 +38,10 @@ func runStatefulSetController( ) { listwatch := &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return clientset.AppsV1().StatefulSets(targetNamespace).List(context.TODO(), options) + return clientset.AppsV1().StatefulSets(targetNamespace).List(ctx, options) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return clientset.AppsV1().StatefulSets(targetNamespace).Watch(context.TODO(), options) + return clientset.AppsV1().StatefulSets(targetNamespace).Watch(ctx, options) }, } informer := cache.NewSharedInformer(