Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

Expand Down Expand Up @@ -99,6 +100,11 @@ func AddToManager(ctx *pkgctx.ControllerManagerContext, mgr manager.Manager) err
GetPriority: kubeutil.GetVirtualMachineReconcilePriority,
})

// Filter UPDATE events that do not have updated metadata or spec.
// Please note, this does not apply to GENERIC events -- the async watcher
// and volume controller will still result in the VM being reconciled.
builder = builder.WithEventFilter(predicate.GenerationChangedPredicate{})

builder = builder.Watches(&vmopv1.VirtualMachineClass{},
handler.EnqueueRequestsFromMapFunc(classToVMMapperFn(ctx, r.Client)))

Expand All @@ -114,11 +120,24 @@ func AddToManager(ctx *pkgctx.ControllerManagerContext, mgr manager.Manager) err
builder = builder.WatchesRawSource(source.Channel(
cource.FromContextWithBuffer(ctx, "VirtualMachine", 100),
&kubeutil.EnqueueRequestForObject{
Logger: ctrl.Log.WithName("asyncvmqueue"),
Logger: ctrl.Log.WithName("vmqueue.async"),
GetPriority: kubeutil.GetVirtualMachineReconcilePriority,
}))
}

builder = builder.WatchesRawSource(source.Channel(
cource.FromContextWithBuffer(ctx, "VirtualMachineVolumes", 100),
&kubeutil.EnqueueRequestForObject{
Logger: ctrl.Log.WithName("vmqueue.volumes"),
GetPriority: func(
_ context.Context,
_ kubeutil.EventType,
_, _ client.Object, _ int) int {

return 50
},
}))

if pkgcfg.FromContext(ctx).Features.FastDeploy {
builder = builder.Watches(
&vmopv1.VirtualMachineImageCache{},
Expand Down
15 changes: 15 additions & 0 deletions controllers/virtualmachine/volume/volume_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apierrorsutil "k8s.io/apimachinery/pkg/util/errors"
Expand All @@ -26,6 +27,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand All @@ -44,6 +46,7 @@ import (
"github.com/vmware-tanzu/vm-operator/pkg/providers/vsphere/constants"
"github.com/vmware-tanzu/vm-operator/pkg/record"
pkgutil "github.com/vmware-tanzu/vm-operator/pkg/util"
"github.com/vmware-tanzu/vm-operator/pkg/util/kube/cource"
vmopv1util "github.com/vmware-tanzu/vm-operator/pkg/util/vmopv1"
)

Expand Down Expand Up @@ -216,12 +219,15 @@ type Reconciler struct {
// controller can block for a long time, consuming all of the workers.
func (r *Reconciler) Reconcile(ctx context.Context, request ctrl.Request) (_ ctrl.Result, reterr error) {
ctx = pkgcfg.JoinContext(ctx, r.Context)
ctx = cource.JoinContext(ctx, r.Context)

vm := &vmopv1.VirtualMachine{}
if err := r.Get(ctx, request.NamespacedName, vm); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}

origVM := vm.DeepCopy()

volCtx := &pkgctx.VolumeContext{
Context: ctx,
Logger: pkglog.FromContextOrDefault(ctx),
Expand Down Expand Up @@ -251,6 +257,15 @@ func (r *Reconciler) Reconcile(ctx context.Context, request ctrl.Request) (_ ctr
reterr = err
}
volCtx.Logger.Error(err, "patch failed")
} else if !apiequality.Semantic.DeepEqual(
vm.Status.Volumes,
origVM.Status.Volumes) {

// Notify the VM channel that the VM's status.volumes was updated.
chanSource := cource.FromContextWithBuffer(ctx, "VirtualMachineVolumes", 100)
chanSource <- event.GenericEvent{
Object: vm,
}
}
}()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@ import (
pkgcfg "github.com/vmware-tanzu/vm-operator/pkg/config"
pkgctx "github.com/vmware-tanzu/vm-operator/pkg/context"
providerfake "github.com/vmware-tanzu/vm-operator/pkg/providers/fake"
"github.com/vmware-tanzu/vm-operator/pkg/util/kube/cource"
"github.com/vmware-tanzu/vm-operator/test/builder"
)

var intgFakeVMProvider = providerfake.NewVMProvider()

var suite = builder.NewTestSuiteForControllerWithContext(
pkgcfg.NewContextWithDefaultConfig(),
cource.WithContext(pkgcfg.NewContextWithDefaultConfig()),
volume.AddToManager,
func(ctx *pkgctx.ControllerManagerContext, _ ctrlmgr.Manager) error {
ctx.VMProvider = intgFakeVMProvider
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
providerfake "github.com/vmware-tanzu/vm-operator/pkg/providers/fake"
"github.com/vmware-tanzu/vm-operator/pkg/providers/vsphere/constants"
"github.com/vmware-tanzu/vm-operator/pkg/util"
"github.com/vmware-tanzu/vm-operator/pkg/util/kube/cource"
"github.com/vmware-tanzu/vm-operator/pkg/util/ptr"
vmopv1util "github.com/vmware-tanzu/vm-operator/pkg/util/vmopv1"
"github.com/vmware-tanzu/vm-operator/test/builder"
Expand Down Expand Up @@ -151,6 +152,7 @@ func unitTestsReconcile() {

JustBeforeEach(func() {
ctx = suite.NewUnitTestContextForController()
ctx.Context = cource.WithContext(ctx.Context)

// Replace the fake client with our own that has the expected index.
ctx.Client = fake.NewClientBuilder().
Expand Down
15 changes: 15 additions & 0 deletions controllers/virtualmachine/volumebatch/volumebatch_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,23 @@ import (
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
storagehelpers "k8s.io/component-helpers/storage/volume"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

cnsv1alpha1 "github.com/vmware-tanzu/vm-operator/external/vsphere-csi-driver/api/v1alpha1"
pkgerr "github.com/vmware-tanzu/vm-operator/pkg/errors"
"github.com/vmware-tanzu/vm-operator/pkg/util/kube/cource"
"github.com/vmware-tanzu/vm-operator/pkg/util/ptr"

vmopv1 "github.com/vmware-tanzu/vm-operator/api/v1alpha5"
Expand Down Expand Up @@ -144,12 +147,15 @@ type Reconciler struct {
// Reconcile reconciles a VirtualMachine object and processes the volumes for batch attachment.
func (r *Reconciler) Reconcile(ctx context.Context, request ctrl.Request) (_ ctrl.Result, reterr error) {
ctx = pkgcfg.JoinContext(ctx, r.Context)
ctx = cource.JoinContext(ctx, r.Context)

vm := &vmopv1.VirtualMachine{}
if err := r.Get(ctx, request.NamespacedName, vm); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}

origVM := vm.DeepCopy()

volCtx := &pkgctx.VolumeContext{
Context: ctx,
Logger: pkglog.FromContextOrDefault(ctx),
Expand Down Expand Up @@ -179,6 +185,15 @@ func (r *Reconciler) Reconcile(ctx context.Context, request ctrl.Request) (_ ctr
reterr = err
}
volCtx.Logger.Error(err, "patch failed")
} else if !apiequality.Semantic.DeepEqual(
vm.Status.Volumes,
origVM.Status.Volumes) {

// Notify the VM channel that the VM's status.volumes was updated.
chanSource := cource.FromContextWithBuffer(ctx, "VirtualMachineVolumes", 100)
chanSource <- event.GenericEvent{
Object: vm,
}
}
}()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@ import (
pkgcfg "github.com/vmware-tanzu/vm-operator/pkg/config"
pkgctx "github.com/vmware-tanzu/vm-operator/pkg/context"
providerfake "github.com/vmware-tanzu/vm-operator/pkg/providers/fake"
"github.com/vmware-tanzu/vm-operator/pkg/util/kube/cource"
"github.com/vmware-tanzu/vm-operator/test/builder"
)

var intgFakeVMProvider = providerfake.NewVMProvider()

var suite = builder.NewTestSuiteForControllerWithContext(
pkgcfg.NewContextWithDefaultConfig(),
cource.WithContext(pkgcfg.NewContextWithDefaultConfig()),
volumebatch.AddToManager,
func(ctx *pkgctx.ControllerManagerContext, _ ctrlmgr.Manager) error {
ctx.VMProvider = intgFakeVMProvider
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
providerfake "github.com/vmware-tanzu/vm-operator/pkg/providers/fake"
"github.com/vmware-tanzu/vm-operator/pkg/providers/vsphere/constants"
"github.com/vmware-tanzu/vm-operator/pkg/util"
"github.com/vmware-tanzu/vm-operator/pkg/util/kube/cource"
"github.com/vmware-tanzu/vm-operator/pkg/util/ptr"
)

Expand Down Expand Up @@ -153,6 +154,7 @@ func unitTestsReconcile() {
JustBeforeEach(func() {

ctx = suite.NewUnitTestContextForController()
ctx.Context = cource.WithContext(ctx.Context)

// Replace the fake client with our own that has the expected index.
ctx.Client = fake.NewClientBuilder().
Expand Down
Loading