@@ -2,6 +2,7 @@ package process
22
33import (
44 "context"
5+ "errors"
56 "fmt"
67 "maps"
78 "math/rand"
@@ -214,6 +215,9 @@ func (p *namespaceProcessor) runRebalancingLoop(ctx context.Context) {
214215 err = p .rebalanceShards (ctx )
215216 }
216217 if err != nil {
218+ if isCancelledOrDeadlineExceeded (err ) {
219+ return
220+ }
217221 p .logger .Error ("rebalance failed" , tag .Error (err ))
218222 }
219223 }
@@ -233,6 +237,9 @@ func (p *namespaceProcessor) runShardStatsCleanupLoop(ctx context.Context) {
233237 p .logger .Info ("Periodic shard stats cleanup triggered." )
234238 namespaceState , err := p .shardStore .GetState (ctx , p .namespaceCfg .Name )
235239 if err != nil {
240+ if isCancelledOrDeadlineExceeded (err ) {
241+ return
242+ }
236243 p .logger .Error ("Failed to get state for shard stats cleanup" , tag .Error (err ))
237244 continue
238245 }
@@ -242,6 +249,9 @@ func (p *namespaceProcessor) runShardStatsCleanupLoop(ctx context.Context) {
242249 continue
243250 }
244251 if err := p .shardStore .DeleteShardStats (ctx , p .namespaceCfg .Name , staleShardStats , p .election .Guard ()); err != nil {
252+ if isCancelledOrDeadlineExceeded (err ) {
253+ return
254+ }
245255 p .logger .Error ("Failed to delete stale shard stats" , tag .Error (err ))
246256 }
247257 }
@@ -340,6 +350,9 @@ func (p *namespaceProcessor) rebalanceShardsImpl(ctx context.Context, metricsLoo
340350
341351 namespaceState , err := p .shardStore .GetState (ctx , p .namespaceCfg .Name )
342352 if err != nil {
353+ if isCancelledOrDeadlineExceeded (err ) {
354+ return err
355+ }
343356 return fmt .Errorf ("get state: %w" , err )
344357 }
345358
@@ -386,6 +399,9 @@ func (p *namespaceProcessor) rebalanceShardsImpl(ctx context.Context, metricsLoo
386399 ExecutorsToDelete : staleExecutors ,
387400 }, p .election .Guard ())
388401 if err != nil {
402+ if isCancelledOrDeadlineExceeded (err ) {
403+ return err
404+ }
389405 return fmt .Errorf ("assign shards: %w" , err )
390406 }
391407
@@ -586,3 +602,8 @@ func makeShards(num int64) []string {
586602 }
587603 return shards
588604}
605+
606+ func isCancelledOrDeadlineExceeded (err error ) bool {
607+ return errors .Is (err , context .Canceled ) ||
608+ errors .Is (err , context .DeadlineExceeded )
609+ }
0 commit comments