Skip to content

Commit 57d6de8

Browse files
chore: update even unsubscribe to get called in same go routine
1 parent c140b01 commit 57d6de8

File tree

4 files changed

+25
-19
lines changed

4 files changed

+25
-19
lines changed

rolling-shutter/medley/chainsync/syncer/eonpubkey.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,11 @@ func (s *EonPubKeySyncer) Start(ctx context.Context, runner service.Runner) erro
6161
return err
6262
}
6363
runner.Go(func() error {
64-
return s.watchNewEonPubkey(ctx, subs.Err(), subs.Unsubscribe)
64+
if err := s.watchNewEonPubkey(ctx, subs.Err()); err != nil {
65+
s.Log.Error("error watching new eon pubkey", err.Error())
66+
}
67+
subs.Unsubscribe()
68+
return err
6569
})
6670
return nil
6771
}
@@ -125,7 +129,7 @@ func (s *EonPubKeySyncer) GetEonPubKeyForEon(ctx context.Context, opts *bind.Cal
125129
}, nil
126130
}
127131

128-
func (s *EonPubKeySyncer) watchNewEonPubkey(ctx context.Context, subsErr <-chan error, unsubscribe func()) error {
132+
func (s *EonPubKeySyncer) watchNewEonPubkey(ctx context.Context, subsErr <-chan error) error {
129133
for {
130134
select {
131135
case newEonKey, ok := <-s.keyBroadcastCh:
@@ -152,7 +156,6 @@ func (s *EonPubKeySyncer) watchNewEonPubkey(ctx context.Context, subsErr <-chan
152156
return err
153157
}
154158
case <-ctx.Done():
155-
unsubscribe()
156159
return ctx.Err()
157160
}
158161
}

rolling-shutter/medley/chainsync/syncer/keyperset.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,11 @@ func (s *KeyperSetSyncer) Start(ctx context.Context, runner service.Runner) erro
7171
return err
7272
}
7373
runner.Go(func() error {
74-
return s.watchNewKeypersService(ctx, subs.Err(), subs.Unsubscribe)
74+
if err := s.watchNewKeypersService(ctx, subs.Err()); err != nil {
75+
s.Log.Error("error watching new keypers", err.Error())
76+
}
77+
subs.Unsubscribe()
78+
return err
7579
})
7680
return nil
7781
}
@@ -203,7 +207,7 @@ func (s *KeyperSetSyncer) newEvent(
203207
}, nil
204208
}
205209

206-
func (s *KeyperSetSyncer) watchNewKeypersService(ctx context.Context, subsErr <-chan error, unsubscribe func()) error {
210+
func (s *KeyperSetSyncer) watchNewKeypersService(ctx context.Context, subsErr <-chan error) error {
207211
for {
208212
select {
209213
case newKeypers, ok := <-s.keyperAddedCh:
@@ -239,7 +243,6 @@ func (s *KeyperSetSyncer) watchNewKeypersService(ctx context.Context, subsErr <-
239243
return err
240244
}
241245
case <-ctx.Done():
242-
unsubscribe()
243246
return ctx.Err()
244247
}
245248
}

rolling-shutter/medley/chainsync/syncer/shutterstate.go

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,12 @@ func (s *ShutterStateSyncer) Start(ctx context.Context, runner service.Runner) e
6161
}
6262

6363
runner.Go(func() error {
64-
return s.watchPaused(ctx, subs.Err(), subsUnpaused.Err(), subs.Unsubscribe, subsUnpaused.Unsubscribe)
64+
if err := s.watchPaused(ctx, subs.Err(), subsUnpaused.Err()); err != nil {
65+
s.Log.Error("error watching paused", err.Error())
66+
}
67+
subs.Unsubscribe()
68+
subsUnpaused.Unsubscribe()
69+
return err
6570
})
6671
return nil
6772
}
@@ -85,13 +90,7 @@ func (s *ShutterStateSyncer) handle(ctx context.Context, ev *event.ShutterState)
8590
}
8691
}
8792

88-
func (s *ShutterStateSyncer) watchPaused(
89-
ctx context.Context,
90-
subsErr <-chan error,
91-
subsErrUnpaused <-chan error,
92-
unsubscribe func(),
93-
unsubscribeUnpaused func(),
94-
) error {
93+
func (s *ShutterStateSyncer) watchPaused(ctx context.Context, subsErr <-chan error, subsErrUnpaused <-chan error) error {
9594
isActive, err := s.pollIsActive(ctx)
9695
if err != nil {
9796
// XXX: this will fail everything, do we want that?
@@ -138,8 +137,6 @@ func (s *ShutterStateSyncer) watchPaused(
138137
return err
139138
}
140139
case <-ctx.Done():
141-
unsubscribe()
142-
unsubscribeUnpaused()
143140
return ctx.Err()
144141
}
145142
}

rolling-shutter/medley/chainsync/syncer/unsafehead.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,16 @@ func (s *UnsafeHeadSyncer) Start(ctx context.Context, runner service.Runner) err
3232
return err
3333
}
3434
runner.Go(func() error {
35-
return s.watchLatestUnsafeHead(ctx, subs.Err(), subs.Unsubscribe)
35+
if err := s.watchLatestUnsafeHead(ctx, subs.Err()); err != nil {
36+
s.Log.Error("error watching latest unsafe head", err.Error())
37+
}
38+
subs.Unsubscribe()
39+
return err
3640
})
3741
return nil
3842
}
3943

40-
func (s *UnsafeHeadSyncer) watchLatestUnsafeHead(ctx context.Context, subsErr <-chan error, unsubscribe func()) error {
44+
func (s *UnsafeHeadSyncer) watchLatestUnsafeHead(ctx context.Context, subsErr <-chan error) error {
4145
for {
4246
select {
4347
case newHeader, ok := <-s.newLatestHeadCh:
@@ -63,7 +67,6 @@ func (s *UnsafeHeadSyncer) watchLatestUnsafeHead(ctx context.Context, subsErr <-
6367
return err
6468
}
6569
case <-ctx.Done():
66-
unsubscribe()
6770
return ctx.Err()
6871
}
6972
}

0 commit comments

Comments
 (0)