@@ -133,7 +133,7 @@ where
133133 last_id : AtomicUsize ,
134134 waiting_map : Mutex < HashMap < usize , Sender < ChannelMessage > > > ,
135135
136- headers : Mutex < VecDeque < RawHeaderNotification > > ,
136+ headers : Mutex < Option < VecDeque < RawHeaderNotification > > > ,
137137 script_notifications : Mutex < HashMap < ScriptHash , VecDeque < ScriptStatus > > > ,
138138
139139 #[ cfg( feature = "debug-calls" ) ]
@@ -154,7 +154,7 @@ where
154154 last_id : AtomicUsize :: new ( 0 ) ,
155155 waiting_map : Mutex :: new ( HashMap :: new ( ) ) ,
156156
157- headers : Mutex :: new ( VecDeque :: new ( ) ) ,
157+ headers : Mutex :: new ( None ) ,
158158 script_notifications : Mutex :: new ( HashMap :: new ( ) ) ,
159159
160160 #[ cfg( feature = "debug-calls" ) ]
@@ -648,11 +648,17 @@ impl<S: Read + Write> RawClient<S> {
648648
649649 fn handle_notification ( & self , method : & str , result : serde_json:: Value ) -> Result < ( ) , Error > {
650650 match method {
651- "blockchain.headers.subscribe" => self . headers . lock ( ) ?. append (
652- & mut serde_json:: from_value :: < Vec < RawHeaderNotification > > ( result) ?
653- . into_iter ( )
654- . collect ( ) ,
655- ) ,
651+ "blockchain.headers.subscribe" => {
652+ let mut queue = self . headers . lock ( ) ?;
653+ match queue. as_mut ( ) {
654+ None => return Err ( Error :: NotSubscribedToHeaders ) ,
655+ Some ( queue) => queue. append (
656+ & mut serde_json:: from_value :: < Vec < RawHeaderNotification > > ( result) ?
657+ . into_iter ( )
658+ . collect ( ) ,
659+ ) ,
660+ }
661+ }
656662 "blockchain.scripthash.subscribe" => {
657663 let unserialized: ScriptNotification = serde_json:: from_value ( result) ?;
658664 let mut script_notifications = self . script_notifications . lock ( ) ?;
@@ -762,6 +768,11 @@ impl<T: Read + Write> ElectrumApi for RawClient<T> {
762768 }
763769
764770 fn block_headers_subscribe_raw ( & self ) -> Result < RawHeaderNotification , Error > {
771+ let mut headers = self . headers . lock ( ) ?;
772+ if headers. is_none ( ) {
773+ * headers = Some ( VecDeque :: new ( ) ) ;
774+ }
775+
765776 let req = Request :: new_id (
766777 self . last_id . fetch_add ( 1 , Ordering :: SeqCst ) ,
767778 "blockchain.headers.subscribe" ,
@@ -773,7 +784,11 @@ impl<T: Read + Write> ElectrumApi for RawClient<T> {
773784 }
774785
775786 fn block_headers_pop_raw ( & self ) -> Result < Option < RawHeaderNotification > , Error > {
776- Ok ( self . headers . lock ( ) ?. pop_front ( ) )
787+ let mut queue = self . headers . lock ( ) ?;
788+ match queue. as_mut ( ) {
789+ None => Err ( Error :: NotSubscribedToHeaders ) ,
790+ Some ( queue) => Ok ( queue. pop_front ( ) ) ,
791+ }
777792 }
778793
779794 fn block_header_raw ( & self , height : usize ) -> Result < Vec < u8 > , Error > {
@@ -1333,6 +1348,16 @@ mod test {
13331348 assert ! ( resp. height >= 639000 ) ;
13341349 }
13351350
1351+ #[ test]
1352+ fn test_block_headers_subscribe_pop ( ) {
1353+ let client = RawClient :: new ( get_test_server ( ) , None ) . unwrap ( ) ;
1354+ let resp = client. block_headers_pop ( ) ;
1355+ assert_eq ! ( format!( "{:?}" , resp) , "Err(NotSubscribedToHeaders)" ) ;
1356+ client. block_headers_subscribe ( ) . unwrap ( ) ;
1357+ let resp = client. block_headers_pop ( ) ;
1358+ assert ! ( resp. is_ok( ) ) ;
1359+ }
1360+
13361361 #[ test]
13371362 fn test_script_subscribe ( ) {
13381363 use std:: str:: FromStr ;
0 commit comments