11use std:: time:: Duration ;
22
33use anyhow:: Result ;
4- use prost:: Message ;
4+ use chirpstack_api :: { gw , prost:: Message } ;
55
66use super :: socket:: ZMQ_CONTEXT ;
77
8+ #[ derive( thiserror:: Error , Debug ) ]
9+ pub enum Error {
10+ #[ error( "Timeout" ) ]
11+ Timeout ,
12+
13+ #[ error( transparent) ]
14+ Anyhow ( #[ from] anyhow:: Error ) ,
15+ }
16+
817pub fn get_socket ( endpoint : & str ) -> Result < zmq:: Socket , zmq:: Error > {
918 info ! (
1019 "Creating new socket for receiving events, endpoint: {}" ,
@@ -19,23 +28,6 @@ pub fn get_socket(endpoint: &str) -> Result<zmq::Socket, zmq::Error> {
1928 Ok ( sock)
2029}
2130
22- pub enum Event {
23- // Reading event timed out.
24- Timeout ,
25-
26- // Error reading event.
27- Error ( String ) ,
28-
29- // Unknown event.
30- Unknown ( String ) ,
31-
32- // Uplink event.
33- Uplink ( Box < chirpstack_api:: gw:: UplinkFrame > ) ,
34-
35- // Stats event.
36- Stats ( Box < chirpstack_api:: gw:: GatewayStats > ) ,
37- }
38-
3931pub struct Reader < ' a > {
4032 sub_sock : & ' a zmq:: Socket ,
4133 timeout : Duration ,
@@ -48,48 +40,20 @@ impl<'a> Reader<'a> {
4840}
4941
5042impl Iterator for Reader < ' _ > {
51- type Item = Event ;
43+ type Item = Result < gw :: Event , Error > ;
5244
53- fn next ( & mut self ) -> Option < Event > {
45+ fn next ( & mut self ) -> Option < Result < gw :: Event , Error > > {
5446 // set poller so that we can timeout
5547 let mut items = [ self . sub_sock . as_poll_item ( zmq:: POLLIN ) ] ;
5648 zmq:: poll ( & mut items, self . timeout . as_millis ( ) as i64 ) . unwrap ( ) ;
5749 if !items[ 0 ] . is_readable ( ) {
58- return Some ( Event :: Timeout ) ;
50+ return Some ( Err ( Error :: Timeout ) ) ;
5951 }
6052
61- let msg = self . sub_sock . recv_multipart ( 0 ) . unwrap ( ) ;
62- match handle_message ( msg ) {
63- Ok ( v) => Some ( v ) ,
64- Err ( err ) => Some ( Event :: Error ( err . to_string ( ) ) ) ,
53+ let b = self . sub_sock . recv_bytes ( 0 ) . unwrap ( ) ;
54+ match gw :: Event :: decode ( b . as_slice ( ) ) . map_err ( |e| Error :: Anyhow ( anyhow :: Error :: new ( e ) ) ) {
55+ Ok ( v) => Some ( Ok ( v ) ) ,
56+ Err ( e ) => Some ( Err ( e ) ) ,
6557 }
6658 }
6759}
68-
69- fn handle_message ( msg : Vec < Vec < u8 > > ) -> Result < Event > {
70- if msg. len ( ) != 2 {
71- return Err ( anyhow ! ( "Event must have two frames" ) ) ;
72- }
73-
74- let event = String :: from_utf8 ( msg[ 0 ] . clone ( ) ) ?;
75-
76- Ok ( match event. as_str ( ) {
77- "up" => match parse_up ( & msg[ 1 ] ) {
78- Ok ( v) => Event :: Uplink ( Box :: new ( v) ) ,
79- Err ( err) => Event :: Error ( err. to_string ( ) ) ,
80- } ,
81- "stats" => match parse_stats ( & msg[ 1 ] ) {
82- Ok ( v) => Event :: Stats ( Box :: new ( v) ) ,
83- Err ( err) => Event :: Error ( err. to_string ( ) ) ,
84- } ,
85- _ => Event :: Unknown ( event) ,
86- } )
87- }
88-
89- fn parse_up ( msg : & [ u8 ] ) -> Result < chirpstack_api:: gw:: UplinkFrame > {
90- Ok ( chirpstack_api:: gw:: UplinkFrame :: decode ( msg) ?)
91- }
92-
93- fn parse_stats ( msg : & [ u8 ] ) -> Result < chirpstack_api:: gw:: GatewayStats > {
94- Ok ( chirpstack_api:: gw:: GatewayStats :: decode ( msg) ?)
95- }
0 commit comments