Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 17 additions & 19 deletions slice/api/v1alpha1/slice_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,25 @@ import (
)

// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this note related to Type. Can we add some space between to avoid confusion?

type Type string
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
type Type string
type AcceleratorType string

Maybe like this?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the slice API, this is just copying the canonical version of this API (exists in some doc), and we need to match that.

Copy link
Collaborator

@mbobrovskyi mbobrovskyi Nov 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like this

Suggested change
type Type string
// +kubebuilder:validation:Enum=v6e;tpu-v7x
type Type string


const (
TypeV6e Type = "v6e"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
TypeV6e Type = "v6e"
AcceleratorTypeV6e Type = "v6e"

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we relax validation for it in IsValidTPUAccelerator()?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will need to implement logic supporting v6 before relaxing this.

TypeTpu7x Type = "tpu-v7x"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
TypeTpu7x Type = "tpu-v7x"
AcceleratorTypeTpu7x Type = "tpu-v7x"

)

// SliceSpec defines the desired state of Slice.
type SliceSpec struct {
// AcceleratorType specifies the type of accelerator used in this slice.
// Type specifies the type of accelerator used in this slice, e.g., "v6e", "tpu-v7x".
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Type specifies the type of accelerator used in this slice, e.g., "v6e", "tpu-v7x".
// type specifies the type of accelerator used in this slice, e.g., "v6e", "tpu-v7x".

// +kubebuilder:validation:Immutable
Type string `json:"type"`
// +kubebuilder:validation:Enum=v6e;tpu-v7x
Copy link
Collaborator

@mbobrovskyi mbobrovskyi Nov 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this validation to type?

Type Type `json:"type"`

// Topology represents the network topology of the slice.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Topology represents the network topology of the slice.
// topology represents the network topology of the slice.

// It defines the physical arrangement of TPU chips in a 2D or 3D mesg.
// The topology must be specified in `<X>x<Y>` or `<X>x<Y>x<Z>` format.
// +kubebuilder:validation:Immutable
// +kubebuilder:validation:Pattern=^\d+x\d+(x\d+)?$
Topology string `json:"topology"`

// Partition Ids denotes the set of partitions to use to form a slice
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Partition Ids denotes the set of partitions to use to form a slice
// partitionIds denotes the set of partitions to use to form a slice

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also the same for status.

Expand All @@ -55,7 +65,7 @@ type SliceStatus struct {
// +kubebuilder:subresource:status
// +kubebuilder:printcolumn:name="Type",type=string,JSONPath=`.spec.type`
// +kubebuilder:printcolumn:name="Topology",type=string,JSONPath=`.spec.topology`
// +kubebuilder:printcolumn:name="Status",type=string,JSONPath=`.status.conditions[0].type`
// +kubebuilder:printcolumn:name="State",type=string,JSONPath=`.status.conditions[?(@.type=="Ready")].reason`
// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"
// Slice is the Schema for the slices API.
type Slice struct {
Expand All @@ -75,22 +85,10 @@ type SliceList struct {
Items []Slice `json:"items"`
}

// SliceConditionType defines the type of condition
type SliceConditionType string

const (
// Forming means the slice is being created and configured.
Forming SliceConditionType = "Forming"
// Ready means the slice is fully operational.
Ready SliceConditionType = "Ready"
// Degraded means the slice is operational but with reduced capacity or performance.
Degraded SliceConditionType = "Degraded"
// Deformed means the slice is being torn down.
Deformed SliceConditionType = "Deformed"
// Error means the slice has encountered an error and is not operational.
Error SliceConditionType = "Error"
)

func init() {
SchemeBuilder.Register(&Slice{}, &SliceList{})
}

const (
SliceStateConditionType = "Ready"
)
78 changes: 31 additions & 47 deletions slice/internal/controller/workload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
return ctrl.Result{}, err
}

deleted, _, errored, _ := r.groupSlices(slices)
deleted, toDelete, _ := r.groupSlices(slices)
if len(deleted) > 0 {
log.V(3).Info(
"Waiting for deleted Slices to be cleaned up; skipping reconciliation for now",
Expand All @@ -181,12 +181,12 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
return ctrl.Result{}, client.IgnoreNotFound(err)
}

if len(errored) > 0 {
if len(toDelete) > 0 {
log.V(3).Info(
"Deleting errored Slices",
"erroredSlices", klog.KObjSlice(errored),
"Deleting Slices",
"slices", klog.KObjSlice(toDelete),
)
err = r.deleteSlices(ctx, errored)
err = r.deleteSlices(ctx, toDelete)
if err != nil {
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -256,22 +256,13 @@ func (r *WorkloadReconciler) cleanupSlices(ctx context.Context, wl *kueue.Worklo
return false, err
}

deleted, deformed, errored, other := r.groupSlices(slices)
deleted, errored, other := r.groupSlices(slices)

if len(deleted) == len(slices) {
log.V(3).Info("All slices already deleted; finishing cleanup")
return true, nil
}

if len(deformed) > 0 {
log.V(3).Info("Found Slices in deformed state; cleaning them up", "deformedSlices", klog.KObjSlice(deformed))
// We still need to delete deformed Slices because requeueing causes a conflict error during Slice creation.
err = r.deleteSlices(ctx, deformed)
if err != nil {
return false, err
}
}

if len(other)+len(errored) > 0 {
terminated, err := r.ownerPodsFinished(ctx, wl)
if err != nil || !terminated {
Expand Down Expand Up @@ -313,21 +304,19 @@ func (r *WorkloadReconciler) findWorkloadSlices(ctx context.Context, wl *kueue.W
// - A slice containing deformed Slice objects (being torn down).
// - A slice containing errored Slice objects.
// - A slice containing other Slice objects (active/valid slices).
func (r *WorkloadReconciler) groupSlices(slices []v1alpha1.Slice) ([]v1alpha1.Slice, []v1alpha1.Slice, []v1alpha1.Slice, []v1alpha1.Slice) {
var deleted, deformed, errored, other []v1alpha1.Slice
func (r *WorkloadReconciler) groupSlices(slices []v1alpha1.Slice) ([]v1alpha1.Slice, []v1alpha1.Slice, []v1alpha1.Slice) {
var deleted, toDelete, other []v1alpha1.Slice
for _, slice := range slices {
switch {
case !slice.DeletionTimestamp.IsZero():
deleted = append(deleted, slice)
case core.Deformed(&slice):
deformed = append(deformed, slice)
case core.IsError(&slice):
errored = append(errored, slice)
case core.IsError(&slice) || core.IsStale(&slice):
toDelete = append(toDelete, slice)
default:
other = append(other, slice)
}
}
return deleted, deformed, errored, other
return deleted, toDelete, other
}

func (r *WorkloadReconciler) deleteSlices(ctx context.Context, slices []v1alpha1.Slice) error {
Expand Down Expand Up @@ -507,7 +496,7 @@ func shouldCreateSliceForPodSetAssignment(wl *kueue.Workload, psa kueue.PodSetAs
return false
}

func parseTopologyAssignmentIntoNodeSelector(slice *v1alpha1.Slice, topologyAssignment *kueue.TopologyAssignment, nodes map[string]corev1.Node) {
func parseTopologyAssignmentIntoPartitionIDs(slice *v1alpha1.Slice, topologyAssignment *kueue.TopologyAssignment, nodes map[string]corev1.Node) {
subBlockIDs := sets.New[string]()
// we already validated that all assignments have a valid level,
// in validateRelevantWorkload.
Expand All @@ -529,10 +518,10 @@ func (r *WorkloadReconciler) createSlice(ctx context.Context, wl *kueue.Workload
if err := controllerutil.SetControllerReference(wl, slice, r.client.Scheme()); err != nil {
return nil, err
}
parseTopologyAssignmentIntoNodeSelector(slice, psa.TopologyAssignment, nodes)
parseTopologyAssignmentIntoPartitionIDs(slice, psa.TopologyAssignment, nodes)

ps := podset.FindPodSetByName(wl.Spec.PodSets, psa.Name)
slice.Spec.Type = core.GetTPUAccelerator(ps.Template)
slice.Spec.Type = v1alpha1.Type(core.GetTPUAccelerator(ps.Template))
slice.Spec.Topology = core.GetTPUTopology(ps.Template)

if err := r.client.Create(ctx, slice); err != nil {
Expand Down Expand Up @@ -594,8 +583,8 @@ func (r *WorkloadReconciler) syncAdmissionCheckStatus(ctx context.Context, wl *k
if ac.Message != originalMessage {
// Logging error messages if exists
for _, slice := range slices {
cond := meta.FindStatusCondition(slice.Status.Conditions, string(v1alpha1.Error))
if cond != nil && cond.Status == metav1.ConditionTrue {
cond := meta.FindStatusCondition(slice.Status.Conditions, v1alpha1.SliceStateConditionType)
if cond != nil && cond.Status == metav1.ConditionFalse && cond.Reason == string(core.MMIGHealthStatusFailed) {
log.V(2).Info(
"WARNING: The Slice is not operational due to an error",
"slice", klog.KObj(&slice),
Expand All @@ -608,21 +597,16 @@ func (r *WorkloadReconciler) syncAdmissionCheckStatus(ctx context.Context, wl *k
return nil
}

func groupSlicesByState(slices []v1alpha1.Slice) (map[v1alpha1.SliceConditionType][]v1alpha1.Slice, []v1alpha1.Slice) {
slicesByState := make(map[v1alpha1.SliceConditionType][]v1alpha1.Slice)
func groupSlicesByState(slices []v1alpha1.Slice) (map[core.MMIGHealthStatus][]v1alpha1.Slice, []v1alpha1.Slice) {
slicesByState := make(map[core.MMIGHealthStatus][]v1alpha1.Slice)
var noState []v1alpha1.Slice
for _, slice := range slices {
foundState := false
for _, status := range core.SliceStates {
if meta.IsStatusConditionTrue(slice.Status.Conditions, string(status)) {
slicesByState[status] = append(slicesByState[status], slice)
foundState = true
break
}
}
if !foundState {
cond := meta.FindStatusCondition(slice.Status.Conditions, string(v1alpha1.SliceStateConditionType))
if cond == nil {
noState = append(noState, slice)
continue
}
slicesByState[core.MMIGHealthStatus(cond.Reason)] = append(slicesByState[core.MMIGHealthStatus(cond.Reason)], slice)
}
return slicesByState, noState
}
Expand All @@ -631,17 +615,17 @@ func prepareAdmissionCheckStatus(ac *kueue.AdmissionCheckState, slices []v1alpha
slicesByState, noState := groupSlicesByState(slices)

switch {
case len(slicesByState[v1alpha1.Deformed]) > 0:
ac.State = kueue.CheckStateRejected
case len(slicesByState[v1alpha1.Error]) > 0:
ac.State = kueue.CheckStateRetry
case len(slices) == len(slicesByState[v1alpha1.Degraded])+len(slicesByState[v1alpha1.Ready]):
case len(slices) == len(slicesByState[core.MMIGHealthStatusActiveDegraded])+len(slicesByState[core.MMIGHealthStatusActive]):
ac.State = kueue.CheckStateReady
case len(slicesByState[core.MMIGHealthStatusFailed]) > 0:
ac.State = kueue.CheckStateRetry
case len(slicesByState[core.MMIGHealthStatusDeactivating])+len(slicesByState[core.MMIGHealthStatusIncomplete])+len(slicesByState[core.MMIGHealthStatusUnknown]) > 0:
ac.State = kueue.CheckStateRejected
}

var stateMessages []string
if len(noState) > 0 {
stateMessages = append(stateMessages, fmt.Sprintf("%d Created", len(noState)))
stateMessages = append(stateMessages, fmt.Sprintf("%d CREATED", len(noState)))
}

for _, state := range core.SliceStates {
Expand All @@ -652,10 +636,10 @@ func prepareAdmissionCheckStatus(ac *kueue.AdmissionCheckState, slices []v1alpha

ac.Message = fmt.Sprintf("Slices are in states: %s", strings.Join(stateMessages, ", "))

if len(slicesByState[v1alpha1.Error]) > 0 {
if len(slicesByState[core.MMIGHealthStatusFailed]) > 0 {
var errMessages []string
for _, slice := range slicesByState[v1alpha1.Error] {
cond := meta.FindStatusCondition(slice.Status.Conditions, string(v1alpha1.Error))
for _, slice := range slicesByState[core.MMIGHealthStatusFailed] {
cond := meta.FindStatusCondition(slice.Status.Conditions, string(v1alpha1.SliceStateConditionType))
errMessages = append(errMessages, cond.Message)
}
ac.Message += ". Errors: " + strings.Join(errMessages, "; ")
Expand Down
Loading
Loading