Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions oxiad/coordinator/balancer/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,22 @@ func (m *mockStatusResource) LoadWithVersion() (*model.ClusterStatus, metadata.V
return m.status, "0"
}

func (m *mockStatusResource) Swap(_ *model.ClusterStatus, _ metadata.Version) bool {
return true
func (m *mockStatusResource) Swap(_ *model.ClusterStatus, _ metadata.Version) (bool, error) {
return true, nil
}

func (m *mockStatusResource) Update(newStatus *model.ClusterStatus) {
func (m *mockStatusResource) Update(newStatus *model.ClusterStatus) error {
m.status = newStatus
return nil
}

func (m *mockStatusResource) UpdateShardMetadata(_ string, _ int64, _ model.ShardMetadata) {}
func (m *mockStatusResource) UpdateShardMetadata(_ string, _ int64, _ model.ShardMetadata) error {
return nil
}

func (m *mockStatusResource) DeleteShardMetadata(_ string, _ int64) {}
func (m *mockStatusResource) DeleteShardMetadata(_ string, _ int64) error {
return nil
}

func (m *mockStatusResource) IsReady(_ *model.ClusterConfig) bool {
return true
Expand Down
9 changes: 7 additions & 2 deletions oxiad/coordinator/controller/shard_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,9 @@ func (s *shardController) deleteShard() error {
)
}

s.statusResource.DeleteShardMetadata(s.namespace, s.shard)
if err := s.statusResource.DeleteShardMetadata(s.namespace, s.shard); err != nil {
return err
}
s.eventListener.ShardDeleted(s.shard)
return s.close()
}
Expand Down Expand Up @@ -508,7 +510,10 @@ func (s *shardController) handlePeriodicTasks() {
}

// Update the shard status
s.statusResource.UpdateShardMetadata(s.namespace, s.shard, mutShardMeta)
if err := s.statusResource.UpdateShardMetadata(s.namespace, s.shard, mutShardMeta); err != nil {
s.log.Warn("Failed to update shard metadata", "error", err)
return
}
s.metadata.Store(mutShardMeta)
}

Expand Down
8 changes: 6 additions & 2 deletions oxiad/coordinator/controller/shard_controller_election.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,9 @@ func (e *ShardElection) start() (model.Server, error) {
metadata.Term++
metadata.Ensemble = e.refreshedEnsemble(metadata.Ensemble)
})
e.statusResource.UpdateShardMetadata(e.namespace, e.shard, mutShardMeta)
if err := e.statusResource.UpdateShardMetadata(e.namespace, e.shard, mutShardMeta); err != nil {
return model.Server{}, err
}

if e.changeEnsembleAction != nil {
e.prepareIfChangeEnsemble(&mutShardMeta)
Expand Down Expand Up @@ -484,7 +486,9 @@ func (e *ShardElection) start() (model.Server, error) {
leader := mutShardMeta.Leader
leaderEntry := candidatesStatus[*leader]

e.statusResource.UpdateShardMetadata(e.namespace, e.shard, mutShardMeta)
if err = e.statusResource.UpdateShardMetadata(e.namespace, e.shard, mutShardMeta); err != nil {
return model.Server{}, err
}
e.meta.Store(mutShardMeta)
if e.eventListener != nil {
e.eventListener.LeaderElected(e.shard, newLeader, maps.Keys(followers))
Expand Down
83 changes: 51 additions & 32 deletions oxiad/coordinator/controller/split_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func (sc *SplitController) currentPhase() *model.SplitPhase {
}

// updatePhase atomically updates the split phase on both parent and children.
func (sc *SplitController) updatePhase(newPhase model.SplitPhase) {
func (sc *SplitController) updatePhase(newPhase model.SplitPhase) error {
status := sc.statusResource.Load()
cloned := status.Clone()

Expand All @@ -235,7 +235,7 @@ func (sc *SplitController) updatePhase(newPhase model.SplitPhase) {
}
}

sc.statusResource.Update(cloned)
return sc.statusResource.Update(cloned)
}

// runBootstrap validates preconditions, fences child ensemble members, elects
Expand Down Expand Up @@ -285,13 +285,14 @@ func (sc *SplitController) runBootstrap() error {
childLeaders[childId] = childMeta.Leader.Internal
}
}
sc.updateParentMeta(func(meta *model.ShardMetadata) {
if err := sc.updateParentMeta(func(meta *model.ShardMetadata) {
meta.Split.ParentTermAtBootstrap = parentTerm
meta.Split.ChildLeadersAtBootstrap = childLeaders
})
}); err != nil {
return err
}

sc.updatePhase(model.SplitPhaseCatchUp)
return nil
return sc.updatePhase(model.SplitPhaseCatchUp)
}

// fenceAndElectChild fences a child shard's ensemble and elects a leader.
Expand All @@ -318,11 +319,13 @@ func (sc *SplitController) fenceAndElectChild(childId int64) error {

childLeader := sc.pickLeader(headEntries)

sc.updateChildMeta(childId, func(meta *model.ShardMetadata) {
if err := sc.updateChildMeta(childId, func(meta *model.ShardMetadata) {
meta.Term = childTerm
meta.Leader = &childLeader
meta.Status = model.ShardStatusSteadyState
})
}); err != nil {
return err
}

// Elect the child leader so it replicates to its followers immediately.
// Without this, only the single child leader node has the data.
Expand Down Expand Up @@ -421,8 +424,7 @@ func (sc *SplitController) runCatchUp() error {
}
if caughtUp {
sc.log.Info("All children caught up")
sc.updatePhase(model.SplitPhaseCutover)
return nil
return sc.updatePhase(model.SplitPhaseCutover)
}
}
}
Expand All @@ -443,14 +445,18 @@ func (sc *SplitController) checkObserverCursorsStale() (bool, error) {
slog.Int64("bootstrap-term", parentMeta.Split.ParentTermAtBootstrap),
slog.Int64("current-term", parentMeta.Term),
)
sc.updatePhase(model.SplitPhaseBootstrap)
if err := sc.updatePhase(model.SplitPhaseBootstrap); err != nil {
return false, err
}
return true, nil
}

// Child leader election: the observer cursor targets the old (dead) leader.
// Remove the stale cursor and fall back to Bootstrap to re-add.
if sc.removeStaleChildObservers(parentMeta) {
sc.updatePhase(model.SplitPhaseBootstrap)
if err := sc.updatePhase(model.SplitPhaseBootstrap); err != nil {
return false, err
}
return true, nil
}

Expand Down Expand Up @@ -562,11 +568,13 @@ func (sc *SplitController) runCutover() error {
)

// Update parent term in metadata
sc.updateParentMeta(func(meta *model.ShardMetadata) {
if err := sc.updateParentMeta(func(meta *model.ShardMetadata) {
meta.Term = newParentTerm
meta.Leader = nil
meta.Status = model.ShardStatusElection
})
}); err != nil {
return err
}

// Step 2: Wait for children to commit parentFinalOffset.
// Children were already elected leader in Bootstrap, so commitOffset
Expand All @@ -587,14 +595,18 @@ func (sc *SplitController) runCutover() error {
// Step 4: Clear split metadata from children and mark parent for deletion.
// Children are now independent shards.
for _, childId := range []int64{sc.leftChildId, sc.rightChildId} {
sc.updateChildMeta(childId, func(meta *model.ShardMetadata) {
if err := sc.updateChildMeta(childId, func(meta *model.ShardMetadata) {
meta.Split = nil
})
}); err != nil {
return err
}
}

sc.updateParentMeta(func(meta *model.ShardMetadata) {
if err := sc.updateParentMeta(func(meta *model.ShardMetadata) {
meta.Status = model.ShardStatusDeleting
})
}); err != nil {
return err
}

// Step 5: Notify the coordinator. This triggers the parent shard
// controller's DeleteShard (which retries indefinitely with backoff)
Expand All @@ -603,11 +615,9 @@ func (sc *SplitController) runCutover() error {

// Clear split metadata from parent — the split controller's job is done.
// The parent shard controller handles the actual deletion.
sc.updateParentMeta(func(meta *model.ShardMetadata) {
return sc.updateParentMeta(func(meta *model.ShardMetadata) {
meta.Split = nil
})

return nil
}

// abort cleans up a failed/timed-out split that hasn't reached Cutover.
Expand Down Expand Up @@ -650,13 +660,19 @@ func (sc *SplitController) abort() {

// Delete child shards from status.
for _, childId := range []int64{sc.leftChildId, sc.rightChildId} {
sc.statusResource.DeleteShardMetadata(sc.namespace, childId)
if err := sc.statusResource.DeleteShardMetadata(sc.namespace, childId); err != nil {
sc.log.Warn("Failed to delete child shard metadata during abort",
slog.Int64("child-shard", childId),
slog.Any("error", err))
}
}

// Clear parent split metadata.
sc.updateParentMeta(func(meta *model.ShardMetadata) {
if err := sc.updateParentMeta(func(meta *model.ShardMetadata) {
meta.Split = nil
})
}); err != nil {
sc.log.Warn("Failed to clear parent split metadata during abort", slog.Any("error", err))
}

sc.log.Info("Split aborted, parent restored")

Expand Down Expand Up @@ -684,23 +700,24 @@ func (sc *SplitController) loadShardMeta(shardId int64) *model.ShardMetadata {
return &cloned
}

func (sc *SplitController) updateParentMeta(fn func(meta *model.ShardMetadata)) {
sc.updateShardMeta(sc.parentShardId, fn)
func (sc *SplitController) updateParentMeta(fn func(meta *model.ShardMetadata)) error {
return sc.updateShardMeta(sc.parentShardId, fn)
}

func (sc *SplitController) updateChildMeta(childId int64, fn func(meta *model.ShardMetadata)) {
sc.updateShardMeta(childId, fn)
func (sc *SplitController) updateChildMeta(childId int64, fn func(meta *model.ShardMetadata)) error {
return sc.updateShardMeta(childId, fn)
}

func (sc *SplitController) updateShardMeta(shardId int64, fn func(meta *model.ShardMetadata)) {
func (sc *SplitController) updateShardMeta(shardId int64, fn func(meta *model.ShardMetadata)) error {
status := sc.statusResource.Load()
cloned := status.Clone()
ns := cloned.Namespaces[sc.namespace]
if meta, exists := ns.Shards[shardId]; exists {
fn(&meta)
ns.Shards[shardId] = meta
sc.statusResource.Update(cloned)
return sc.statusResource.Update(cloned)
}
return nil
}

// fenceEnsemble sends NewTerm to all ensemble members and returns the
Expand Down Expand Up @@ -860,11 +877,13 @@ func (sc *SplitController) reelectChild(childId int64) error {
}

// Update child metadata
sc.updateChildMeta(childId, func(meta *model.ShardMetadata) {
if err := sc.updateChildMeta(childId, func(meta *model.ShardMetadata) {
meta.Term = newTerm
meta.Leader = &newLeader
meta.Status = model.ShardStatusSteadyState
})
}); err != nil {
return err
}

sc.log.Info("Child re-elected in clean term",
slog.Int64("child-shard", childId),
Expand Down
14 changes: 7 additions & 7 deletions oxiad/coordinator/controller/split_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func setupSplitTest(t *testing.T, phase model.SplitPhase) (
ShardIdGenerator: 3,
}

statusRes.Update(clusterStatus)
assert.NoError(t, statusRes.Update(clusterStatus))
listener := newMockSplitEventListener()

return rpcMock, statusRes, listener
Expand Down Expand Up @@ -283,7 +283,7 @@ func TestSplitController_ResumeFromCatchUp(t *testing.T) {
rightMeta.Leader = &rs1
rightMeta.Term = 1
ns.Shards[2] = rightMeta
statusRes.Update(cloned)
assert.NoError(t, statusRes.Update(cloned))

queueCatchUpResponses(rpcMock)
queueCutoverResponses(rpcMock)
Expand Down Expand Up @@ -406,7 +406,7 @@ func TestSplitController_TimeoutDuringCatchUp(t *testing.T) {
parentMeta.Split.ParentTermAtBootstrap = 5
ns.Shards[0] = parentMeta

statusRes.Update(cloned)
assert.NoError(t, statusRes.Update(cloned))

// Queue RemoveObserver responses (abort will call these)
rpcMock.GetNode(ps1).RemoveObserverResponse(nil)
Expand Down Expand Up @@ -471,7 +471,7 @@ func TestSplitController_ParentTermChangeDuringCatchUp(t *testing.T) {
parentMeta.Split.ParentTermAtBootstrap = 5 // Was 5 during Bootstrap
ns.Shards[0] = parentMeta

statusRes.Update(cloned)
assert.NoError(t, statusRes.Update(cloned))

// The controller should detect term change and reset to Bootstrap.
// Queue Bootstrap responses -- note: parent leader is now ps2 (after election)
Expand Down Expand Up @@ -676,7 +676,7 @@ func TestSplitController_ChildLeaderDiesTimesOutAndAborts(t *testing.T) {
parentMeta.Split.ParentTermAtBootstrap = 5
ns.Shards[0] = parentMeta

statusRes.Update(cloned)
assert.NoError(t, statusRes.Update(cloned))

// Queue RemoveObserver responses (abort will call these)
rpcMock.GetNode(ps1).RemoveObserverResponse(nil)
Expand Down Expand Up @@ -742,7 +742,7 @@ func TestSplitController_ChildFollowersDeadCommitStalls(t *testing.T) {
parentMeta.Split.ParentTermAtBootstrap = 5
ns.Shards[0] = parentMeta

statusRes.Update(cloned)
assert.NoError(t, statusRes.Update(cloned))

// Queue RemoveObserver responses (abort will call these)
rpcMock.GetNode(ps1).RemoveObserverResponse(nil)
Expand Down Expand Up @@ -819,7 +819,7 @@ func TestSplitController_ChildLeaderChangeDuringCatchUp(t *testing.T) {
}
ns.Shards[0] = parentMeta

statusRes.Update(cloned)
assert.NoError(t, statusRes.Update(cloned))

// CatchUp detects left child leader changed (ls1 -> ls2).
// It RemoveObserver(old leader) on parent, then falls back to Bootstrap.
Expand Down
Loading
Loading