Skip to content

Commit 9faed30

Browse files
committed
Implementation of front end and backend for window node
1 parent 9310d29 commit 9faed30

File tree

11 files changed

+373
-108
lines changed

11 files changed

+373
-108
lines changed

backend/shared-logic/src/bc.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use tokio::sync::broadcast;
22
use tokio::sync::broadcast::Receiver;
33

44
use crate::mockeeg::{generate_mock_data};
5-
use crate::lsl::{EEGDataPacket, ProcessingConfig, receive_eeg};
5+
use crate::lsl::{EEGDataPacket, ProcessingConfig, WindowingConfig, receive_eeg};
66
use crate::db::{insert_batch_eeg, get_db_client};
77
use futures_util::stream::SplitSink;
88
use futures_util::{SinkExt};
@@ -15,6 +15,7 @@ use tokio::sync::Mutex;
1515
use std::sync::Arc;
1616
use std::time::Instant;
1717
use tokio_util::sync::CancellationToken;
18+
use tokio::sync::watch;
1819

1920
use log::{info, error};
2021

@@ -23,6 +24,7 @@ pub async fn start_broadcast(
2324
write: Arc<Mutex<SplitSink<WebSocketStream<TcpStream>, Message>>>,
2425
cancel_token: CancellationToken,
2526
processing_config: ProcessingConfig, // takes in signal processing configuration from frontend
27+
windowing_rx: watch::Receiver<WindowingConfig> // takes in windowing configuration from frontend
2628
) {
2729
let (tx, _rx) = broadcast::channel::<Arc<EEGDataPacket>>(1000); // size of the broadcast buffer, not recommand below 500, websocket will miss messages
2830
let rx_ws = tx.subscribe();
@@ -42,7 +44,7 @@ pub async fn start_broadcast(
4244
let sender_token = cancel_token.clone();
4345
let sender = tokio::spawn(async move {
4446
// use the ProcessingConfig provided by the client instead of default
45-
receive_eeg(tx_clone, sender_token, processing_config).await;
47+
receive_eeg(tx_clone, sender_token, processing_config, windowing_rx).await;
4648
});
4749

4850
// Subscribe for websocket Receiver

backend/shared-logic/src/db.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -353,7 +353,7 @@ pub async fn delete_session(client: &DbClient, session_id: i32) -> Result<(), Er
353353
.execute(&**client)
354354
.await?;
355355

356-
if (res.rows_affected() == 0) {
356+
if res.rows_affected() == 0 {
357357
info!("No rows deleted, session id {} not found", session_id);
358358
return Err(Error::RowNotFound);
359359
} else {

backend/shared-logic/src/lsl.rs

Lines changed: 74 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,23 @@ impl Default for ProcessingConfig {
3939
}
4040
}
4141

42+
#[derive(Clone, Deserialize, Debug)]
43+
pub struct WindowingConfig {
44+
pub chunk_size: usize,
45+
pub overlap_size: usize,
46+
}
47+
48+
impl Default for WindowingConfig {
49+
fn default() -> Self {
50+
Self {
51+
chunk_size: 64,
52+
overlap_size: 0,
53+
}
54+
}
55+
}
56+
4257
// Async entry point for EEG data collection.
43-
pub async fn receive_eeg(tx:Sender<Arc<EEGDataPacket>>, cancel_token: CancellationToken, processing_config: ProcessingConfig) {
58+
pub async fn receive_eeg(tx:Sender<Arc<EEGDataPacket>>, cancel_token: CancellationToken, processing_config: ProcessingConfig, windowing_rx: tokio::sync::watch::Receiver<WindowingConfig>,) {
4459
info!("Starting EEG data receiver");
4560
let python_script_path = std::env::var("SIGNAL_PROCESSING_SCRIPT")
4661
.unwrap_or_else(|_| "../shared-logic/src/signal_processing/signalProcessing.py".to_string());
@@ -67,7 +82,7 @@ pub async fn receive_eeg(tx:Sender<Arc<EEGDataPacket>>, cancel_token: Cancellati
6782
};
6883

6984
// Run collection loop
70-
run_eeg_collection(inlet, tx, cancel_token, processing_config, sig_processor)
85+
run_eeg_collection(inlet, tx, cancel_token, processing_config, sig_processor, windowing_rx)
7186
});
7287

7388
// Handle results
@@ -100,16 +115,39 @@ fn setup_eeg_stream() -> Result<StreamInlet, String> {
100115

101116
// Main EEG data collection loop.
102117
// Returns (successful_count, dropped_count) statistics.
103-
fn run_eeg_collection(inlet: StreamInlet, tx: Sender<Arc<EEGDataPacket>>, cancel_token: CancellationToken, config: ProcessingConfig, processor: SignalProcessor) -> (u32, u32) {
118+
fn run_eeg_collection(inlet: StreamInlet,
119+
tx: Sender<Arc<EEGDataPacket>>,
120+
cancel_token: CancellationToken,
121+
config: ProcessingConfig,
122+
processor: SignalProcessor,
123+
mut windowing_rx: tokio::sync::watch::Receiver<WindowingConfig>,
124+
) -> (u32, u32) {
104125
let mut count = 0;
105126
let mut drop = 0;
127+
128+
129+
let mut windowing = windowing_rx.borrow().clone();
130+
131+
// Creates a buffer that stores overlapping eeg samples
132+
let mut overlap_buffer: Vec<Vec<f64>> = vec![Vec::new(); 4];
133+
106134
let mut packet = EEGDataPacket {
107-
timestamps: Vec::with_capacity(65),
108-
signals: vec![Vec::with_capacity(65); 4],
135+
timestamps: Vec::with_capacity(windowing.chunk_size + 1),
136+
signals: vec![Vec::with_capacity(windowing.chunk_size + 1); 4],
109137
};
138+
110139
// Calculate the offset between LSL clock and Unix epoch
111140
let lsl_to_unix_offset = Utc::now().timestamp_nanos_opt().unwrap() as f64 / 1_000_000_000.0 - lsl::local_clock();
112141
loop {
142+
if windowing_rx.has_changed().unwrap_or(false) {
143+
windowing = windowing_rx.borrow().clone();
144+
info!("Windowing config updated: chunk={}, overlap={}", windowing.chunk_size, windowing.overlap_size);
145+
// Discard old buffer and start fresh with new config
146+
packet.timestamps.clear();
147+
for ch in &mut packet.signals { ch.clear(); }
148+
for ch in &mut overlap_buffer { ch.clear(); }
149+
}
150+
113151
// Check for cancellation
114152
if cancel_token.is_cancelled() {
115153
info!("EEG data receiver cancelled.");
@@ -130,10 +168,34 @@ fn run_eeg_collection(inlet: StreamInlet, tx: Sender<Arc<EEGDataPacket>>, cancel
130168
// Pull sample with timeout of 1 sec. If it does not see data for 1s, it returns.
131169
match inlet.pull_sample(1.0) {
132170
Ok((sample, timestamp)) => {
133-
match accumulate_sample(&sample, timestamp + lsl_to_unix_offset, &mut packet) {
171+
match accumulate_sample(&sample, timestamp + lsl_to_unix_offset, &mut packet, windowing.chunk_size) {
134172
Ok(true) => {
173+
// Window is full. Prepend overlap from previous window if there are any
174+
if windowing.overlap_size > 0 && !overlap_buffer[0].is_empty() {
175+
// Insert overlap samples at the front of the packet
176+
for (ch_idx, ch) in packet.signals.iter_mut().enumerate() {
177+
let mut new_ch = overlap_buffer[ch_idx].clone();
178+
new_ch.extend_from_slice(ch);
179+
*ch = new_ch;
180+
}
181+
182+
// Timestamps: prepend placeholders (or track overlap timestamps)
183+
// For simplicity, pad with copies of the first timestamp
184+
let first_ts = packet.timestamps[0];
185+
let mut new_ts = vec![first_ts; windowing.overlap_size];
186+
new_ts.extend_from_slice(&packet.timestamps);
187+
packet.timestamps = new_ts;
188+
}
189+
190+
// Save the tail as the new overlap_buffer
191+
let n = packet.signals[0].len();
192+
let keep = windowing.overlap_size.min(n);
193+
for (ch_idx, ch) in packet.signals.iter().enumerate() {
194+
overlap_buffer[ch_idx] = ch[n - keep..].to_vec();
195+
}
196+
135197
// Packet is full, send it
136-
info!("Packet is full, send it");
198+
info!("Packet is full, sending window: {} samples (overlap: {})", packet.signals[0].len(), keep);
137199
match process_and_send(&mut packet, &processor, &config, &tx) {
138200
Ok(_) => count += 1,
139201
Err(e) => {
@@ -146,9 +208,7 @@ fn run_eeg_collection(inlet: StreamInlet, tx: Sender<Arc<EEGDataPacket>>, cancel
146208
channel.clear();
147209
}
148210
}
149-
Ok(false) => {
150-
// Sample added, but packet not full yet
151-
}
211+
Ok(false) => {} // Sample added, but packet not full yet
152212
Err(e) => {
153213
let error_msg = e.to_string();
154214
if error_msg.contains("Invalid sample length: got 0 channels") {
@@ -179,7 +239,8 @@ fn run_eeg_collection(inlet: StreamInlet, tx: Sender<Arc<EEGDataPacket>>, cancel
179239
fn accumulate_sample(
180240
sample: &[f32],
181241
timestamp: f64,
182-
packet: &mut EEGDataPacket
242+
packet: &mut EEGDataPacket,
243+
chunk_size: usize,
183244
) -> Result<bool, String> {
184245
// Validate sample length
185246
if sample.len() < 4 {
@@ -188,7 +249,7 @@ fn accumulate_sample(
188249

189250
// Convert timestamp
190251
let timestamp_dt = DateTime::from_timestamp(
191-
timestamp as i64,
252+
timestamp as i64,
192253
(timestamp.fract() * 1_000_000_000.0) as u32
193254
).unwrap_or_else(|| Utc::now());
194255

@@ -202,7 +263,7 @@ fn accumulate_sample(
202263
}
203264

204265
// Check if packet is full
205-
Ok(packet.signals[0].len() >= 65)
266+
Ok(packet.signals[0].len() >= chunk_size)
206267
}
207268

208269
// calls the signalProcessing.py to process the packet and sends it

backend/websocket-server/src/main.rs

Lines changed: 94 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1-
use std::os::windows::process;
1+
// use std::os::windows::process;
22
use std::{sync::Arc};
33
use futures_util::stream::SplitSink;
44
use futures_util::{SinkExt, StreamExt};
55
use tokio::net::{TcpListener, TcpStream};
66
use tokio::sync::Mutex;
7+
use tokio::sync::watch;
78
use tokio_tungstenite::{
89
accept_async,
910
tungstenite::{Message},
@@ -12,10 +13,11 @@ use tokio_tungstenite::{
1213
use tokio_util::sync::CancellationToken;
1314
use shared_logic::bc::{start_broadcast};
1415
use shared_logic::db::{initialize_connection};
15-
use shared_logic::lsl::{ProcessingConfig}; // get ProcessingConfig from lsl.rs
16+
use shared_logic::lsl::{ProcessingConfig, WindowingConfig}; // get ProcessingConfig from lsl.rs
1617
use dotenvy::dotenv;
1718
use log::{info, error};
1819
use serde_json; // used to parse ProcessingConfig from JSON sent by frontend
20+
use serde_json::Value; // used to parse ProcessingConfig from JSON sent by frontend
1921

2022

2123
#[tokio::main]
@@ -75,83 +77,128 @@ async fn handle_ws(stream: TcpStream) {
7577
}
7678
}
7779

78-
// handle_connection, starts a async broadcast task,
79-
// then listens for incoming websocket closing request with the read stream in order to stop the broadcast task.
8080
async fn handle_connection(ws_stream: WebSocketStream<TcpStream>) {
81-
let ( write, mut read) = ws_stream.split();
82-
// set up for the broadcast task
83-
let write = Arc::new(Mutex::new(write));
81+
let (write, mut read) = ws_stream.split();
82+
let write = Arc::new(Mutex::new(write));
8483
let write_clone = write.clone();
8584
let cancel_token = CancellationToken::new();
8685
let cancel_clone = cancel_token.clone();
8786

88-
// setup registration for signal processing configuration
89-
let signal_config = read.next().await;
87+
let mut processing_config = ProcessingConfig::default();
88+
let mut initial_windowing = WindowingConfig::default();
9089

91-
// we have the ProcessingConfig struct
92-
// check if we received a message (two layers of unwrapping needed)
93-
let processing_config: ProcessingConfig = match signal_config {
90+
// Give the frontend a short window to send configs before we start
91+
// Use a timeout so we don't block forever if only one config arrives
92+
let config_timeout = tokio::time::Duration::from_millis(500);
93+
let deadline = tokio::time::Instant::now() + config_timeout;
9494

95-
Some(Ok(config_json)) => {
96-
97-
// here, we parse the json into a signal config struct using serde_json
98-
let config_text = config_json.to_text().unwrap();
95+
loop {
96+
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
97+
if remaining.is_zero() {
98+
info!("Config window elapsed, starting with current configs.");
99+
break;
100+
}
99101

100-
match serde_json::from_str(config_text) {
101-
Ok(config) => config,
102-
Err(e) => {
103-
error!("Error parsing signal configuration JSON: {}", e);
102+
match tokio::time::timeout(remaining, read.next()).await {
103+
Ok(Some(Ok(msg))) if msg.is_text() => {
104+
let text = msg.to_text().unwrap();
105+
if text == "clientClosing" {
106+
info!("Client closing during config phase.");
104107
return;
105108
}
109+
match parse_config_message(text) {
110+
ConfigMessage::Processing(cfg) => {
111+
info!("Received ProcessingConfig");
112+
processing_config = cfg;
113+
}
114+
ConfigMessage::Windowing(cfg) => {
115+
info!("Received WindowingConfig: chunk={}, overlap={}", cfg.chunk_size, cfg.overlap_size);
116+
initial_windowing = cfg;
117+
}
118+
ConfigMessage::Unknown => {
119+
info!("Non-config message during config phase: {}", text);
120+
break;
121+
}
122+
}
106123
}
107-
124+
Ok(Some(Err(e))) => { error!("Error receiving config: {}", e); return; }
125+
Ok(None) => { error!("Connection closed during config phase."); return; }
126+
Err(_) => {
127+
// Timeout elapsed
128+
info!("Config timeout, starting with received configs.");
129+
break;
130+
}
131+
Ok(_) => {}
108132
}
133+
}
109134

110-
Some(Err(e)) => {
111-
error!("Error receiving signal configuration: {}", e);
112-
return;
113-
}
135+
info!("Starting broadcast with chunk={}, overlap={}", initial_windowing.chunk_size, initial_windowing.overlap_size);
114136

115-
None => {
116-
error!("No signal configuration received from client. Closing connection.");
117-
return;
118-
}
119-
};
137+
let (windowing_tx, windowing_rx) = watch::channel(initial_windowing);
120138

121-
// spawns the broadcast task
122139
let mut broadcast = Some(tokio::spawn(async move {
123-
// pass ProcessingConfig into broadcast so it reaches receive_eeg
124-
start_broadcast(write_clone, cancel_clone, processing_config).await;
140+
start_broadcast(write_clone, cancel_clone, processing_config, windowing_rx).await;
125141
}));
126142

127-
128-
//listens for incoming messages
129143
while let Some(msg) = read.next().await {
130144
match msg {
131-
Ok(msg) if msg.is_text() => { //prep for closing, this currently will not be called, waiting for frontend
145+
Ok(msg) if msg.is_text() => {
132146
let text = msg.to_text().unwrap();
133-
info!("Received request: {}", text);
134147
if text == "clientClosing" {
135-
handle_prep_close(&mut broadcast,&cancel_token, &write.clone()).await;
148+
handle_prep_close(&mut broadcast, &cancel_token, &write).await;
149+
break;
150+
}
151+
match parse_config_message(text) {
152+
ConfigMessage::Windowing(cfg) => {
153+
info!("Windowing update: chunk={}, overlap={}", cfg.chunk_size, cfg.overlap_size);
154+
let _ = windowing_tx.send(cfg);
155+
}
156+
ConfigMessage::Processing(_) => {
157+
info!("Processing config update received");
158+
}
159+
ConfigMessage::Unknown => {
160+
error!("Unknown mid-stream message: {}", text);
161+
}
136162
}
137163
}
138-
Ok(Message::Close(frame)) => { //handles closing.
139-
info!("Received a close request from the client");
140-
// cancel_token.cancel(); // remove after frontend updates
141-
let mut write = write.lock().await;
142-
let _ = write.send(Message::Close(frame)).await;
164+
Ok(Message::Close(frame)) => {
165+
let mut w = write.lock().await;
166+
let _ = w.send(Message::Close(frame)).await;
143167
break;
144168
}
145169
Ok(_) => continue,
146-
Err(e) => {
147-
error!("Read error (client likely disconnected): {}", e);
148-
break;
149-
}
170+
Err(e) => { error!("Read error: {}", e); break; }
150171
}
151172
}
152173
info!("Client disconnected.");
153174
}
154175

176+
// Discriminate config type by which fields are present
177+
enum ConfigMessage {
178+
Processing(ProcessingConfig),
179+
Windowing(WindowingConfig),
180+
Unknown,
181+
}
182+
183+
fn parse_config_message(text: &str) -> ConfigMessage {
184+
let Ok(value) = serde_json::from_str::<Value>(text) else {
185+
return ConfigMessage::Unknown;
186+
};
187+
if value.get("chunk_size").is_some() {
188+
match serde_json::from_value::<WindowingConfig>(value) {
189+
Ok(cfg) => ConfigMessage::Windowing(cfg),
190+
Err(e) => { error!("Failed to parse WindowingConfig: {}", e); ConfigMessage::Unknown }
191+
}
192+
} else if value.get("apply_bandpass").is_some() {
193+
match serde_json::from_value::<ProcessingConfig>(value) {
194+
Ok(cfg) => ConfigMessage::Processing(cfg),
195+
Err(e) => { error!("Failed to parse ProcessingConfig: {}", e); ConfigMessage::Unknown }
196+
}
197+
} else {
198+
ConfigMessage::Unknown
199+
}
200+
}
201+
155202
// handle_prep_close uses the cancel_token to stop the broadcast sender task, and sends a "prep close complete" message to the client
156203
async fn handle_prep_close(
157204
broadcast_task: &mut Option<tokio::task::JoinHandle<()>>,

0 commit comments

Comments
 (0)