Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 24 additions & 7 deletions pkg/common/cacheruntime.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,9 @@ const (
)

type CacheRuntimeValue struct {
// RuntimeIdentity is used to identify the runtime (name/namespace)
RuntimeIdentity RuntimeIdentity `json:"runtimeIdentity"`

Master *CacheRuntimeComponentValue `json:"master,omitempty"`
Worker *CacheRuntimeComponentValue `json:"worker,omitempty"`
Client *CacheRuntimeComponentValue `json:"client,omitempty"`
Master *CacheRuntimeComponentValue
Worker *CacheRuntimeComponentValue
Client *CacheRuntimeComponentValue
}

// CacheRuntimeComponentValue is the common value for building CacheRuntimeValue.
Expand All @@ -54,12 +51,32 @@ type CacheRuntimeComponentValue struct {
Replicas int32
PodTemplateSpec corev1.PodTemplateSpec
Owner *OwnerReference
ComponentType ComponentType `json:"componentType,omitempty"`
ComponentType ComponentType

// Service name, can be not same as Component name
Service *CacheRuntimeComponentServiceConfig
}

// CacheRuntimeStatusValue contains only the fields needed for status update
type CacheRuntimeStatusValue struct {
Master *ComponentStatusInfo
Worker *ComponentStatusInfo
Client *ComponentStatusInfo
}

// ComponentIdentity contains minimal identity information for component status queries
type ComponentIdentity struct {
Name string
Namespace string
}

// ComponentStatusInfo contains the minimal information needed for status updates
type ComponentStatusInfo struct {
ComponentIdentity
Enabled bool
WorkloadType metav1.TypeMeta
}

// CacheRuntimeConfig defines the config of runtime, will be auto mounted by configmap in the component pod.
type CacheRuntimeConfig struct {
// Mounts from Dataset Spec
Expand Down
3 changes: 2 additions & 1 deletion pkg/common/label.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ const (

// The dataset annotation
// i.e. fluid.io/dataset
LabelAnnotationDataset = LabelAnnotationPrefix + "dataset"
LabelAnnotationDataset = LabelAnnotationPrefix + "dataset"
LabelAnnotationDatasetPlacement = LabelAnnotationPrefix + "dataset-placement"

// LabelAnnotationDatasetId indicates the uuid of the dataset
// i.e. fluid.io/dataset-uuid
Expand Down
12 changes: 6 additions & 6 deletions pkg/ctrl/affinity.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,12 @@ func (e *Helper) BuildWorkersAffinity(workers *appsv1.StatefulSet) (workersToUpd
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "fluid.io/dataset",
Key: common.LabelAnnotationDataset,
Operator: metav1.LabelSelectorOpExists,
},
},
},
TopologyKey: "kubernetes.io/hostname",
TopologyKey: common.K8sNodeNameLabelKey,
},
},
}
Expand All @@ -100,12 +100,12 @@ func (e *Helper) BuildWorkersAffinity(workers *appsv1.StatefulSet) (workersToUpd
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "fluid.io/dataset",
Key: common.LabelAnnotationDataset,
Operator: metav1.LabelSelectorOpExists,
},
},
},
TopologyKey: "kubernetes.io/hostname",
TopologyKey: common.K8sNodeNameLabelKey,
},
},
},
Expand All @@ -114,13 +114,13 @@ func (e *Helper) BuildWorkersAffinity(workers *appsv1.StatefulSet) (workersToUpd
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "fluid.io/dataset-placement",
Key: common.LabelAnnotationDatasetPlacement,
Operator: metav1.LabelSelectorOpIn,
Values: []string{string(datav1alpha1.ExclusiveMode)},
},
},
},
TopologyKey: "kubernetes.io/hostname",
TopologyKey: common.K8sNodeNameLabelKey,
},
},
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ctrl/affinity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func TestBuildWorkersAffinity(t *testing.T) {
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "fluid.io/dataset-placement",
Key: common.LabelAnnotationDatasetPlacement,
Operator: metav1.LabelSelectorOpIn,
Values: []string{"Exclusive"},
},
Expand Down Expand Up @@ -319,7 +319,7 @@ func TestBuildWorkersAffinityForEFCRuntime(t *testing.T) {
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "fluid.io/dataset-placement",
Key: common.LabelAnnotationDatasetPlacement,
Operator: metav1.LabelSelectorOpIn,
Values: []string{"Exclusive"},
},
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddc/cache/component/component_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import (

type ComponentManager interface {
Reconciler(ctx context.Context, component *common.CacheRuntimeComponentValue) error
ConstructComponentStatus(todo context.Context, value *common.CacheRuntimeComponentValue) (v1alpha1.RuntimeComponentStatus, error)
GetNodeAffinity(value *common.CacheRuntimeComponentValue) (*corev1.NodeAffinity, error)
ConstructComponentStatus(todo context.Context, identity *common.ComponentIdentity) (v1alpha1.RuntimeComponentStatus, error)
GetNodeAffinity(identity *common.ComponentIdentity) (*corev1.NodeAffinity, error)
}

func NewComponentHelper(workloadType metav1.TypeMeta, client client.Client) ComponentManager {
Expand Down
24 changes: 16 additions & 8 deletions pkg/ddc/cache/component/component_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,14 @@ func setupTestClient() client.Client {
return fake.NewFakeClientWithScheme(scheme)
}

// newComponentIdentity creates a ComponentIdentity from a CacheRuntimeComponentValue
func newComponentIdentity(component *common.CacheRuntimeComponentValue) *common.ComponentIdentity {
return &common.ComponentIdentity{
Name: component.Name,
Namespace: component.Namespace,
}
}

var _ = Describe("StatefulSetManager", func() {
var (
manager *StatefulSetManager
Expand Down Expand Up @@ -233,7 +241,7 @@ var _ = Describe("StatefulSetManager", func() {
}
Expect(manager.client.Create(ctx, sts)).To(Succeed())

status, err := manager.ConstructComponentStatus(ctx, component)
status, err := manager.ConstructComponentStatus(ctx, newComponentIdentity(component))
Expect(err).NotTo(HaveOccurred())
Expect(status.DesiredReplicas).To(Equal(int32(3)))
Expect(status.ReadyReplicas).To(Equal(int32(3)))
Expand Down Expand Up @@ -261,7 +269,7 @@ var _ = Describe("StatefulSetManager", func() {
}
Expect(manager.client.Create(ctx, sts)).To(Succeed())

status, err := manager.ConstructComponentStatus(ctx, component)
status, err := manager.ConstructComponentStatus(ctx, newComponentIdentity(component))
Expect(err).NotTo(HaveOccurred())
Expect(status.DesiredReplicas).To(Equal(int32(3)))
Expect(status.ReadyReplicas).To(Equal(int32(2)))
Expand All @@ -287,14 +295,14 @@ var _ = Describe("StatefulSetManager", func() {
}
Expect(manager.client.Create(ctx, sts)).To(Succeed())

status, err := manager.ConstructComponentStatus(ctx, component)
status, err := manager.ConstructComponentStatus(ctx, newComponentIdentity(component))
Expect(err).NotTo(HaveOccurred())
Expect(status.ReadyReplicas).To(Equal(int32(0)))
Expect(status.Phase).To(Equal(datav1alpha1.RuntimePhaseNotReady))
})

It("should return error when StatefulSet doesn't exist", func() {
_, err := manager.ConstructComponentStatus(ctx, component)
_, err := manager.ConstructComponentStatus(ctx, newComponentIdentity(component))
Expect(err).To(HaveOccurred())
})
})
Expand Down Expand Up @@ -397,7 +405,7 @@ var _ = Describe("DaemonSetManager", func() {
}
Expect(manager.client.Create(ctx, ds)).To(Succeed())

status, err := manager.ConstructComponentStatus(ctx, component)
status, err := manager.ConstructComponentStatus(ctx, newComponentIdentity(component))
Expect(err).NotTo(HaveOccurred())
Expect(status.DesiredReplicas).To(Equal(int32(3)))
Expect(status.ReadyReplicas).To(Equal(int32(3)))
Expand All @@ -422,7 +430,7 @@ var _ = Describe("DaemonSetManager", func() {
}
Expect(manager.client.Create(ctx, ds)).To(Succeed())

status, err := manager.ConstructComponentStatus(ctx, component)
status, err := manager.ConstructComponentStatus(ctx, newComponentIdentity(component))
Expect(err).NotTo(HaveOccurred())
Expect(status.DesiredReplicas).To(Equal(int32(3)))
Expect(status.ReadyReplicas).To(Equal(int32(2)))
Expand All @@ -448,7 +456,7 @@ var _ = Describe("DaemonSetManager", func() {
}
Expect(manager.client.Create(ctx, ds)).To(Succeed())

status, err := manager.ConstructComponentStatus(ctx, component)
status, err := manager.ConstructComponentStatus(ctx, newComponentIdentity(component))
Expect(err).NotTo(HaveOccurred())
Expect(status.DesiredReplicas).To(Equal(int32(3)))
Expect(status.ReadyReplicas).To(Equal(int32(0)))
Expand All @@ -458,7 +466,7 @@ var _ = Describe("DaemonSetManager", func() {
})

It("should return error when DaemonSet doesn't exist", func() {
_, err := manager.ConstructComponentStatus(ctx, component)
_, err := manager.ConstructComponentStatus(ctx, newComponentIdentity(component))
Expect(err).To(HaveOccurred())
})
})
Expand Down
10 changes: 5 additions & 5 deletions pkg/ddc/cache/component/daemonset_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ func (s *DaemonSetManager) Reconciler(ctx context.Context, component *common.Cac
return reconcileService(ctx, s.client, component)
}

func (s *DaemonSetManager) GetNodeAffinity(component *common.CacheRuntimeComponentValue) (*corev1.NodeAffinity, error) {
ds, err := kubeclient.GetDaemonset(s.client, component.Name, component.Namespace)
func (s *DaemonSetManager) GetNodeAffinity(identity *common.ComponentIdentity) (*corev1.NodeAffinity, error) {
ds, err := kubeclient.GetDaemonset(s.client, identity.Name, identity.Namespace)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -114,14 +114,14 @@ func (s *DaemonSetManager) constructDaemonSet(component *common.CacheRuntimeComp
return ds
}

func (s *DaemonSetManager) ConstructComponentStatus(ctx context.Context, component *common.CacheRuntimeComponentValue) (datav1alpha1.RuntimeComponentStatus, error) {
func (s *DaemonSetManager) ConstructComponentStatus(ctx context.Context, identity *common.ComponentIdentity) (datav1alpha1.RuntimeComponentStatus, error) {
logger := log.FromContext(ctx)
logger.Info("start to ConstructComponentStatus")

ds := &appsv1.DaemonSet{}
err := s.client.Get(ctx, types.NamespacedName{Name: component.Name, Namespace: component.Namespace}, ds)
err := s.client.Get(ctx, types.NamespacedName{Name: identity.Name, Namespace: identity.Namespace}, ds)
if err != nil {
logger.Error(err, fmt.Sprintf("failed to get component: %s/%s", component.Namespace, component.Name))
logger.Error(err, fmt.Sprintf("failed to get component: %s/%s", identity.Namespace, identity.Name))
return datav1alpha1.RuntimeComponentStatus{}, err
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/ddc/cache/component/statefulset_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ func (s *StatefulSetManager) Reconciler(ctx context.Context, component *common.C
return reconcileService(ctx, s.client, component)
}

func (s *StatefulSetManager) GetNodeAffinity(component *common.CacheRuntimeComponentValue) (*corev1.NodeAffinity, error) {
sts, err := kubeclient.GetStatefulSet(s.client, component.Name, component.Namespace)
func (s *StatefulSetManager) GetNodeAffinity(identity *common.ComponentIdentity) (*corev1.NodeAffinity, error) {
sts, err := kubeclient.GetStatefulSet(s.client, identity.Name, identity.Namespace)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -121,13 +121,13 @@ func (s *StatefulSetManager) constructStatefulSet(component *common.CacheRuntime
return sts
}

func (s *StatefulSetManager) ConstructComponentStatus(ctx context.Context, component *common.CacheRuntimeComponentValue) (datav1alpha1.RuntimeComponentStatus, error) {
func (s *StatefulSetManager) ConstructComponentStatus(ctx context.Context, identity *common.ComponentIdentity) (datav1alpha1.RuntimeComponentStatus, error) {
logger := log.FromContext(ctx)
logger.Info("start to ConstructComponentStatus")

sts, err := kubeclient.GetStatefulSet(s.client, component.Name, component.Namespace)
sts, err := kubeclient.GetStatefulSet(s.client, identity.Name, identity.Namespace)
if err != nil {
logger.Error(err, fmt.Sprintf("failed to get component: %s/%s", component.Namespace, component.Name))
logger.Error(err, fmt.Sprintf("failed to get component: %s/%s", identity.Namespace, identity.Name))
return datav1alpha1.RuntimeComponentStatus{}, err
}

Expand Down
6 changes: 5 additions & 1 deletion pkg/ddc/cache/engine/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,11 @@ func (e *CacheEngine) SetupClientInternal(clientValue *common.CacheRuntimeCompon
return err
}

clientStatus, err := manager.ConstructComponentStatus(context.TODO(), clientValue)
identity := &common.ComponentIdentity{
Name: clientValue.Name,
Namespace: clientValue.Namespace,
}
clientStatus, err := manager.ConstructComponentStatus(context.TODO(), identity)
if err != nil {
return err
}
Expand Down
Loading
Loading