@@ -4,15 +4,18 @@ use std::{
4
4
Arc ,
5
5
atomic:: { AtomicU64 , Ordering } ,
6
6
} ,
7
+ time:: Duration ,
7
8
} ;
8
9
9
10
use crate :: { Config , Height , Watcher } ;
11
+ use either:: Either ;
10
12
use espresso_types:: { Header , NamespaceId } ;
11
13
use futures:: { StreamExt , stream:: SelectAll } ;
12
14
use tokio:: {
13
15
spawn,
14
16
sync:: { Barrier , mpsc} ,
15
17
task:: JoinHandle ,
18
+ time:: sleep,
16
19
} ;
17
20
use tokio_stream:: wrappers:: ReceiverStream ;
18
21
use tracing:: { debug, warn} ;
@@ -66,16 +69,47 @@ impl Multiwatcher {
66
69
let lower_bound = lower_bound. clone ( ) ;
67
70
watchers. push ( spawn ( async move {
68
71
let i = Id ( i) ;
69
- let mut w = Watcher :: new ( c, height, nsid) ;
70
72
loop {
71
- let h = w. next ( ) . await ;
72
- if h. height ( ) <= lower_bound. load ( Ordering :: Relaxed ) {
73
- continue ;
73
+ let height = lower_bound. load ( Ordering :: Relaxed ) ;
74
+ let mut w = Watcher :: new ( c. clone ( ) , height, nsid) ;
75
+ let mut expected = height + 1 ;
76
+ loop {
77
+ match w. next ( ) . await {
78
+ Either :: Right ( hdr) => {
79
+ if hdr. height ( ) > expected {
80
+ warn ! (
81
+ url = %c. wss_base_url,
82
+ height = %hdr. height( ) ,
83
+ expected = %expected,
84
+ "unexpected block height"
85
+ ) ;
86
+ break ;
87
+ }
88
+ expected += 1 ;
89
+ if hdr. height ( ) <= lower_bound. load ( Ordering :: Relaxed ) {
90
+ continue ;
91
+ }
92
+ if tx. send ( ( i, hdr) ) . await . is_err ( ) {
93
+ return ;
94
+ }
95
+ }
96
+ Either :: Left ( height) => {
97
+ if * height > expected {
98
+ warn ! (
99
+ url = %c. wss_base_url,
100
+ height = %height,
101
+ expected = %expected,
102
+ "unexpected block height"
103
+ ) ;
104
+ break ;
105
+ }
106
+ expected += 1 ;
107
+ }
108
+ }
109
+ barrier. wait ( ) . await ;
74
110
}
75
- if tx. send ( ( i, h) ) . await . is_err ( ) {
76
- break ;
77
- }
78
- barrier. wait ( ) . await ;
111
+ drop ( w) ;
112
+ sleep ( Duration :: from_secs ( 3 ) ) . await // wait a little before re-connecting
79
113
}
80
114
} ) ) ;
81
115
}
0 commit comments