From efe788a47e63626e942d92cb6288f241ba0f02a5 Mon Sep 17 00:00:00 2001 From: mattisonchao Date: Wed, 25 Mar 2026 13:30:17 +0800 Subject: [PATCH 1/4] fix: remove backoff retry from coordinator metadata status operations The status resource was using backoff.NewExponentialBackOff() from the cenkalti library directly, which has a default MaxElapsedTime of 15 minutes and is not context-aware. This caused several issues: - Held mutex lock during entire retry loop (up to 15 minutes) - Not context-aware, so retries couldn't be cancelled on shutdown - Errors silently discarded when retries exhausted - Inconsistent with the rest of the codebase which uses oxiatime.NewBackOff(ctx) The callers (shard controllers, elections, periodic tasks) already have their own retry mechanisms, making the status resource retry redundant. This change removes the backoff retry, returns errors to callers, and lets them handle failures appropriately. Signed-off-by: mattisonchao --- oxiad/coordinator/balancer/scheduler_test.go | 15 +- .../controller/shard_controller.go | 9 +- .../controller/shard_controller_election.go | 8 +- .../controller/split_controller.go | 85 ++++++---- oxiad/coordinator/coordinator.go | 20 ++- oxiad/coordinator/resource/status_resource.go | 150 ++++++------------ 6 files changed, 140 insertions(+), 147 deletions(-) diff --git a/oxiad/coordinator/balancer/scheduler_test.go b/oxiad/coordinator/balancer/scheduler_test.go index 5f3bcd05..e997fa26 100644 --- a/oxiad/coordinator/balancer/scheduler_test.go +++ b/oxiad/coordinator/balancer/scheduler_test.go @@ -43,17 +43,22 @@ func (m *mockStatusResource) LoadWithVersion() (*model.ClusterStatus, metadata.V return m.status, "0" } -func (m *mockStatusResource) Swap(_ *model.ClusterStatus, _ metadata.Version) bool { - return true +func (m *mockStatusResource) Swap(_ *model.ClusterStatus, _ metadata.Version) (bool, error) { + return true, nil } -func (m *mockStatusResource) Update(newStatus *model.ClusterStatus) { +func (m *mockStatusResource) Update(newStatus *model.ClusterStatus) error { m.status = newStatus + return nil } -func (m *mockStatusResource) UpdateShardMetadata(_ string, _ int64, _ model.ShardMetadata) {} +func (m *mockStatusResource) UpdateShardMetadata(_ string, _ int64, _ model.ShardMetadata) error { + return nil +} -func (m *mockStatusResource) DeleteShardMetadata(_ string, _ int64) {} +func (m *mockStatusResource) DeleteShardMetadata(_ string, _ int64) error { + return nil +} func (m *mockStatusResource) IsReady(_ *model.ClusterConfig) bool { return true diff --git a/oxiad/coordinator/controller/shard_controller.go b/oxiad/coordinator/controller/shard_controller.go index 59e55086..887106db 100644 --- a/oxiad/coordinator/controller/shard_controller.go +++ b/oxiad/coordinator/controller/shard_controller.go @@ -427,7 +427,9 @@ func (s *shardController) deleteShard() error { ) } - s.statusResource.DeleteShardMetadata(s.namespace, s.shard) + if err := s.statusResource.DeleteShardMetadata(s.namespace, s.shard); err != nil { + return err + } s.eventListener.ShardDeleted(s.shard) return s.close() } @@ -508,7 +510,10 @@ func (s *shardController) handlePeriodicTasks() { } // Update the shard status - s.statusResource.UpdateShardMetadata(s.namespace, s.shard, mutShardMeta) + if err := s.statusResource.UpdateShardMetadata(s.namespace, s.shard, mutShardMeta); err != nil { + s.log.Warn("Failed to update shard metadata", "error", err) + return + } s.metadata.Store(mutShardMeta) } diff --git a/oxiad/coordinator/controller/shard_controller_election.go b/oxiad/coordinator/controller/shard_controller_election.go index d7fda192..b0b003f2 100644 --- a/oxiad/coordinator/controller/shard_controller_election.go +++ b/oxiad/coordinator/controller/shard_controller_election.go @@ -434,7 +434,9 @@ func (e *ShardElection) start() (model.Server, error) { metadata.Term++ metadata.Ensemble = e.refreshedEnsemble(metadata.Ensemble) }) - e.statusResource.UpdateShardMetadata(e.namespace, e.shard, mutShardMeta) + if err := e.statusResource.UpdateShardMetadata(e.namespace, e.shard, mutShardMeta); err != nil { + return model.Server{}, err + } if e.changeEnsembleAction != nil { e.prepareIfChangeEnsemble(&mutShardMeta) @@ -484,7 +486,9 @@ func (e *ShardElection) start() (model.Server, error) { leader := mutShardMeta.Leader leaderEntry := candidatesStatus[*leader] - e.statusResource.UpdateShardMetadata(e.namespace, e.shard, mutShardMeta) + if err = e.statusResource.UpdateShardMetadata(e.namespace, e.shard, mutShardMeta); err != nil { + return model.Server{}, err + } e.meta.Store(mutShardMeta) if e.eventListener != nil { e.eventListener.LeaderElected(e.shard, newLeader, maps.Keys(followers)) diff --git a/oxiad/coordinator/controller/split_controller.go b/oxiad/coordinator/controller/split_controller.go index 1acb82b1..31150c1b 100644 --- a/oxiad/coordinator/controller/split_controller.go +++ b/oxiad/coordinator/controller/split_controller.go @@ -215,7 +215,7 @@ func (sc *SplitController) currentPhase() *model.SplitPhase { } // updatePhase atomically updates the split phase on both parent and children. -func (sc *SplitController) updatePhase(newPhase model.SplitPhase) { +func (sc *SplitController) updatePhase(newPhase model.SplitPhase) error { status := sc.statusResource.Load() cloned := status.Clone() @@ -235,7 +235,7 @@ func (sc *SplitController) updatePhase(newPhase model.SplitPhase) { } } - sc.statusResource.Update(cloned) + return sc.statusResource.Update(cloned) } // runBootstrap validates preconditions, fences child ensemble members, elects @@ -285,13 +285,14 @@ func (sc *SplitController) runBootstrap() error { childLeaders[childId] = childMeta.Leader.Internal } } - sc.updateParentMeta(func(meta *model.ShardMetadata) { + if err := sc.updateParentMeta(func(meta *model.ShardMetadata) { meta.Split.ParentTermAtBootstrap = parentTerm meta.Split.ChildLeadersAtBootstrap = childLeaders - }) + }); err != nil { + return err + } - sc.updatePhase(model.SplitPhaseCatchUp) - return nil + return sc.updatePhase(model.SplitPhaseCatchUp) } // fenceAndElectChild fences a child shard's ensemble and elects a leader. @@ -318,11 +319,13 @@ func (sc *SplitController) fenceAndElectChild(childId int64) error { childLeader := sc.pickLeader(headEntries) - sc.updateChildMeta(childId, func(meta *model.ShardMetadata) { + if err := sc.updateChildMeta(childId, func(meta *model.ShardMetadata) { meta.Term = childTerm meta.Leader = &childLeader meta.Status = model.ShardStatusSteadyState - }) + }); err != nil { + return err + } // Elect the child leader so it replicates to its followers immediately. // Without this, only the single child leader node has the data. @@ -421,8 +424,7 @@ func (sc *SplitController) runCatchUp() error { } if caughtUp { sc.log.Info("All children caught up") - sc.updatePhase(model.SplitPhaseCutover) - return nil + return sc.updatePhase(model.SplitPhaseCutover) } } } @@ -443,14 +445,18 @@ func (sc *SplitController) checkObserverCursorsStale() (bool, error) { slog.Int64("bootstrap-term", parentMeta.Split.ParentTermAtBootstrap), slog.Int64("current-term", parentMeta.Term), ) - sc.updatePhase(model.SplitPhaseBootstrap) + if err := sc.updatePhase(model.SplitPhaseBootstrap); err != nil { + return false, err + } return true, nil } // Child leader election: the observer cursor targets the old (dead) leader. // Remove the stale cursor and fall back to Bootstrap to re-add. if sc.removeStaleChildObservers(parentMeta) { - sc.updatePhase(model.SplitPhaseBootstrap) + if err := sc.updatePhase(model.SplitPhaseBootstrap); err != nil { + return false, err + } return true, nil } @@ -562,11 +568,13 @@ func (sc *SplitController) runCutover() error { ) // Update parent term in metadata - sc.updateParentMeta(func(meta *model.ShardMetadata) { + if err := sc.updateParentMeta(func(meta *model.ShardMetadata) { meta.Term = newParentTerm meta.Leader = nil meta.Status = model.ShardStatusElection - }) + }); err != nil { + return err + } // Step 2: Wait for children to commit parentFinalOffset. // Children were already elected leader in Bootstrap, so commitOffset @@ -587,14 +595,18 @@ func (sc *SplitController) runCutover() error { // Step 4: Clear split metadata from children and mark parent for deletion. // Children are now independent shards. for _, childId := range []int64{sc.leftChildId, sc.rightChildId} { - sc.updateChildMeta(childId, func(meta *model.ShardMetadata) { + if err := sc.updateChildMeta(childId, func(meta *model.ShardMetadata) { meta.Split = nil - }) + }); err != nil { + return err + } } - sc.updateParentMeta(func(meta *model.ShardMetadata) { + if err := sc.updateParentMeta(func(meta *model.ShardMetadata) { meta.Status = model.ShardStatusDeleting - }) + }); err != nil { + return err + } // Step 5: Notify the coordinator. This triggers the parent shard // controller's DeleteShard (which retries indefinitely with backoff) @@ -603,9 +615,11 @@ func (sc *SplitController) runCutover() error { // Clear split metadata from parent — the split controller's job is done. // The parent shard controller handles the actual deletion. - sc.updateParentMeta(func(meta *model.ShardMetadata) { + if err := sc.updateParentMeta(func(meta *model.ShardMetadata) { meta.Split = nil - }) + }); err != nil { + return err + } return nil } @@ -650,13 +664,19 @@ func (sc *SplitController) abort() { // Delete child shards from status. for _, childId := range []int64{sc.leftChildId, sc.rightChildId} { - sc.statusResource.DeleteShardMetadata(sc.namespace, childId) + if err := sc.statusResource.DeleteShardMetadata(sc.namespace, childId); err != nil { + sc.log.Warn("Failed to delete child shard metadata during abort", + slog.Int64("child-shard", childId), + slog.Any("error", err)) + } } // Clear parent split metadata. - sc.updateParentMeta(func(meta *model.ShardMetadata) { + if err := sc.updateParentMeta(func(meta *model.ShardMetadata) { meta.Split = nil - }) + }); err != nil { + sc.log.Warn("Failed to clear parent split metadata during abort", slog.Any("error", err)) + } sc.log.Info("Split aborted, parent restored") @@ -684,23 +704,24 @@ func (sc *SplitController) loadShardMeta(shardId int64) *model.ShardMetadata { return &cloned } -func (sc *SplitController) updateParentMeta(fn func(meta *model.ShardMetadata)) { - sc.updateShardMeta(sc.parentShardId, fn) +func (sc *SplitController) updateParentMeta(fn func(meta *model.ShardMetadata)) error { + return sc.updateShardMeta(sc.parentShardId, fn) } -func (sc *SplitController) updateChildMeta(childId int64, fn func(meta *model.ShardMetadata)) { - sc.updateShardMeta(childId, fn) +func (sc *SplitController) updateChildMeta(childId int64, fn func(meta *model.ShardMetadata)) error { + return sc.updateShardMeta(childId, fn) } -func (sc *SplitController) updateShardMeta(shardId int64, fn func(meta *model.ShardMetadata)) { +func (sc *SplitController) updateShardMeta(shardId int64, fn func(meta *model.ShardMetadata)) error { status := sc.statusResource.Load() cloned := status.Clone() ns := cloned.Namespaces[sc.namespace] if meta, exists := ns.Shards[shardId]; exists { fn(&meta) ns.Shards[shardId] = meta - sc.statusResource.Update(cloned) + return sc.statusResource.Update(cloned) } + return nil } // fenceEnsemble sends NewTerm to all ensemble members and returns the @@ -860,11 +881,13 @@ func (sc *SplitController) reelectChild(childId int64) error { } // Update child metadata - sc.updateChildMeta(childId, func(meta *model.ShardMetadata) { + if err := sc.updateChildMeta(childId, func(meta *model.ShardMetadata) { meta.Term = newTerm meta.Leader = &newLeader meta.Status = model.ShardStatusSteadyState - }) + }); err != nil { + return err + } sc.log.Info("Child re-elected in clean term", slog.Int64("child-shard", childId), diff --git a/oxiad/coordinator/coordinator.go b/oxiad/coordinator/coordinator.go index a28e9c88..1dbc4720 100644 --- a/oxiad/coordinator/coordinator.go +++ b/oxiad/coordinator/coordinator.go @@ -173,7 +173,13 @@ func (c *coordinator) ConfigChanged(newConfig *model.ClusterConfig) { var shardsToDelete []int64 for { clusterStatus, shardsToAdd, shardsToDelete = util.ApplyClusterChanges(newConfig, currentStatus, c.selectNewEnsemble) - if !c.statusResource.Swap(clusterStatus, version) { + swapped, err := c.statusResource.Swap(clusterStatus, version) + if err != nil { + c.Warn("Failed to swap cluster status", slog.Any("error", err)) + currentStatus, version = c.statusResource.LoadWithVersion() + continue + } + if !swapped { currentStatus, version = c.statusResource.LoadWithVersion() continue } @@ -563,7 +569,9 @@ func (c *coordinator) InitiateSplit(namespace string, parentShardId int64, split } // Persist - c.statusResource.Update(cloned) + if err = c.statusResource.Update(cloned); err != nil { + return 0, 0, errors.Wrap(err, "failed to persist split status") + } c.Info("Split initiated", slog.Int64("parent-shard", parentShardId), @@ -777,7 +785,9 @@ func NewCoordinator(meta metadata.Provider, clusterStatus, _, _ = util.ApplyClusterChanges(clusterConfig, model.NewClusterStatus(), c.selectNewEnsemble) - c.statusResource.Update(clusterStatus) + if err = c.statusResource.Update(clusterStatus); err != nil { + return nil, nil, errors.Wrap(err, "failed to persist initial cluster status") + } } else { c.Info("Checking cluster config", slog.Any("clusterConfig", clusterConfig)) @@ -787,7 +797,9 @@ func NewCoordinator(meta metadata.Provider, clusterStatus, c.selectNewEnsemble) if len(shardsToAdd) > 0 || len(shardsToDelete) > 0 { - c.statusResource.Update(clusterStatus) + if err = c.statusResource.Update(clusterStatus); err != nil { + return nil, nil, errors.Wrap(err, "failed to persist cluster status changes") + } } } diff --git a/oxiad/coordinator/resource/status_resource.go b/oxiad/coordinator/resource/status_resource.go index a80f2534..bd3f2ee5 100644 --- a/oxiad/coordinator/resource/status_resource.go +++ b/oxiad/coordinator/resource/status_resource.go @@ -16,12 +16,8 @@ package resource import ( "context" - "errors" "log/slog" "sync" - "time" - - "github.com/cenkalti/backoff/v4" "github.com/oxia-db/oxia/oxiad/coordinator/metadata" "github.com/oxia-db/oxia/oxiad/coordinator/model" @@ -32,13 +28,13 @@ type StatusResource interface { LoadWithVersion() (*model.ClusterStatus, metadata.Version) - Swap(newStatus *model.ClusterStatus, version metadata.Version) bool + Swap(newStatus *model.ClusterStatus, version metadata.Version) (bool, error) - Update(newStatus *model.ClusterStatus) + Update(newStatus *model.ClusterStatus) error - UpdateShardMetadata(namespace string, shard int64, shardMetadata model.ShardMetadata) + UpdateShardMetadata(namespace string, shard int64, shardMetadata model.ShardMetadata) error - DeleteShardMetadata(namespace string, shard int64) + DeleteShardMetadata(namespace string, shard int64) error IsReady(clusterConfig *model.ClusterConfig) bool @@ -72,18 +68,6 @@ type status struct { changeCh chan struct{} } -// handleStoreError handles errors from metadata.Store(). -// ErrMetadataBadVersion is treated as permanent — retrying with a -// re-read version could overwrite valid data written by a new leader. -// The LeadershipLostCh will trigger a full coordinator restart with -// clean state. -func (*status) handleStoreError(err error) error { - if errors.Is(err, metadata.ErrMetadataBadVersion) { - return backoff.Permanent(err) - } - return err -} - // notifyChange wakes all goroutines waiting on ChangeNotify. // Must be called while holding s.lock for writing. func (s *status) notifyChange() { @@ -123,23 +107,18 @@ func (s *status) loadWithInitSlow() { if s.current != nil { return } - _ = backoff.RetryNotify(func() error { - clusterStatus, version, err := s.metadata.Get() - if err != nil { - return err - } - s.current = clusterStatus - s.currentVersionID = version - return nil - }, backoff.NewExponentialBackOff(), func(err error, duration time.Duration) { + clusterStatus, version, err := s.metadata.Get() + if err != nil { s.Warn( - "failed to load status, retrying later", + "failed to load status", slog.Any("error", err), - slog.Duration("retry-after", duration), ) - }) - if s.current == nil { + } + if clusterStatus == nil { s.current = &model.ClusterStatus{} + } else { + s.current = clusterStatus + s.currentVersionID = version } } @@ -159,111 +138,76 @@ func (s *status) LoadWithVersion() (*model.ClusterStatus, metadata.Version) { return s.current, s.currentVersionID } -func (s *status) Swap(newStatus *model.ClusterStatus, version metadata.Version) bool { +func (s *status) Swap(newStatus *model.ClusterStatus, version metadata.Version) (bool, error) { s.lock.Lock() defer s.lock.Unlock() if s.currentVersionID != version { - return false + return false, nil } - err := backoff.RetryNotify(func() error { - versionID, err := s.metadata.Store(newStatus, s.currentVersionID) - if err != nil { - return s.handleStoreError(err) - } - s.current = newStatus - s.currentVersionID = versionID - return nil - }, backoff.NewExponentialBackOff(), func(err error, duration time.Duration) { - s.Warn( - "failed to swap status, retrying later", - slog.Any("error", err), - slog.Duration("retry-after", duration), - ) - }) - if err == nil { - s.notifyChange() + versionID, err := s.metadata.Store(newStatus, s.currentVersionID) + if err != nil { + return false, err } - return err == nil + s.current = newStatus + s.currentVersionID = versionID + s.notifyChange() + return true, nil } -func (s *status) Update(newStatus *model.ClusterStatus) { +func (s *status) Update(newStatus *model.ClusterStatus) error { s.lock.Lock() defer s.lock.Unlock() - _ = backoff.RetryNotify(func() error { - versionID, err := s.metadata.Store(newStatus, s.currentVersionID) - if err != nil { - return s.handleStoreError(err) - } - s.current = newStatus - s.currentVersionID = versionID - return nil - }, backoff.NewExponentialBackOff(), func(err error, duration time.Duration) { - s.Warn( - "failed to update status, retrying later", - slog.Any("error", err), - slog.Duration("retry-after", duration), - ) - }) + versionID, err := s.metadata.Store(newStatus, s.currentVersionID) + if err != nil { + return err + } + s.current = newStatus + s.currentVersionID = versionID s.notifyChange() + return nil } -func (s *status) UpdateShardMetadata(namespace string, shard int64, shardMetadata model.ShardMetadata) { +func (s *status) UpdateShardMetadata(namespace string, shard int64, shardMetadata model.ShardMetadata) error { s.lock.Lock() defer s.lock.Unlock() clonedStatus := s.current.Clone() ns, exist := clonedStatus.Namespaces[namespace] if !exist { - return + return nil } ns.Shards[shard] = shardMetadata.Clone() - _ = backoff.RetryNotify(func() error { - versionID, err := s.metadata.Store(clonedStatus, s.currentVersionID) - if err != nil { - return s.handleStoreError(err) - } - s.current = clonedStatus - s.currentVersionID = versionID - return nil - }, backoff.NewExponentialBackOff(), func(err error, duration time.Duration) { - s.Warn( - "failed to update shard metadata, retrying later", - slog.Any("error", err), - slog.Duration("retry-after", duration), - ) - }) + versionID, err := s.metadata.Store(clonedStatus, s.currentVersionID) + if err != nil { + return err + } + s.current = clonedStatus + s.currentVersionID = versionID s.notifyChange() + return nil } -func (s *status) DeleteShardMetadata(namespace string, shard int64) { +func (s *status) DeleteShardMetadata(namespace string, shard int64) error { s.lock.Lock() defer s.lock.Unlock() clonedStatus := s.current.Clone() ns, exist := clonedStatus.Namespaces[namespace] if !exist { - return + return nil } delete(ns.Shards, shard) if len(ns.Shards) == 0 { delete(clonedStatus.Namespaces, namespace) } - _ = backoff.RetryNotify(func() error { - versionID, err := s.metadata.Store(clonedStatus, s.currentVersionID) - if err != nil { - return s.handleStoreError(err) - } - s.current = clonedStatus - s.currentVersionID = versionID - return nil - }, backoff.NewExponentialBackOff(), func(err error, duration time.Duration) { - s.Warn( - "failed to delete shard metadata, retrying later", - slog.Any("error", err), - slog.Duration("retry-after", duration), - ) - }) + versionID, err := s.metadata.Store(clonedStatus, s.currentVersionID) + if err != nil { + return err + } + s.current = clonedStatus + s.currentVersionID = versionID s.notifyChange() + return nil } func (s *status) IsReady(clusterConfig *model.ClusterConfig) bool { From d837be695ace7a115d85f388da559974e90bc108 Mon Sep 17 00:00:00 2001 From: mattisonchao Date: Wed, 25 Mar 2026 13:41:31 +0800 Subject: [PATCH 2/4] fix: address lint errors in tests and split controller - Check error returns from StatusResource methods in tests - Simplify redundant if-return pattern in split_controller.go Signed-off-by: mattisonchao --- oxiad/coordinator/controller/split_controller.go | 8 ++------ .../controller/split_controller_test.go | 14 +++++++------- tests/coordinator/coordinator_test.go | 2 +- 3 files changed, 10 insertions(+), 14 deletions(-) diff --git a/oxiad/coordinator/controller/split_controller.go b/oxiad/coordinator/controller/split_controller.go index 31150c1b..b30c84c8 100644 --- a/oxiad/coordinator/controller/split_controller.go +++ b/oxiad/coordinator/controller/split_controller.go @@ -615,13 +615,9 @@ func (sc *SplitController) runCutover() error { // Clear split metadata from parent — the split controller's job is done. // The parent shard controller handles the actual deletion. - if err := sc.updateParentMeta(func(meta *model.ShardMetadata) { + return sc.updateParentMeta(func(meta *model.ShardMetadata) { meta.Split = nil - }); err != nil { - return err - } - - return nil + }) } // abort cleans up a failed/timed-out split that hasn't reached Cutover. diff --git a/oxiad/coordinator/controller/split_controller_test.go b/oxiad/coordinator/controller/split_controller_test.go index 6da6dbd5..8ff2239c 100644 --- a/oxiad/coordinator/controller/split_controller_test.go +++ b/oxiad/coordinator/controller/split_controller_test.go @@ -136,7 +136,7 @@ func setupSplitTest(t *testing.T, phase model.SplitPhase) ( ShardIdGenerator: 3, } - statusRes.Update(clusterStatus) + assert.NoError(t, statusRes.Update(clusterStatus)) listener := newMockSplitEventListener() return rpcMock, statusRes, listener @@ -283,7 +283,7 @@ func TestSplitController_ResumeFromCatchUp(t *testing.T) { rightMeta.Leader = &rs1 rightMeta.Term = 1 ns.Shards[2] = rightMeta - statusRes.Update(cloned) + assert.NoError(t, statusRes.Update(cloned)) queueCatchUpResponses(rpcMock) queueCutoverResponses(rpcMock) @@ -406,7 +406,7 @@ func TestSplitController_TimeoutDuringCatchUp(t *testing.T) { parentMeta.Split.ParentTermAtBootstrap = 5 ns.Shards[0] = parentMeta - statusRes.Update(cloned) + assert.NoError(t, statusRes.Update(cloned)) // Queue RemoveObserver responses (abort will call these) rpcMock.GetNode(ps1).RemoveObserverResponse(nil) @@ -471,7 +471,7 @@ func TestSplitController_ParentTermChangeDuringCatchUp(t *testing.T) { parentMeta.Split.ParentTermAtBootstrap = 5 // Was 5 during Bootstrap ns.Shards[0] = parentMeta - statusRes.Update(cloned) + assert.NoError(t, statusRes.Update(cloned)) // The controller should detect term change and reset to Bootstrap. // Queue Bootstrap responses -- note: parent leader is now ps2 (after election) @@ -676,7 +676,7 @@ func TestSplitController_ChildLeaderDiesTimesOutAndAborts(t *testing.T) { parentMeta.Split.ParentTermAtBootstrap = 5 ns.Shards[0] = parentMeta - statusRes.Update(cloned) + assert.NoError(t, statusRes.Update(cloned)) // Queue RemoveObserver responses (abort will call these) rpcMock.GetNode(ps1).RemoveObserverResponse(nil) @@ -742,7 +742,7 @@ func TestSplitController_ChildFollowersDeadCommitStalls(t *testing.T) { parentMeta.Split.ParentTermAtBootstrap = 5 ns.Shards[0] = parentMeta - statusRes.Update(cloned) + assert.NoError(t, statusRes.Update(cloned)) // Queue RemoveObserver responses (abort will call these) rpcMock.GetNode(ps1).RemoveObserverResponse(nil) @@ -819,7 +819,7 @@ func TestSplitController_ChildLeaderChangeDuringCatchUp(t *testing.T) { } ns.Shards[0] = parentMeta - statusRes.Update(cloned) + assert.NoError(t, statusRes.Update(cloned)) // CatchUp detects left child leader changed (ls1 -> ls2). // It RemoveObserver(old leader) on parent, then falls back to Bootstrap. diff --git a/tests/coordinator/coordinator_test.go b/tests/coordinator/coordinator_test.go index b6712263..8738f1f5 100644 --- a/tests/coordinator/coordinator_test.go +++ b/tests/coordinator/coordinator_test.go @@ -60,7 +60,7 @@ func TestCoordinatorInitiateLeaderElection(t *testing.T) { Int32HashRange: model.Int32HashRange{Min: 2000, Max: 100000}, } statusResource := coordinatorInstance.StatusResource() - statusResource.UpdateShardMetadata("default", 1, metadata) + assert.NoError(t, statusResource.UpdateShardMetadata("default", 1, metadata)) status := statusResource.Load() assert.EqualValues(t, status.Namespaces["default"].Shards[1], metadata) From 159daabb19e27c96113ca06722388d0a6cb30113 Mon Sep 17 00:00:00 2001 From: mattisonchao Date: Wed, 25 Mar 2026 22:22:03 +0800 Subject: [PATCH 3/4] fix: return error from ConfigChanged instead of retrying indefinitely Make ConfigChanged return an error so callers can handle swap failures instead of looping forever when the metadata store is unavailable. --- oxiad/coordinator/coordinator.go | 7 +++-- .../resource/cluster_config_event_listener.go | 2 +- .../resource/cluster_config_resource.go | 5 +++- oxiad/coordinator/resource/status_resource.go | 27 +++++++++++++++++++ .../dataserver/controller/shards_director.go | 1 + 5 files changed, 36 insertions(+), 6 deletions(-) diff --git a/oxiad/coordinator/coordinator.go b/oxiad/coordinator/coordinator.go index 1dbc4720..5d7901c9 100644 --- a/oxiad/coordinator/coordinator.go +++ b/oxiad/coordinator/coordinator.go @@ -127,7 +127,7 @@ func (c *coordinator) NodeControllers() map[string]controller.DataServerControll return res } -func (c *coordinator) ConfigChanged(newConfig *model.ClusterConfig) { +func (c *coordinator) ConfigChanged(newConfig *model.ClusterConfig) error { c.ccrWg.Wait() c.Lock() defer c.Unlock() @@ -175,9 +175,7 @@ func (c *coordinator) ConfigChanged(newConfig *model.ClusterConfig) { clusterStatus, shardsToAdd, shardsToDelete = util.ApplyClusterChanges(newConfig, currentStatus, c.selectNewEnsemble) swapped, err := c.statusResource.Swap(clusterStatus, version) if err != nil { - c.Warn("Failed to swap cluster status", slog.Any("error", err)) - currentStatus, version = c.statusResource.LoadWithVersion() - continue + return fmt.Errorf("failed to swap cluster status: %w", err) } if !swapped { currentStatus, version = c.statusResource.LoadWithVersion() @@ -203,6 +201,7 @@ func (c *coordinator) ConfigChanged(newConfig *model.ClusterConfig) { c.computeNewAssignments() c.loadBalancer.Trigger() + return nil } func (c *coordinator) findDataServerFeatures(dataServers []model.Server) map[string][]proto.Feature { diff --git a/oxiad/coordinator/resource/cluster_config_event_listener.go b/oxiad/coordinator/resource/cluster_config_event_listener.go index b312f2d8..5e46e0c0 100644 --- a/oxiad/coordinator/resource/cluster_config_event_listener.go +++ b/oxiad/coordinator/resource/cluster_config_event_listener.go @@ -17,5 +17,5 @@ package resource import "github.com/oxia-db/oxia/oxiad/coordinator/model" type ClusterConfigEventListener interface { - ConfigChanged(newConfig *model.ClusterConfig) + ConfigChanged(newConfig *model.ClusterConfig) error } diff --git a/oxiad/coordinator/resource/cluster_config_resource.go b/oxiad/coordinator/resource/cluster_config_resource.go index 3a933f05..d57280cd 100644 --- a/oxiad/coordinator/resource/cluster_config_resource.go +++ b/oxiad/coordinator/resource/cluster_config_resource.go @@ -216,7 +216,10 @@ func (ccf *clusterConfig) waitForUpdates() { ccf.Info("No cluster config changes detected") return } - ccf.clusterConfigEventListener.ConfigChanged(currentClusterConfig) + if err := ccf.clusterConfigEventListener.ConfigChanged(currentClusterConfig); err != nil { + ccf.Warn("Failed to apply config change, will retry on next event", + slog.Any("error", err)) + } } } } diff --git a/oxiad/coordinator/resource/status_resource.go b/oxiad/coordinator/resource/status_resource.go index bd3f2ee5..30efa44f 100644 --- a/oxiad/coordinator/resource/status_resource.go +++ b/oxiad/coordinator/resource/status_resource.go @@ -16,6 +16,7 @@ package resource import ( "context" + "errors" "log/slog" "sync" @@ -68,6 +69,28 @@ type status struct { changeCh chan struct{} } +// refreshOnVersionConflict re-reads the metadata from the remote store +// when a version conflict occurs. Without this, the local versionID stays +// stale and every subsequent Store would fail with ErrMetadataBadVersion +// forever — making the error non-retryable even though the caller retries. +// Must be called while holding s.lock for writing. +func (s *status) refreshOnVersionConflict(err error) { + if !errors.Is(err, metadata.ErrMetadataBadVersion) { + return + } + clusterStatus, version, getErr := s.metadata.Get() + if getErr != nil { + slog.Warn("failed to refresh status from remote", + slog.Any("error", getErr)) + return + } + if clusterStatus != nil { + s.current = clusterStatus + s.currentVersionID = version + s.notifyChange() + } +} + // notifyChange wakes all goroutines waiting on ChangeNotify. // Must be called while holding s.lock for writing. func (s *status) notifyChange() { @@ -146,6 +169,7 @@ func (s *status) Swap(newStatus *model.ClusterStatus, version metadata.Version) } versionID, err := s.metadata.Store(newStatus, s.currentVersionID) if err != nil { + s.refreshOnVersionConflict(err) return false, err } s.current = newStatus @@ -159,6 +183,7 @@ func (s *status) Update(newStatus *model.ClusterStatus) error { defer s.lock.Unlock() versionID, err := s.metadata.Store(newStatus, s.currentVersionID) if err != nil { + s.refreshOnVersionConflict(err) return err } s.current = newStatus @@ -179,6 +204,7 @@ func (s *status) UpdateShardMetadata(namespace string, shard int64, shardMetadat ns.Shards[shard] = shardMetadata.Clone() versionID, err := s.metadata.Store(clonedStatus, s.currentVersionID) if err != nil { + s.refreshOnVersionConflict(err) return err } s.current = clonedStatus @@ -202,6 +228,7 @@ func (s *status) DeleteShardMetadata(namespace string, shard int64) error { } versionID, err := s.metadata.Store(clonedStatus, s.currentVersionID) if err != nil { + s.refreshOnVersionConflict(err) return err } s.current = clonedStatus diff --git a/oxiad/dataserver/controller/shards_director.go b/oxiad/dataserver/controller/shards_director.go index 39f804ff..8bde8ed6 100644 --- a/oxiad/dataserver/controller/shards_director.go +++ b/oxiad/dataserver/controller/shards_director.go @@ -231,6 +231,7 @@ func (s *shardsDirector) DeleteShard(req *proto.DeleteShardRequest) (*proto.Dele return resp, nil } + // todo: check the database directly to avoid init follower controller again if coordinator is retrying by some internal error fc, err := follow.NewFollowerController(s.storageOptions, req.Namespace, req.Shard, s.walFactory, s.kvFactory, nil) if err != nil { return nil, err From 0cbd2ade159971897cd7dd99e4d4bc47dcb0f235 Mon Sep 17 00:00:00 2001 From: mattisonchao Date: Wed, 25 Mar 2026 22:28:09 +0800 Subject: [PATCH 4/4] fix: backoff retry entire config change flow to avoid discarding changes --- .../resource/cluster_config_resource.go | 39 +++++++++++-------- 1 file changed, 23 insertions(+), 16 deletions(-) diff --git a/oxiad/coordinator/resource/cluster_config_resource.go b/oxiad/coordinator/resource/cluster_config_resource.go index d57280cd..b499110e 100644 --- a/oxiad/coordinator/resource/cluster_config_resource.go +++ b/oxiad/coordinator/resource/cluster_config_resource.go @@ -27,6 +27,7 @@ import ( "github.com/emirpasic/gods/v2/trees/redblacktree" "github.com/oxia-db/oxia/common/process" + oxiatime "github.com/oxia-db/oxia/common/time" "github.com/oxia-db/oxia/oxiad/coordinator/model" ) @@ -201,25 +202,31 @@ func (ccf *clusterConfig) waitForUpdates() { case <-ccf.clusterConfigNotificationsCh: ccf.Info("Received cluster config change event") - ccf.clusterConfigLock.Lock() - oldClusterConfig := ccf.currentClusterConfig - ccf.currentClusterConfig = nil - ccf.clusterConfigLock.Unlock() - - ccf.loadWithInitSlow() - ccf.clusterConfigLock.RLock() - currentClusterConfig := ccf.currentClusterConfig + oldClusterConfig := ccf.currentClusterConfig ccf.clusterConfigLock.RUnlock() - if reflect.DeepEqual(oldClusterConfig, currentClusterConfig) { - ccf.Info("No cluster config changes detected") - return - } - if err := ccf.clusterConfigEventListener.ConfigChanged(currentClusterConfig); err != nil { - ccf.Warn("Failed to apply config change, will retry on next event", - slog.Any("error", err)) - } + _ = backoff.RetryNotify(func() error { + ccf.clusterConfigLock.Lock() + ccf.currentClusterConfig = nil + ccf.clusterConfigLock.Unlock() + + ccf.loadWithInitSlow() + + ccf.clusterConfigLock.RLock() + currentClusterConfig := ccf.currentClusterConfig + ccf.clusterConfigLock.RUnlock() + + if reflect.DeepEqual(oldClusterConfig, currentClusterConfig) { + ccf.Info("No cluster config changes detected") + return nil + } + return ccf.clusterConfigEventListener.ConfigChanged(currentClusterConfig) + }, oxiatime.NewBackOff(ccf.ctx), func(err error, duration time.Duration) { + ccf.Warn("Failed to apply config change, retrying", + slog.Any("error", err), + slog.Duration("retry-after", duration)) + }) } } }