Skip to content

Commit 838d4bb

Browse files
committed
subscriptions, todo app
1 parent b872ba3 commit 838d4bb

File tree

11 files changed

+875
-59
lines changed

11 files changed

+875
-59
lines changed

software/tracksight/rust_backend/Cargo.lock

Lines changed: 651 additions & 17 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

software/tracksight/rust_backend/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,6 @@ tokio-serial = "5"
1111
influxdb2 = "0.5"
1212
dotenv = "0.15"
1313
crc = "3.0.0"
14+
rust_socketio = "0.6.0"
1415

1516
jsoncan_rust = { path = "../../../scripts/code_generation/jsoncan-rust" }

software/tracksight/rust_backend/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ todo for backend
1414
[ ] socket clients to publish data
1515
[ ] proper error handling in backend and using optimal threading or whatnot
1616
[ ] implementing api endpoints
17+
[ ] figure out the time offset basetime thing
1718
[ ] implementing ntp and base time syncing
1819
...
1920

software/tracksight/rust_backend/src/main.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
use ctrlc;
22
use std::sync::atomic::{AtomicUsize, Ordering};
3-
use std::sync::Arc;
4-
use dashmap::DashMap;
3+
use tokio::sync::RwLock;
54
use tokio::sync::broadcast::channel;
65
use tokio::task::{JoinSet};
76

87

98
mod config;
109
mod tasks;
1110
use tasks::telem_message::CanPayload;
11+
use tasks::api::subscriptions::Subscriptions;
1212

1313
use tasks::serial_handler::run_serial_task;
1414
use tasks::can_data_handler::run_can_data_handler;
@@ -40,10 +40,10 @@ async fn main() {
4040

4141
// track which clients subscribe to which signals
4242
// maps signal name to client ids
43-
let subscribers = Arc::new(DashMap<String, Vec<String>>);
43+
let subscriptions = RwLock::new(Subscriptions::new());
4444

4545
// start tasks
46-
tasks.spawn(run_can_data_handler(shutdown_rx.resubscribe(), can_queue_rx));
46+
tasks.spawn(run_can_data_handler(shutdown_rx.resubscribe(), can_queue_rx, subscriptions));
4747
tasks.spawn(run_serial_task(shutdown_rx.resubscribe(), can_queue_tx));
4848

4949
// wait for tasks to clean up
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
pub mod subscriptions;
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
use std::collections::{HashMap, HashSet};
2+
3+
pub struct Subscriptions {
4+
// Clients and their set of signals subscribed
5+
// client -> signal
6+
client_to_signal: HashMap<String, HashSet<String>>,
7+
// Signals and its subscribed clients
8+
// signal -> client
9+
signal_to_client: HashMap<String, HashSet<String>>
10+
}
11+
12+
impl Subscriptions {
13+
pub fn new() -> Self {
14+
return Self {
15+
client_to_signal: HashMap::new(),
16+
signal_to_client: HashMap::new()
17+
};
18+
}
19+
20+
/**
21+
* Subscribe client to signal
22+
*/
23+
pub fn subscribe_client_to_signal(&mut self, client: String, signal: String) {
24+
// check if client exists, and add
25+
if let Some(client_signals) = self.client_to_signal.get_mut(&client) {
26+
client_signals.insert(signal.clone());
27+
} else {
28+
let mut new_client_signals = HashSet::new();
29+
new_client_signals.insert(signal.clone());
30+
self.client_to_signal.insert(client.clone(), new_client_signals);
31+
}
32+
33+
// check if signal exists and add signal
34+
if let Some(signal_clients) = self.signal_to_client.get_mut(&signal) {
35+
signal_clients.insert(client.clone());
36+
} else {
37+
let mut new_signal_clients = HashSet::new();
38+
new_signal_clients.insert(client.clone());
39+
self.signal_to_client.insert(signal.clone(), new_signal_clients);
40+
}
41+
}
42+
43+
/**
44+
* Remove client from signal
45+
*/
46+
pub fn unsubscribe_client_from_signal(&mut self, client: String, signal: String) {
47+
// check if client exists
48+
if let Some(client_signals) = self.client_to_signal.get_mut(&client) {
49+
client_signals.remove(&signal);
50+
51+
if client_signals.len() <= 0 {
52+
self.client_to_signal.remove(&client);
53+
}
54+
}
55+
56+
// check if signal exists and add signal
57+
if let Some(signal_clients) = self.signal_to_client.get_mut(&signal) {
58+
signal_clients.remove(&client);
59+
60+
if signal_clients.len() <= 0 {
61+
self.signal_to_client.remove(&signal);
62+
}
63+
}
64+
}
65+
66+
pub fn get_clients_of_signal(&self, signal: String) -> Vec<&String> {
67+
return self.signal_to_client
68+
.get(&signal)
69+
.map(|clients| clients.iter().collect())
70+
.unwrap_or_default();
71+
}
72+
73+
/**
74+
* Check if client is subscribed to signal
75+
*/
76+
pub fn is_client_subscribed_to(&self, client: String, signal: String) -> bool {
77+
self.client_to_signal
78+
.get(&client)
79+
.map_or(false, |signals| signals.contains(&signal))
80+
}
81+
82+
/**
83+
* Check if signal is subscribed to by client
84+
*/
85+
pub fn is_signal_subscribed_by(&self, signal: String, client: String) -> bool {
86+
self.signal_to_client
87+
.get(&signal)
88+
.map_or(false, |clients| clients.contains(&client))
89+
}
90+
}
91+
92+
/**
93+
* Unit tests
94+
*/
95+
#[cfg(test)]
96+
mod tests {
97+
use super::*;
98+
99+
#[test]
100+
fn test_subscribe_client_to_signal() {
101+
let mut subs = Subscriptions::new();
102+
subs.subscribe_client_to_signal("client1".to_string(), "signal1".to_string());
103+
104+
assert!(subs.is_client_subscribed_to("client1".to_string(), "signal1".to_string()));
105+
assert!(subs.is_signal_subscribed_by("signal1".to_string(), "client1".to_string()));
106+
}
107+
108+
#[test]
109+
fn test_subscribe_multiple_signals_to_client() {
110+
let mut subs = Subscriptions::new();
111+
subs.subscribe_client_to_signal("client1".to_string(), "signal1".to_string());
112+
subs.subscribe_client_to_signal("client1".to_string(), "signal2".to_string());
113+
114+
assert!(subs.is_client_subscribed_to("client1".to_string(), "signal1".to_string()));
115+
assert!(subs.is_client_subscribed_to("client1".to_string(), "signal2".to_string()));
116+
}
117+
118+
#[test]
119+
fn test_subscribe_multiple_clients_to_signal() {
120+
let mut subs = Subscriptions::new();
121+
subs.subscribe_client_to_signal("client1".to_string(), "signal1".to_string());
122+
subs.subscribe_client_to_signal("client2".to_string(), "signal1".to_string());
123+
124+
let clients = subs.get_clients_of_signal("signal1".to_string());
125+
assert_eq!(clients.len(), 2);
126+
assert!(clients.contains(&&"client1".to_string()));
127+
assert!(clients.contains(&&"client2".to_string()));
128+
}
129+
130+
#[test]
131+
fn test_unsubscribe_client_from_signal() {
132+
let mut subs = Subscriptions::new();
133+
subs.subscribe_client_to_signal("client1".to_string(), "signal1".to_string());
134+
subs.unsubscribe_client_from_signal("client1".to_string(), "signal1".to_string());
135+
136+
assert!(!subs.is_client_subscribed_to("client1".to_string(), "signal1".to_string()));
137+
assert!(!subs.is_signal_subscribed_by("signal1".to_string(), "client1".to_string()));
138+
}
139+
140+
#[test]
141+
fn test_unsubscribe_removes_empty_entries() {
142+
let mut subs = Subscriptions::new();
143+
subs.subscribe_client_to_signal("client1".to_string(), "signal1".to_string());
144+
subs.unsubscribe_client_from_signal("client1".to_string(), "signal1".to_string());
145+
146+
assert!(subs.get_clients_of_signal("signal1".to_string()).is_empty());
147+
}
148+
149+
#[test]
150+
fn test_unsubscribe_one_of_multiple_signals() {
151+
let mut subs = Subscriptions::new();
152+
subs.subscribe_client_to_signal("client1".to_string(), "signal1".to_string());
153+
subs.subscribe_client_to_signal("client1".to_string(), "signal2".to_string());
154+
subs.unsubscribe_client_from_signal("client1".to_string(), "signal1".to_string());
155+
156+
assert!(!subs.is_client_subscribed_to("client1".to_string(), "signal1".to_string()));
157+
assert!(subs.is_client_subscribed_to("client1".to_string(), "signal2".to_string()));
158+
}
159+
160+
#[test]
161+
fn test_get_clients_of_nonexistent_signal() {
162+
let subs = Subscriptions::new();
163+
let clients = subs.get_clients_of_signal("nonexistent".to_string());
164+
assert!(clients.is_empty());
165+
}
166+
167+
#[test]
168+
fn test_double_subscribe_same_client_signal() {
169+
let mut subs = Subscriptions::new();
170+
subs.subscribe_client_to_signal("client1".to_string(), "signal1".to_string());
171+
subs.subscribe_client_to_signal("client1".to_string(), "signal1".to_string());
172+
173+
let clients = subs.get_clients_of_signal("signal1".to_string());
174+
assert_eq!(clients.len(), 1);
175+
}
176+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
use tokio::sync::broadcast;
2+
3+
pub fn run_api_handler(mut shutdown_rx: broadcast::Receiver<()>) {
4+
5+
}

software/tracksight/rust_backend/src/tasks/can_data_handler.rs

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,25 @@
11

2-
use tokio::sync::broadcast;
3-
use tokio::{select, spawn};
2+
use tokio::{select};
3+
use tokio::sync::{RwLock, broadcast};
44

55
use super::can_data::influx_handler::run_influx_handler;
66
use super::can_data::live_data_handler::run_live_data_handler;
77
use super::telem_message::CanPayload;
88
use crate::config::CONFIG;
9+
use crate::tasks::api::subscriptions::Subscriptions;
910

1011
use jsoncan_rust::parsing::JsonCanParser;
1112
use jsoncan_rust::can_database::CanDatabase;
12-
use jsoncan_rust::can_database; // todo fix the naming conflict with canmessage bruhhh
1313

1414
/**
1515
* Consumes from serial handler
1616
* Uses JsonCan config to parse CAN messages and broadcasts to other tasks
1717
*/
18-
pub async fn run_can_data_handler(mut shutdown_rx: broadcast::Receiver<()>, mut can_queue_rx: broadcast::Receiver<CanPayload>) {
18+
pub async fn run_can_data_handler(
19+
mut shutdown_rx: broadcast::Receiver<()>,
20+
mut can_queue_rx: broadcast::Receiver<CanPayload>,
21+
subscriptions: RwLock<Subscriptions>
22+
) {
1923
println!("CAN data handler task started.");
2024

2125
let parser = JsonCanParser::new(CONFIG.jsoncan_config_path.clone());
@@ -40,16 +44,24 @@ pub async fn run_can_data_handler(mut shutdown_rx: broadcast::Receiver<()>, mut
4044
break;
4145
}
4246
Ok(can_payload) = can_queue_rx.recv() => {
43-
let can_message = match can_db.get_message_by_id(can_payload.can_id) {
44-
Ok(c) => c,
45-
Err(_) => {
46-
eprintln!("Unknown CAN ID: {:X}", can_payload.can_id);
47-
continue;
48-
},
49-
};
50-
51-
for signal in can_message.unpack(can_payload.payload) {
52-
// iterate through subscribers and send signal
47+
// let can_message = match can_db.get_message_by_id(can_payload.can_id) {
48+
// Ok(c) => c,
49+
// Err(_) => {
50+
// eprintln!("Unknown CAN ID: {:X}", can_payload.can_id);
51+
// continue;
52+
// },
53+
// };
54+
55+
let decoded_signals = can_db.unpack(
56+
can_payload.can_id,
57+
can_payload.payload,
58+
Some(can_payload.can_timestamp),
59+
);
60+
61+
for signal in decoded_signals {
62+
for client_id in subscriptions.read().await.get_clients_of_signal(signal.name) {
63+
// todo emit to client
64+
}
5365
}
5466
}
5567
}
Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1+
pub mod api_handler;
12
pub mod serial_handler;
23
pub mod telem_message;
34
pub mod can_data;
4-
pub mod can_data_handler;
5+
pub mod can_data_handler;
6+
pub mod api;

software/tracksight/rust_backend/src/tasks/serial_handler.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use tokio::select;
33
use tokio::sync::{broadcast, mpsc};
44
use std::io::{Error, ErrorKind};
55
use std::sync::{Arc, atomic::AtomicBool, atomic::Ordering};
6-
use std::time::Duration;
6+
use std::time::{Duration, SystemTime};
77
use crc::{Crc, CRC_32_ISO_HDLC};
88

99
use crate::config::CONFIG;
@@ -163,11 +163,13 @@ fn parse_telem_message(payload: Vec<u8>) -> Result<TelemetryMessage, ()> {
163163
let can_id = u32::from_le_bytes([payload[1], payload[2], payload[3], payload[4]]);
164164
// TODO use RTC and NTP time instead of whatever this is
165165
let can_time_offset = f32::from_le_bytes([payload[5], payload[6], payload[7], payload[8]]);
166+
let timestamp = SystemTime::now(); // something with can_time_offset
167+
166168
let can_payload = payload[9..].to_vec();
167169
TelemetryMessage::Can {
168170
body: CanPayload {
169171
can_id,
170-
can_time_offset,
172+
can_timestamp: timestamp,
171173
payload: can_payload,
172174
}
173175
}

0 commit comments

Comments
 (0)