Skip to content

Commit af3b6fa

Browse files
handle context cancelled
Signed-off-by: Gaziza Yestemirova <[email protected]>
1 parent b94764d commit af3b6fa

File tree

2 files changed

+33
-0
lines changed

2 files changed

+33
-0
lines changed

service/sharddistributor/leader/process/processor.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package process
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"maps"
78
"math/rand"
@@ -214,6 +215,9 @@ func (p *namespaceProcessor) runRebalancingLoop(ctx context.Context) {
214215
err = p.rebalanceShards(ctx)
215216
}
216217
if err != nil {
218+
if isCancelledOrDeadlineExceeded(err) {
219+
return
220+
}
217221
p.logger.Error("rebalance failed", tag.Error(err))
218222
}
219223
}
@@ -233,6 +237,9 @@ func (p *namespaceProcessor) runShardStatsCleanupLoop(ctx context.Context) {
233237
p.logger.Info("Periodic shard stats cleanup triggered.")
234238
namespaceState, err := p.shardStore.GetState(ctx, p.namespaceCfg.Name)
235239
if err != nil {
240+
if isCancelledOrDeadlineExceeded(err) {
241+
return
242+
}
236243
p.logger.Error("Failed to get state for shard stats cleanup", tag.Error(err))
237244
continue
238245
}
@@ -242,6 +249,9 @@ func (p *namespaceProcessor) runShardStatsCleanupLoop(ctx context.Context) {
242249
continue
243250
}
244251
if err := p.shardStore.DeleteShardStats(ctx, p.namespaceCfg.Name, staleShardStats, p.election.Guard()); err != nil {
252+
if isCancelledOrDeadlineExceeded(err) {
253+
return
254+
}
245255
p.logger.Error("Failed to delete stale shard stats", tag.Error(err))
246256
}
247257
}
@@ -340,6 +350,9 @@ func (p *namespaceProcessor) rebalanceShardsImpl(ctx context.Context, metricsLoo
340350

341351
namespaceState, err := p.shardStore.GetState(ctx, p.namespaceCfg.Name)
342352
if err != nil {
353+
if isCancelledOrDeadlineExceeded(err) {
354+
return err
355+
}
343356
return fmt.Errorf("get state: %w", err)
344357
}
345358

@@ -386,6 +399,9 @@ func (p *namespaceProcessor) rebalanceShardsImpl(ctx context.Context, metricsLoo
386399
ExecutorsToDelete: staleExecutors,
387400
}, p.election.Guard())
388401
if err != nil {
402+
if isCancelledOrDeadlineExceeded(err) {
403+
return err
404+
}
389405
return fmt.Errorf("assign shards: %w", err)
390406
}
391407

@@ -586,3 +602,8 @@ func makeShards(num int64) []string {
586602
}
587603
return shards
588604
}
605+
606+
func isCancelledOrDeadlineExceeded(err error) bool {
607+
return errors.Is(err, context.Canceled) ||
608+
errors.Is(err, context.DeadlineExceeded)
609+
}

service/sharddistributor/store/etcd/executorstore/etcdstore.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,8 +233,14 @@ func (s *executorStoreImpl) GetState(ctx context.Context, namespace string) (*st
233233
executorPrefix := etcdkeys.BuildExecutorsPrefix(s.prefix, namespace)
234234
resp, err := s.client.Get(ctx, executorPrefix, clientv3.WithPrefix())
235235
if err != nil {
236+
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
237+
return nil, ctx.Err()
238+
}
236239
return nil, fmt.Errorf("get executor data: %w", err)
237240
}
241+
if ctxErr := ctx.Err(); errors.Is(ctxErr, context.Canceled) || errors.Is(ctxErr, context.DeadlineExceeded) {
242+
return nil, ctxErr
243+
}
238244

239245
for _, kv := range resp.Kvs {
240246
key := string(kv.Key)
@@ -276,8 +282,14 @@ func (s *executorStoreImpl) GetState(ctx context.Context, namespace string) (*st
276282
shardsPrefix := etcdkeys.BuildShardsPrefix(s.prefix, namespace)
277283
shardResp, err := s.client.Get(ctx, shardsPrefix, clientv3.WithPrefix())
278284
if err != nil {
285+
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
286+
return nil, ctx.Err()
287+
}
279288
return nil, fmt.Errorf("get shard data: %w", err)
280289
}
290+
if ctxErr := ctx.Err(); errors.Is(ctxErr, context.Canceled) || errors.Is(ctxErr, context.DeadlineExceeded) {
291+
return nil, ctxErr
292+
}
281293
for _, kv := range shardResp.Kvs {
282294
shardID, shardKeyType, err := etcdkeys.ParseShardKey(s.prefix, namespace, string(kv.Key))
283295
if err != nil {

0 commit comments

Comments
 (0)