diff --git a/pkg/cache/settings.go b/pkg/cache/settings.go index 692ac2674..6e060c196 100644 --- a/pkg/cache/settings.go +++ b/pkg/cache/settings.go @@ -30,7 +30,10 @@ func (f *noopSettings) IsExcludedResource(_, _, _ string) bool { // Settings caching customizations type Settings struct { // ResourceHealthOverride contains health assessment overrides + // Deprecated: use ResourceHealthOverrideContext insttead. ResourceHealthOverride health.HealthOverride + // ResourceHealthOverrideContext contains health assessment overrides + ResourceHealthOverrideContext health.HealthOverrideContext // ResourcesFilter holds filter that excludes resources ResourcesFilter kube.ResourceFilter } @@ -54,7 +57,7 @@ func SetPopulateResourceInfoHandler(handler OnPopulateResourceInfoHandler) Updat // SetSettings updates caching settings func SetSettings(settings Settings) UpdateSettingsFunc { return func(cache *clusterCache) { - cache.settings = Settings{settings.ResourceHealthOverride, settings.ResourcesFilter} + cache.settings = Settings{settings.ResourceHealthOverride, settings.ResourceHealthOverrideContext, settings.ResourcesFilter} } } diff --git a/pkg/diff/diff.go b/pkg/diff/diff.go index 1cacc2dff..7af0f5504 100644 --- a/pkg/diff/diff.go +++ b/pkg/diff/diff.go @@ -61,9 +61,15 @@ func (n *noopNormalizer) Normalize(_ *unstructured.Unstructured) error { return nil } +func (n *noopNormalizer) NormalizeContext(_ context.Context, _ *unstructured.Unstructured) error { + return nil +} + // Normalizer updates resource before comparing it type Normalizer interface { + // Deprecated: use NormalizeContext instead Normalize(un *unstructured.Unstructured) error + NormalizeContext(ctx context.Context, un *unstructured.Unstructured) error } // GetNoopNormalizer returns normalizer that does not apply any resource modifications diff --git a/pkg/health/health.go b/pkg/health/health.go index c615deea9..b3fcc6fea 100644 --- a/pkg/health/health.go +++ b/pkg/health/health.go @@ -1,6 +1,8 @@ package health import ( + "context" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" @@ -33,6 +35,13 @@ type HealthOverride interface { GetResourceHealth(obj *unstructured.Unstructured) (*HealthStatus, error) } +// Implements custom health assessment that overrides built-in assessment +type HealthOverrideContext interface { + GetResourceHealth(ctx context.Context, obj *unstructured.Unstructured) (*HealthStatus, error) +} + +type HealthOverrideFuncContext func(ctx context.Context, obj *unstructured.Unstructured) (*HealthStatus, error) + // Holds health assessment results type HealthStatus struct { Status HealthStatusCode `json:"status,omitempty"` @@ -66,6 +75,25 @@ func IsWorse(current, new HealthStatusCode) bool { // GetResourceHealth returns the health of a k8s resource func GetResourceHealth(obj *unstructured.Unstructured, healthOverride HealthOverride) (health *HealthStatus, err error) { + healthOverrideContext := func(_ context.Context, obj *unstructured.Unstructured) (*HealthStatus, error) { + return healthOverride.GetResourceHealth(obj) + } + return getResourceHealth(context.Background(), obj, healthOverrideContext) +} + +// GetResourceHealth returns the health of a k8s resource +func GetResourceHealthContext(ctx context.Context, obj *unstructured.Unstructured, healthOverride HealthOverrideContext) (health *HealthStatus, err error) { + var healthOverrideFunc HealthOverrideFuncContext + if healthOverride != nil { + healthOverrideFunc = func(ctx context.Context, obj *unstructured.Unstructured) (*HealthStatus, error) { + return healthOverride.GetResourceHealth(ctx, obj) + } + } + return getResourceHealth(ctx, obj, healthOverrideFunc) +} + +// GetResourceHealth returns the health of a k8s resource +func getResourceHealth(ctx context.Context, obj *unstructured.Unstructured, healthOverride HealthOverrideFuncContext) (health *HealthStatus, err error) { if obj.GetDeletionTimestamp() != nil && !hook.HasHookFinalizer(obj) { return &HealthStatus{ Status: HealthStatusProgressing, @@ -74,7 +102,7 @@ func GetResourceHealth(obj *unstructured.Unstructured, healthOverride HealthOver } if healthOverride != nil { - health, err := healthOverride.GetResourceHealth(obj) + health, err := healthOverride(ctx, obj) if err != nil { health = &HealthStatus{ Status: HealthStatusUnknown, diff --git a/pkg/health/health_test.go b/pkg/health/health_test.go index ef945eb46..cd34eec1a 100644 --- a/pkg/health/health_test.go +++ b/pkg/health/health_test.go @@ -5,6 +5,7 @@ Package provides functionality that allows assessing the health state of a Kuber package health import ( + "context" "os" "testing" @@ -28,7 +29,7 @@ func getHealthStatus(t *testing.T, yamlPath string) *HealthStatus { var obj unstructured.Unstructured err = yaml.Unmarshal(yamlBytes, &obj) require.NoError(t, err) - health, err := GetResourceHealth(&obj, nil) + health, err := GetResourceHealthContext(context.Background(), &obj, nil) require.NoError(t, err) return health } diff --git a/pkg/sync/sync_context.go b/pkg/sync/sync_context.go index 3420a351c..7cf10702a 100644 --- a/pkg/sync/sync_context.go +++ b/pkg/sync/sync_context.go @@ -50,9 +50,16 @@ func (r *reconciledResource) key() kubeutil.ResourceKey { type SyncContext interface { // Terminate terminates sync operation. The method is asynchronous: it starts deletion is related K8S resources // such as in-flight resource hooks, updates operation status, and exists without waiting for resource completion. + // Deprecated: use TerminateContext instead Terminate() + // TerminateContext terminates sync operation. The method is asynchronous: it starts deletion is related K8S resources + // such as in-flight resource hooks, updates operation status, and exists without waiting for resource completion. + TerminateContext(ctx context.Context) // Executes next synchronization step and updates operation status. + // Deprecated: use SyncContext instead Sync() + // Executes next synchronization step and updates operation status. + SyncContext(ctx context.Context) // Returns current sync operation state and information about resources synchronized so far. GetState() (common.OperationPhase, string, []common.ResourceSyncResult) } @@ -75,12 +82,20 @@ func WithPermissionValidator(validator common.PermissionValidator) SyncOpt { } // WithHealthOverride sets specified health override +// Deprecated: use WithHealthOverrideContext instead func WithHealthOverride(override health.HealthOverride) SyncOpt { return func(ctx *syncContext) { ctx.healthOverride = override } } +// WithHealthOverrideContext sets specified health override +func WithHealthOverrideContext(override health.HealthOverrideContext) SyncOpt { + return func(ctx *syncContext) { + ctx.healthOverrideContext = override + } +} + // WithInitialState sets sync operation initial state func WithInitialState(phase common.OperationPhase, message string, results []common.ResourceSyncResult, startedAt metav1.Time) SyncOpt { return func(ctx *syncContext) { @@ -308,11 +323,17 @@ const ( ) // getOperationPhase returns a health status from a _live_ unstructured object -func (sc *syncContext) getOperationPhase(obj *unstructured.Unstructured) (common.OperationPhase, string, error) { +func (sc *syncContext) getOperationPhase(ctx context.Context, obj *unstructured.Unstructured) (common.OperationPhase, string, error) { phase := common.OperationSucceeded message := obj.GetName() + " created" - resHealth, err := health.GetResourceHealth(obj, sc.healthOverride) + var resHealth *health.HealthStatus + var err error + if sc.healthOverrideContext != nil { + resHealth, err = health.GetResourceHealthContext(ctx, obj, sc.healthOverrideContext) + } else if sc.healthOverride != nil { + resHealth, err = health.GetResourceHealth(obj, sc.healthOverride) + } if err != nil { return "", "", err } @@ -333,18 +354,19 @@ func (sc *syncContext) getOperationPhase(obj *unstructured.Unstructured) (common } type syncContext struct { - healthOverride health.HealthOverride - permissionValidator common.PermissionValidator - resources map[kubeutil.ResourceKey]reconciledResource - hooks []*unstructured.Unstructured - config *rest.Config - rawConfig *rest.Config - dynamicIf dynamic.Interface - disco discovery.DiscoveryInterface - extensionsclientset *clientset.Clientset - kubectl kubeutil.Kubectl - resourceOps kubeutil.ResourceOperations - namespace string + healthOverride health.HealthOverride + healthOverrideContext health.HealthOverrideContext + permissionValidator common.PermissionValidator + resources map[kubeutil.ResourceKey]reconciledResource + hooks []*unstructured.Unstructured + config *rest.Config + rawConfig *rest.Config + dynamicIf dynamic.Interface + disco discovery.DiscoveryInterface + extensionsclientset *clientset.Clientset + kubectl kubeutil.Kubectl + resourceOps kubeutil.ResourceOperations + namespace string dryRun bool skipDryRun bool @@ -403,8 +425,19 @@ func (sc *syncContext) setRunningPhase(tasks []*syncTask, isPendingDeletion bool } } -// sync has performs the actual apply or hook based sync +// Sync has performs the actual apply or hook based sync +// Deprecated: use SyncContext instead func (sc *syncContext) Sync() { + sc.SyncContext(context.Background()) +} + +// SyncContext has performs the actual apply or hook based sync +func (sc *syncContext) SyncContext(ctx context.Context) { + sc.sync(ctx) +} + +// sync has performs the actual apply or hook based sync +func (sc *syncContext) sync(ctx context.Context) { sc.log.WithValues("skipHooks", sc.skipHooks, "started", sc.started()).Info("Syncing") tasks, ok := sc.getSyncTasks() if !ok { @@ -441,7 +474,7 @@ func (sc *syncContext) Sync() { }) { if task.isHook() { // update the hook's result - operationState, message, err := sc.getOperationPhase(task.liveObj) + operationState, message, err := sc.getOperationPhase(ctx, task.liveObj) if err != nil { sc.setResourceResult(task, "", common.OperationError, fmt.Sprintf("failed to get resource health: %v", err)) } else { @@ -449,7 +482,13 @@ func (sc *syncContext) Sync() { } } else { // this must be calculated on the live object - healthStatus, err := health.GetResourceHealth(task.liveObj, sc.healthOverride) + var healthStatus *health.HealthStatus + var err error + if sc.healthOverrideContext != nil { + healthStatus, err = health.GetResourceHealthContext(ctx, task.liveObj, sc.healthOverrideContext) + } else if sc.healthOverride != nil { + healthStatus, err = health.GetResourceHealth(task.liveObj, sc.healthOverride) + } if err == nil { sc.log.WithValues("task", task, "healthStatus", healthStatus).V(1).Info("attempting to update health of running task") if healthStatus == nil { @@ -1176,8 +1215,17 @@ func (sc *syncContext) hasCRDOfGroupKind(group string, kind string) bool { return false } -// terminate looks for any running jobs/workflow hooks and deletes the resource +// Deprecated: use TerminateContext instead func (sc *syncContext) Terminate() { + sc.TerminateContext(context.Background()) +} + +func (sc *syncContext) TerminateContext(ctx context.Context) { + sc.terminate(ctx) +} + +// terminate looks for any running jobs/workflow hooks and deletes the resource +func (sc *syncContext) terminate(ctx context.Context) { terminateSuccessful := true sc.log.V(1).Info("terminating") tasks, _ := sc.getSyncTasks() @@ -1190,7 +1238,7 @@ func (sc *syncContext) Terminate() { terminateSuccessful = false continue } - phase, msg, err := sc.getOperationPhase(task.liveObj) + phase, msg, err := sc.getOperationPhase(ctx, task.liveObj) if err != nil { sc.setOperationPhase(common.OperationError, fmt.Sprintf("Failed to get hook health: %v", err)) return diff --git a/pkg/utils/kube/ctl.go b/pkg/utils/kube/ctl.go index 8af205ad4..8adcf6943 100644 --- a/pkg/utils/kube/ctl.go +++ b/pkg/utils/kube/ctl.go @@ -28,6 +28,8 @@ type CleanupFunc func() type OnKubectlRunFunc func(command string) (CleanupFunc, error) +type OnKubectlRunFuncContext func(ctx context.Context, command string) (CleanupFunc, error) + type Kubectl interface { ManageResources(config *rest.Config, openAPISchema openapi.Resources) (ResourceOperations, func(), error) LoadOpenAPISchema(config *rest.Config) (openapi.Resources, *managedfields.GvkParser, error) @@ -39,13 +41,16 @@ type Kubectl interface { GetAPIResources(config *rest.Config, preferred bool, resourceFilter ResourceFilter) ([]APIResourceInfo, error) GetServerVersion(config *rest.Config) (string, error) NewDynamicClient(config *rest.Config) (dynamic.Interface, error) + // Deprecated: use SetOnKubectlRunContext instead. SetOnKubectlRun(onKubectlRun OnKubectlRunFunc) + SetOnKubectlRunContext(onKubectlRun OnKubectlRunFuncContext) } type KubectlCmd struct { - Log logr.Logger - Tracer tracing.Tracer - OnKubectlRun OnKubectlRunFunc + Log logr.Logger + Tracer tracing.Tracer + OnKubectlRun OnKubectlRunFunc + OnKubectlRunContext OnKubectlRunFuncContext } type APIResourceInfo struct { @@ -292,11 +297,23 @@ func (k *KubectlCmd) ManageResources(config *rest.Config, openAPISchema openapi. openAPISchema: openAPISchema, tracer: k.Tracer, log: k.Log, - onKubectlRun: k.OnKubectlRun, + onKubectlRun: k.OnKubectlRunContext, }, cleanup, nil } +// Deprecated: use ManageServerSideDiffDryRunsContext instead. func ManageServerSideDiffDryRuns(config *rest.Config, openAPISchema openapi.Resources, tracer tracing.Tracer, log logr.Logger, onKubectlRun OnKubectlRunFunc) (diff.KubeApplier, func(), error) { + onKubectlRunContext := func(_ context.Context, command string) (CleanupFunc, error) { + return onKubectlRun(command) + } + return manageServerSideDiffDryRunsContext(config, openAPISchema, tracer, log, onKubectlRunContext) +} + +func ManageServerSideDiffDryRunsContext(config *rest.Config, openAPISchema openapi.Resources, tracer tracing.Tracer, log logr.Logger, onKubectlRunContext OnKubectlRunFuncContext) (diff.KubeApplier, func(), error) { + return manageServerSideDiffDryRunsContext(config, openAPISchema, tracer, log, onKubectlRunContext) +} + +func manageServerSideDiffDryRunsContext(config *rest.Config, openAPISchema openapi.Resources, tracer tracing.Tracer, log logr.Logger, onKubectlRunContext OnKubectlRunFuncContext) (diff.KubeApplier, func(), error) { f, err := os.CreateTemp(utils.TempDir, "") if err != nil { return nil, nil, fmt.Errorf("failed to generate temp file for kubeconfig: %w", err) @@ -317,7 +334,7 @@ func ManageServerSideDiffDryRuns(config *rest.Config, openAPISchema openapi.Reso openAPISchema: openAPISchema, tracer: tracer, log: log, - onKubectlRun: onKubectlRun, + onKubectlRun: onKubectlRunContext, }, cleanup, nil } @@ -356,6 +373,10 @@ func (k *KubectlCmd) SetOnKubectlRun(onKubectlRun OnKubectlRunFunc) { k.OnKubectlRun = onKubectlRun } +func (k *KubectlCmd) SetOnKubectlRunContext(onKubectlRunContext OnKubectlRunFuncContext) { + k.OnKubectlRunContext = onKubectlRunContext +} + func RunAllAsync(count int, action func(i int) error) error { g, ctx := errgroup.WithContext(context.Background()) loop: diff --git a/pkg/utils/kube/kubetest/mock.go b/pkg/utils/kube/kubetest/mock.go index 2faa669bf..29ebf468a 100644 --- a/pkg/utils/kube/kubetest/mock.go +++ b/pkg/utils/kube/kubetest/mock.go @@ -96,6 +96,9 @@ func (k *MockKubectlCmd) LoadOpenAPISchema(_ *rest.Config) (openapi.Resources, * func (k *MockKubectlCmd) SetOnKubectlRun(_ kube.OnKubectlRunFunc) { } +func (k *MockKubectlCmd) SetOnKubectlRunContext(_ kube.OnKubectlRunFuncContext) { +} + func (k *MockKubectlCmd) ManageResources(_ *rest.Config, _ openapi.Resources) (kube.ResourceOperations, func(), error) { return &MockResourceOps{}, func() { }, nil diff --git a/pkg/utils/kube/resource_ops.go b/pkg/utils/kube/resource_ops.go index 039749ac3..0c94f1113 100644 --- a/pkg/utils/kube/resource_ops.go +++ b/pkg/utils/kube/resource_ops.go @@ -50,7 +50,7 @@ type kubectlResourceOperations struct { config *rest.Config log logr.Logger tracer tracing.Tracer - onKubectlRun OnKubectlRunFunc + onKubectlRun OnKubectlRunFuncContext fact cmdutil.Factory openAPISchema openapi.Resources } @@ -60,7 +60,7 @@ type kubectlServerSideDiffDryRunApplier struct { config *rest.Config log logr.Logger tracer tracing.Tracer - onKubectlRun OnKubectlRunFunc + onKubectlRun OnKubectlRunFuncContext fact cmdutil.Factory openAPISchema openapi.Resources } @@ -193,7 +193,7 @@ func (k *kubectlServerSideDiffDryRunApplier) runResourceCommand(obj *unstructure // See: https://github.com/kubernetes/kubernetes/issues/66353 // `auth reconcile` will delete and recreate the resource if necessary func (k *kubectlResourceOperations) rbacReconcile(ctx context.Context, obj *unstructured.Unstructured, fileName string, dryRunStrategy cmdutil.DryRunStrategy) (string, error) { - cleanup, err := processKubectlRun(k.onKubectlRun, "auth") + cleanup, err := processKubectlRun(ctx, k.onKubectlRun, "auth") if err != nil { return "", fmt.Errorf("error processing kubectl run auth: %w", err) } @@ -227,7 +227,7 @@ func (k *kubectlResourceOperations) ReplaceResource(ctx context.Context, obj *un defer span.Finish() k.log.Info(fmt.Sprintf("Replacing resource %s/%s in cluster: %s, namespace: %s", obj.GetKind(), obj.GetName(), k.config.Host, obj.GetNamespace())) return k.runResourceCommand(ctx, obj, dryRunStrategy, func(ioStreams genericclioptions.IOStreams, fileName string) error { - cleanup, err := processKubectlRun(k.onKubectlRun, "replace") + cleanup, err := processKubectlRun(ctx, k.onKubectlRun, "replace") if err != nil { return err } @@ -248,7 +248,7 @@ func (k *kubectlResourceOperations) CreateResource(ctx context.Context, obj *uns span.SetBaggageItem("name", obj.GetName()) defer span.Finish() return k.runResourceCommand(ctx, obj, dryRunStrategy, func(ioStreams genericclioptions.IOStreams, fileName string) error { - cleanup, err := processKubectlRun(k.onKubectlRun, "create") + cleanup, err := processKubectlRun(ctx, k.onKubectlRun, "create") if err != nil { return err } @@ -301,7 +301,7 @@ func (k *kubectlResourceOperations) UpdateResource(ctx context.Context, obj *uns } // ApplyResource performs an apply of a unstructured resource -func (k *kubectlServerSideDiffDryRunApplier) ApplyResource(_ context.Context, obj *unstructured.Unstructured, dryRunStrategy cmdutil.DryRunStrategy, force bool, validate bool, serverSideApply bool, manager string) (string, error) { +func (k *kubectlServerSideDiffDryRunApplier) ApplyResource(ctx context.Context, obj *unstructured.Unstructured, dryRunStrategy cmdutil.DryRunStrategy, force bool, validate bool, serverSideApply bool, manager string) (string, error) { span := k.tracer.StartSpan("ApplyResource") span.SetBaggageItem("kind", obj.GetKind()) span.SetBaggageItem("name", obj.GetName()) @@ -312,7 +312,7 @@ func (k *kubectlServerSideDiffDryRunApplier) ApplyResource(_ context.Context, ob "serverSideApply", serverSideApply).Info(fmt.Sprintf("Running server-side diff. Dry run applying resource %s/%s in cluster: %s, namespace: %s", obj.GetKind(), obj.GetName(), k.config.Host, obj.GetNamespace())) return k.runResourceCommand(obj, func(ioStreams genericclioptions.IOStreams, fileName string) error { - cleanup, err := processKubectlRun(k.onKubectlRun, "apply") + cleanup, err := processKubectlRun(ctx, k.onKubectlRun, "apply") if err != nil { return err } @@ -343,7 +343,7 @@ func (k *kubectlResourceOperations) ApplyResource(ctx context.Context, obj *unst "serverSideDiff", true).Info(fmt.Sprintf("Applying resource %s/%s in cluster: %s, namespace: %s", obj.GetKind(), obj.GetName(), k.config.Host, obj.GetNamespace())) return k.runResourceCommand(ctx, obj, dryRunStrategy, func(ioStreams genericclioptions.IOStreams, fileName string) error { - cleanup, err := processKubectlRun(k.onKubectlRun, "apply") + cleanup, err := processKubectlRun(ctx, k.onKubectlRun, "apply") if err != nil { return err } @@ -616,9 +616,9 @@ func (k *kubectlResourceOperations) authReconcile(ctx context.Context, obj *unst return strings.Join(out, ". "), nil } -func processKubectlRun(onKubectlRun OnKubectlRunFunc, cmd string) (CleanupFunc, error) { +func processKubectlRun(ctx context.Context, onKubectlRun OnKubectlRunFuncContext, cmd string) (CleanupFunc, error) { if onKubectlRun != nil { - return onKubectlRun(cmd) + return onKubectlRun(ctx, cmd) } return func() {}, nil }