Skip to content

Commit 80804e1

Browse files
committed
Implement DevAddr and JoinEUI prefix filters.
Closes #25.
1 parent 1059996 commit 80804e1

File tree

7 files changed

+135
-24
lines changed

7 files changed

+135
-24
lines changed

Cargo.lock

Lines changed: 12 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ clap = { version = "4.1", default-features = false, features = [
1919
chirpstack_api = { version = "4.3.1", default-features = false, features = [
2020
"json",
2121
] }
22+
lrwn_filters = { version = "4.3.3-test.1", features = ["serde"] }
2223
serde_json = "1.0"
2324
serde = { version = "1.0", features = ["derive"] }
2425
log = "0.4"

src/backend/concentratord.rs

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -56,13 +56,18 @@ impl Backend {
5656
info!("Received gateway id, gateway_id: {}", gateway_id);
5757

5858
tokio::spawn({
59-
let forward_crc_ok = conf.backend.forward_crc_ok;
60-
let forward_crc_invalid = conf.backend.forward_crc_invalid;
61-
let forward_crc_missing = conf.backend.forward_crc_missing;
59+
let forward_crc_ok = conf.backend.filters.forward_crc_ok;
60+
let forward_crc_invalid = conf.backend.filters.forward_crc_invalid;
61+
let forward_crc_missing = conf.backend.filters.forward_crc_missing;
62+
let filters = lrwn_filters::Filters {
63+
dev_addr_prefixes: conf.backend.filters.dev_addr_prefixes.clone(),
64+
join_eui_prefixes: conf.backend.filters.join_eui_prefixes.clone(),
65+
};
6266

6367
async move {
6468
event_loop(
6569
event_sock,
70+
filters,
6671
forward_crc_ok,
6772
forward_crc_invalid,
6873
forward_crc_missing,
@@ -148,6 +153,7 @@ impl BackendTrait for Backend {
148153

149154
async fn event_loop(
150155
event_sock: zmq::Socket,
156+
filters: lrwn_filters::Filters,
151157
forward_crc_ok: bool,
152158
forward_crc_invalid: bool,
153159
forward_crc_missing: bool,
@@ -187,6 +193,7 @@ async fn event_loop(
187193
if let Err(err) = handle_event_msg(
188194
&msg[0],
189195
&msg[1],
196+
&filters,
190197
forward_crc_ok,
191198
forward_crc_invalid,
192199
forward_crc_missing,
@@ -212,6 +219,7 @@ async fn event_loop(
212219
async fn handle_event_msg(
213220
event: &[u8],
214221
pl: &[u8],
222+
filters: &lrwn_filters::Filters,
215223
forward_crc_ok: bool,
216224
forward_crc_invalid: bool,
217225
forward_crc_missing: bool,
@@ -235,11 +243,18 @@ async fn handle_event_msg(
235243
}
236244
}
237245

238-
info!(
239-
"Received uplink frame, uplink_id: {}",
240-
pl.rx_info.as_ref().map(|v| v.uplink_id).unwrap_or_default(),
241-
);
242-
send_uplink_frame(&pl).await?;
246+
if lrwn_filters::matches(&pl.phy_payload, filters) {
247+
info!(
248+
"Received uplink frame, uplink_id: {}",
249+
pl.rx_info.as_ref().map(|v| v.uplink_id).unwrap_or_default(),
250+
);
251+
send_uplink_frame(&pl).await?;
252+
} else {
253+
debug!(
254+
"Ignoring uplink frame because of dev_addr and join_eui filters, uplink_id: {}",
255+
pl.rx_info.as_ref().map(|v| v.uplink_id).unwrap_or_default()
256+
);
257+
}
243258
}
244259
"stats" => {
245260
let pl = gw::GatewayStats::decode(pl)?;

src/backend/semtech_udp/mod.rs

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use std::time::{Duration, SystemTime};
77
use anyhow::Result;
88
use async_trait::async_trait;
99
use chirpstack_api::gw;
10-
use log::{info, trace, warn};
10+
use log::{debug, info, trace, warn};
1111
use prost::Message;
1212
use tokio::net::UdpSocket;
1313
use tokio::sync::{Mutex, RwLock};
@@ -26,6 +26,10 @@ struct State {
2626
pull_addr: RwLock<Option<SocketAddr>>,
2727
stats: Mutex<Stats>,
2828
time_fallback_enabled: bool,
29+
forward_crc_ok: bool,
30+
forward_crc_invalid: bool,
31+
forward_crc_missing: bool,
32+
filters: lrwn_filters::Filters,
2933
}
3034

3135
#[derive(Clone)]
@@ -238,6 +242,13 @@ impl Backend {
238242
pull_addr: RwLock::new(None),
239243
stats: Mutex::new(Stats::default()),
240244
time_fallback_enabled: conf.backend.semtech_udp.time_fallback_enabled,
245+
forward_crc_invalid: conf.backend.filters.forward_crc_invalid,
246+
forward_crc_missing: conf.backend.filters.forward_crc_missing,
247+
forward_crc_ok: conf.backend.filters.forward_crc_ok,
248+
filters: lrwn_filters::Filters {
249+
dev_addr_prefixes: conf.backend.filters.dev_addr_prefixes.clone(),
250+
join_eui_prefixes: conf.backend.filters.join_eui_prefixes.clone(),
251+
},
241252
};
242253
let state = Arc::new(state);
243254

@@ -348,8 +359,29 @@ async fn handle_push_data(state: &Arc<State>, data: &[u8], remote: &SocketAddr)
348359
let gateway_stats = pl.to_proto_gateway_stats()?;
349360

350361
for uf in &uplink_frames {
351-
state.count_uplink(uf).await?;
352-
send_uplink_frame(uf).await?;
362+
if let Some(rx_info) = &uf.rx_info {
363+
if !((rx_info.crc_status() == gw::CrcStatus::CrcOk && state.forward_crc_ok)
364+
|| (rx_info.crc_status() == gw::CrcStatus::BadCrc && state.forward_crc_invalid)
365+
|| (rx_info.crc_status() == gw::CrcStatus::NoCrc && state.forward_crc_missing))
366+
{
367+
debug!(
368+
"Ignoring uplink frame because of forward_crc_ flags, uplink_id: {}",
369+
uf.rx_info.as_ref().map(|v| v.uplink_id).unwrap_or_default(),
370+
);
371+
372+
continue;
373+
}
374+
}
375+
376+
if lrwn_filters::matches(&uf.phy_payload, &state.filters) {
377+
state.count_uplink(uf).await?;
378+
send_uplink_frame(uf).await?;
379+
} else {
380+
debug!(
381+
"Ignoring uplink frame because of dev_addr and join_eui filters, uplink_id: {}",
382+
uf.rx_info.as_ref().map(|v| v.uplink_id).unwrap_or_default()
383+
);
384+
}
353385
}
354386

355387
if let Some(mut stats) = gateway_stats {

src/cmd/configfile.rs

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -106,14 +106,47 @@ pub fn run(config: &Configuration) {
106106
# * semtech_udp
107107
enabled="{{ backend.enabled }}"
108108
109-
# Forward CRC ok.
110-
forward_crc_ok={{ backend.forward_crc_ok }}
109+
# Filters.
110+
[backend.filters]
111111
112-
# Forward CRC invalid.
113-
forward_crc_invalid={{ backend.forward_crc_invalid }}
112+
# Forward CRC ok.
113+
forward_crc_ok={{ backend.filters.forward_crc_ok }}
114114
115-
# Forward CRC missing.
116-
forward_crc_missing={{ backend.forward_crc_missing }}
115+
# Forward CRC invalid.
116+
forward_crc_invalid={{ backend.filters.forward_crc_invalid }}
117+
118+
# Forward CRC missing.
119+
forward_crc_missing={{ backend.filters.forward_crc_missing }}
120+
121+
# DevAddr prefix filters.
122+
#
123+
# Example configuration:
124+
# dev_addr_prefixes=["0000ff00/24"]
125+
#
126+
# The above filter means that the 24MSB of 0000ff00 will be used to
127+
# filter DevAddrs. Uplinks with DevAddrs that do not match any of the
128+
# configured filters will not be forwarded. Leaving this option empty
129+
# disables filtering on DevAddr.
130+
dev_addr_prefixes=[
131+
{{#each backend.filters.dev_addr_prefixes}}
132+
"{{this}}",
133+
{{/each}}
134+
]
135+
136+
# JoinEUI prefix filters.
137+
#
138+
# Example configuration:
139+
# join_eui_prefixes=["0000ff0000000000/24"]
140+
#
141+
# The above filter means that the 24MSB of 0000ff0000000000 will be used
142+
# to filter JoinEUIs. Uplinks with JoinEUIs that do not match any of the
143+
# configured filters will not be forwarded. Leaving this option empty
144+
# disables filtering on JoinEUI.
145+
join_eui_prefixes=[
146+
{{#each backend.filters.join_eui_prefixes}}
147+
"{{this}}",
148+
{{/each}}
149+
]
117150
118151
119152
# ChirpStack Concentratord backend configuration.

src/config.rs

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -79,9 +79,7 @@ impl Default for Mqtt {
7979
#[serde(default)]
8080
pub struct Backend {
8181
pub enabled: String,
82-
pub forward_crc_ok: bool,
83-
pub forward_crc_invalid: bool,
84-
pub forward_crc_missing: bool,
82+
pub filters: Filters,
8583
pub gateway_id: String,
8684
pub semtech_udp: SemtechUdp,
8785
pub concentratord: Concentratord,
@@ -91,16 +89,36 @@ impl Default for Backend {
9189
fn default() -> Self {
9290
Backend {
9391
enabled: "semtech_udp".to_string(),
94-
forward_crc_ok: true,
95-
forward_crc_invalid: false,
96-
forward_crc_missing: false,
92+
filters: Filters::default(),
9793
gateway_id: "".into(),
9894
semtech_udp: SemtechUdp::default(),
9995
concentratord: Concentratord::default(),
10096
}
10197
}
10298
}
10399

100+
#[derive(Serialize, Deserialize)]
101+
#[serde(default)]
102+
pub struct Filters {
103+
pub forward_crc_ok: bool,
104+
pub forward_crc_invalid: bool,
105+
pub forward_crc_missing: bool,
106+
pub dev_addr_prefixes: Vec<lrwn_filters::DevAddrPrefix>,
107+
pub join_eui_prefixes: Vec<lrwn_filters::EuiPrefix>,
108+
}
109+
110+
impl Default for Filters {
111+
fn default() -> Self {
112+
Filters {
113+
forward_crc_ok: true,
114+
forward_crc_invalid: false,
115+
forward_crc_missing: false,
116+
dev_addr_prefixes: vec![],
117+
join_eui_prefixes: vec![],
118+
}
119+
}
120+
}
121+
104122
#[derive(Serialize, Deserialize)]
105123
#[serde(default)]
106124
pub struct Concentratord {

src/logging.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ pub fn setup(name: &str, level: log::Level, syslog: bool) -> Result<()> {
99
facility: Facility::LOG_USER,
1010
hostname: None,
1111
process: name.to_string(),
12-
pid: process::id() as u32,
12+
pid: process::id(),
1313
};
1414
let logger = syslog::unix(formatter).map_err(|e| anyhow!("{}", e))?;
1515
log::set_boxed_logger(Box::new(BasicLogger::new(logger)))

0 commit comments

Comments
 (0)