Skip to content

Commit 1f1b10e

Browse files
committed
Implement time fallback for UDP forwarder.
Closes #24.
1 parent 32ac844 commit 1f1b10e

File tree

4 files changed

+109
-13
lines changed

4 files changed

+109
-13
lines changed

src/backend/semtech_udp/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ struct State {
2525
downlink_cache: RwLock<HashMap<u16, DownlinkCache>>,
2626
pull_addr: RwLock<Option<SocketAddr>>,
2727
stats: Mutex<Stats>,
28+
time_fallback_enabled: bool,
2829
}
2930

3031
#[derive(Clone)]
@@ -236,6 +237,7 @@ impl Backend {
236237
downlink_cache: RwLock::new(HashMap::new()),
237238
pull_addr: RwLock::new(None),
238239
stats: Mutex::new(Stats::default()),
240+
time_fallback_enabled: conf.backend.semtech_udp.time_fallback_enabled,
239241
};
240242
let state = Arc::new(state);
241243

@@ -342,7 +344,7 @@ async fn handle_push_data(state: &Arc<State>, data: &[u8], remote: &SocketAddr)
342344
};
343345
state.socket.send_to(&ack.to_vec(), remote).await?;
344346

345-
let uplink_frames = pl.to_proto_uplink_frames()?;
347+
let uplink_frames = pl.to_proto_uplink_frames(state.time_fallback_enabled)?;
346348
let gateway_stats = pl.to_proto_gateway_stats()?;
347349

348350
for uf in &uplink_frames {

src/backend/semtech_udp/structs.rs

Lines changed: 98 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -287,10 +287,13 @@ impl PushData {
287287
})
288288
}
289289

290-
pub fn to_proto_uplink_frames(&self) -> Result<Vec<gw::UplinkFrame>> {
290+
pub fn to_proto_uplink_frames(
291+
&self,
292+
time_fallback_enabled: bool,
293+
) -> Result<Vec<gw::UplinkFrame>> {
291294
let mut out: Vec<gw::UplinkFrame> = vec![];
292295
for rx in &self.payload.rxpk {
293-
for f in rx.to_proto(&self.gateway_id)? {
296+
for f in rx.to_proto(&self.gateway_id, time_fallback_enabled)? {
294297
out.push(f);
295298
}
296299
}
@@ -377,7 +380,11 @@ pub struct RxPk {
377380
}
378381

379382
impl RxPk {
380-
fn to_proto(&self, gateway_id: &[u8]) -> Result<Vec<gw::UplinkFrame>> {
383+
fn to_proto(
384+
&self,
385+
gateway_id: &[u8],
386+
time_fallback_enabled: bool,
387+
) -> Result<Vec<gw::UplinkFrame>> {
381388
let mut rng = rand::thread_rng();
382389
let uplink_id = if cfg!(test) { 123 } else { rng.gen::<u32>() };
383390

@@ -426,7 +433,16 @@ impl RxPk {
426433
rx_info: Some(gw::UplinkRxInfo {
427434
gateway_id: hex::encode(gateway_id),
428435
uplink_id,
429-
time: self.time.map(pbjson_types::Timestamp::from),
436+
time: match self.time.map(pbjson_types::Timestamp::from) {
437+
Some(v) => Some(v),
438+
None => {
439+
if time_fallback_enabled {
440+
Some(pbjson_types::Timestamp::from(Utc::now()))
441+
} else {
442+
None
443+
}
444+
}
445+
},
430446
time_since_gps_epoch: self
431447
.tmms
432448
.map(|t| pbjson_types::Duration::from(Duration::from_millis(t))),
@@ -1068,7 +1084,7 @@ mod test {
10681084
stat: None,
10691085
},
10701086
};
1071-
assert_eq!(0, pl.to_proto_uplink_frames().unwrap().len());
1087+
assert_eq!(0, pl.to_proto_uplink_frames(false).unwrap().len());
10721088
}
10731089

10741090
#[test]
@@ -1103,7 +1119,7 @@ mod test {
11031119
stat: None,
11041120
},
11051121
};
1106-
let pl = pl.to_proto_uplink_frames().unwrap();
1122+
let pl = pl.to_proto_uplink_frames(false).unwrap();
11071123
assert_eq!(1, pl.len());
11081124
assert_eq!(
11091125
gw::UplinkFrame {
@@ -1173,7 +1189,7 @@ mod test {
11731189
stat: None,
11741190
},
11751191
};
1176-
let pl = pl.to_proto_uplink_frames().unwrap();
1192+
let pl = pl.to_proto_uplink_frames(false).unwrap();
11771193
assert_eq!(1, pl.len());
11781194
assert_eq!(
11791195
gw::UplinkFrame {
@@ -1246,7 +1262,7 @@ mod test {
12461262
stat: None,
12471263
},
12481264
};
1249-
let pl = pl.to_proto_uplink_frames().unwrap();
1265+
let pl = pl.to_proto_uplink_frames(false).unwrap();
12501266
assert_eq!(1, pl.len());
12511267
assert_eq!(
12521268
gw::UplinkFrame {
@@ -1335,7 +1351,7 @@ mod test {
13351351
stat: None,
13361352
},
13371353
};
1338-
let pl = pl.to_proto_uplink_frames().unwrap();
1354+
let pl = pl.to_proto_uplink_frames(false).unwrap();
13391355
assert_eq!(2, pl.len());
13401356
assert_eq!(
13411357
vec![
@@ -1438,7 +1454,7 @@ mod test {
14381454
stat: None,
14391455
},
14401456
};
1441-
let pl = pl.to_proto_uplink_frames().unwrap();
1457+
let pl = pl.to_proto_uplink_frames(false).unwrap();
14421458
assert_eq!(1, pl.len());
14431459
assert_eq!(
14441460
gw::UplinkFrame {
@@ -1503,7 +1519,7 @@ mod test {
15031519
stat: None,
15041520
},
15051521
};
1506-
let pl = pl.to_proto_uplink_frames().unwrap();
1522+
let pl = pl.to_proto_uplink_frames(false).unwrap();
15071523
assert_eq!(1, pl.len());
15081524
assert_eq!(
15091525
gw::UplinkFrame {
@@ -1577,7 +1593,7 @@ mod test {
15771593
stat: None,
15781594
},
15791595
};
1580-
let pl = pl.to_proto_uplink_frames().unwrap();
1596+
let pl = pl.to_proto_uplink_frames(false).unwrap();
15811597
assert_eq!(1, pl.len());
15821598
assert_eq!(
15831599
gw::UplinkFrame {
@@ -1619,6 +1635,76 @@ mod test {
16191635
);
16201636
}
16211637

1638+
#[test]
1639+
fn test_uplink_no_time() {
1640+
let pl = PushData {
1641+
random_token: 123,
1642+
gateway_id: [1, 2, 3, 4, 5, 6, 7, 8],
1643+
payload: PushDataPayload {
1644+
rxpk: vec![RxPk {
1645+
time: None,
1646+
tmms: None,
1647+
tmst: 1234,
1648+
ftime: None,
1649+
freq: 868.1,
1650+
chan: 5,
1651+
rfch: 1,
1652+
brd: 3,
1653+
stat: Crc::Ok,
1654+
modu: Modulation::Lora,
1655+
datr: DataRate::Lora(7, 125000),
1656+
codr: Some(CodeRate::Cr45),
1657+
rssi: 120,
1658+
lsnr: Some(3.5),
1659+
hpw: None,
1660+
size: 4,
1661+
data: vec![4, 3, 2, 1],
1662+
rsig: vec![],
1663+
meta: None,
1664+
}],
1665+
stat: None,
1666+
},
1667+
};
1668+
let pl = pl.to_proto_uplink_frames(false).unwrap();
1669+
assert_eq!(1, pl.len());
1670+
assert!(pl[0].rx_info.as_ref().unwrap().time.is_none());
1671+
}
1672+
1673+
#[test]
1674+
fn test_uplink_no_time_time_fallback_enabled() {
1675+
let pl = PushData {
1676+
random_token: 123,
1677+
gateway_id: [1, 2, 3, 4, 5, 6, 7, 8],
1678+
payload: PushDataPayload {
1679+
rxpk: vec![RxPk {
1680+
time: None,
1681+
tmms: None,
1682+
tmst: 1234,
1683+
ftime: None,
1684+
freq: 868.1,
1685+
chan: 5,
1686+
rfch: 1,
1687+
brd: 3,
1688+
stat: Crc::Ok,
1689+
modu: Modulation::Lora,
1690+
datr: DataRate::Lora(7, 125000),
1691+
codr: Some(CodeRate::Cr45),
1692+
rssi: 120,
1693+
lsnr: Some(3.5),
1694+
hpw: None,
1695+
size: 4,
1696+
data: vec![4, 3, 2, 1],
1697+
rsig: vec![],
1698+
meta: None,
1699+
}],
1700+
stat: None,
1701+
},
1702+
};
1703+
let pl = pl.to_proto_uplink_frames(true).unwrap();
1704+
assert_eq!(1, pl.len());
1705+
assert!(pl[0].rx_info.as_ref().unwrap().time.is_some());
1706+
}
1707+
16221708
#[test]
16231709
fn test_downlink_lora_delay() {
16241710
let pl = gw::DownlinkFrame {

src/cmd/configfile.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,12 @@ pub fn run(config: &Configuration) {
137137
# packet-forwarder matches this port.
138138
bind="{{ backend.semtech_udp.bind }}"
139139
140+
# Time fallback.
141+
#
142+
# In case the UDP packet-forwarder does not set the 'time' field, then the
143+
# server-time will be used as fallback if this option is enabled.
144+
time_fallback_enabled={{ backend.semtech_udp.time_fallback_enabled }}
145+
140146
141147
# Gateway metadata configuration.
142148
[metadata]

src/config.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,12 +121,14 @@ impl Default for Concentratord {
121121
#[serde(default)]
122122
pub struct SemtechUdp {
123123
pub bind: String,
124+
pub time_fallback_enabled: bool,
124125
}
125126

126127
impl Default for SemtechUdp {
127128
fn default() -> Self {
128129
SemtechUdp {
129130
bind: "0.0.0.0:1700".to_string(),
131+
time_fallback_enabled: false,
130132
}
131133
}
132134
}

0 commit comments

Comments
 (0)