@@ -3,6 +3,7 @@ package process
33import (
44 "context"
55 "fmt"
6+ "maps"
67 "math/rand"
78 "slices"
89 "sort"
@@ -143,23 +144,23 @@ func (p *namespaceProcessor) Terminate(ctx context.Context) error {
143144 return nil
144145}
145146
146- // runProcess launches and manages the independent processing loops.
147+ // runProcess launches and manages the processing loops.
147148func (p * namespaceProcessor ) runProcess (ctx context.Context ) {
148149 defer p .wg .Done ()
149150
150151 var loopWg sync.WaitGroup
151152 loopWg .Add (2 ) // We have two loops to manage.
152153
153- // Launch the rebalancing process in its own goroutine.
154+ // Launch the assignment and executor cleanup process in its own goroutine.
154155 go func () {
155156 defer loopWg .Done ()
156157 p .runRebalancingLoop (ctx )
157158 }()
158159
159- // Launch the heartbeat cleanup process in its own goroutine.
160+ // Launch the shard stats cleanup process in its own goroutine.
160161 go func () {
161162 defer loopWg .Done ()
162- p .runCleanupLoop (ctx )
163+ p .runShardStatsCleanupLoop (ctx )
163164 }()
164165
165166 // Wait for both loops to exit.
@@ -218,63 +219,52 @@ func (p *namespaceProcessor) runRebalancingLoop(ctx context.Context) {
218219 }
219220}
220221
221- // runCleanupLoop periodically removes stale executors .
222- func (p * namespaceProcessor ) runCleanupLoop (ctx context.Context ) {
222+ // runShardStatsCleanupLoop periodically removes stale shard statistics .
223+ func (p * namespaceProcessor ) runShardStatsCleanupLoop (ctx context.Context ) {
223224 ticker := p .timeSource .NewTicker (p .cfg .HeartbeatTTL )
224225 defer ticker .Stop ()
225226
226227 for {
227228 select {
228229 case <- ctx .Done ():
229- p .logger .Info ("Cleanup loop cancelled." )
230+ p .logger .Info ("Shard stats cleanup loop cancelled." )
230231 return
231232 case <- ticker .Chan ():
232- p .logger .Info ("Periodic heartbeat cleanup triggered." )
233+ p .logger .Info ("Periodic shard stats cleanup triggered." )
233234 namespaceState , err := p .shardStore .GetState (ctx , p .namespaceCfg .Name )
234235 if err != nil {
235- p .logger .Error ("Failed to get state for cleanup" , tag .Error (err ))
236+ p .logger .Error ("Failed to get state for shard stats cleanup" , tag .Error (err ))
236237 continue
237238 }
238- p .cleanupStaleExecutors (ctx , namespaceState )
239- p .cleanupStaleShardStats (ctx , namespaceState )
239+ staleShardStats := p .identifyStaleShardStats (namespaceState )
240+ if len (staleShardStats ) > 0 {
241+ // No stale shard stats to delete
242+ continue
243+ }
244+ if err := p .shardStore .DeleteShardStats (ctx , p .namespaceCfg .Name , staleShardStats , p .election .Guard ()); err != nil {
245+ p .logger .Error ("Failed to delete stale shard stats" , tag .Error (err ))
246+ }
240247 }
241248 }
242249}
243250
244- // cleanupStaleExecutors removes executors who have not reported a heartbeat recently.
245- func (p * namespaceProcessor ) cleanupStaleExecutors (ctx context.Context , namespaceState * store.NamespaceState ) {
246- if namespaceState == nil {
247- p .logger .Error ("Namespace state missing for heartbeat cleanup" )
248- return
249- }
250-
251- var expiredExecutors []string
251+ // identifyStaleExecutors returns a list of executors who have not reported a heartbeat recently.
252+ func (p * namespaceProcessor ) identifyStaleExecutors (namespaceState * store.NamespaceState ) map [string ]int64 {
253+ expiredExecutors := make (map [string ]int64 )
252254 now := p .timeSource .Now ().Unix ()
253255 heartbeatTTL := int64 (p .cfg .HeartbeatTTL .Seconds ())
254256
255257 for executorID , state := range namespaceState .Executors {
256258 if (now - state .LastHeartbeat ) > heartbeatTTL {
257- expiredExecutors = append ( expiredExecutors , executorID )
259+ expiredExecutors [ executorID ] = namespaceState . ShardAssignments [ executorID ]. ModRevision
258260 }
259261 }
260262
261- if len (expiredExecutors ) == 0 {
262- return // Nothing to do.
263- }
264-
265- p .logger .Info ("Removing stale executors" , tag .ShardExecutors (expiredExecutors ))
266- // Use the leader guard for the delete operation.
267- if err := p .shardStore .DeleteExecutors (ctx , p .namespaceCfg .Name , expiredExecutors , p .election .Guard ()); err != nil {
268- p .logger .Error ("Failed to delete stale executors" , tag .Error (err ))
269- }
263+ return expiredExecutors
270264}
271265
272- func (p * namespaceProcessor ) cleanupStaleShardStats (ctx context.Context , namespaceState * store.NamespaceState ) {
273- if namespaceState == nil {
274- p .logger .Error ("Namespace state missing for shard stats cleanup" )
275- return
276- }
277-
266+ // identifyStaleShardStats returns a list of shard statistics that are no longer relevant.
267+ func (p * namespaceProcessor ) identifyStaleShardStats (namespaceState * store.NamespaceState ) []string {
278268 activeShards := make (map [string ]struct {})
279269 now := p .timeSource .Now ().Unix ()
280270 shardStatsTTL := int64 (p .cfg .HeartbeatTTL .Seconds ())
@@ -325,15 +315,7 @@ func (p *namespaceProcessor) cleanupStaleShardStats(ctx context.Context, namespa
325315 staleShardStats = append (staleShardStats , shardID )
326316 }
327317
328- if len (staleShardStats ) == 0 {
329- return
330- }
331-
332- p .logger .Info ("Removing stale shard stats" )
333- // Use the leader guard for the delete operation.
334- if err := p .shardStore .DeleteShardStats (ctx , p .namespaceCfg .Name , staleShardStats , p .election .Guard ()); err != nil {
335- p .logger .Error ("Failed to delete stale shard stats" , tag .Error (err ))
336- }
318+ return staleShardStats
337319}
338320
339321// rebalanceShards is the core logic for distributing shards among active executors.
@@ -369,22 +351,29 @@ func (p *namespaceProcessor) rebalanceShardsImpl(ctx context.Context, metricsLoo
369351 }
370352 p .lastAppliedRevision = namespaceState .GlobalRevision
371353
372- activeExecutors := p .getActiveExecutors (namespaceState )
354+ // Identify stale executors that need to be removed
355+ staleExecutors := p .identifyStaleExecutors (namespaceState )
356+ if len (staleExecutors ) > 0 {
357+ p .logger .Info ("Identified stale executors for removal" , tag .ShardExecutors (slices .Collect (maps .Keys (staleExecutors ))))
358+ }
359+
360+ activeExecutors := p .getActiveExecutors (namespaceState , staleExecutors )
373361 if len (activeExecutors ) == 0 {
374362 p .logger .Warn ("No active executors found. Cannot assign shards." )
375363 return nil
376364 }
365+ p .logger .Info ("Active executors" , tag .ShardExecutors (activeExecutors ))
377366
378367 deletedShards := p .findDeletedShards (namespaceState )
379- shardsToReassign , currentAssignments := p .findShardsToReassign (activeExecutors , namespaceState , deletedShards )
368+ shardsToReassign , currentAssignments := p .findShardsToReassign (activeExecutors , namespaceState , deletedShards , staleExecutors )
380369
381370 metricsLoopScope .UpdateGauge (metrics .ShardDistributorAssignLoopNumRebalancedShards , float64 (len (shardsToReassign )))
382371
383- // If there are deleted shards, we have removed them from the shard assignments, so the distribution has changed.
384- distributionChanged := len (deletedShards ) > 0
385- distributionChanged = distributionChanged || assignShardsToEmptyExecutors (currentAssignments )
386- distributionChanged = distributionChanged || p .updateAssignments (shardsToReassign , activeExecutors , currentAssignments )
372+ // If there are deleted shards or stale executors, the distribution has changed.
373+ assignedToEmptyExecutors := assignShardsToEmptyExecutors (currentAssignments )
374+ updatedAssignments := p .updateAssignments (shardsToReassign , activeExecutors , currentAssignments )
387375
376+ distributionChanged := len (deletedShards ) > 0 || len (staleExecutors ) > 0 || assignedToEmptyExecutors || updatedAssignments
388377 if ! distributionChanged {
389378 p .logger .Debug ("No changes to distribution detected. Skipping rebalance." )
390379 return nil
@@ -393,9 +382,10 @@ func (p *namespaceProcessor) rebalanceShardsImpl(ctx context.Context, metricsLoo
393382 p .addAssignmentsToNamespaceState (namespaceState , currentAssignments )
394383
395384 p .logger .Info ("Applying new shard distribution." )
396- // Use the leader guard for the assign operation.
385+ // Use the leader guard for the assign and delete operation.
397386 err = p .shardStore .AssignShards (ctx , p .namespaceCfg .Name , store.AssignShardsRequest {
398- NewState : namespaceState ,
387+ NewState : namespaceState ,
388+ ExecutorsToDelete : staleExecutors ,
399389 }, p .election .Guard ())
400390 if err != nil {
401391 return fmt .Errorf ("assign shards: %w" , err )
@@ -428,7 +418,12 @@ func (p *namespaceProcessor) findDeletedShards(namespaceState *store.NamespaceSt
428418 return deletedShards
429419}
430420
431- func (p * namespaceProcessor ) findShardsToReassign (activeExecutors []string , namespaceState * store.NamespaceState , deletedShards map [string ]store.ShardState ) ([]string , map [string ][]string ) {
421+ func (p * namespaceProcessor ) findShardsToReassign (
422+ activeExecutors []string ,
423+ namespaceState * store.NamespaceState ,
424+ deletedShards map [string ]store.ShardState ,
425+ staleExecutors map [string ]int64 ,
426+ ) ([]string , map [string ][]string ) {
432427 allShards := make (map [string ]struct {})
433428 for _ , shardID := range getShards (p .namespaceCfg , namespaceState , deletedShards ) {
434429 allShards [shardID ] = struct {}{}
@@ -443,12 +438,16 @@ func (p *namespaceProcessor) findShardsToReassign(activeExecutors []string, name
443438
444439 for executorID , state := range namespaceState .ShardAssignments {
445440 isActive := namespaceState .Executors [executorID ].Status == types .ExecutorStatusACTIVE
441+ _ , isStale := staleExecutors [executorID ]
442+
446443 for shardID := range state .AssignedShards {
447444 if _ , ok := allShards [shardID ]; ok {
448445 delete (allShards , shardID )
449- if isActive {
446+ // If executor is active AND not stale, keep the assignment
447+ if isActive && ! isStale {
450448 currentAssignments [executorID ] = append (currentAssignments [executorID ], shardID )
451449 } else {
450+ // Otherwise, reassign the shard (executor is either inactive or stale)
452451 shardsToReassign = append (shardsToReassign , shardID )
453452 }
454453 }
@@ -497,11 +496,14 @@ func (p *namespaceProcessor) addAssignmentsToNamespaceState(namespaceState *stor
497496 namespaceState .ShardAssignments = newState
498497}
499498
500- func (* namespaceProcessor ) getActiveExecutors (namespaceState * store.NamespaceState ) []string {
499+ func (* namespaceProcessor ) getActiveExecutors (namespaceState * store.NamespaceState , staleExecutors map [ string ] int64 ) []string {
501500 var activeExecutors []string
502501 for id , state := range namespaceState .Executors {
502+ // Executor must be ACTIVE and not stale
503503 if state .Status == types .ExecutorStatusACTIVE {
504- activeExecutors = append (activeExecutors , id )
504+ if _ , ok := staleExecutors [id ]; ! ok {
505+ activeExecutors = append (activeExecutors , id )
506+ }
505507 }
506508 }
507509
0 commit comments