1- use std:: io:: Cursor ;
21use std:: sync:: { Arc , Mutex } ;
32use std:: thread:: sleep;
43use std:: time:: Duration ;
54
65use anyhow:: Result ;
76use async_trait:: async_trait;
8- use chirpstack_api:: gw ;
7+ use chirpstack_api:: { gw , prost :: Message } ;
98use log:: { debug, error, info, trace, warn} ;
10- use prost:: Message ;
119use tokio:: task;
1210
1311use super :: Backend as BackendTrait ;
1412use crate :: config:: Configuration ;
1513use crate :: metadata;
16- use crate :: mqtt:: { send_gateway_stats, send_mesh_heartbeat , send_tx_ack, send_uplink_frame} ;
14+ use crate :: mqtt:: { send_gateway_stats, send_mesh_event , send_tx_ack, send_uplink_frame} ;
1715
1816pub struct Backend {
1917 gateway_id : String ,
@@ -45,26 +43,31 @@ impl Backend {
4543
4644 info ! ( "Reading gateway id" ) ;
4745
48- // send 'gateway_id' command with empty payload.
49- cmd_sock. send ( "gateway_id" , zmq:: SNDMORE ) ?;
50- cmd_sock. send ( "" , 0 ) ?;
46+ // Request Gateway ID.
47+ let req = gw:: Command {
48+ command : Some ( gw:: command:: Command :: GetGatewayId (
49+ gw:: GetGatewayIdRequest { } ,
50+ ) ) ,
51+ } ;
52+ cmd_sock. send ( req. encode_to_vec ( ) , 0 ) ?;
5153
5254 // set poller so that we can timeout after 100ms
5355 let mut items = [ cmd_sock. as_poll_item ( zmq:: POLLIN ) ] ;
5456 zmq:: poll ( & mut items, 100 ) ?;
5557 if !items[ 0 ] . is_readable ( ) {
5658 return Err ( anyhow ! ( "Could not read gateway id" ) ) ;
5759 }
58- let gateway_id = cmd_sock. recv_bytes ( 0 ) ?;
59- if gateway_id. len ( ) != 8 {
60+
61+ // Read response.
62+ let resp = cmd_sock. recv_bytes ( 0 ) ?;
63+ let resp = gw:: GetGatewayIdResponse :: decode ( resp. as_slice ( ) ) ?;
64+ if resp. gateway_id . len ( ) != 16 {
6065 return Err ( anyhow ! (
61- "Invalid gateway id, expected 8 bytes, received {}" ,
62- gateway_id . len ( )
66+ "Invalid Gateway ID length, gateway_id: {}" ,
67+ resp . gateway_id
6368 ) ) ;
6469 }
65- let gateway_id = hex:: encode ( gateway_id) ;
66-
67- info ! ( "Received gateway id, gateway_id: {}" , gateway_id) ;
70+ info ! ( "Received gateway id, gateway_id: {}" , resp. gateway_id) ;
6871
6972 tokio:: spawn ( {
7073 let forward_crc_ok = conf. backend . filters . forward_crc_ok ;
@@ -88,18 +91,17 @@ impl Backend {
8891 } ) ;
8992
9093 Ok ( Backend {
91- gateway_id,
94+ gateway_id : resp . gateway_id ,
9295 ctx : zmq_ctx,
9396 cmd_url : conf. backend . concentratord . command_url . clone ( ) ,
9497 cmd_sock : Mutex :: new ( cmd_sock) ,
9598 } )
9699 }
97100
98- fn send_command ( & self , cmd : & str , b : & [ u8 ] ) -> Result < Vec < u8 > > {
101+ fn send_command ( & self , cmd : gw :: Command ) -> Result < Vec < u8 > > {
99102 let res = || -> Result < Vec < u8 > > {
100103 let cmd_sock = self . cmd_sock . lock ( ) . unwrap ( ) ;
101- cmd_sock. send ( cmd, zmq:: SNDMORE ) ?;
102- cmd_sock. send ( b, 0 ) ?;
104+ cmd_sock. send ( cmd. encode_to_vec ( ) , 0 ) ?;
103105
104106 // set poller so that we can timeout after 100ms
105107 let mut items = [ cmd_sock. as_poll_item ( zmq:: POLLIN ) ] ;
@@ -109,8 +111,8 @@ impl Backend {
109111 }
110112
111113 // red tx ack response
112- let resp_b: & [ u8 ] = & cmd_sock. recv_bytes ( 0 ) ?;
113- Ok ( resp_b. to_vec ( ) )
114+ let resp_b = cmd_sock. recv_bytes ( 0 ) ?;
115+ Ok ( resp_b)
114116 } ( ) ;
115117
116118 if res. is_err ( ) {
@@ -153,13 +155,16 @@ impl BackendTrait for Backend {
153155 Ok ( self . gateway_id . clone ( ) )
154156 }
155157
156- async fn send_downlink_frame ( & self , pl : & gw:: DownlinkFrame ) -> Result < ( ) > {
158+ async fn send_downlink_frame ( & self , pl : gw:: DownlinkFrame ) -> Result < ( ) > {
157159 info ! ( "Sending downlink frame, downlink_id: {}" , pl. downlink_id) ;
160+ let downlink_id = pl. downlink_id ;
158161
159162 let tx_ack = {
160- let b = pl. encode_to_vec ( ) ;
161- let resp_b = self . send_command ( "down" , & b) ?;
162- gw:: DownlinkTxAck :: decode ( & mut Cursor :: new ( resp_b) ) ?
163+ let cmd = gw:: Command {
164+ command : Some ( gw:: command:: Command :: SendDownlinkFrame ( pl) ) ,
165+ } ;
166+ let resp_b = self . send_command ( cmd) ?;
167+ gw:: DownlinkTxAck :: decode ( resp_b. as_slice ( ) ) ?
163168 } ;
164169
165170 let ack_items: Vec < String > = tx_ack
@@ -170,17 +175,30 @@ impl BackendTrait for Backend {
170175
171176 info ! (
172177 "Received ack, items: {:?}, downlink_id: {}" ,
173- ack_items, pl . downlink_id
178+ ack_items, downlink_id
174179 ) ;
175180
176181 send_tx_ack ( & tx_ack) . await
177182 }
178183
179- async fn send_configuration_command ( & self , pl : & gw:: GatewayConfiguration ) -> Result < ( ) > {
184+ async fn send_configuration_command ( & self , pl : gw:: GatewayConfiguration ) -> Result < ( ) > {
180185 info ! ( "Sending configuration command, version: {}" , pl. version) ;
181186
182- let b = pl. encode_to_vec ( ) ;
183- let _ = self . send_command ( "config" , & b) ?;
187+ let cmd = gw:: Command {
188+ command : Some ( gw:: command:: Command :: SetGatewayConfiguration ( pl) ) ,
189+ } ;
190+ let _ = self . send_command ( cmd) ?;
191+
192+ Ok ( ( ) )
193+ }
194+
195+ async fn send_mesh_command ( & self , pl : gw:: MeshCommand ) -> Result < ( ) > {
196+ info ! ( "Sending mesh command" ) ;
197+
198+ let cmd = gw:: Command {
199+ command : Some ( gw:: command:: Command :: Mesh ( pl) ) ,
200+ } ;
201+ let _ = self . send_command ( cmd) ?;
184202
185203 Ok ( ( ) )
186204 }
@@ -197,37 +215,37 @@ async fn event_loop(
197215 let event_sock = Arc :: new ( Mutex :: new ( event_sock) ) ;
198216
199217 loop {
200- let res = task:: spawn_blocking ( {
218+ let event = task:: spawn_blocking ( {
201219 let event_sock = event_sock. clone ( ) ;
202220
203- move || -> Result < Vec < Vec < u8 > > > {
221+ move || -> Result < Option < gw :: Event > > {
204222 let event_sock = event_sock. lock ( ) . unwrap ( ) ;
205223
206224 // set poller so that we can timeout after 100ms
207225 let mut items = [ event_sock. as_poll_item ( zmq:: POLLIN ) ] ;
208226 zmq:: poll ( & mut items, 100 ) ?;
209227 if !items[ 0 ] . is_readable ( ) {
210- return Ok ( vec ! [ ] ) ;
228+ return Ok ( None ) ;
211229 }
212230
213- let msg = event_sock. recv_multipart ( 0 ) ?;
214- if msg. len ( ) != 2 {
215- return Err ( anyhow ! ( "Event must have two frames" ) ) ;
216- }
217- Ok ( msg)
231+ let msg = event_sock. recv_bytes ( 0 ) ?;
232+ Ok ( Some ( gw:: Event :: decode ( msg. as_slice ( ) ) ?) )
218233 }
219234 } )
220235 . await ;
221236
222- match res {
223- Ok ( Ok ( msg) ) => {
224- if msg. len ( ) != 2 {
225- continue ;
226- }
237+ let event = match event {
238+ Ok ( v) => v,
239+ Err ( e) => {
240+ error ! ( "Task error: {}" , e) ;
241+ continue ;
242+ }
243+ } ;
227244
245+ match event {
246+ Ok ( Some ( v) ) => {
228247 if let Err ( err) = handle_event_msg (
229- & msg[ 0 ] ,
230- & msg[ 1 ] ,
248+ v,
231249 & filters,
232250 forward_crc_ok,
233251 forward_crc_invalid,
@@ -239,72 +257,60 @@ async fn event_loop(
239257 continue ;
240258 }
241259 }
242- Ok ( Err ( err) ) => {
243- error ! ( "Receive event error, error: {}" , err) ;
244- continue ;
245- }
246- Err ( err) => {
247- error ! ( "{}" , err) ;
260+ Ok ( None ) => continue ,
261+ Err ( e) => {
262+ error ! ( "Error reading event, error: {}" , e) ;
248263 continue ;
249264 }
250265 }
251266 }
252267}
253268
254269async fn handle_event_msg (
255- event : & [ u8 ] ,
256- pl : & [ u8 ] ,
270+ event : gw:: Event ,
257271 filters : & lrwn_filters:: Filters ,
258272 forward_crc_ok : bool ,
259273 forward_crc_invalid : bool ,
260274 forward_crc_missing : bool ,
261275) -> Result < ( ) > {
262- let event = String :: from_utf8 ( event. to_vec ( ) ) ?;
263- let pl = Cursor :: new ( pl. to_vec ( ) ) ;
264-
265- match event. as_str ( ) {
266- "up" => {
267- let pl = gw:: UplinkFrame :: decode ( pl) ?;
268- if let Some ( rx_info) = & pl. rx_info {
276+ match event. event {
277+ Some ( gw:: event:: Event :: UplinkFrame ( v) ) => {
278+ if let Some ( rx_info) = & v. rx_info {
269279 if !( ( rx_info. crc_status ( ) == gw:: CrcStatus :: CrcOk && forward_crc_ok)
270280 || ( rx_info. crc_status ( ) == gw:: CrcStatus :: BadCrc && forward_crc_invalid)
271281 || ( rx_info. crc_status ( ) == gw:: CrcStatus :: NoCrc && forward_crc_missing) )
272282 {
273283 debug ! (
274284 "Ignoring uplink frame because of forward_crc_ flags, uplink_id: {}" ,
275- pl . rx_info. as_ref( ) . map( |v| v. uplink_id) . unwrap_or_default( ) ,
285+ v . rx_info. as_ref( ) . map( |v| v. uplink_id) . unwrap_or_default( ) ,
276286 ) ;
277287 return Ok ( ( ) ) ;
278288 }
279289 }
280290
281- if lrwn_filters:: matches ( & pl . phy_payload , filters) {
291+ if lrwn_filters:: matches ( & v . phy_payload , filters) {
282292 info ! (
283293 "Received uplink frame, uplink_id: {}" ,
284- pl . rx_info. as_ref( ) . map( |v| v. uplink_id) . unwrap_or_default( ) ,
294+ v . rx_info. as_ref( ) . map( |v| v. uplink_id) . unwrap_or_default( ) ,
285295 ) ;
286- send_uplink_frame ( & pl ) . await ?;
296+ send_uplink_frame ( & v ) . await ?;
287297 } else {
288298 debug ! (
289299 "Ignoring uplink frame because of dev_addr and join_eui filters, uplink_id: {}" ,
290- pl . rx_info. as_ref( ) . map( |v| v. uplink_id) . unwrap_or_default( )
300+ v . rx_info. as_ref( ) . map( |v| v. uplink_id) . unwrap_or_default( )
291301 ) ;
292302 }
293303 }
294- "stats" => {
295- let mut pl = gw:: GatewayStats :: decode ( pl) ?;
296- info ! ( "Received gateway stats" ) ;
297- pl. metadata . extend ( metadata:: get ( ) . await ?) ;
298- send_gateway_stats ( & pl) . await ?;
299- }
300- "mesh_heartbeat" => {
301- let pl = gw:: MeshHeartbeat :: decode ( pl) ?;
302- info ! ( "Received mesh heartbeat" ) ;
303- send_mesh_heartbeat ( & pl) . await ?;
304+ Some ( gw:: event:: Event :: GatewayStats ( mut v) ) => {
305+ info ! ( "received gateway stats" ) ;
306+ v. metadata . extend ( metadata:: get ( ) . await ?) ;
307+ send_gateway_stats ( & v) . await ?;
304308 }
305- _ => {
306- return Err ( anyhow ! ( "Unexpected event: {}" , event) ) ;
309+ Some ( gw:: event:: Event :: Mesh ( v) ) => {
310+ info ! ( "Received mesh event" ) ;
311+ send_mesh_event ( & v) . await ?;
307312 }
313+ None => { }
308314 }
309315
310316 Ok ( ( ) )
0 commit comments