Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 44 additions & 22 deletions pkg/dispatcher/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,18 @@ import (
karmadaclientset "github.com/karmada-io/karmada/pkg/generated/clientset/versioned"
karmadainformerfactory "github.com/karmada-io/karmada/pkg/generated/informers/externalversions"
informerworkv1aplha2 "github.com/karmada-io/karmada/pkg/generated/informers/externalversions/work/v1alpha2"
v1 "k8s.io/api/core/v1"
schedulingv1 "k8s.io/api/scheduling/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
schedv1 "k8s.io/client-go/informers/scheduling/v1"
"k8s.io/client-go/kubernetes"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"volcano.sh/apis/pkg/apis/scheduling/scheme"
schedulingv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
volcanoclientset "volcano.sh/apis/pkg/client/clientset/versioned"
volcanoinformer "volcano.sh/apis/pkg/client/informers/externalversions"
Expand All @@ -46,6 +50,7 @@ import (

type DispatcherCacheOption struct {
WorkerNum uint32
DispatcherName string
DefaultQueueName string
KubeClientOptions kube.ClientOptions
}
Expand All @@ -54,6 +59,9 @@ type DispatcherCache struct {
mutex sync.Mutex
workerNum uint32

// dispatcherName is the name for volcano dispatcher
dispatcherName string

kubeClient kubernetes.Interface
vcClient volcanoclientset.Interface
karmadaClient karmadaclientset.Interface
Expand Down Expand Up @@ -86,6 +94,9 @@ type DispatcherCache struct {
// Its queue for unsuspend the ResourceBinding, when a ResourceBinding finish dispatch,
// The Dispatcher will add a task to here, and update the ResourceBinding.spec.Suspend = false.
unSuspendRBTaskQueue workqueue.Interface

// ResourceBinding event recorder
recorder record.EventRecorder
}

func NewDispatcherCache(option *DispatcherCacheOption) DispatcherCacheInterface {
Expand All @@ -105,11 +116,17 @@ func NewDispatcherCache(option *DispatcherCacheOption) DispatcherCacheInterface
if err != nil {
panic(fmt.Sprintf("failed to init karmadaClient, with err: %v", err))
}
eventClient, err := kubernetes.NewForConfig(config)
if err != nil {
panic(fmt.Sprintf("failed to init eventClient, with err: %v", err))
}

// Create the default queue
utils.CreateDefaultQueue(volcanoClient, option.DefaultQueueName)

sc := &DispatcherCache{
dc := &DispatcherCache{
dispatcherName: option.DispatcherName,

kubeClient: kubeClient,
workerNum: option.WorkerNum,
vcClient: volcanoClient,
Expand All @@ -130,35 +147,40 @@ func NewDispatcherCache(option *DispatcherCacheOption) DispatcherCacheInterface
unSuspendRBTaskQueue: workqueue.New(),
}

sc.queueInformer = sc.volcanoInformerFactory.Scheduling().V1beta1().Queues()
sc.queueInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: sc.addQueue,
UpdateFunc: sc.updateQueue,
DeleteFunc: sc.deleteQueue,
dc.queueInformer = dc.volcanoInformerFactory.Scheduling().V1beta1().Queues()
dc.queueInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: dc.addQueue,
UpdateFunc: dc.updateQueue,
DeleteFunc: dc.deleteQueue,
})

sc.podGroupInformer = sc.volcanoInformerFactory.Scheduling().V1beta1().PodGroups()
sc.podGroupInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: sc.addPodGroup,
UpdateFunc: sc.updatePodGroup,
DeleteFunc: sc.deletePodGroup,
dc.podGroupInformer = dc.volcanoInformerFactory.Scheduling().V1beta1().PodGroups()
dc.podGroupInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: dc.addPodGroup,
UpdateFunc: dc.updatePodGroup,
DeleteFunc: dc.deletePodGroup,
})

sc.priorityClassInformer = sc.informerFactory.Scheduling().V1().PriorityClasses()
sc.priorityClassInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: sc.addPriorityClass,
UpdateFunc: sc.updatePriorityClass,
DeleteFunc: sc.deletePriorityClass,
dc.priorityClassInformer = dc.informerFactory.Scheduling().V1().PriorityClasses()
dc.priorityClassInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: dc.addPriorityClass,
UpdateFunc: dc.updatePriorityClass,
DeleteFunc: dc.deletePriorityClass,
})

sc.resourceBindingInformer = sc.karmadaInformerFactor.Work().V1alpha2().ResourceBindings()
sc.resourceBindingInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: sc.addResourceBinding,
UpdateFunc: sc.updateResourceBinding,
DeleteFunc: sc.deleteResourceBinding,
dc.resourceBindingInformer = dc.karmadaInformerFactor.Work().V1alpha2().ResourceBindings()
dc.resourceBindingInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: dc.addResourceBinding,
UpdateFunc: dc.updateResourceBinding,
DeleteFunc: dc.deleteResourceBinding,
})

return sc
// Prepare event clients.
broadcaster := record.NewBroadcaster()
broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: eventClient.CoreV1().Events("")})
dc.recorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: option.DispatcherName})

return dc
}

func (dc *DispatcherCache) Run(stopCh <-chan struct{}) {
Expand Down
10 changes: 10 additions & 0 deletions pkg/dispatcher/cache/resource_binding_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,24 @@ package cache
import (
"context"
"encoding/json"
"fmt"

workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
"gomodules.xyz/jsonpatch/v2"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"

"volcano.sh/volcano-global/pkg/dispatcher/api"
)

// Reasons for resourceBinding events.
const (
// FailedUnSuspendReason is added in an event when failed to unsuspend a ResourceBinding.
FailedUnSuspendReason = "FailedUnSuspend"
)

func (dc *DispatcherCache) UnSuspendResourceBinding(key types.NamespacedName) {
dc.mutex.Lock()
defer dc.mutex.Unlock()
Expand Down Expand Up @@ -84,6 +92,8 @@ func (dc *DispatcherCache) unSuspendResourceBinding(rb *workv1alpha2.ResourceBin
klog.Errorf("ResourceBindingInfo <%s/%s> not found in cache.", key.Namespace, key.Name)
} else {
// Recover the ResourceBindingInfo status to Suspended, wait for the next dispatch.
dc.recorder.Event(rbi.ResourceBinding, v1.EventTypeWarning, FailedUnSuspendReason,
fmt.Sprintf("Error unsuspending ResourceBinding: %+v", err))
rbi.DispatchStatus = api.Suspended
}
dc.mutex.Unlock()
Expand Down
5 changes: 2 additions & 3 deletions pkg/dispatcher/cache/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/retry"
"k8s.io/utils/ptr"
schedulingv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
volcanoclientset "volcano.sh/apis/pkg/client/clientset/versioned"

"volcano.sh/volcano-global/pkg/utils"
)

// CreateDefaultQueue Create the default queue.
Expand All @@ -46,7 +45,7 @@ func CreateDefaultQueue(volcanoClient volcanoclientset.Interface, queueName stri
Name: queueName,
},
Spec: schedulingv1beta1.QueueSpec{
Reclaimable: utils.ToPointer(true),
Reclaimable: ptr.To(true),
Weight: 1,
},
}, metav1.CreateOptions{})
Expand Down
4 changes: 0 additions & 4 deletions pkg/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,3 @@ limitations under the License.
*/

package utils

func ToPointer[T any](c T) *T {
return &c
}
3 changes: 2 additions & 1 deletion pkg/webhooks/resourcebinding/mutating/mutating.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
registrationv1 "k8s.io/api/admissionregistration/v1"
"k8s.io/apimachinery/pkg/util/json"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
"volcano.sh/volcano/pkg/webhooks/router"
"volcano.sh/volcano/pkg/webhooks/util"

Expand Down Expand Up @@ -97,6 +98,6 @@ func ResourceBindings(ar admissionv1.AdmissionReview) *admissionv1.AdmissionResp
return util.ToAdmissionResponse(err)
}

response.PatchType = utils.ToPointer(admissionv1.PatchTypeJSONPatch)
response.PatchType = ptr.To(admissionv1.PatchTypeJSONPatch)
return response
}
4 changes: 2 additions & 2 deletions pkg/webhooks/resourcebinding/mutating/mutating_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/json"
"k8s.io/utils/ptr"

"volcano.sh/volcano-global/pkg/utils"
"volcano.sh/volcano-global/pkg/webhooks/decoder"
)

Expand Down Expand Up @@ -89,7 +89,7 @@ func TestResourceBindings(t *testing.T) {
expectResponse: admissionv1.AdmissionResponse{
Allowed: true,
Patch: normalResponsePatch,
PatchType: utils.ToPointer(admissionv1.PatchTypeJSONPatch),
PatchType: ptr.To(admissionv1.PatchTypeJSONPatch),
},
},
{
Expand Down