Skip to content

Commit 5e08d65

Browse files
Merge pull request #612 from shutter-network/fix/graceful-close
close p2p host, cancel subscriptions for closing service gracefully when context is cancelled
2 parents 96052a8 + 8cac788 commit 5e08d65

File tree

7 files changed

+46
-11
lines changed

7 files changed

+46
-11
lines changed

rolling-shutter/keyper/eonpkhandler.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ func (pkh *eonPubKeyHandler) loop(ctx context.Context) error {
6666
}
6767
select {
6868
case <-ctx.Done():
69+
t.Stop()
6970
return ctx.Err()
7071
case <-t.C:
7172
}

rolling-shutter/medley/chainsync/syncer/eonpubkey.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,13 @@ func (s *EonPubKeySyncer) Start(ctx context.Context, runner service.Runner) erro
6060
if err != nil {
6161
return err
6262
}
63-
runner.Defer(subs.Unsubscribe)
6463
runner.Go(func() error {
65-
return s.watchNewEonPubkey(ctx, subs.Err())
64+
err := s.watchNewEonPubkey(ctx, subs.Err())
65+
if err != nil {
66+
s.Log.Error("error watching new eon pubkey", err.Error())
67+
}
68+
subs.Unsubscribe()
69+
return err
6670
})
6771
return nil
6872
}

rolling-shutter/medley/chainsync/syncer/keyperset.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,13 @@ func (s *KeyperSetSyncer) Start(ctx context.Context, runner service.Runner) erro
7070
if err != nil {
7171
return err
7272
}
73-
runner.Defer(subs.Unsubscribe)
7473
runner.Go(func() error {
75-
return s.watchNewKeypersService(ctx, subs.Err())
74+
err := s.watchNewKeypersService(ctx, subs.Err())
75+
if err != nil {
76+
s.Log.Error("error watching new keypers", err.Error())
77+
}
78+
subs.Unsubscribe()
79+
return err
7680
})
7781
return nil
7882
}

rolling-shutter/medley/chainsync/syncer/shutterstate.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,17 +53,21 @@ func (s *ShutterStateSyncer) Start(ctx context.Context, runner service.Runner) e
5353
if err != nil {
5454
return err
5555
}
56-
runner.Defer(subs.Unsubscribe)
5756

5857
s.unpausedCh = make(chan *bindings.KeyperSetManagerUnpaused)
5958
subsUnpaused, err := s.Contract.WatchUnpaused(watchOpts, s.unpausedCh)
6059
if err != nil {
6160
return err
6261
}
63-
runner.Defer(subsUnpaused.Unsubscribe)
6462

6563
runner.Go(func() error {
66-
return s.watchPaused(ctx, subs.Err(), subsUnpaused.Err())
64+
err := s.watchPaused(ctx, subs.Err(), subsUnpaused.Err())
65+
if err != nil {
66+
s.Log.Error("error watching paused", err.Error())
67+
}
68+
subs.Unsubscribe()
69+
subsUnpaused.Unsubscribe()
70+
return err
6771
})
6872
return nil
6973
}

rolling-shutter/medley/chainsync/syncer/unsafehead.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,13 @@ func (s *UnsafeHeadSyncer) Start(ctx context.Context, runner service.Runner) err
3131
if err != nil {
3232
return err
3333
}
34-
runner.Defer(subs.Unsubscribe)
3534
runner.Go(func() error {
36-
return s.watchLatestUnsafeHead(ctx, subs.Err())
35+
err := s.watchLatestUnsafeHead(ctx, subs.Err())
36+
if err != nil {
37+
s.Log.Error("error watching latest unsafe head", err.Error())
38+
}
39+
subs.Unsubscribe()
40+
return err
3741
})
3842
return nil
3943
}

rolling-shutter/p2p/p2p.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,19 @@ func (p *P2PNode) Run(
111111
return err
112112
}
113113

114+
runner.Go(func() error {
115+
<-ctx.Done()
116+
log.Debug().Msg("stopping host when context is done")
117+
if err := p.host.Close(); err != nil {
118+
log.Error().Err(err).Msg("error closing host")
119+
}
120+
if err := p.dht.Close(); err != nil {
121+
log.Error().Err(err).Msg("error closing dht")
122+
}
123+
log.Debug().Msg("host closed")
124+
return nil
125+
})
126+
114127
for topicName := range topicValidators {
115128
validator := topicValidators.GetCombinedValidator(topicName)
116129
if err := p.pubSub.RegisterTopicValidator(topicName, validator); err != nil {
@@ -158,8 +171,7 @@ func (p *P2PNode) Run(
158171
runner.Go(func() error {
159172
log.Info().Str("namespace", p.config.DiscoveryNamespace).Msg("starting advertizing discovery node")
160173
util.Advertise(ctx, p.discovery, p.config.DiscoveryNamespace)
161-
<-ctx.Done()
162-
return ctx.Err()
174+
return nil
163175
})
164176
runner.Go(func() error {
165177
return findPeers(ctx, p.host, p.discovery, p.config.DiscoveryNamespace)

rolling-shutter/p2p/topic.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55

66
pubsub "github.com/libp2p/go-libp2p-pubsub"
77
"github.com/libp2p/go-libp2p/core/peer"
8+
"github.com/rs/zerolog/log"
89
)
910

1011
// gossipRoom represents a subscription to a single PubSub topic. Messages
@@ -42,6 +43,11 @@ func (room *gossipRoom) readLoop(ctx context.Context, messages chan *pubsub.Mess
4243
select {
4344
case messages <- msg:
4445
case <-ctx.Done():
46+
log.Debug().Msg("subscription canceled, closing read loop")
47+
room.subscription.Cancel()
48+
if err := room.topic.Close(); err != nil {
49+
return err
50+
}
4551
return ctx.Err()
4652
}
4753
}

0 commit comments

Comments
 (0)