Skip to content

Commit 218ed9d

Browse files
committed
fix: remove backoff retry from coordinator metadata status operations
The status resource was using backoff.NewExponentialBackOff() from the cenkalti library directly, which has a default MaxElapsedTime of 15 minutes and is not context-aware. This caused several issues: - Held mutex lock during entire retry loop (up to 15 minutes) - Not context-aware, so retries couldn't be cancelled on shutdown - Errors silently discarded when retries exhausted - Inconsistent with the rest of the codebase which uses oxiatime.NewBackOff(ctx) The callers (shard controllers, elections, periodic tasks) already have their own retry mechanisms, making the status resource retry redundant. This change removes the backoff retry, returns errors to callers, and lets them handle failures appropriately.
1 parent 9ce1311 commit 218ed9d

File tree

6 files changed

+140
-147
lines changed

6 files changed

+140
-147
lines changed

oxiad/coordinator/balancer/scheduler_test.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,17 +43,22 @@ func (m *mockStatusResource) LoadWithVersion() (*model.ClusterStatus, metadata.V
4343
return m.status, "0"
4444
}
4545

46-
func (m *mockStatusResource) Swap(_ *model.ClusterStatus, _ metadata.Version) bool {
47-
return true
46+
func (m *mockStatusResource) Swap(_ *model.ClusterStatus, _ metadata.Version) (bool, error) {
47+
return true, nil
4848
}
4949

50-
func (m *mockStatusResource) Update(newStatus *model.ClusterStatus) {
50+
func (m *mockStatusResource) Update(newStatus *model.ClusterStatus) error {
5151
m.status = newStatus
52+
return nil
5253
}
5354

54-
func (m *mockStatusResource) UpdateShardMetadata(_ string, _ int64, _ model.ShardMetadata) {}
55+
func (m *mockStatusResource) UpdateShardMetadata(_ string, _ int64, _ model.ShardMetadata) error {
56+
return nil
57+
}
5558

56-
func (m *mockStatusResource) DeleteShardMetadata(_ string, _ int64) {}
59+
func (m *mockStatusResource) DeleteShardMetadata(_ string, _ int64) error {
60+
return nil
61+
}
5762

5863
func (m *mockStatusResource) IsReady(_ *model.ClusterConfig) bool {
5964
return true

oxiad/coordinator/controller/shard_controller.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -427,7 +427,9 @@ func (s *shardController) deleteShard() error {
427427
)
428428
}
429429

430-
s.statusResource.DeleteShardMetadata(s.namespace, s.shard)
430+
if err := s.statusResource.DeleteShardMetadata(s.namespace, s.shard); err != nil {
431+
return err
432+
}
431433
s.eventListener.ShardDeleted(s.shard)
432434
return s.close()
433435
}
@@ -508,7 +510,10 @@ func (s *shardController) handlePeriodicTasks() {
508510
}
509511

510512
// Update the shard status
511-
s.statusResource.UpdateShardMetadata(s.namespace, s.shard, mutShardMeta)
513+
if err := s.statusResource.UpdateShardMetadata(s.namespace, s.shard, mutShardMeta); err != nil {
514+
s.log.Warn("Failed to update shard metadata", "error", err)
515+
return
516+
}
512517
s.metadata.Store(mutShardMeta)
513518
}
514519

oxiad/coordinator/controller/shard_controller_election.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -434,7 +434,9 @@ func (e *ShardElection) start() (model.Server, error) {
434434
metadata.Term++
435435
metadata.Ensemble = e.refreshedEnsemble(metadata.Ensemble)
436436
})
437-
e.statusResource.UpdateShardMetadata(e.namespace, e.shard, mutShardMeta)
437+
if err := e.statusResource.UpdateShardMetadata(e.namespace, e.shard, mutShardMeta); err != nil {
438+
return model.Server{}, err
439+
}
438440

439441
if e.changeEnsembleAction != nil {
440442
e.prepareIfChangeEnsemble(&mutShardMeta)
@@ -484,7 +486,9 @@ func (e *ShardElection) start() (model.Server, error) {
484486
leader := mutShardMeta.Leader
485487
leaderEntry := candidatesStatus[*leader]
486488

487-
e.statusResource.UpdateShardMetadata(e.namespace, e.shard, mutShardMeta)
489+
if err = e.statusResource.UpdateShardMetadata(e.namespace, e.shard, mutShardMeta); err != nil {
490+
return model.Server{}, err
491+
}
488492
e.meta.Store(mutShardMeta)
489493
if e.eventListener != nil {
490494
e.eventListener.LeaderElected(e.shard, newLeader, maps.Keys(followers))

oxiad/coordinator/controller/split_controller.go

Lines changed: 54 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ func (sc *SplitController) currentPhase() *model.SplitPhase {
215215
}
216216

217217
// updatePhase atomically updates the split phase on both parent and children.
218-
func (sc *SplitController) updatePhase(newPhase model.SplitPhase) {
218+
func (sc *SplitController) updatePhase(newPhase model.SplitPhase) error {
219219
status := sc.statusResource.Load()
220220
cloned := status.Clone()
221221

@@ -235,7 +235,7 @@ func (sc *SplitController) updatePhase(newPhase model.SplitPhase) {
235235
}
236236
}
237237

238-
sc.statusResource.Update(cloned)
238+
return sc.statusResource.Update(cloned)
239239
}
240240

241241
// runBootstrap validates preconditions, fences child ensemble members, elects
@@ -285,13 +285,14 @@ func (sc *SplitController) runBootstrap() error {
285285
childLeaders[childId] = childMeta.Leader.Internal
286286
}
287287
}
288-
sc.updateParentMeta(func(meta *model.ShardMetadata) {
288+
if err := sc.updateParentMeta(func(meta *model.ShardMetadata) {
289289
meta.Split.ParentTermAtBootstrap = parentTerm
290290
meta.Split.ChildLeadersAtBootstrap = childLeaders
291-
})
291+
}); err != nil {
292+
return err
293+
}
292294

293-
sc.updatePhase(model.SplitPhaseCatchUp)
294-
return nil
295+
return sc.updatePhase(model.SplitPhaseCatchUp)
295296
}
296297

297298
// fenceAndElectChild fences a child shard's ensemble and elects a leader.
@@ -318,11 +319,13 @@ func (sc *SplitController) fenceAndElectChild(childId int64) error {
318319

319320
childLeader := sc.pickLeader(headEntries)
320321

321-
sc.updateChildMeta(childId, func(meta *model.ShardMetadata) {
322+
if err := sc.updateChildMeta(childId, func(meta *model.ShardMetadata) {
322323
meta.Term = childTerm
323324
meta.Leader = &childLeader
324325
meta.Status = model.ShardStatusSteadyState
325-
})
326+
}); err != nil {
327+
return err
328+
}
326329

327330
// Elect the child leader so it replicates to its followers immediately.
328331
// Without this, only the single child leader node has the data.
@@ -421,8 +424,7 @@ func (sc *SplitController) runCatchUp() error {
421424
}
422425
if caughtUp {
423426
sc.log.Info("All children caught up")
424-
sc.updatePhase(model.SplitPhaseCutover)
425-
return nil
427+
return sc.updatePhase(model.SplitPhaseCutover)
426428
}
427429
}
428430
}
@@ -443,14 +445,18 @@ func (sc *SplitController) checkObserverCursorsStale() (bool, error) {
443445
slog.Int64("bootstrap-term", parentMeta.Split.ParentTermAtBootstrap),
444446
slog.Int64("current-term", parentMeta.Term),
445447
)
446-
sc.updatePhase(model.SplitPhaseBootstrap)
448+
if err := sc.updatePhase(model.SplitPhaseBootstrap); err != nil {
449+
return false, err
450+
}
447451
return true, nil
448452
}
449453

450454
// Child leader election: the observer cursor targets the old (dead) leader.
451455
// Remove the stale cursor and fall back to Bootstrap to re-add.
452456
if sc.removeStaleChildObservers(parentMeta) {
453-
sc.updatePhase(model.SplitPhaseBootstrap)
457+
if err := sc.updatePhase(model.SplitPhaseBootstrap); err != nil {
458+
return false, err
459+
}
454460
return true, nil
455461
}
456462

@@ -562,11 +568,13 @@ func (sc *SplitController) runCutover() error {
562568
)
563569

564570
// Update parent term in metadata
565-
sc.updateParentMeta(func(meta *model.ShardMetadata) {
571+
if err := sc.updateParentMeta(func(meta *model.ShardMetadata) {
566572
meta.Term = newParentTerm
567573
meta.Leader = nil
568574
meta.Status = model.ShardStatusElection
569-
})
575+
}); err != nil {
576+
return err
577+
}
570578

571579
// Step 2: Wait for children to commit parentFinalOffset.
572580
// Children were already elected leader in Bootstrap, so commitOffset
@@ -587,14 +595,18 @@ func (sc *SplitController) runCutover() error {
587595
// Step 4: Clear split metadata from children and mark parent for deletion.
588596
// Children are now independent shards.
589597
for _, childId := range []int64{sc.leftChildId, sc.rightChildId} {
590-
sc.updateChildMeta(childId, func(meta *model.ShardMetadata) {
598+
if err := sc.updateChildMeta(childId, func(meta *model.ShardMetadata) {
591599
meta.Split = nil
592-
})
600+
}); err != nil {
601+
return err
602+
}
593603
}
594604

595-
sc.updateParentMeta(func(meta *model.ShardMetadata) {
605+
if err := sc.updateParentMeta(func(meta *model.ShardMetadata) {
596606
meta.Status = model.ShardStatusDeleting
597-
})
607+
}); err != nil {
608+
return err
609+
}
598610

599611
// Step 5: Notify the coordinator. This triggers the parent shard
600612
// controller's DeleteShard (which retries indefinitely with backoff)
@@ -603,9 +615,11 @@ func (sc *SplitController) runCutover() error {
603615

604616
// Clear split metadata from parent — the split controller's job is done.
605617
// The parent shard controller handles the actual deletion.
606-
sc.updateParentMeta(func(meta *model.ShardMetadata) {
618+
if err := sc.updateParentMeta(func(meta *model.ShardMetadata) {
607619
meta.Split = nil
608-
})
620+
}); err != nil {
621+
return err
622+
}
609623

610624
return nil
611625
}
@@ -650,13 +664,19 @@ func (sc *SplitController) abort() {
650664

651665
// Delete child shards from status.
652666
for _, childId := range []int64{sc.leftChildId, sc.rightChildId} {
653-
sc.statusResource.DeleteShardMetadata(sc.namespace, childId)
667+
if err := sc.statusResource.DeleteShardMetadata(sc.namespace, childId); err != nil {
668+
sc.log.Warn("Failed to delete child shard metadata during abort",
669+
slog.Int64("child-shard", childId),
670+
slog.Any("error", err))
671+
}
654672
}
655673

656674
// Clear parent split metadata.
657-
sc.updateParentMeta(func(meta *model.ShardMetadata) {
675+
if err := sc.updateParentMeta(func(meta *model.ShardMetadata) {
658676
meta.Split = nil
659-
})
677+
}); err != nil {
678+
sc.log.Warn("Failed to clear parent split metadata during abort", slog.Any("error", err))
679+
}
660680

661681
sc.log.Info("Split aborted, parent restored")
662682

@@ -684,23 +704,24 @@ func (sc *SplitController) loadShardMeta(shardId int64) *model.ShardMetadata {
684704
return &cloned
685705
}
686706

687-
func (sc *SplitController) updateParentMeta(fn func(meta *model.ShardMetadata)) {
688-
sc.updateShardMeta(sc.parentShardId, fn)
707+
func (sc *SplitController) updateParentMeta(fn func(meta *model.ShardMetadata)) error {
708+
return sc.updateShardMeta(sc.parentShardId, fn)
689709
}
690710

691-
func (sc *SplitController) updateChildMeta(childId int64, fn func(meta *model.ShardMetadata)) {
692-
sc.updateShardMeta(childId, fn)
711+
func (sc *SplitController) updateChildMeta(childId int64, fn func(meta *model.ShardMetadata)) error {
712+
return sc.updateShardMeta(childId, fn)
693713
}
694714

695-
func (sc *SplitController) updateShardMeta(shardId int64, fn func(meta *model.ShardMetadata)) {
715+
func (sc *SplitController) updateShardMeta(shardId int64, fn func(meta *model.ShardMetadata)) error {
696716
status := sc.statusResource.Load()
697717
cloned := status.Clone()
698718
ns := cloned.Namespaces[sc.namespace]
699719
if meta, exists := ns.Shards[shardId]; exists {
700720
fn(&meta)
701721
ns.Shards[shardId] = meta
702-
sc.statusResource.Update(cloned)
722+
return sc.statusResource.Update(cloned)
703723
}
724+
return nil
704725
}
705726

706727
// fenceEnsemble sends NewTerm to all ensemble members and returns the
@@ -860,11 +881,13 @@ func (sc *SplitController) reelectChild(childId int64) error {
860881
}
861882

862883
// Update child metadata
863-
sc.updateChildMeta(childId, func(meta *model.ShardMetadata) {
884+
if err := sc.updateChildMeta(childId, func(meta *model.ShardMetadata) {
864885
meta.Term = newTerm
865886
meta.Leader = &newLeader
866887
meta.Status = model.ShardStatusSteadyState
867-
})
888+
}); err != nil {
889+
return err
890+
}
868891

869892
sc.log.Info("Child re-elected in clean term",
870893
slog.Int64("child-shard", childId),

oxiad/coordinator/coordinator.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,13 @@ func (c *coordinator) ConfigChanged(newConfig *model.ClusterConfig) {
173173
var shardsToDelete []int64
174174
for {
175175
clusterStatus, shardsToAdd, shardsToDelete = util.ApplyClusterChanges(newConfig, currentStatus, c.selectNewEnsemble)
176-
if !c.statusResource.Swap(clusterStatus, version) {
176+
swapped, err := c.statusResource.Swap(clusterStatus, version)
177+
if err != nil {
178+
c.Warn("Failed to swap cluster status", slog.Any("error", err))
179+
currentStatus, version = c.statusResource.LoadWithVersion()
180+
continue
181+
}
182+
if !swapped {
177183
currentStatus, version = c.statusResource.LoadWithVersion()
178184
continue
179185
}
@@ -563,7 +569,9 @@ func (c *coordinator) InitiateSplit(namespace string, parentShardId int64, split
563569
}
564570

565571
// Persist
566-
c.statusResource.Update(cloned)
572+
if err = c.statusResource.Update(cloned); err != nil {
573+
return 0, 0, errors.Wrap(err, "failed to persist split status")
574+
}
567575

568576
c.Info("Split initiated",
569577
slog.Int64("parent-shard", parentShardId),
@@ -777,7 +785,9 @@ func NewCoordinator(meta metadata.Provider,
777785

778786
clusterStatus, _, _ = util.ApplyClusterChanges(clusterConfig, model.NewClusterStatus(), c.selectNewEnsemble)
779787

780-
c.statusResource.Update(clusterStatus)
788+
if err = c.statusResource.Update(clusterStatus); err != nil {
789+
return nil, nil, errors.Wrap(err, "failed to persist initial cluster status")
790+
}
781791
} else {
782792
c.Info("Checking cluster config", slog.Any("clusterConfig", clusterConfig))
783793

@@ -787,7 +797,9 @@ func NewCoordinator(meta metadata.Provider,
787797
clusterStatus, c.selectNewEnsemble)
788798

789799
if len(shardsToAdd) > 0 || len(shardsToDelete) > 0 {
790-
c.statusResource.Update(clusterStatus)
800+
if err = c.statusResource.Update(clusterStatus); err != nil {
801+
return nil, nil, errors.Wrap(err, "failed to persist cluster status changes")
802+
}
791803
}
792804
}
793805

0 commit comments

Comments
 (0)