diff --git a/oxiad/coordinator/balancer/scheduler_test.go b/oxiad/coordinator/balancer/scheduler_test.go index 5f3bcd05..e997fa26 100644 --- a/oxiad/coordinator/balancer/scheduler_test.go +++ b/oxiad/coordinator/balancer/scheduler_test.go @@ -43,17 +43,22 @@ func (m *mockStatusResource) LoadWithVersion() (*model.ClusterStatus, metadata.V return m.status, "0" } -func (m *mockStatusResource) Swap(_ *model.ClusterStatus, _ metadata.Version) bool { - return true +func (m *mockStatusResource) Swap(_ *model.ClusterStatus, _ metadata.Version) (bool, error) { + return true, nil } -func (m *mockStatusResource) Update(newStatus *model.ClusterStatus) { +func (m *mockStatusResource) Update(newStatus *model.ClusterStatus) error { m.status = newStatus + return nil } -func (m *mockStatusResource) UpdateShardMetadata(_ string, _ int64, _ model.ShardMetadata) {} +func (m *mockStatusResource) UpdateShardMetadata(_ string, _ int64, _ model.ShardMetadata) error { + return nil +} -func (m *mockStatusResource) DeleteShardMetadata(_ string, _ int64) {} +func (m *mockStatusResource) DeleteShardMetadata(_ string, _ int64) error { + return nil +} func (m *mockStatusResource) IsReady(_ *model.ClusterConfig) bool { return true diff --git a/oxiad/coordinator/controller/shard_controller.go b/oxiad/coordinator/controller/shard_controller.go index 59e55086..887106db 100644 --- a/oxiad/coordinator/controller/shard_controller.go +++ b/oxiad/coordinator/controller/shard_controller.go @@ -427,7 +427,9 @@ func (s *shardController) deleteShard() error { ) } - s.statusResource.DeleteShardMetadata(s.namespace, s.shard) + if err := s.statusResource.DeleteShardMetadata(s.namespace, s.shard); err != nil { + return err + } s.eventListener.ShardDeleted(s.shard) return s.close() } @@ -508,7 +510,10 @@ func (s *shardController) handlePeriodicTasks() { } // Update the shard status - s.statusResource.UpdateShardMetadata(s.namespace, s.shard, mutShardMeta) + if err := s.statusResource.UpdateShardMetadata(s.namespace, s.shard, mutShardMeta); err != nil { + s.log.Warn("Failed to update shard metadata", "error", err) + return + } s.metadata.Store(mutShardMeta) } diff --git a/oxiad/coordinator/controller/shard_controller_election.go b/oxiad/coordinator/controller/shard_controller_election.go index d7fda192..b0b003f2 100644 --- a/oxiad/coordinator/controller/shard_controller_election.go +++ b/oxiad/coordinator/controller/shard_controller_election.go @@ -434,7 +434,9 @@ func (e *ShardElection) start() (model.Server, error) { metadata.Term++ metadata.Ensemble = e.refreshedEnsemble(metadata.Ensemble) }) - e.statusResource.UpdateShardMetadata(e.namespace, e.shard, mutShardMeta) + if err := e.statusResource.UpdateShardMetadata(e.namespace, e.shard, mutShardMeta); err != nil { + return model.Server{}, err + } if e.changeEnsembleAction != nil { e.prepareIfChangeEnsemble(&mutShardMeta) @@ -484,7 +486,9 @@ func (e *ShardElection) start() (model.Server, error) { leader := mutShardMeta.Leader leaderEntry := candidatesStatus[*leader] - e.statusResource.UpdateShardMetadata(e.namespace, e.shard, mutShardMeta) + if err = e.statusResource.UpdateShardMetadata(e.namespace, e.shard, mutShardMeta); err != nil { + return model.Server{}, err + } e.meta.Store(mutShardMeta) if e.eventListener != nil { e.eventListener.LeaderElected(e.shard, newLeader, maps.Keys(followers)) diff --git a/oxiad/coordinator/controller/split_controller.go b/oxiad/coordinator/controller/split_controller.go index 1acb82b1..b30c84c8 100644 --- a/oxiad/coordinator/controller/split_controller.go +++ b/oxiad/coordinator/controller/split_controller.go @@ -215,7 +215,7 @@ func (sc *SplitController) currentPhase() *model.SplitPhase { } // updatePhase atomically updates the split phase on both parent and children. -func (sc *SplitController) updatePhase(newPhase model.SplitPhase) { +func (sc *SplitController) updatePhase(newPhase model.SplitPhase) error { status := sc.statusResource.Load() cloned := status.Clone() @@ -235,7 +235,7 @@ func (sc *SplitController) updatePhase(newPhase model.SplitPhase) { } } - sc.statusResource.Update(cloned) + return sc.statusResource.Update(cloned) } // runBootstrap validates preconditions, fences child ensemble members, elects @@ -285,13 +285,14 @@ func (sc *SplitController) runBootstrap() error { childLeaders[childId] = childMeta.Leader.Internal } } - sc.updateParentMeta(func(meta *model.ShardMetadata) { + if err := sc.updateParentMeta(func(meta *model.ShardMetadata) { meta.Split.ParentTermAtBootstrap = parentTerm meta.Split.ChildLeadersAtBootstrap = childLeaders - }) + }); err != nil { + return err + } - sc.updatePhase(model.SplitPhaseCatchUp) - return nil + return sc.updatePhase(model.SplitPhaseCatchUp) } // fenceAndElectChild fences a child shard's ensemble and elects a leader. @@ -318,11 +319,13 @@ func (sc *SplitController) fenceAndElectChild(childId int64) error { childLeader := sc.pickLeader(headEntries) - sc.updateChildMeta(childId, func(meta *model.ShardMetadata) { + if err := sc.updateChildMeta(childId, func(meta *model.ShardMetadata) { meta.Term = childTerm meta.Leader = &childLeader meta.Status = model.ShardStatusSteadyState - }) + }); err != nil { + return err + } // Elect the child leader so it replicates to its followers immediately. // Without this, only the single child leader node has the data. @@ -421,8 +424,7 @@ func (sc *SplitController) runCatchUp() error { } if caughtUp { sc.log.Info("All children caught up") - sc.updatePhase(model.SplitPhaseCutover) - return nil + return sc.updatePhase(model.SplitPhaseCutover) } } } @@ -443,14 +445,18 @@ func (sc *SplitController) checkObserverCursorsStale() (bool, error) { slog.Int64("bootstrap-term", parentMeta.Split.ParentTermAtBootstrap), slog.Int64("current-term", parentMeta.Term), ) - sc.updatePhase(model.SplitPhaseBootstrap) + if err := sc.updatePhase(model.SplitPhaseBootstrap); err != nil { + return false, err + } return true, nil } // Child leader election: the observer cursor targets the old (dead) leader. // Remove the stale cursor and fall back to Bootstrap to re-add. if sc.removeStaleChildObservers(parentMeta) { - sc.updatePhase(model.SplitPhaseBootstrap) + if err := sc.updatePhase(model.SplitPhaseBootstrap); err != nil { + return false, err + } return true, nil } @@ -562,11 +568,13 @@ func (sc *SplitController) runCutover() error { ) // Update parent term in metadata - sc.updateParentMeta(func(meta *model.ShardMetadata) { + if err := sc.updateParentMeta(func(meta *model.ShardMetadata) { meta.Term = newParentTerm meta.Leader = nil meta.Status = model.ShardStatusElection - }) + }); err != nil { + return err + } // Step 2: Wait for children to commit parentFinalOffset. // Children were already elected leader in Bootstrap, so commitOffset @@ -587,14 +595,18 @@ func (sc *SplitController) runCutover() error { // Step 4: Clear split metadata from children and mark parent for deletion. // Children are now independent shards. for _, childId := range []int64{sc.leftChildId, sc.rightChildId} { - sc.updateChildMeta(childId, func(meta *model.ShardMetadata) { + if err := sc.updateChildMeta(childId, func(meta *model.ShardMetadata) { meta.Split = nil - }) + }); err != nil { + return err + } } - sc.updateParentMeta(func(meta *model.ShardMetadata) { + if err := sc.updateParentMeta(func(meta *model.ShardMetadata) { meta.Status = model.ShardStatusDeleting - }) + }); err != nil { + return err + } // Step 5: Notify the coordinator. This triggers the parent shard // controller's DeleteShard (which retries indefinitely with backoff) @@ -603,11 +615,9 @@ func (sc *SplitController) runCutover() error { // Clear split metadata from parent — the split controller's job is done. // The parent shard controller handles the actual deletion. - sc.updateParentMeta(func(meta *model.ShardMetadata) { + return sc.updateParentMeta(func(meta *model.ShardMetadata) { meta.Split = nil }) - - return nil } // abort cleans up a failed/timed-out split that hasn't reached Cutover. @@ -650,13 +660,19 @@ func (sc *SplitController) abort() { // Delete child shards from status. for _, childId := range []int64{sc.leftChildId, sc.rightChildId} { - sc.statusResource.DeleteShardMetadata(sc.namespace, childId) + if err := sc.statusResource.DeleteShardMetadata(sc.namespace, childId); err != nil { + sc.log.Warn("Failed to delete child shard metadata during abort", + slog.Int64("child-shard", childId), + slog.Any("error", err)) + } } // Clear parent split metadata. - sc.updateParentMeta(func(meta *model.ShardMetadata) { + if err := sc.updateParentMeta(func(meta *model.ShardMetadata) { meta.Split = nil - }) + }); err != nil { + sc.log.Warn("Failed to clear parent split metadata during abort", slog.Any("error", err)) + } sc.log.Info("Split aborted, parent restored") @@ -684,23 +700,24 @@ func (sc *SplitController) loadShardMeta(shardId int64) *model.ShardMetadata { return &cloned } -func (sc *SplitController) updateParentMeta(fn func(meta *model.ShardMetadata)) { - sc.updateShardMeta(sc.parentShardId, fn) +func (sc *SplitController) updateParentMeta(fn func(meta *model.ShardMetadata)) error { + return sc.updateShardMeta(sc.parentShardId, fn) } -func (sc *SplitController) updateChildMeta(childId int64, fn func(meta *model.ShardMetadata)) { - sc.updateShardMeta(childId, fn) +func (sc *SplitController) updateChildMeta(childId int64, fn func(meta *model.ShardMetadata)) error { + return sc.updateShardMeta(childId, fn) } -func (sc *SplitController) updateShardMeta(shardId int64, fn func(meta *model.ShardMetadata)) { +func (sc *SplitController) updateShardMeta(shardId int64, fn func(meta *model.ShardMetadata)) error { status := sc.statusResource.Load() cloned := status.Clone() ns := cloned.Namespaces[sc.namespace] if meta, exists := ns.Shards[shardId]; exists { fn(&meta) ns.Shards[shardId] = meta - sc.statusResource.Update(cloned) + return sc.statusResource.Update(cloned) } + return nil } // fenceEnsemble sends NewTerm to all ensemble members and returns the @@ -860,11 +877,13 @@ func (sc *SplitController) reelectChild(childId int64) error { } // Update child metadata - sc.updateChildMeta(childId, func(meta *model.ShardMetadata) { + if err := sc.updateChildMeta(childId, func(meta *model.ShardMetadata) { meta.Term = newTerm meta.Leader = &newLeader meta.Status = model.ShardStatusSteadyState - }) + }); err != nil { + return err + } sc.log.Info("Child re-elected in clean term", slog.Int64("child-shard", childId), diff --git a/oxiad/coordinator/controller/split_controller_test.go b/oxiad/coordinator/controller/split_controller_test.go index 6da6dbd5..8ff2239c 100644 --- a/oxiad/coordinator/controller/split_controller_test.go +++ b/oxiad/coordinator/controller/split_controller_test.go @@ -136,7 +136,7 @@ func setupSplitTest(t *testing.T, phase model.SplitPhase) ( ShardIdGenerator: 3, } - statusRes.Update(clusterStatus) + assert.NoError(t, statusRes.Update(clusterStatus)) listener := newMockSplitEventListener() return rpcMock, statusRes, listener @@ -283,7 +283,7 @@ func TestSplitController_ResumeFromCatchUp(t *testing.T) { rightMeta.Leader = &rs1 rightMeta.Term = 1 ns.Shards[2] = rightMeta - statusRes.Update(cloned) + assert.NoError(t, statusRes.Update(cloned)) queueCatchUpResponses(rpcMock) queueCutoverResponses(rpcMock) @@ -406,7 +406,7 @@ func TestSplitController_TimeoutDuringCatchUp(t *testing.T) { parentMeta.Split.ParentTermAtBootstrap = 5 ns.Shards[0] = parentMeta - statusRes.Update(cloned) + assert.NoError(t, statusRes.Update(cloned)) // Queue RemoveObserver responses (abort will call these) rpcMock.GetNode(ps1).RemoveObserverResponse(nil) @@ -471,7 +471,7 @@ func TestSplitController_ParentTermChangeDuringCatchUp(t *testing.T) { parentMeta.Split.ParentTermAtBootstrap = 5 // Was 5 during Bootstrap ns.Shards[0] = parentMeta - statusRes.Update(cloned) + assert.NoError(t, statusRes.Update(cloned)) // The controller should detect term change and reset to Bootstrap. // Queue Bootstrap responses -- note: parent leader is now ps2 (after election) @@ -676,7 +676,7 @@ func TestSplitController_ChildLeaderDiesTimesOutAndAborts(t *testing.T) { parentMeta.Split.ParentTermAtBootstrap = 5 ns.Shards[0] = parentMeta - statusRes.Update(cloned) + assert.NoError(t, statusRes.Update(cloned)) // Queue RemoveObserver responses (abort will call these) rpcMock.GetNode(ps1).RemoveObserverResponse(nil) @@ -742,7 +742,7 @@ func TestSplitController_ChildFollowersDeadCommitStalls(t *testing.T) { parentMeta.Split.ParentTermAtBootstrap = 5 ns.Shards[0] = parentMeta - statusRes.Update(cloned) + assert.NoError(t, statusRes.Update(cloned)) // Queue RemoveObserver responses (abort will call these) rpcMock.GetNode(ps1).RemoveObserverResponse(nil) @@ -819,7 +819,7 @@ func TestSplitController_ChildLeaderChangeDuringCatchUp(t *testing.T) { } ns.Shards[0] = parentMeta - statusRes.Update(cloned) + assert.NoError(t, statusRes.Update(cloned)) // CatchUp detects left child leader changed (ls1 -> ls2). // It RemoveObserver(old leader) on parent, then falls back to Bootstrap. diff --git a/oxiad/coordinator/coordinator.go b/oxiad/coordinator/coordinator.go index a28e9c88..5d7901c9 100644 --- a/oxiad/coordinator/coordinator.go +++ b/oxiad/coordinator/coordinator.go @@ -127,7 +127,7 @@ func (c *coordinator) NodeControllers() map[string]controller.DataServerControll return res } -func (c *coordinator) ConfigChanged(newConfig *model.ClusterConfig) { +func (c *coordinator) ConfigChanged(newConfig *model.ClusterConfig) error { c.ccrWg.Wait() c.Lock() defer c.Unlock() @@ -173,7 +173,11 @@ func (c *coordinator) ConfigChanged(newConfig *model.ClusterConfig) { var shardsToDelete []int64 for { clusterStatus, shardsToAdd, shardsToDelete = util.ApplyClusterChanges(newConfig, currentStatus, c.selectNewEnsemble) - if !c.statusResource.Swap(clusterStatus, version) { + swapped, err := c.statusResource.Swap(clusterStatus, version) + if err != nil { + return fmt.Errorf("failed to swap cluster status: %w", err) + } + if !swapped { currentStatus, version = c.statusResource.LoadWithVersion() continue } @@ -197,6 +201,7 @@ func (c *coordinator) ConfigChanged(newConfig *model.ClusterConfig) { c.computeNewAssignments() c.loadBalancer.Trigger() + return nil } func (c *coordinator) findDataServerFeatures(dataServers []model.Server) map[string][]proto.Feature { @@ -563,7 +568,9 @@ func (c *coordinator) InitiateSplit(namespace string, parentShardId int64, split } // Persist - c.statusResource.Update(cloned) + if err = c.statusResource.Update(cloned); err != nil { + return 0, 0, errors.Wrap(err, "failed to persist split status") + } c.Info("Split initiated", slog.Int64("parent-shard", parentShardId), @@ -777,7 +784,9 @@ func NewCoordinator(meta metadata.Provider, clusterStatus, _, _ = util.ApplyClusterChanges(clusterConfig, model.NewClusterStatus(), c.selectNewEnsemble) - c.statusResource.Update(clusterStatus) + if err = c.statusResource.Update(clusterStatus); err != nil { + return nil, nil, errors.Wrap(err, "failed to persist initial cluster status") + } } else { c.Info("Checking cluster config", slog.Any("clusterConfig", clusterConfig)) @@ -787,7 +796,9 @@ func NewCoordinator(meta metadata.Provider, clusterStatus, c.selectNewEnsemble) if len(shardsToAdd) > 0 || len(shardsToDelete) > 0 { - c.statusResource.Update(clusterStatus) + if err = c.statusResource.Update(clusterStatus); err != nil { + return nil, nil, errors.Wrap(err, "failed to persist cluster status changes") + } } } diff --git a/oxiad/coordinator/resource/cluster_config_event_listener.go b/oxiad/coordinator/resource/cluster_config_event_listener.go index b312f2d8..5e46e0c0 100644 --- a/oxiad/coordinator/resource/cluster_config_event_listener.go +++ b/oxiad/coordinator/resource/cluster_config_event_listener.go @@ -17,5 +17,5 @@ package resource import "github.com/oxia-db/oxia/oxiad/coordinator/model" type ClusterConfigEventListener interface { - ConfigChanged(newConfig *model.ClusterConfig) + ConfigChanged(newConfig *model.ClusterConfig) error } diff --git a/oxiad/coordinator/resource/cluster_config_resource.go b/oxiad/coordinator/resource/cluster_config_resource.go index 3a933f05..b499110e 100644 --- a/oxiad/coordinator/resource/cluster_config_resource.go +++ b/oxiad/coordinator/resource/cluster_config_resource.go @@ -27,6 +27,7 @@ import ( "github.com/emirpasic/gods/v2/trees/redblacktree" "github.com/oxia-db/oxia/common/process" + oxiatime "github.com/oxia-db/oxia/common/time" "github.com/oxia-db/oxia/oxiad/coordinator/model" ) @@ -201,22 +202,31 @@ func (ccf *clusterConfig) waitForUpdates() { case <-ccf.clusterConfigNotificationsCh: ccf.Info("Received cluster config change event") - ccf.clusterConfigLock.Lock() - oldClusterConfig := ccf.currentClusterConfig - ccf.currentClusterConfig = nil - ccf.clusterConfigLock.Unlock() - - ccf.loadWithInitSlow() - ccf.clusterConfigLock.RLock() - currentClusterConfig := ccf.currentClusterConfig + oldClusterConfig := ccf.currentClusterConfig ccf.clusterConfigLock.RUnlock() - if reflect.DeepEqual(oldClusterConfig, currentClusterConfig) { - ccf.Info("No cluster config changes detected") - return - } - ccf.clusterConfigEventListener.ConfigChanged(currentClusterConfig) + _ = backoff.RetryNotify(func() error { + ccf.clusterConfigLock.Lock() + ccf.currentClusterConfig = nil + ccf.clusterConfigLock.Unlock() + + ccf.loadWithInitSlow() + + ccf.clusterConfigLock.RLock() + currentClusterConfig := ccf.currentClusterConfig + ccf.clusterConfigLock.RUnlock() + + if reflect.DeepEqual(oldClusterConfig, currentClusterConfig) { + ccf.Info("No cluster config changes detected") + return nil + } + return ccf.clusterConfigEventListener.ConfigChanged(currentClusterConfig) + }, oxiatime.NewBackOff(ccf.ctx), func(err error, duration time.Duration) { + ccf.Warn("Failed to apply config change, retrying", + slog.Any("error", err), + slog.Duration("retry-after", duration)) + }) } } } diff --git a/oxiad/coordinator/resource/status_resource.go b/oxiad/coordinator/resource/status_resource.go index a80f2534..30efa44f 100644 --- a/oxiad/coordinator/resource/status_resource.go +++ b/oxiad/coordinator/resource/status_resource.go @@ -19,9 +19,6 @@ import ( "errors" "log/slog" "sync" - "time" - - "github.com/cenkalti/backoff/v4" "github.com/oxia-db/oxia/oxiad/coordinator/metadata" "github.com/oxia-db/oxia/oxiad/coordinator/model" @@ -32,13 +29,13 @@ type StatusResource interface { LoadWithVersion() (*model.ClusterStatus, metadata.Version) - Swap(newStatus *model.ClusterStatus, version metadata.Version) bool + Swap(newStatus *model.ClusterStatus, version metadata.Version) (bool, error) - Update(newStatus *model.ClusterStatus) + Update(newStatus *model.ClusterStatus) error - UpdateShardMetadata(namespace string, shard int64, shardMetadata model.ShardMetadata) + UpdateShardMetadata(namespace string, shard int64, shardMetadata model.ShardMetadata) error - DeleteShardMetadata(namespace string, shard int64) + DeleteShardMetadata(namespace string, shard int64) error IsReady(clusterConfig *model.ClusterConfig) bool @@ -72,16 +69,26 @@ type status struct { changeCh chan struct{} } -// handleStoreError handles errors from metadata.Store(). -// ErrMetadataBadVersion is treated as permanent — retrying with a -// re-read version could overwrite valid data written by a new leader. -// The LeadershipLostCh will trigger a full coordinator restart with -// clean state. -func (*status) handleStoreError(err error) error { - if errors.Is(err, metadata.ErrMetadataBadVersion) { - return backoff.Permanent(err) +// refreshOnVersionConflict re-reads the metadata from the remote store +// when a version conflict occurs. Without this, the local versionID stays +// stale and every subsequent Store would fail with ErrMetadataBadVersion +// forever — making the error non-retryable even though the caller retries. +// Must be called while holding s.lock for writing. +func (s *status) refreshOnVersionConflict(err error) { + if !errors.Is(err, metadata.ErrMetadataBadVersion) { + return + } + clusterStatus, version, getErr := s.metadata.Get() + if getErr != nil { + slog.Warn("failed to refresh status from remote", + slog.Any("error", getErr)) + return + } + if clusterStatus != nil { + s.current = clusterStatus + s.currentVersionID = version + s.notifyChange() } - return err } // notifyChange wakes all goroutines waiting on ChangeNotify. @@ -123,23 +130,18 @@ func (s *status) loadWithInitSlow() { if s.current != nil { return } - _ = backoff.RetryNotify(func() error { - clusterStatus, version, err := s.metadata.Get() - if err != nil { - return err - } - s.current = clusterStatus - s.currentVersionID = version - return nil - }, backoff.NewExponentialBackOff(), func(err error, duration time.Duration) { + clusterStatus, version, err := s.metadata.Get() + if err != nil { s.Warn( - "failed to load status, retrying later", + "failed to load status", slog.Any("error", err), - slog.Duration("retry-after", duration), ) - }) - if s.current == nil { + } + if clusterStatus == nil { s.current = &model.ClusterStatus{} + } else { + s.current = clusterStatus + s.currentVersionID = version } } @@ -159,111 +161,80 @@ func (s *status) LoadWithVersion() (*model.ClusterStatus, metadata.Version) { return s.current, s.currentVersionID } -func (s *status) Swap(newStatus *model.ClusterStatus, version metadata.Version) bool { +func (s *status) Swap(newStatus *model.ClusterStatus, version metadata.Version) (bool, error) { s.lock.Lock() defer s.lock.Unlock() if s.currentVersionID != version { - return false + return false, nil } - err := backoff.RetryNotify(func() error { - versionID, err := s.metadata.Store(newStatus, s.currentVersionID) - if err != nil { - return s.handleStoreError(err) - } - s.current = newStatus - s.currentVersionID = versionID - return nil - }, backoff.NewExponentialBackOff(), func(err error, duration time.Duration) { - s.Warn( - "failed to swap status, retrying later", - slog.Any("error", err), - slog.Duration("retry-after", duration), - ) - }) - if err == nil { - s.notifyChange() + versionID, err := s.metadata.Store(newStatus, s.currentVersionID) + if err != nil { + s.refreshOnVersionConflict(err) + return false, err } - return err == nil + s.current = newStatus + s.currentVersionID = versionID + s.notifyChange() + return true, nil } -func (s *status) Update(newStatus *model.ClusterStatus) { +func (s *status) Update(newStatus *model.ClusterStatus) error { s.lock.Lock() defer s.lock.Unlock() - _ = backoff.RetryNotify(func() error { - versionID, err := s.metadata.Store(newStatus, s.currentVersionID) - if err != nil { - return s.handleStoreError(err) - } - s.current = newStatus - s.currentVersionID = versionID - return nil - }, backoff.NewExponentialBackOff(), func(err error, duration time.Duration) { - s.Warn( - "failed to update status, retrying later", - slog.Any("error", err), - slog.Duration("retry-after", duration), - ) - }) + versionID, err := s.metadata.Store(newStatus, s.currentVersionID) + if err != nil { + s.refreshOnVersionConflict(err) + return err + } + s.current = newStatus + s.currentVersionID = versionID s.notifyChange() + return nil } -func (s *status) UpdateShardMetadata(namespace string, shard int64, shardMetadata model.ShardMetadata) { +func (s *status) UpdateShardMetadata(namespace string, shard int64, shardMetadata model.ShardMetadata) error { s.lock.Lock() defer s.lock.Unlock() clonedStatus := s.current.Clone() ns, exist := clonedStatus.Namespaces[namespace] if !exist { - return + return nil } ns.Shards[shard] = shardMetadata.Clone() - _ = backoff.RetryNotify(func() error { - versionID, err := s.metadata.Store(clonedStatus, s.currentVersionID) - if err != nil { - return s.handleStoreError(err) - } - s.current = clonedStatus - s.currentVersionID = versionID - return nil - }, backoff.NewExponentialBackOff(), func(err error, duration time.Duration) { - s.Warn( - "failed to update shard metadata, retrying later", - slog.Any("error", err), - slog.Duration("retry-after", duration), - ) - }) + versionID, err := s.metadata.Store(clonedStatus, s.currentVersionID) + if err != nil { + s.refreshOnVersionConflict(err) + return err + } + s.current = clonedStatus + s.currentVersionID = versionID s.notifyChange() + return nil } -func (s *status) DeleteShardMetadata(namespace string, shard int64) { +func (s *status) DeleteShardMetadata(namespace string, shard int64) error { s.lock.Lock() defer s.lock.Unlock() clonedStatus := s.current.Clone() ns, exist := clonedStatus.Namespaces[namespace] if !exist { - return + return nil } delete(ns.Shards, shard) if len(ns.Shards) == 0 { delete(clonedStatus.Namespaces, namespace) } - _ = backoff.RetryNotify(func() error { - versionID, err := s.metadata.Store(clonedStatus, s.currentVersionID) - if err != nil { - return s.handleStoreError(err) - } - s.current = clonedStatus - s.currentVersionID = versionID - return nil - }, backoff.NewExponentialBackOff(), func(err error, duration time.Duration) { - s.Warn( - "failed to delete shard metadata, retrying later", - slog.Any("error", err), - slog.Duration("retry-after", duration), - ) - }) + versionID, err := s.metadata.Store(clonedStatus, s.currentVersionID) + if err != nil { + s.refreshOnVersionConflict(err) + return err + } + s.current = clonedStatus + s.currentVersionID = versionID s.notifyChange() + return nil } func (s *status) IsReady(clusterConfig *model.ClusterConfig) bool { diff --git a/oxiad/dataserver/controller/shards_director.go b/oxiad/dataserver/controller/shards_director.go index 39f804ff..8bde8ed6 100644 --- a/oxiad/dataserver/controller/shards_director.go +++ b/oxiad/dataserver/controller/shards_director.go @@ -231,6 +231,7 @@ func (s *shardsDirector) DeleteShard(req *proto.DeleteShardRequest) (*proto.Dele return resp, nil } + // todo: check the database directly to avoid init follower controller again if coordinator is retrying by some internal error fc, err := follow.NewFollowerController(s.storageOptions, req.Namespace, req.Shard, s.walFactory, s.kvFactory, nil) if err != nil { return nil, err diff --git a/tests/coordinator/coordinator_test.go b/tests/coordinator/coordinator_test.go index b6712263..8738f1f5 100644 --- a/tests/coordinator/coordinator_test.go +++ b/tests/coordinator/coordinator_test.go @@ -60,7 +60,7 @@ func TestCoordinatorInitiateLeaderElection(t *testing.T) { Int32HashRange: model.Int32HashRange{Min: 2000, Max: 100000}, } statusResource := coordinatorInstance.StatusResource() - statusResource.UpdateShardMetadata("default", 1, metadata) + assert.NoError(t, statusResource.UpdateShardMetadata("default", 1, metadata)) status := statusResource.Load() assert.EqualValues(t, status.Namespaces["default"].Shards[1], metadata)