Skip to content

Commit 33aadd8

Browse files
committed
dev: baseline reconciler
1 parent d202e9d commit 33aadd8

File tree

1 file changed

+87
-57
lines changed

1 file changed

+87
-57
lines changed

pkg/controller/chk/worker-reconciler-chk.go

Lines changed: 87 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ package chk
1717
import (
1818
"context"
1919
"errors"
20-
"sync"
20+
"fmt"
2121
"time"
2222

2323
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -26,7 +26,7 @@ import (
2626
apiChk "github.com/altinity/clickhouse-operator/pkg/apis/clickhouse-keeper.altinity.com/v1"
2727
api "github.com/altinity/clickhouse-operator/pkg/apis/clickhouse.altinity.com/v1"
2828
"github.com/altinity/clickhouse-operator/pkg/apis/common/types"
29-
"github.com/altinity/clickhouse-operator/pkg/controller/chi/metrics"
29+
"github.com/altinity/clickhouse-operator/pkg/controller/chk/metrics"
3030
"github.com/altinity/clickhouse-operator/pkg/controller/common"
3131
a "github.com/altinity/clickhouse-operator/pkg/controller/common/announcer"
3232
"github.com/altinity/clickhouse-operator/pkg/controller/common/statefulset"
@@ -46,20 +46,35 @@ func (w *worker) reconcileCR(ctx context.Context, old, new *apiChk.ClickHouseKee
4646

4747
common.LogOldAndNew("non-normalized yet (native)", old, new)
4848

49+
switch {
50+
case w.isAfterFinalizerInstalled(old, new):
51+
w.a.M(new).F().Info("isAfterFinalizerInstalled - continue reconcile-1")
52+
case w.isGenerationTheSame(old, new):
53+
log.V(2).M(new).F().Info("isGenerationTheSame() - nothing to do here, exit")
54+
return nil
55+
}
56+
4957
w.a.M(new).S().P()
5058
defer w.a.M(new).E().P()
5159

60+
metrics.CRInitZeroValues(ctx, new)
61+
metrics.CRReconcilesStarted(ctx, new)
62+
startTime := time.Now()
63+
5264
new = w.buildCR(ctx, new)
5365

5466
switch {
5567
case new.EnsureRuntime().ActionPlan.HasActionsToDo():
5668
w.a.M(new).F().Info("ActionPlan has actions - continue reconcile")
69+
case w.isAfterFinalizerInstalled(new.GetAncestorT(), new):
70+
w.a.M(new).F().Info("isAfterFinalizerInstalled - continue reconcile-2")
5771
default:
5872
w.a.M(new).F().Info("ActionPlan has no actions - abort reconcile")
5973
return nil
6074
}
6175

6276
w.markReconcileStart(ctx, new)
77+
w.excludeFromMonitoring(new)
6378
w.setHostStatusesPreliminary(ctx, new)
6479

6580
if err := w.reconcile(ctx, new); err != nil {
@@ -71,6 +86,7 @@ func (w *worker) reconcileCR(ctx context.Context, old, new *apiChk.ClickHouseKee
7186
err = common.ErrCRUDAbort
7287
w.markReconcileCompletedUnsuccessfully(ctx, new, err)
7388
if errors.Is(err, common.ErrCRUDAbort) {
89+
metrics.CRReconcilesAborted(ctx, new)
7490
}
7591
} else {
7692
// Reconcile successful
@@ -81,8 +97,12 @@ func (w *worker) reconcileCR(ctx context.Context, old, new *apiChk.ClickHouseKee
8197
}
8298

8399
w.clean(ctx, new)
100+
w.addToMonitoring(new)
84101
w.waitForIPAddresses(ctx, new)
85102
w.finalizeReconcileAndMarkCompleted(ctx, new)
103+
104+
metrics.CRReconcilesCompleted(ctx, new)
105+
metrics.CRReconcilesTimings(ctx, new, time.Since(startTime).Seconds())
86106
}
87107

88108
return nil
@@ -182,7 +202,6 @@ func (w *worker) reconcile(ctx context.Context, cr *apiChk.ClickHouseKeeperInsta
182202
ctx,
183203
w.reconcileCRAuxObjectsPreliminary,
184204
w.reconcileCluster,
185-
w.reconcileShardsAndHosts,
186205
w.reconcileCRAuxObjectsFinal,
187206
)
188207
}
@@ -282,7 +301,7 @@ func (w *worker) reconcileCRAuxObjectsFinal(ctx context.Context, cr *apiChk.Clic
282301
}
283302

284303
func (w *worker) includeAllHostsIntoCluster(ctx context.Context, cr *apiChk.ClickHouseKeeperInstallation) {
285-
// Not appropriate
304+
// Not applicable
286305
}
287306

288307
// reconcileConfigMapCommon reconciles common ConfigMap
@@ -343,16 +362,16 @@ func (w *worker) reconcileHostStatefulSet(ctx context.Context, host *api.Host, o
343362
log.V(1).M(host).F().S().Info("reconcile StatefulSet start")
344363
defer log.V(1).M(host).F().E().Info("reconcile StatefulSet end")
345364

346-
version := w.getHostSoftwareVersion(ctx, host)
347-
host.Runtime.CurStatefulSet, _ = w.c.kube.STS().Get(ctx, host)
365+
w.a.V(1).M(host).F().Info("Reconcile host STS: %s. App version: %s", host.GetName(), host.Runtime.Version.Render())
348366

349-
w.a.V(1).M(host).F().Info("Reconcile host: %s. App version: %s", host.GetName(), version)
367+
// Start with force-restart host
350368
if w.shouldForceRestartHost(ctx, host) {
351369
w.a.V(1).M(host).F().Info("Reconcile host STS force restart: %s", host.GetName())
352370
_ = w.hostForceRestart(ctx, host, opts)
353371
}
354372

355373
w.stsReconciler.PrepareHostStatefulSetWithStatus(ctx, host, host.IsStopped())
374+
opts = w.prepareStsReconcileOptsWaitSection(host, opts)
356375

357376
// We are in place, where we can reconcile StatefulSet to desired configuration.
358377
w.a.V(1).M(host).F().Info("Reconcile host STS: %s. Reconcile StatefulSet", host.GetName())
@@ -380,17 +399,33 @@ func (w *worker) reconcileHostStatefulSet(ctx context.Context, host *api.Host, o
380399
func (w *worker) hostForceRestart(ctx context.Context, host *api.Host, opts *statefulset.ReconcileOptions) error {
381400
w.a.V(1).M(host).F().Info("Reconcile host. Force restart: %s", host.GetName())
382401

383-
// In case we have to force-restart host
384-
// We'll do it via replicas: 0 in StatefulSet.
385-
w.stsReconciler.PrepareHostStatefulSetWithStatus(ctx, host, true)
386-
_ = w.stsReconciler.ReconcileStatefulSet(ctx, host, false, opts)
402+
if host.IsStopped() || (w.hostSoftwareRestart(ctx, host) != nil) {
403+
_ = w.hostScaleDown(ctx, host, opts)
404+
}
405+
387406
metrics.HostReconcilesRestart(ctx, host.GetCR())
388-
// At this moment StatefulSet has 0 replicas.
389-
// First stage of RollingUpdate completed.
390407

391408
return nil
392409
}
393410

411+
func (w *worker) hostSoftwareRestart(ctx context.Context, host *api.Host) error {
412+
return fmt.Errorf("inapplicable so far")
413+
}
414+
415+
func (w *worker) hostScaleDown(ctx context.Context, host *api.Host, opts *statefulset.ReconcileOptions) error {
416+
w.a.V(1).M(host).F().Info("Reconcile host. Host shutdown via scale down: %s", host.GetName())
417+
418+
w.stsReconciler.PrepareHostStatefulSetWithStatus(ctx, host, true)
419+
err := w.stsReconciler.ReconcileStatefulSet(ctx, host, false, opts)
420+
if err != nil {
421+
w.a.V(1).M(host).F().Info("Host shutdown abort 1. Host: %s err: %v", host.GetName(), err)
422+
return err
423+
}
424+
425+
w.a.V(1).M(host).F().Info("Host shutdown success. Host: %s", host.GetName())
426+
return nil
427+
}
428+
394429
// reconcileHostService reconciles host's Service
395430
func (w *worker) reconcileHostService(ctx context.Context, host *api.Host) error {
396431
service := w.task.Creator().CreateService(interfaces.ServiceHost, host).First()
@@ -429,6 +464,9 @@ func (w *worker) reconcileCluster(ctx context.Context, cluster *apiChk.Cluster)
429464
if err := w.reconcileClusterPodDisruptionBudget(ctx, cluster); err != nil {
430465
return err
431466
}
467+
if err := w.reconcileClusterShardsAndHosts(ctx, cluster); err != nil {
468+
return err
469+
}
432470

433471
return nil
434472
}
@@ -464,15 +502,17 @@ func (w *worker) reconcileClusterPodDisruptionBudget(ctx context.Context, cluste
464502
return nil
465503
}
466504

467-
// reconcileShardsAndHosts reconciles shards and hosts of each shard
468-
func (w *worker) reconcileShardsAndHosts(ctx context.Context, shards []*apiChk.ChkShard) error {
505+
// reconcileClusterShardsAndHosts reconciles shards and hosts of each shard
506+
func (w *worker) reconcileClusterShardsAndHosts(ctx context.Context, cluster *apiChk.Cluster) error {
507+
shards := cluster.Layout.Shards[:]
508+
469509
// Sanity check - has to have shard(s)
470510
if len(shards) == 0 {
471511
return nil
472512
}
473513

474-
log.V(1).F().S().Info("reconcileShardsAndHosts start")
475-
defer log.V(1).F().E().Info("reconcileShardsAndHosts end")
514+
log.V(1).F().S().Info("reconcileClusterShardsAndHosts start")
515+
defer log.V(1).F().E().Info("reconcileClusterShardsAndHosts end")
476516

477517
opts := w.reconcileShardsAndHostsFetchOpts(ctx)
478518

@@ -492,45 +532,17 @@ func (w *worker) reconcileShardsAndHosts(ctx context.Context, shards []*apiChk.C
492532
return err
493533
}
494534

495-
// Since shard with 0 index is already done, we'll proceed with the 1-st
535+
// Since shard with 0 index is already done, we'll proceed concurrently starting with the 1-st
496536
startShard = 1
497537
}
498538

499539
// Process shards using specified concurrency level while maintaining specified max concurrency percentage.
500540
// Loop over shards.
501-
workersNum := w.getReconcileShardsWorkersNum(shards, opts)
541+
workersNum := w.getReconcileShardsWorkersNum(cluster, opts)
502542
w.a.V(1).Info("Starting rest of shards on workers. Workers num: %d", workersNum)
503-
for startShardIndex := startShard; startShardIndex < len(shards); startShardIndex += workersNum {
504-
endShardIndex := startShardIndex + workersNum
505-
if endShardIndex > len(shards) {
506-
endShardIndex = len(shards)
507-
}
508-
concurrentlyProcessedShards := shards[startShardIndex:endShardIndex]
509-
510-
// Processing error protected with mutex
511-
var err error
512-
var errLock sync.Mutex
513-
514-
wg := sync.WaitGroup{}
515-
wg.Add(len(concurrentlyProcessedShards))
516-
// Launch shard concurrent processing
517-
for j := range concurrentlyProcessedShards {
518-
shard := concurrentlyProcessedShards[j]
519-
go func() {
520-
defer wg.Done()
521-
if e := w.reconcileShardWithHosts(ctx, shard); e != nil {
522-
errLock.Lock()
523-
err = e
524-
errLock.Unlock()
525-
return
526-
}
527-
}()
528-
}
529-
wg.Wait()
530-
if err != nil {
531-
w.a.V(1).Warning("Skipping rest of shards due to an error: %v", err)
532-
return err
533-
}
543+
if err := w.runConcurrently(ctx, workersNum, startShard, shards[startShard:]); err != nil {
544+
w.a.V(1).Info("Finished with ERROR rest of shards on workers: %d, err: %v", workersNum, err)
545+
return err
534546
}
535547
w.a.V(1).Info("Finished successfully rest of shards on workers: %d", workersNum)
536548
return nil
@@ -574,14 +586,14 @@ func (w *worker) reconcileHost(ctx context.Context, host *api.Host) error {
574586
w.a.V(2).M(host).S().P()
575587
defer w.a.V(2).M(host).E().P()
576588

589+
metrics.HostReconcilesStarted(ctx, host.GetCR())
590+
startTime := time.Now()
591+
577592
if host.IsFirstInCR() {
578593
_ = w.reconcileCRServicePreliminary(ctx, host.GetCR())
579594
defer w.reconcileCRServiceFinal(ctx, host.GetCR())
580595
}
581596

582-
// Create artifacts
583-
w.stsReconciler.PrepareHostStatefulSetWithStatus(ctx, host, false)
584-
585597
w.a.V(1).M(host).F().Info("Reconcile host: %s. App version: %s", host.GetName(), host.Runtime.Version.Render())
586598

587599
if err := w.reconcileHostPrepare(ctx, host); err != nil {
@@ -620,6 +632,10 @@ func (w *worker) reconcileHost(ctx context.Context, host *api.Host) error {
620632
},
621633
},
622634
})
635+
636+
metrics.HostReconcilesCompleted(ctx, host.GetCR())
637+
metrics.HostReconcilesTimings(ctx, host.GetCR(), time.Since(startTime).Seconds())
638+
623639
return nil
624640
}
625641

@@ -641,12 +657,9 @@ func (w *worker) reconcileHostMain(ctx context.Context, host *api.Host) error {
641657
stsReconcileOpts *statefulset.ReconcileOptions
642658
)
643659

644-
//if !host.IsLast() {
645-
// stsReconcileOpts = stsReconcileOpts.SetDoNotWait()
646-
//}
647-
648660
// Reconcile ConfigMap
649661
if err := w.reconcileConfigMapHost(ctx, host); err != nil {
662+
metrics.HostReconcilesErrors(ctx, host.GetCR())
650663
w.a.V(1).
651664
M(host).F().
652665
Warning("Reconcile Host Main - unable to reconcile ConfigMap. Host: %s Err: %v", host.GetName(), err)
@@ -682,6 +695,7 @@ func (w *worker) reconcileHostMain(ctx context.Context, host *api.Host) error {
682695

683696
// Reconcile StatefulSet
684697
if err := w.reconcileHostStatefulSet(ctx, host, stsReconcileOpts); err != nil {
698+
metrics.HostReconcilesErrors(ctx, host.GetCR())
685699
w.a.V(1).
686700
M(host).F().
687701
Warning("Reconcile Host Main - unable to reconcile StatefulSet. Host: %s Err: %v", host.GetName(), err)
@@ -705,6 +719,22 @@ func (w *worker) reconcileHostMain(ctx context.Context, host *api.Host) error {
705719
return nil
706720
}
707721

722+
func (w *worker) prepareStsReconcileOptsWaitSection(host *api.Host, opts *statefulset.ReconcileOptions) *statefulset.ReconcileOptions {
723+
if host.GetCluster().GetReconcile().Host.Wait.Probes.GetStartup().IsTrue() {
724+
opts = opts.SetWaitUntilStarted()
725+
w.a.V(1).
726+
M(host).F().
727+
Warning("Setting option SetWaitUntilStarted ")
728+
}
729+
if host.GetCluster().GetReconcile().Host.Wait.Probes.GetReadiness().IsTrue() {
730+
opts = opts.SetWaitUntilReady()
731+
w.a.V(1).
732+
M(host).F().
733+
Warning("Setting option SetWaitUntilReady")
734+
}
735+
return opts
736+
}
737+
708738
func (w *worker) reconcileHostPVCs(ctx context.Context, host *api.Host) storage.ErrorDataPersistence {
709739
return storage.NewStorageReconciler(
710740
w.task,

0 commit comments

Comments
 (0)