Skip to content

Commit d8dca83

Browse files
authored
fix downtime causing bug during rolling deployments (#3817)
* fix bug causing downtime during rolling deployments * fix bug in recoverable deploys * fix bug causing test to fail * refactor statuslogger to machine id mapper * refactor pool sizing * use errgroup instead of waitgroups and errchan
1 parent 0e5d2d1 commit d8dca83

File tree

2 files changed

+154
-53
lines changed

2 files changed

+154
-53
lines changed

internal/command/deploy/machines_deploymachinesapp.go

Lines changed: 51 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -758,39 +758,74 @@ func (md *machineDeployment) updateUsingRollingStrategy(parentCtx context.Contex
758758

759759
for group, entries := range entriesByGroup {
760760
entries := entries
761-
startIdx += len(entries)
761+
762+
warmMachines := lo.Filter(entries, func(e *machineUpdateEntry, i int) bool {
763+
return e.leasableMachine.Machine().State == "started"
764+
})
765+
coldMachines := lo.Filter(entries, func(e *machineUpdateEntry, i int) bool {
766+
return e.leasableMachine.Machine().State != "started"
767+
})
768+
762769
groupsPool.Go(func(ctx context.Context) error {
763-
return md.updateEntriesGroup(ctx, group, entries, sl, startIdx-len(entries))
770+
eg, ctx := errgroup.WithContext(ctx)
771+
772+
eg.Go(func() (err error) {
773+
poolSize := len(coldMachines)
774+
if poolSize >= STOPPED_MACHINES_POOL_SIZE {
775+
poolSize = STOPPED_MACHINES_POOL_SIZE
776+
}
777+
778+
if len(coldMachines) > 0 {
779+
// for cold machines, we can update all of them at once.
780+
// there's no need for protection against downtime since the machines are already stopped
781+
startIdx += len(coldMachines)
782+
return md.updateEntriesGroup(ctx, group, coldMachines, sl, startIdx-len(coldMachines), poolSize)
783+
}
784+
785+
return nil
786+
})
787+
788+
eg.Go(func() (err error) {
789+
// for warm machines, we update them in chunks of size, md.maxUnavailable.
790+
// this is to prevent downtime/low-latency during deployments
791+
startIdx += len(warmMachines)
792+
poolSize := md.getPoolSize(len(warmMachines))
793+
if len(warmMachines) > 0 {
794+
return md.updateEntriesGroup(ctx, group, warmMachines, sl, startIdx-len(warmMachines), poolSize)
795+
}
796+
return nil
797+
})
798+
799+
return eg.Wait()
764800
})
765801
}
766802

767803
err := groupsPool.Wait()
768804
if err != nil {
769805
span.RecordError(err)
770806
}
807+
771808
return err
772809
}
773810

774-
func (md *machineDeployment) updateEntriesGroup(parentCtx context.Context, group string, entries []*machineUpdateEntry, sl statuslogger.StatusLogger, startIdx int) error {
811+
func (md *machineDeployment) getPoolSize(totalMachines int) int {
812+
switch mu := md.maxUnavailable; {
813+
case mu >= 1:
814+
return int(mu)
815+
default:
816+
return int(math.Ceil(float64(totalMachines) * mu))
817+
}
818+
}
819+
820+
func (md *machineDeployment) updateEntriesGroup(parentCtx context.Context, group string, entries []*machineUpdateEntry, sl statuslogger.StatusLogger, startIdx int, poolSize int) error {
775821
parentCtx, span := tracing.GetTracer().Start(parentCtx, "update_entries_in_group", trace.WithAttributes(
776822
attribute.Int("start_id", startIdx),
777823
attribute.String("group", group),
778824
attribute.Int("max_unavailable", int(md.maxUnavailable)),
825+
attribute.Int("pool_size", poolSize),
779826
))
780827
defer span.End()
781828

782-
var poolSize int
783-
switch mu := md.maxUnavailable; {
784-
case mu >= 1:
785-
poolSize = int(mu)
786-
case mu > 0:
787-
poolSize = int(math.Ceil(float64(len(entries)) * mu))
788-
default:
789-
return fmt.Errorf("Invalid --max-unavailable value: %v", mu)
790-
}
791-
792-
span.SetAttributes(attribute.Int("pool_size", poolSize))
793-
794829
updatePool := pool.New().
795830
WithErrors().
796831
WithMaxGoroutines(poolSize).
@@ -801,6 +836,7 @@ func (md *machineDeployment) updateEntriesGroup(parentCtx context.Context, group
801836
e := e
802837
eCtx := statuslogger.NewContext(parentCtx, sl.Line(startIdx+idx))
803838
fmtID := e.leasableMachine.FormattedMachineId()
839+
span.SetAttributes(attribute.String("state", e.leasableMachine.Machine().State))
804840

805841
statusRunning := func() {
806842
statuslogger.LogfStatus(eCtx,

internal/command/deploy/plan.go

Lines changed: 103 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"errors"
66
"fmt"
7-
"math"
87
"strings"
98
"sync"
109
"time"
@@ -22,6 +21,36 @@ import (
2221
"golang.org/x/sync/errgroup"
2322
)
2423

24+
const (
25+
STOPPED_MACHINES_POOL_SIZE = 30
26+
)
27+
28+
type MachineLogger struct {
29+
store map[string]statuslogger.StatusLine
30+
sl statuslogger.StatusLogger
31+
}
32+
33+
func NewMachineLogger(store map[string]statuslogger.StatusLine, sl statuslogger.StatusLogger) *MachineLogger {
34+
return &MachineLogger{
35+
store: store,
36+
sl: sl,
37+
}
38+
}
39+
40+
func (m *MachineLogger) initFromMachinePairs(mp []machinePairing) {
41+
for idx, machPair := range mp {
42+
if machPair.oldMachine != nil {
43+
m.store[machPair.oldMachine.ID] = m.sl.Line(idx)
44+
} else if machPair.newMachine != nil {
45+
m.store[machPair.newMachine.ID] = m.sl.Line(idx)
46+
}
47+
}
48+
}
49+
50+
func (m *MachineLogger) getLoggerFromID(id string) statuslogger.StatusLine {
51+
return m.store[id]
52+
}
53+
2554
type AppState struct {
2655
Machines []*fly.Machine
2756
}
@@ -130,6 +159,13 @@ func (md *machineDeployment) updateMachinesWRecovery(ctx context.Context, oldApp
130159
defer sl.Destroy(false)
131160
}
132161

162+
machineLogger := NewMachineLogger(
163+
map[string]statuslogger.StatusLine{},
164+
sl,
165+
)
166+
167+
machineLogger.initFromMachinePairs(machineTuples)
168+
133169
machPairByProcessGroup := lo.GroupBy(machineTuples, func(machPair machinePairing) string {
134170
if machPair.oldMachine != nil {
135171
return machPair.oldMachine.ProcessGroup()
@@ -140,15 +176,7 @@ func (md *machineDeployment) updateMachinesWRecovery(ctx context.Context, oldApp
140176
}
141177
})
142178

143-
var poolSize int
144-
switch mu := md.maxUnavailable; {
145-
case mu >= 1:
146-
poolSize = int(mu)
147-
case mu > 0:
148-
poolSize = int(math.Ceil(float64(len(machineTuples)) * mu))
149-
default:
150-
return fmt.Errorf("Invalid --max-unavailable value: %v", mu)
151-
}
179+
poolSize := md.getPoolSize(len(machineTuples))
152180

153181
if !settings.skipLeaseAcquisition {
154182
attempts := 0
@@ -158,7 +186,7 @@ func (md *machineDeployment) updateMachinesWRecovery(ctx context.Context, oldApp
158186
}()
159187

160188
for {
161-
err := md.acquireLeases(ctx, machineTuples, poolSize, sl)
189+
err := md.acquireLeases(ctx, machineTuples, poolSize, machineLogger)
162190
if err == nil {
163191
break
164192
}
@@ -172,39 +200,78 @@ func (md *machineDeployment) updateMachinesWRecovery(ctx context.Context, oldApp
172200
}
173201

174202
defer func() {
175-
err := md.releaseLeases(ctx, machineTuples, sl)
203+
err := md.releaseLeases(ctx, machineTuples, machineLogger)
176204
if err != nil {
177205
fmt.Fprintln(md.io.ErrOut, "Failed to release leases:", err)
178206
span.RecordError(err)
179207
}
180208
}()
181209
}
182210

183-
statusLines := map[string]statuslogger.StatusLine{}
184-
for idx, machPair := range machineTuples {
185-
if machPair.oldMachine != nil {
186-
statusLines[machPair.oldMachine.ID] = sl.Line(idx)
187-
} else if machPair.newMachine != nil {
188-
statusLines[machPair.newMachine.ID] = sl.Line(idx)
189-
}
190-
}
191-
192211
pgroup := errgroup.Group{}
193212
pgroup.SetLimit(rollingStrategyMaxConcurrentGroups)
194213

195214
// We want to update by process group
196215
for _, machineTuples := range machPairByProcessGroup {
197216
machineTuples := machineTuples
198217
pgroup.Go(func() error {
199-
err := md.updateProcessGroup(ctx, machineTuples, statusLines, poolSize)
200-
if err != nil && strings.Contains(err.Error(), "lease currently held by") {
201-
err := &unrecoverableError{err: err}
218+
eg, ctx := errgroup.WithContext(ctx)
219+
220+
warmMachines := lo.Filter(machineTuples, func(e machinePairing, i int) bool {
221+
if e.oldMachine != nil && e.oldMachine.State == "started" {
222+
return true
223+
}
224+
if e.newMachine != nil && e.newMachine.State == "started" {
225+
return true
226+
}
227+
return false
228+
})
229+
230+
coldMachines := lo.Filter(machineTuples, func(e machinePairing, i int) bool {
231+
if e.oldMachine != nil && e.oldMachine.State != "started" {
232+
return true
233+
}
234+
if e.newMachine != nil && e.newMachine.State != "started" {
235+
return true
236+
}
237+
return false
238+
})
239+
240+
eg.Go(func() (err error) {
241+
poolSize := len(coldMachines)
242+
if poolSize >= STOPPED_MACHINES_POOL_SIZE {
243+
poolSize = STOPPED_MACHINES_POOL_SIZE
244+
}
245+
246+
if len(coldMachines) > 0 {
247+
// for cold machines, we can update all of them at once.
248+
// there's no need for protection against downtime since the machines are already stopped
249+
return md.updateProcessGroup(ctx, coldMachines, machineLogger, poolSize)
250+
}
251+
252+
return nil
253+
})
254+
255+
eg.Go(func() (err error) {
256+
// for warm machines, we update them in chunks of size, md.maxUnavailable.
257+
// this is to prevent downtime/low-latency during deployments
258+
poolSize := md.getPoolSize(len(warmMachines))
259+
if len(warmMachines) > 0 {
260+
return md.updateProcessGroup(ctx, warmMachines, machineLogger, poolSize)
261+
}
262+
return nil
263+
})
264+
265+
err := eg.Wait()
266+
if err != nil {
202267
span.RecordError(err)
268+
if strings.Contains(err.Error(), "lease currently held by") {
269+
err = &unrecoverableError{err: err}
270+
}
203271
return err
204272
}
205273

206-
span.RecordError(err)
207-
return err
274+
return nil
208275
})
209276
}
210277

@@ -259,7 +326,7 @@ func (md *machineDeployment) updateMachinesWRecovery(ctx context.Context, oldApp
259326
return nil
260327
}
261328

262-
func (md *machineDeployment) updateProcessGroup(ctx context.Context, machineTuples []machinePairing, statusLines map[string]statuslogger.StatusLine, poolSize int) error {
329+
func (md *machineDeployment) updateProcessGroup(ctx context.Context, machineTuples []machinePairing, machineLogger *MachineLogger, poolSize int) error {
263330
ctx, span := tracing.GetTracer().Start(ctx, "update_process_group")
264331
defer span.End()
265332

@@ -277,9 +344,9 @@ func (md *machineDeployment) updateProcessGroup(ctx context.Context, machineTupl
277344

278345
var sl statuslogger.StatusLine
279346
if oldMachine != nil {
280-
sl = statusLines[oldMachine.ID]
347+
sl = machineLogger.getLoggerFromID(oldMachine.ID)
281348
} else if newMachine != nil {
282-
sl = statusLines[newMachine.ID]
349+
sl = machineLogger.getLoggerFromID(newMachine.ID)
283350
}
284351

285352
err := md.updateMachineWChecks(ctx, oldMachine, newMachine, sl, md.io, machineCheckResult)
@@ -300,18 +367,15 @@ func (md *machineDeployment) updateProcessGroup(ctx context.Context, machineTupl
300367
return nil
301368
}
302369

303-
func (md *machineDeployment) acquireLeases(ctx context.Context, machineTuples []machinePairing, poolSize int, statusLogger statuslogger.StatusLogger) error {
370+
func (md *machineDeployment) acquireLeases(ctx context.Context, machineTuples []machinePairing, poolSize int, machToLogger *MachineLogger) error {
304371
ctx, span := tracing.GetTracer().Start(ctx, "acquire_leases")
305372

306373
leaseGroup := errgroup.Group{}
307374
leaseGroup.SetLimit(poolSize)
308375

309-
for idx, machineTuple := range machineTuples {
376+
for _, machineTuple := range machineTuples {
310377
machineTuple := machineTuple
311-
idx := idx
312-
313378
leaseGroup.Go(func() error {
314-
sl := statusLogger.Line(idx)
315379

316380
var machine *fly.Machine
317381
if machineTuple.oldMachine != nil {
@@ -321,6 +385,7 @@ func (md *machineDeployment) acquireLeases(ctx context.Context, machineTuples []
321385
} else {
322386
return nil
323387
}
388+
sl := machToLogger.getLoggerFromID(machine.ID)
324389

325390
if machine.LeaseNonce != "" {
326391
sl.LogStatus(statuslogger.StatusRunning, fmt.Sprintf("Already have lease for %s", machine.ID))
@@ -351,20 +416,18 @@ func (md *machineDeployment) acquireLeases(ctx context.Context, machineTuples []
351416
return nil
352417
}
353418

354-
func (md *machineDeployment) releaseLeases(ctx context.Context, machineTuples []machinePairing, statusLogger statuslogger.StatusLogger) error {
419+
func (md *machineDeployment) releaseLeases(ctx context.Context, machineTuples []machinePairing, machToLogger *MachineLogger) error {
355420
ctx = context.WithoutCancel(ctx)
356421
ctx, span := tracing.GetTracer().Start(ctx, "release_leases")
357422
defer span.End()
358423

359424
leaseGroup := errgroup.Group{}
360425
leaseGroup.SetLimit(len(machineTuples))
361426

362-
for idx, machineTuple := range machineTuples {
427+
for _, machineTuple := range machineTuples {
363428
machineTuple := machineTuple
364-
idx := idx
365429

366430
leaseGroup.Go(func() error {
367-
sl := statusLogger.Line(idx)
368431

369432
var machine *fly.Machine
370433
if machineTuple.oldMachine != nil {
@@ -375,6 +438,8 @@ func (md *machineDeployment) releaseLeases(ctx context.Context, machineTuples []
375438
return nil
376439
}
377440

441+
sl := machToLogger.getLoggerFromID(machine.ID)
442+
378443
sl.LogStatus(statuslogger.StatusRunning, fmt.Sprintf("Clearing lease for %s", machine.ID))
379444
if machine.LeaseNonce == "" {
380445
sl.LogStatus(statuslogger.StatusSuccess, fmt.Sprintf("Cleared lease for %s", machine.ID))

0 commit comments

Comments
 (0)