Skip to content

Commit 8519cb0

Browse files
authored
Stagger controller join sequencing to optimize etcd quorum formation (#885)
* Stagger controller join sequencing to optimize etcd quorum formation Signed-off-by: Kimmo Lehto <[email protected]> --------- Signed-off-by: Kimmo Lehto <[email protected]>
1 parent 0e36086 commit 8519cb0

File tree

1 file changed

+132
-49
lines changed

1 file changed

+132
-49
lines changed

phase/install_controllers.go

Lines changed: 132 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package phase
22

33
import (
4+
"bytes"
45
"context"
56
"fmt"
67
"strings"
@@ -17,8 +18,9 @@ import (
1718
// InstallControllers installs k0s controllers and joins them to the cluster
1819
type InstallControllers struct {
1920
GenericPhase
20-
hosts cluster.Hosts
21-
leader *cluster.Host
21+
hosts cluster.Hosts
22+
leader *cluster.Host
23+
numRunning int
2224
}
2325

2426
// Title for the phase
@@ -30,9 +32,14 @@ func (p *InstallControllers) Title() string {
3032
func (p *InstallControllers) Prepare(config *v1beta1.Cluster) error {
3133
p.Config = config
3234
p.leader = p.Config.Spec.K0sLeader()
35+
var countRunning int
3336
p.hosts = p.Config.Spec.Hosts.Controllers().Filter(func(h *cluster.Host) bool {
37+
if h.Metadata.K0sRunningVersion != nil {
38+
countRunning++
39+
}
3440
return !h.Reset && !h.Metadata.NeedsUpgrade && (h != p.leader && h.Metadata.K0sRunningVersion == nil)
3541
})
42+
p.numRunning = countRunning
3643
return nil
3744
}
3845

@@ -94,7 +101,7 @@ func (p *InstallControllers) Run(ctx context.Context) error {
94101
ctx,
95102
p.leader,
96103
"controller",
97-
time.Duration(10)*time.Minute,
104+
30*time.Minute,
98105
)
99106
if err != nil {
100107
return err
@@ -124,71 +131,147 @@ func (p *InstallControllers) Run(ctx context.Context) error {
124131
if err != nil {
125132
return err
126133
}
127-
return p.parallelDo(ctx, p.hosts, func(ctx context.Context, h *cluster.Host) error {
128-
tokenPath := h.K0sJoinTokenPath()
129-
log.Infof("%s: writing join token to %s", h, tokenPath)
130-
err := p.Wet(h, fmt.Sprintf("write k0s join token to %s", tokenPath), func() error {
131-
return h.Configurer.WriteFile(h, tokenPath, h.Metadata.K0sTokenData.Token, "0600")
132-
})
133-
if err != nil {
134+
135+
// just one controller to install, install it and return
136+
if len(p.hosts) == 1 {
137+
log.Debug("only one controller to install")
138+
return p.installK0s(ctx, p.hosts[0])
139+
}
140+
141+
if p.manager.Concurrency < 2 {
142+
log.Debugf("installing %d controllers sequantially because concurrency is set to 1", len(p.hosts))
143+
return p.hosts.Each(ctx, p.installK0s)
144+
}
145+
146+
var remaining cluster.Hosts
147+
remaining = append(remaining, p.hosts...)
148+
149+
if p.numRunning == 1 && len(remaining) >= 2 {
150+
perBatch := min(2, p.manager.Concurrency)
151+
firstBatch := remaining[:perBatch]
152+
153+
log.Debugf("installing first %d controllers to reach HA state and quorum", perBatch)
154+
if err := firstBatch.BatchedParallelEach(ctx, perBatch, p.installK0s); err != nil {
134155
return err
135156
}
157+
remaining = remaining[perBatch:]
158+
p.numRunning += perBatch
136159

137-
if p.Config.Spec.K0s.DynamicConfig {
138-
h.InstallFlags.AddOrReplace("--enable-dynamic-config")
160+
if len(remaining) == 0 {
161+
log.Debug("all controllers installed")
162+
return nil
139163
}
164+
log.Debugf("remaining %d controllers to install", len(remaining))
165+
}
140166

141-
if Force {
142-
log.Warnf("%s: --force given, using k0s install with --force", h)
143-
h.InstallFlags.AddOrReplace("--force=true")
167+
if p.numRunning%2 == 0 {
168+
log.Debug("even number of running controllers, installing one first to reach quorum")
169+
if err := p.installK0s(ctx, remaining[0]); err != nil {
170+
return err
144171
}
172+
remaining = remaining[1:]
173+
p.numRunning++
174+
}
145175

146-
cmd, err := h.K0sInstallCommand()
147-
if err != nil {
176+
// install the rest in parallel in uneven quorum-optimized batches
177+
for len(remaining) > 0 {
178+
currentTotal := p.numRunning + len(remaining)
179+
quorum := (currentTotal / 2) + 1
180+
safeMax := (quorum / 2)
181+
if safeMax < 1 {
182+
safeMax = 1
183+
}
184+
185+
perBatch := min(safeMax, p.manager.Concurrency, len(remaining))
186+
187+
log.Debugf("installing next %d controllers (quorum=%d, total=%d)", perBatch, quorum, currentTotal)
188+
189+
batch := remaining[:perBatch]
190+
if err := batch.BatchedParallelEach(ctx, perBatch, p.installK0s); err != nil {
148191
return err
149192
}
150-
log.Infof("%s: installing k0s controller", h)
151-
err = p.Wet(h, fmt.Sprintf("install k0s controller using `%s", strings.ReplaceAll(cmd, h.Configurer.K0sBinaryPath(), "k0s")), func() error {
152-
return h.Exec(cmd, exec.Sudo(h))
153-
})
193+
194+
remaining = remaining[perBatch:]
195+
p.numRunning += perBatch
196+
}
197+
log.Debug("all controllers installed")
198+
return nil
199+
}
200+
201+
func (p *InstallControllers) installK0s(ctx context.Context, h *cluster.Host) error {
202+
tokenPath := h.K0sJoinTokenPath()
203+
log.Infof("%s: writing join token to %s", h, tokenPath)
204+
err := p.Wet(h, fmt.Sprintf("write k0s join token to %s", tokenPath), func() error {
205+
return h.Configurer.WriteFile(h, tokenPath, h.Metadata.K0sTokenData.Token, "0600")
206+
})
207+
if err != nil {
208+
return err
209+
}
210+
211+
if p.Config.Spec.K0s.DynamicConfig {
212+
h.InstallFlags.AddOrReplace("--enable-dynamic-config")
213+
}
214+
215+
if Force {
216+
log.Warnf("%s: --force given, using k0s install with --force", h)
217+
h.InstallFlags.AddOrReplace("--force=true")
218+
}
219+
220+
cmd, err := h.K0sInstallCommand()
221+
if err != nil {
222+
return err
223+
}
224+
log.Infof("%s: installing k0s controller", h)
225+
err = p.Wet(h, fmt.Sprintf("install k0s controller using `%s", strings.ReplaceAll(cmd, h.Configurer.K0sBinaryPath(), "k0s")), func() error {
226+
var stdout, stderr bytes.Buffer
227+
runner, err := h.ExecStreams(cmd, nil, &stdout, &stderr, exec.Sudo(h))
154228
if err != nil {
155-
return err
229+
return fmt.Errorf("run k0s install: %w", err)
230+
}
231+
if err := runner.Wait(); err != nil {
232+
log.Errorf("%s: k0s install failed: %s %s", h, stdout.String(), stderr.String())
233+
return fmt.Errorf("k0s install failed: %w", err)
156234
}
157-
h.Metadata.K0sInstalled = true
158-
h.Metadata.K0sRunningVersion = p.Config.Spec.K0s.Version
159235

160-
if p.IsWet() {
161-
if len(h.Environment) > 0 {
162-
log.Infof("%s: updating service environment", h)
163-
if err := h.Configurer.UpdateServiceEnvironment(h, h.K0sServiceName(), h.Environment); err != nil {
164-
return err
165-
}
166-
}
236+
return nil
237+
})
238+
if err != nil {
239+
return err
240+
}
241+
h.Metadata.K0sInstalled = true
242+
h.Metadata.K0sRunningVersion = p.Config.Spec.K0s.Version
167243

168-
log.Infof("%s: starting service", h)
169-
if err := h.Configurer.StartService(h, h.K0sServiceName()); err != nil {
244+
if p.IsWet() {
245+
if len(h.Environment) > 0 {
246+
log.Infof("%s: updating service environment", h)
247+
if err := h.Configurer.UpdateServiceEnvironment(h, h.K0sServiceName(), h.Environment); err != nil {
170248
return err
171249
}
250+
}
172251

173-
log.Infof("%s: waiting for the k0s service to start", h)
174-
if err := retry.AdaptiveTimeout(ctx, retry.DefaultTimeout, node.ServiceRunningFunc(h, h.K0sServiceName())); err != nil {
175-
return err
176-
}
252+
log.Infof("%s: starting service", h)
253+
if err := h.Configurer.StartService(h, h.K0sServiceName()); err != nil {
254+
return err
255+
}
177256

178-
err := retry.AdaptiveTimeout(ctx, retry.DefaultTimeout, func(_ context.Context) error {
179-
out, err := h.ExecOutput(h.Configurer.KubectlCmdf(h, h.K0sDataDir(), "get --raw='/readyz?verbose=true'"), exec.Sudo(h))
180-
if err != nil {
181-
return fmt.Errorf("readiness endpoint reports %q: %w", out, err)
182-
}
183-
return nil
184-
})
257+
log.Infof("%s: waiting for the k0s service to start", h)
258+
if err := retry.AdaptiveTimeout(ctx, retry.DefaultTimeout, node.ServiceRunningFunc(h, h.K0sServiceName())); err != nil {
259+
return err
260+
}
261+
262+
err := retry.AdaptiveTimeout(ctx, retry.DefaultTimeout, func(_ context.Context) error {
263+
out, err := h.ExecOutput(h.Configurer.KubectlCmdf(h, h.K0sDataDir(), "get --raw='/readyz?verbose=true'"), exec.Sudo(h))
185264
if err != nil {
186-
return fmt.Errorf("controller did not reach ready state: %w", err)
265+
return fmt.Errorf("readiness endpoint reports %q: %w", out, err)
187266
}
188-
189-
h.Metadata.Ready = true
267+
return nil
268+
})
269+
if err != nil {
270+
return fmt.Errorf("controller did not reach ready state: %w", err)
190271
}
191272

192-
return nil
193-
})
273+
h.Metadata.Ready = true
274+
}
275+
276+
return nil
194277
}

0 commit comments

Comments
 (0)