Skip to content

Commit a230b32

Browse files
committed
RAHHHHHH frontend receives from socket
1 parent 0b53045 commit a230b32

File tree

10 files changed

+292
-260
lines changed

10 files changed

+292
-260
lines changed

software/tracksight/rust_backend/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

software/tracksight/rust_backend/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,6 @@ socketioxide = "0.18.1"
1414
influxdb2 = "0.5"
1515
dotenv = "0.15"
1616
crc = "3.0.0"
17+
serde_json = "1.0.145"
1718

1819
jsoncan_rust = { path = "../../../scripts/code_generation/jsoncan-rust" }

software/tracksight/rust_backend/src/main.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use ctrlc;
2+
use std::sync::Arc;
23
use std::sync::atomic::{AtomicUsize, Ordering};
34
use tokio::sync::RwLock;
45
use tokio::sync::broadcast;
@@ -7,7 +8,7 @@ use tokio::task::{JoinSet};
78
mod config;
89
mod tasks;
910
use tasks::telem_message::CanPayload;
10-
use tasks::client_api::subscriptions::Subscriptions;
11+
use tasks::client_api::clients::Clients;
1112

1213
use tasks::serial_handler::run_serial_task;
1314
use tasks::can_data_handler::run_can_data_handler;
@@ -41,11 +42,11 @@ async fn main() {
4142

4243
// track which clients subscribe to which signals
4344
// maps signal name to client ids
44-
let subscriptions = RwLock::new(Subscriptions::new());
45+
let clients = Arc::new(RwLock::new(Clients::new()));
4546

4647
// start tasks
47-
tasks.spawn(run_api_handler(shutdown_rx.resubscribe()));
48-
tasks.spawn(run_can_data_handler(shutdown_rx.resubscribe(), can_queue_rx, subscriptions));
48+
tasks.spawn(run_api_handler(shutdown_rx.resubscribe(), clients.clone()));
49+
tasks.spawn(run_can_data_handler(shutdown_rx.resubscribe(), can_queue_rx, clients.clone()));
4950
tasks.spawn(run_serial_task(shutdown_rx.resubscribe(), can_queue_tx));
5051

5152
// wait for tasks to clean up
Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,40 @@
1+
2+
use std::sync::Arc;
13
use axum::response::IntoResponse;
24
use axum::serve::Listener;
35
use tokio::select;
4-
use tokio::sync::broadcast;
6+
use tokio::sync::{RwLock, broadcast};
57
use tokio::net::TcpListener;
68
use axum::{Router, routing::get};
79
use axum::extract::ws::{WebSocket, WebSocketUpgrade};
810
use socketioxide::{SocketIo, extract::SocketRef};
911
use crate::config::CONFIG;
12+
use crate::tasks::client_api::clients::Clients;
1013

1114

1215

13-
pub async fn run_api_handler(mut shutdown_rx: broadcast::Receiver<()>) {
16+
pub async fn run_api_handler(mut shutdown_rx: broadcast::Receiver<()>, clients: Arc<RwLock<Clients>>) {
1417
let addr = format!("0.0.0.0:{}", CONFIG.backend_port);
1518
let listener = TcpListener::bind(addr).await.unwrap();
1619

1720
let (socket_layer, io) = SocketIo::new_layer();
1821

22+
// default socketio endpoint
1923
io.ns("/", |socket: SocketRef| async move {
20-
println!("connected sid={}", socket.id);
24+
let client_id = socket.id.to_string();
25+
println!("{} connected", client_id);
26+
27+
clients.write().await.add_client(&client_id, &socket);
2128

22-
socket.on_disconnect(|| async {
23-
println!("disconnected");
29+
socket.on_disconnect(async move || {
30+
clients.write().await.remove_client(&client_id);
31+
println!("{} disconnected", client_id);
2432
});
33+
2534
});
2635

2736
let app: Router<> = Router::new()
2837
.layer(socket_layer);
29-
// .route("/socket.io", get(handle_connection));
3038

3139
select! {
3240
_ = shutdown_rx.recv() => {
@@ -37,15 +45,3 @@ pub async fn run_api_handler(mut shutdown_rx: broadcast::Receiver<()>) {
3745
}
3846
}
3947
}
40-
41-
// async fn handle_connection(ws: WebSocketUpgrade) -> impl IntoResponse {
42-
// let _ = ws.on_upgrade(async |mut socket: WebSocket| {
43-
// println!("client connected");
44-
45-
// while let Some(Ok(_)) = socket.recv().await {
46-
// // ignore
47-
// }
48-
49-
// println!("client disconnected")
50-
// });
51-
// }

software/tracksight/rust_backend/src/tasks/can_data/live_data_handler.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use tokio::sync::broadcast::Receiver;
2-
use influxdb2::Client;
32
use tokio::select;
43

54
use crate::config::CONFIG;

software/tracksight/rust_backend/src/tasks/can_data_handler.rs

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::sync::Arc;
12

23
use tokio::{select};
34
use tokio::sync::{RwLock, broadcast};
@@ -6,10 +7,11 @@ use super::can_data::influx_handler::run_influx_handler;
67
use super::can_data::live_data_handler::run_live_data_handler;
78
use super::telem_message::CanPayload;
89
use crate::config::CONFIG;
9-
use crate::tasks::client_api::subscriptions::Subscriptions;
10+
use crate::tasks::client_api::clients::Clients;
1011

1112
use jsoncan_rust::parsing::JsonCanParser;
1213
use jsoncan_rust::can_database::CanDatabase;
14+
use jsoncan_rust::can_database::DecodedSignal;
1315

1416
/**
1517
* Consumes from serial handler
@@ -18,7 +20,7 @@ use jsoncan_rust::can_database::CanDatabase;
1820
pub async fn run_can_data_handler(
1921
mut shutdown_rx: broadcast::Receiver<()>,
2022
mut can_queue_rx: broadcast::Receiver<CanPayload>,
21-
subscriptions: RwLock<Subscriptions>
23+
clients: Arc<RwLock<Clients>>
2224
) {
2325
println!("CAN data handler task started.");
2426

@@ -31,11 +33,11 @@ pub async fn run_can_data_handler(
3133
}
3234
};
3335

34-
// let (parsed_can_tx, _) = broadcast::channel::<can_database::CanMessage>(32);
36+
let (decoded_signal_tx, _) = broadcast::channel::<DecodedSignal>(32);
3537

3638
// parsed can signal consumers
37-
// let influx_handler_task = spawn(run_influx_handler(shutdown_rx.resubscribe(), parsed_can_tx.subscribe()));
38-
// let live_data_handler_task = spawn(run_live_data_handler(shutdown_rx.resubscribe(), parsed_can_tx.subscribe()));
39+
// let influx_handler_task = spawn(run_influx_handler(shutdown_rx.resubscribe(), decoded_signal_tx.subscribe()));
40+
// let live_data_handler_task = spawn(run_live_data_handler(shutdown_rx.resubscribe(), decoded_signal_tx.subscribe()));
3941

4042
loop {
4143
select! {
@@ -59,8 +61,17 @@ pub async fn run_can_data_handler(
5961
);
6062

6163
for signal in decoded_signals {
62-
for client_id in subscriptions.read().await.get_clients_of_signal(signal.name) {
63-
// todo emit to client
64+
let c = clients.read().await;
65+
for client_id in c.get_clients_of_signal(&signal.name) {
66+
// todo unwrap or handle none case, which would be really weird
67+
c.get_client_socket(client_id).clone().unwrap().emit(
68+
"data",
69+
&serde_json::json!({
70+
"name": signal.name,
71+
"value": signal.value,
72+
"timestamp": can_payload.can_timestamp,
73+
})
74+
);
6475
}
6576
}
6677
}

0 commit comments

Comments
 (0)