@@ -63,8 +63,21 @@ pub fn register_protocol_channels(
6363 ( MessageSender :: Sqmr ( sqmr_client) , MessageReceiver :: Sqmr ( sqmr_server) )
6464 }
6565 NetworkProtocol :: ReveresedSqmr => {
66- // TODO(AndrewL): Implement ReveresedSqmr protocol registration
67- todo ! ( "ReveresedSqmr protocol registration not yet implemented" )
66+ let sqmr_client = network_manager
67+ . register_sqmr_protocol_client :: < TopicType , TopicType > (
68+ SQMR_PROTOCOL_NAME . to_string ( ) ,
69+ buffer_size,
70+ ) ;
71+ let sqmr_server = network_manager
72+ . register_sqmr_protocol_server :: < TopicType , TopicType > (
73+ SQMR_PROTOCOL_NAME . to_string ( ) ,
74+ buffer_size,
75+ ) ;
76+
77+ (
78+ MessageSender :: ReveresedSqmr ( ReveresedSqmrSender :: new ( sqmr_server) ) ,
79+ MessageReceiver :: ReveresedSqmr ( sqmr_client) ,
80+ )
6881 }
6982 }
7083}
@@ -154,6 +167,7 @@ impl MessageSender {
154167pub enum MessageReceiver {
155168 Gossipsub ( BroadcastTopicServer < TopicType > ) ,
156169 Sqmr ( SqmrServerReceiver < TopicType , TopicType > ) ,
170+ ReveresedSqmr ( SqmrClientSender < TopicType , TopicType > ) ,
157171}
158172
159173impl MessageReceiver {
@@ -180,6 +194,33 @@ impl MessageReceiver {
180194 } )
181195 . await
182196 }
197+ MessageReceiver :: ReveresedSqmr ( mut client) => loop {
198+ match client. send_new_query ( vec ! [ ] ) . await {
199+ Ok ( mut response_manager) => loop {
200+ let response_result = response_manager. next ( ) . await ;
201+ match response_result {
202+ Some ( Ok ( response_data) ) => {
203+ f ( response_data, None ) ;
204+ }
205+ Some ( Err ( _) ) => {
206+ error ! ( "ReveresedSqmr: Failed to parse response" ) ;
207+ break ;
208+ }
209+ None => {
210+ error ! ( "ReveresedSqmr: Response stream ended" ) ;
211+ break ;
212+ }
213+ }
214+ } ,
215+ Err ( e) => {
216+ error ! (
217+ "Failed to establish ReveresedSqmr connection, keeping client alive, \
218+ error: {:?}",
219+ e
220+ ) ;
221+ }
222+ }
223+ } ,
183224 }
184225 }
185226}
0 commit comments