Skip to content

Commit c31a9b1

Browse files
fix: close p2p host, cancel subscriptions
1 parent 96052a8 commit c31a9b1

File tree

6 files changed

+38
-13
lines changed

6 files changed

+38
-13
lines changed

rolling-shutter/medley/chainsync/syncer/eonpubkey.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,8 @@ func (s *EonPubKeySyncer) Start(ctx context.Context, runner service.Runner) erro
6060
if err != nil {
6161
return err
6262
}
63-
runner.Defer(subs.Unsubscribe)
6463
runner.Go(func() error {
65-
return s.watchNewEonPubkey(ctx, subs.Err())
64+
return s.watchNewEonPubkey(ctx, subs.Err(), subs.Unsubscribe)
6665
})
6766
return nil
6867
}
@@ -126,7 +125,7 @@ func (s *EonPubKeySyncer) GetEonPubKeyForEon(ctx context.Context, opts *bind.Cal
126125
}, nil
127126
}
128127

129-
func (s *EonPubKeySyncer) watchNewEonPubkey(ctx context.Context, subsErr <-chan error) error {
128+
func (s *EonPubKeySyncer) watchNewEonPubkey(ctx context.Context, subsErr <-chan error, unsubscribe func()) error {
130129
for {
131130
select {
132131
case newEonKey, ok := <-s.keyBroadcastCh:
@@ -153,6 +152,7 @@ func (s *EonPubKeySyncer) watchNewEonPubkey(ctx context.Context, subsErr <-chan
153152
return err
154153
}
155154
case <-ctx.Done():
155+
unsubscribe()
156156
return ctx.Err()
157157
}
158158
}

rolling-shutter/medley/chainsync/syncer/keyperset.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,8 @@ func (s *KeyperSetSyncer) Start(ctx context.Context, runner service.Runner) erro
7070
if err != nil {
7171
return err
7272
}
73-
runner.Defer(subs.Unsubscribe)
7473
runner.Go(func() error {
75-
return s.watchNewKeypersService(ctx, subs.Err())
74+
return s.watchNewKeypersService(ctx, subs.Err(), subs.Unsubscribe)
7675
})
7776
return nil
7877
}
@@ -204,7 +203,7 @@ func (s *KeyperSetSyncer) newEvent(
204203
}, nil
205204
}
206205

207-
func (s *KeyperSetSyncer) watchNewKeypersService(ctx context.Context, subsErr <-chan error) error {
206+
func (s *KeyperSetSyncer) watchNewKeypersService(ctx context.Context, subsErr <-chan error, unsubscribe func()) error {
208207
for {
209208
select {
210209
case newKeypers, ok := <-s.keyperAddedCh:
@@ -240,6 +239,7 @@ func (s *KeyperSetSyncer) watchNewKeypersService(ctx context.Context, subsErr <-
240239
return err
241240
}
242241
case <-ctx.Done():
242+
unsubscribe()
243243
return ctx.Err()
244244
}
245245
}

rolling-shutter/medley/chainsync/syncer/shutterstate.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,17 +53,15 @@ func (s *ShutterStateSyncer) Start(ctx context.Context, runner service.Runner) e
5353
if err != nil {
5454
return err
5555
}
56-
runner.Defer(subs.Unsubscribe)
5756

5857
s.unpausedCh = make(chan *bindings.KeyperSetManagerUnpaused)
5958
subsUnpaused, err := s.Contract.WatchUnpaused(watchOpts, s.unpausedCh)
6059
if err != nil {
6160
return err
6261
}
63-
runner.Defer(subsUnpaused.Unsubscribe)
6462

6563
runner.Go(func() error {
66-
return s.watchPaused(ctx, subs.Err(), subsUnpaused.Err())
64+
return s.watchPaused(ctx, subs.Err(), subsUnpaused.Err(), subs.Unsubscribe, subsUnpaused.Unsubscribe)
6765
})
6866
return nil
6967
}
@@ -87,7 +85,13 @@ func (s *ShutterStateSyncer) handle(ctx context.Context, ev *event.ShutterState)
8785
}
8886
}
8987

90-
func (s *ShutterStateSyncer) watchPaused(ctx context.Context, subsErr <-chan error, subsErrUnpaused <-chan error) error {
88+
func (s *ShutterStateSyncer) watchPaused(
89+
ctx context.Context,
90+
subsErr <-chan error,
91+
subsErrUnpaused <-chan error,
92+
unsubscribe func(),
93+
unsubscribeUnpaused func(),
94+
) error {
9195
isActive, err := s.pollIsActive(ctx)
9296
if err != nil {
9397
// XXX: this will fail everything, do we want that?
@@ -134,6 +138,8 @@ func (s *ShutterStateSyncer) watchPaused(ctx context.Context, subsErr <-chan err
134138
return err
135139
}
136140
case <-ctx.Done():
141+
unsubscribe()
142+
unsubscribeUnpaused()
137143
return ctx.Err()
138144
}
139145
}

rolling-shutter/medley/chainsync/syncer/unsafehead.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,13 @@ func (s *UnsafeHeadSyncer) Start(ctx context.Context, runner service.Runner) err
3131
if err != nil {
3232
return err
3333
}
34-
runner.Defer(subs.Unsubscribe)
3534
runner.Go(func() error {
36-
return s.watchLatestUnsafeHead(ctx, subs.Err())
35+
return s.watchLatestUnsafeHead(ctx, subs.Err(), subs.Unsubscribe)
3736
})
3837
return nil
3938
}
4039

41-
func (s *UnsafeHeadSyncer) watchLatestUnsafeHead(ctx context.Context, subsErr <-chan error) error {
40+
func (s *UnsafeHeadSyncer) watchLatestUnsafeHead(ctx context.Context, subsErr <-chan error, unsubscribe func()) error {
4241
for {
4342
select {
4443
case newHeader, ok := <-s.newLatestHeadCh:
@@ -64,6 +63,7 @@ func (s *UnsafeHeadSyncer) watchLatestUnsafeHead(ctx context.Context, subsErr <-
6463
return err
6564
}
6665
case <-ctx.Done():
66+
unsubscribe()
6767
return ctx.Err()
6868
}
6969
}

rolling-shutter/p2p/p2p.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,19 @@ func (p *P2PNode) Run(
111111
return err
112112
}
113113

114+
runner.Go(func() error {
115+
<-ctx.Done()
116+
log.Debug().Msg("stopping host when context is done")
117+
if err := p.host.Close(); err != nil {
118+
log.Error().Err(err).Msg("error closing host")
119+
}
120+
if err := p.dht.Close(); err != nil {
121+
log.Error().Err(err).Msg("error closing dht")
122+
}
123+
log.Debug().Msg("host closed")
124+
return nil
125+
})
126+
114127
for topicName := range topicValidators {
115128
validator := topicValidators.GetCombinedValidator(topicName)
116129
if err := p.pubSub.RegisterTopicValidator(topicName, validator); err != nil {

rolling-shutter/p2p/topic.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55

66
pubsub "github.com/libp2p/go-libp2p-pubsub"
77
"github.com/libp2p/go-libp2p/core/peer"
8+
"github.com/rs/zerolog/log"
89
)
910

1011
// gossipRoom represents a subscription to a single PubSub topic. Messages
@@ -42,6 +43,11 @@ func (room *gossipRoom) readLoop(ctx context.Context, messages chan *pubsub.Mess
4243
select {
4344
case messages <- msg:
4445
case <-ctx.Done():
46+
log.Debug().Msg("subscription canceled, closing read loop")
47+
room.subscription.Cancel()
48+
if err := room.topic.Close(); err != nil {
49+
return err
50+
}
4551
return ctx.Err()
4652
}
4753
}

0 commit comments

Comments
 (0)