Skip to content

Commit 9912828

Browse files
committed
RAHHHHHH frontend receives from socket
1 parent 29259f7 commit 9912828

File tree

13 files changed

+320
-286
lines changed

13 files changed

+320
-286
lines changed

software/tracksight/rust_backend/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

software/tracksight/rust_backend/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,6 @@ socketioxide = "0.18.1"
1414
influxdb2 = "0.5"
1515
dotenv = "0.15"
1616
crc = "3.0.0"
17+
serde_json = "1.0.145"
1718

1819
jsoncan_rust = { path = "../../../scripts/code_generation/jsoncan-rust" }

software/tracksight/rust_backend/src/main.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use ctrlc;
2+
use std::sync::Arc;
23
use std::sync::atomic::{AtomicUsize, Ordering};
34
use tokio::sync::RwLock;
45
use tokio::sync::broadcast;
@@ -7,10 +8,10 @@ use tokio::task::{JoinSet};
78
mod config;
89
mod tasks;
910
use tasks::telem_message::CanPayload;
10-
use tasks::client_api::subscriptions::Subscriptions;
11+
use tasks::client_api::clients::Clients;
1112

1213
use tasks::serial_handler::run_serial_task;
13-
use tasks::can_data_handler::run_can_data_handler;
14+
use tasks::can_data::can_data_handler::run_can_data_handler;
1415

1516
use crate::tasks::api_handler::run_api_handler;
1617

@@ -41,11 +42,11 @@ async fn main() {
4142

4243
// track which clients subscribe to which signals
4344
// maps signal name to client ids
44-
let subscriptions = RwLock::new(Subscriptions::new());
45+
let clients = Arc::new(RwLock::new(Clients::new()));
4546

4647
// start tasks
47-
tasks.spawn(run_api_handler(shutdown_rx.resubscribe()));
48-
tasks.spawn(run_can_data_handler(shutdown_rx.resubscribe(), can_queue_rx, subscriptions));
48+
tasks.spawn(run_api_handler(shutdown_rx.resubscribe(), clients.clone()));
49+
tasks.spawn(run_can_data_handler(shutdown_rx.resubscribe(), can_queue_rx, clients.clone()));
4950
tasks.spawn(run_serial_task(shutdown_rx.resubscribe(), can_queue_tx));
5051

5152
// wait for tasks to clean up
Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,40 @@
1+
2+
use std::sync::Arc;
13
use axum::response::IntoResponse;
24
use axum::serve::Listener;
35
use tokio::select;
4-
use tokio::sync::broadcast;
6+
use tokio::sync::{RwLock, broadcast};
57
use tokio::net::TcpListener;
68
use axum::{Router, routing::get};
79
use axum::extract::ws::{WebSocket, WebSocketUpgrade};
810
use socketioxide::{SocketIo, extract::SocketRef};
911
use crate::config::CONFIG;
12+
use crate::tasks::client_api::clients::Clients;
1013

1114

1215

13-
pub async fn run_api_handler(mut shutdown_rx: broadcast::Receiver<()>) {
16+
pub async fn run_api_handler(mut shutdown_rx: broadcast::Receiver<()>, clients: Arc<RwLock<Clients>>) {
1417
let addr = format!("0.0.0.0:{}", CONFIG.backend_port);
1518
let listener = TcpListener::bind(addr).await.unwrap();
1619

1720
let (socket_layer, io) = SocketIo::new_layer();
1821

22+
// default socketio endpoint
1923
io.ns("/", |socket: SocketRef| async move {
20-
println!("connected sid={}", socket.id);
24+
let client_id = socket.id.to_string();
25+
println!("{} connected", client_id);
26+
27+
clients.write().await.add_client(&client_id, &socket);
2128

22-
socket.on_disconnect(|| async {
23-
println!("disconnected");
29+
socket.on_disconnect(async move || {
30+
clients.write().await.remove_client(&client_id);
31+
println!("{} disconnected", client_id);
2432
});
33+
2534
});
2635

2736
let app: Router<> = Router::new()
2837
.layer(socket_layer);
29-
// .route("/socket.io", get(handle_connection));
3038

3139
select! {
3240
_ = shutdown_rx.recv() => {
@@ -37,15 +45,3 @@ pub async fn run_api_handler(mut shutdown_rx: broadcast::Receiver<()>) {
3745
}
3846
}
3947
}
40-
41-
// async fn handle_connection(ws: WebSocketUpgrade) -> impl IntoResponse {
42-
// let _ = ws.on_upgrade(async |mut socket: WebSocket| {
43-
// println!("client connected");
44-
45-
// while let Some(Ok(_)) = socket.recv().await {
46-
// // ignore
47-
// }
48-
49-
// println!("client disconnected")
50-
// });
51-
// }

software/tracksight/rust_backend/src/tasks/can_data_handler.rs renamed to software/tracksight/rust_backend/src/tasks/can_data/can_data_handler.rs

Lines changed: 17 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,17 @@
1+
use std::sync::Arc;
12

2-
use tokio::{select};
3+
use tokio::{select, spawn};
34
use tokio::sync::{RwLock, broadcast};
45

5-
use super::can_data::influx_handler::run_influx_handler;
6-
use super::can_data::live_data_handler::run_live_data_handler;
7-
use super::telem_message::CanPayload;
6+
use super::influx_handler::run_influx_handler;
7+
use super::live_data_handler::run_live_data_handler;
8+
use crate::tasks::telem_message::CanPayload;
89
use crate::config::CONFIG;
9-
use crate::tasks::client_api::subscriptions::Subscriptions;
10+
use crate::tasks::client_api::clients::Clients;
1011

1112
use jsoncan_rust::parsing::JsonCanParser;
1213
use jsoncan_rust::can_database::CanDatabase;
14+
use jsoncan_rust::can_database::DecodedSignal;
1315

1416
/**
1517
* Consumes from serial handler
@@ -18,7 +20,7 @@ use jsoncan_rust::can_database::CanDatabase;
1820
pub async fn run_can_data_handler(
1921
mut shutdown_rx: broadcast::Receiver<()>,
2022
mut can_queue_rx: broadcast::Receiver<CanPayload>,
21-
subscriptions: RwLock<Subscriptions>
23+
clients: Arc<RwLock<Clients>>
2224
) {
2325
println!("CAN data handler task started.");
2426

@@ -31,11 +33,11 @@ pub async fn run_can_data_handler(
3133
}
3234
};
3335

34-
// let (parsed_can_tx, _) = broadcast::channel::<can_database::CanMessage>(32);
36+
let (decoded_signal_tx, _) = broadcast::channel::<DecodedSignal>(32);
3537

3638
// parsed can signal consumers
37-
// let influx_handler_task = spawn(run_influx_handler(shutdown_rx.resubscribe(), parsed_can_tx.subscribe()));
38-
// let live_data_handler_task = spawn(run_live_data_handler(shutdown_rx.resubscribe(), parsed_can_tx.subscribe()));
39+
let influx_handler_task = spawn(run_influx_handler(shutdown_rx.resubscribe(), decoded_signal_tx.subscribe()));
40+
let live_data_handler_task = spawn(run_live_data_handler(shutdown_rx.resubscribe(), decoded_signal_tx.subscribe(), clients));
3941

4042
loop {
4143
select! {
@@ -44,13 +46,6 @@ pub async fn run_can_data_handler(
4446
break;
4547
}
4648
Ok(can_payload) = can_queue_rx.recv() => {
47-
// let can_message = match can_db.get_message_by_id(can_payload.can_id) {
48-
// Ok(c) => c,
49-
// Err(_) => {
50-
// eprintln!("Unknown CAN ID: {:X}", can_payload.can_id);
51-
// continue;
52-
// },
53-
// };
5449

5550
let decoded_signals = can_db.unpack(
5651
can_payload.can_id,
@@ -59,16 +54,17 @@ pub async fn run_can_data_handler(
5954
);
6055

6156
for signal in decoded_signals {
62-
for client_id in subscriptions.read().await.get_clients_of_signal(signal.name) {
63-
// todo emit to client
57+
if !decoded_signal_tx.send(signal).is_ok() {
58+
eprintln!("Parsed can data signal consumers are all closed");
59+
break;
6460
}
6561
}
6662
}
6763
}
6864
}
65+
66+
let _ = influx_handler_task.await;
67+
let _ = live_data_handler_task.await;
6968
println!("CAN data handler task ended.");
70-
71-
// influx_handler_task.await;
72-
// live_data_handler_task.await;
7369
}
7470

software/tracksight/rust_backend/src/tasks/can_data/influx_handler.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,13 @@ use tokio::select;
66
use crate::config::CONFIG;
77
use crate::tasks::telem_message::CanPayload;
88

9-
use jsoncan_rust::can_database::CanMessage;
9+
use jsoncan_rust::can_database::{CanMessage, DecodedSignal};
1010

1111
/**
1212
* After serial_handler parses the can messages,
1313
* this task consumes the messages and writes them to influxdb
1414
*/
15-
pub async fn run_influx_handler(mut shutdown_signal: Receiver<()>, mut can_queue_receiver: Receiver<()>) {
15+
pub async fn run_influx_handler(mut shutdown_signal: Receiver<()>, mut can_queue_receiver: Receiver<DecodedSignal>) {
1616
println!("Influx task started.");
1717

1818
let influx_client = Client::new(

software/tracksight/rust_backend/src/tasks/can_data/live_data_handler.rs

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,21 @@
1-
use tokio::sync::broadcast::Receiver;
2-
use influxdb2::Client;
1+
use std::sync::Arc;
2+
3+
use tokio::sync::{RwLock, broadcast::Receiver};
34
use tokio::select;
45

5-
use crate::config::CONFIG;
6-
use crate::tasks::telem_message::CanPayload;
6+
use crate::tasks::client_api::clients::Clients;
77

8-
use jsoncan_rust::can_database::CanMessage;
8+
use jsoncan_rust::can_database::DecodedSignal;
99

1010
/**
1111
* After serial_handler parses the can messages,
1212
* this task will handle "forwarding" live data to clients via sockets
1313
*/
14-
pub async fn run_live_data_handler(mut shutdown_signal: Receiver<()>, mut can_signals_rx: Receiver<()>) {
14+
pub async fn run_live_data_handler(
15+
mut shutdown_signal: Receiver<()>,
16+
mut can_signals_rx: Receiver<DecodedSignal>,
17+
clients: Arc<RwLock<Clients>>
18+
) {
1519
println!("Live data task started.");
1620

1721
loop {
@@ -20,9 +24,21 @@ pub async fn run_live_data_handler(mut shutdown_signal: Receiver<()>, mut can_si
2024
println!("Shutting down live data handler task.");
2125
break;
2226
}
23-
_ = can_signals_rx.recv() => {
27+
Ok(signal) = can_signals_rx.recv() => {
2428
// todo should also probably check for closed channels and close thread
2529
// todo handle
30+
let c = clients.read().await;
31+
for client_id in c.get_clients_of_signal(&signal.name) {
32+
// todo unwrap or handle none case, which would be really weird
33+
let _ = c.get_client_socket(client_id).clone().unwrap().emit(
34+
"data",
35+
&serde_json::json!({
36+
"name": signal.name,
37+
"value": signal.value,
38+
"timestamp": signal.timestamp,
39+
})
40+
);
41+
}
2642
}
2743
}
2844
}
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
pub mod live_data_handler;
2-
pub mod influx_handler;
2+
pub mod influx_handler;
3+
pub mod can_data_handler;

0 commit comments

Comments
 (0)