Skip to content

Commit e2b9c0a

Browse files
author
smiletan
committed
fe support reconcile consistent
1 parent d143fbb commit e2b9c0a

File tree

4 files changed

+50
-7
lines changed

4 files changed

+50
-7
lines changed

pkg/controller/disaggregated_cluster_controller.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,8 @@ func (dc *DisaggregatedClusterReconciler) reorganizeStatus(ddc *dv1.DorisDisaggr
262262
//update component status.
263263
if err := sc.UpdateComponentStatus(ddc); err != nil {
264264
klog.Errorf("DorisClusterReconciler reconcile update component %s status failed.err=%s\n", sc.GetControllerName(), err.Error())
265-
return requeueIfError(err)
265+
// if failed, the cluster status is not green, in follow step will return requeue after 5 second. so, return error is not need.
266+
//return requeueIfError(err)
266267
}
267268
}
268269

@@ -273,6 +274,12 @@ func (dc *DisaggregatedClusterReconciler) reorganizeStatus(ddc *dv1.DorisDisaggr
273274
} else if ddc.Status.FEStatus.Phase != dv1.Ready || ddc.Status.ClusterHealth.CGAvailableCount < ddc.Status.ClusterHealth.CGCount {
274275
ddc.Status.ClusterHealth.Health = dv1.Yellow
275276
}
277+
278+
//if have any component not ready, should reconcile.
279+
if ddc.Status.MetaServiceStatus.Phase != dv1.Ready || ddc.Status.FEStatus.Phase != dv1.Ready || ddc.Status.ClusterHealth.CGAvailableCount != ddc.Status.ClusterHealth.CGCount {
280+
return ctrl.Result{Requeue: true}, nil
281+
}
282+
276283
return ctrl.Result{}, nil
277284
}
278285

pkg/controller/sub_controller/disaggregated_cluster/disaggregated_fe/controller.go

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package disaggregated_fe
1919

2020
import (
2121
"context"
22+
"errors"
2223
"fmt"
2324
"github.com/apache/doris-operator/api/disaggregated/v1"
2425
"github.com/apache/doris-operator/pkg/common/utils/k8s"
@@ -57,7 +58,7 @@ func New(mgr ctrl.Manager) *DisaggregatedFEController {
5758

5859
func (dfc *DisaggregatedFEController) Sync(ctx context.Context, obj client.Object) error {
5960
ddc := obj.(*v1.DorisDisaggregatedCluster)
60-
//TODO: check ms status
61+
//deploying fe when ms is available.
6162
if !dfc.msAvailable(ddc) {
6263
dfc.K8srecorder.Event(ddc, string(sc.EventNormal), string(sc.WaitMetaServiceAvailable), "meta service have not ready.")
6364
return nil
@@ -85,6 +86,7 @@ func (dfc *DisaggregatedFEController) Sync(ctx context.Context, obj client.Objec
8586
svc := dfc.newService(ddc, confMap)
8687

8788
st := dfc.NewStatefulset(ddc, confMap)
89+
//initial fe status on start. in resource process step, may be use the status record the process.
8890
dfc.initialFEStatus(ddc)
8991

9092
event, err := dfc.DefaultReconcileService(ctx, svcInternal)
@@ -195,7 +197,23 @@ func (dfc *DisaggregatedFEController) UpdateComponentStatus(obj client.Object) e
195197
ddc := obj.(*v1.DorisDisaggregatedCluster)
196198

197199
stfName := ddc.GetFEStatefulsetName()
200+
sts, err := k8s.GetStatefulSet(context.Background(), dfc.K8sclient, ddc.Namespace, stfName)
201+
if err != nil {
202+
klog.Errorf("DisaggregatedFEController UpdateComponentStatus get statefulset %s failed, err=%s", stfName, err.Error())
203+
return err
204+
}
198205

206+
//check statefulset updated or not, if this reconcile update the sts, should exclude the circumstance that get old sts and the pods not updated.
207+
updateStatefulsetKey := strings.ToLower(fmt.Sprintf(v1.UpdateStatefulsetName, ddc.GetFEStatefulsetName()))
208+
if _, updated := ddc.Annotations[updateStatefulsetKey]; updated {
209+
generation := dfc.DisaggregatedSubDefaultController.ReturnStatefulsetUpdatedGeneration(sts, updateStatefulsetKey)
210+
//if this reconcile not update statefulset will not check the generation equals or not.
211+
if ddc.Generation != generation {
212+
return errors.New("waiting statefulset upd ated")
213+
}
214+
}
215+
216+
updateRevision := sts.Status.UpdateRevision
199217
// FEStatus
200218
feSpec := ddc.Spec.FeSpec
201219
electionNumber := ddc.GetElectionNumber()
@@ -204,6 +222,8 @@ func (dfc *DisaggregatedFEController) UpdateComponentStatus(obj client.Object) e
204222
if err := dfc.K8sclient.List(context.Background(), &podList, client.InNamespace(ddc.Namespace), client.MatchingLabels(selector)); err != nil {
205223
return err
206224
}
225+
//check all pods controlled by new statefulset.
226+
allUpdated := dfc.DisaggregatedSubDefaultController.StatefulsetControlledPodsAllUseNewUpdateRevision(updateRevision, podList.Items)
207227
for _, pod := range podList.Items {
208228

209229
if ready := k8s.PodIsReady(&pod.Status); ready {
@@ -227,7 +247,7 @@ func (dfc *DisaggregatedFEController) UpdateComponentStatus(obj client.Object) e
227247
}
228248
// all fe pods are Ready, FEStatus.Phase is Ready,
229249
// for ClusterHealth.Health is green
230-
if masterAliveReplicas == electionNumber && availableReplicas == *(feSpec.Replicas) {
250+
if allUpdated && masterAliveReplicas == electionNumber && availableReplicas == *(feSpec.Replicas) {
231251
ddc.Status.FEStatus.Phase = v1.Ready
232252
}
233253

@@ -288,7 +308,23 @@ func (dfc *DisaggregatedFEController) reconcileStatefulset(ctx context.Context,
288308

289309
// apply fe StatefulSet
290310
if err := k8s.ApplyStatefulSet(ctx, dfc.K8sclient, st, func(st, est *appv1.StatefulSet) bool {
291-
return resource.StatefulsetDeepEqualWithKey(st, est, v1.DisaggregatedSpecHashValueAnnotation, false)
311+
//store annotations "doris.disaggregated.cluster/generation={generation}" on statefulset
312+
//store annotations "doris.disaggregated.cluster/update-{uniqueid}=true/false" on DorisDisaggregatedCluster
313+
equal := resource.StatefulsetDeepEqualWithKey(st, est, v1.DisaggregatedSpecHashValueAnnotation, false)
314+
if !equal {
315+
if len(st.Annotations) == 0 {
316+
st.Annotations = map[string]string{}
317+
}
318+
st_annos := (resource.Annotations)(st.Annotations)
319+
st_annos.Add(v1.UpdateStatefulsetGeneration, strconv.FormatInt(cluster.Generation, 10))
320+
if len(cluster.Annotations) == 0 {
321+
cluster.Annotations = map[string]string{}
322+
}
323+
ddc_annos := (resource.Annotations)(cluster.Annotations)
324+
msUniqueIdKey := strings.ToLower(fmt.Sprintf(v1.UpdateStatefulsetName, cluster.GetFEStatefulsetName()))
325+
ddc_annos.Add(msUniqueIdKey, "true")
326+
}
327+
return equal
292328
}); err != nil {
293329
klog.Errorf("disaggregatedFEController reconcileStatefulset apply statefulset namespace=%s name=%s failed, err=%s", st.Namespace, st.Name, err.Error())
294330
return &sc.Event{Type: sc.EventWarning, Reason: sc.FEApplyResourceFailed, Message: err.Error()}, err

pkg/controller/sub_controller/disaggregated_cluster/metaservice/controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,10 +95,10 @@ func (dms *DisaggregatedMSController) UpdateComponentStatus(obj client.Object) e
9595
}
9696

9797
//check statefulset updated or not, if this reconcile update the sts, so we should exclude the circumstance that get old sts and the pods not updated.
98-
//if this reconcile not update statefulset will not check the generation equals or not.
9998
updateStatefulsetKey := strings.ToLower(fmt.Sprintf(v1.UpdateStatefulsetName, ddc.GetMSStatefulsetName()))
10099
if _, updated := ddc.Annotations[updateStatefulsetKey]; updated {
101100
generation := dms.DisaggregatedSubDefaultController.ReturnStatefulsetUpdatedGeneration(sts, updateStatefulsetKey)
101+
//if this reconcile not update statefulset will not check the generation equals or not.
102102
if ddc.Generation != generation {
103103
return errors.New("waiting statefulset updated")
104104
}

pkg/controller/sub_controller/disaggregated_subcontroller.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -364,8 +364,8 @@ func(d *DisaggregatedSubDefaultController) StatefulsetControlledPodsAllUseNewUpd
364364

365365

366366
for _, pod := range pods {
367-
annos := pod.Annotations
368-
podControlledRevision := annos[resource.POD_CONTROLLER_REVISION_HASH_KEY]
367+
labels := pod.Labels
368+
podControlledRevision := labels[resource.POD_CONTROLLER_REVISION_HASH_KEY]
369369
//if use selector filter pods have one controlled by new revision of statefulset, represents the new revision is working.
370370
if stsUpdateRevision != podControlledRevision {
371371
return false

0 commit comments

Comments
 (0)