Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions backend/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 8 additions & 3 deletions backend/shared-logic/src/bc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ use tokio_util::sync::CancellationToken;
use log::{info, error};

// starts the broadcast by spawning async sender and receiver tasks.
pub async fn start_broadcast(write: Arc<Mutex<SplitSink<WebSocketStream<TcpStream>, Message>>>, cancel_token: CancellationToken) {
pub async fn start_broadcast(
write: Arc<Mutex<SplitSink<WebSocketStream<TcpStream>, Message>>>,
cancel_token: CancellationToken,
processing_config: ProcessingConfig, // takes in signal processing configuration from frontend
) {
let (tx, _rx) = broadcast::channel::<Arc<EEGDataPacket>>(1000); // size of the broadcast buffer, not recommand below 500, websocket will miss messages
let rx_ws = tx.subscribe();
let rx_db = tx.subscribe();
Expand All @@ -37,7 +41,8 @@ pub async fn start_broadcast(write: Arc<Mutex<SplitSink<WebSocketStream<TcpStrea
let tx_clone = tx.clone();
let sender_token = cancel_token.clone();
let sender = tokio::spawn(async move {
receive_eeg(tx_clone, sender_token, ProcessingConfig::default()).await;
// use the ProcessingConfig provided by the client instead of default
receive_eeg(tx_clone, sender_token, processing_config).await;
});

// Subscribe for websocket Receiver
Expand Down Expand Up @@ -141,4 +146,4 @@ pub async fn db_receiver(mut rx_db: Receiver<Arc<EEGDataPacket>>){
}
}
info!("database got {} packets ({} total samples), and dropped {} msg", packet_count, sample_count, dropped)
}
}
2 changes: 1 addition & 1 deletion backend/shared-logic/src/lsl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub struct EEGDataPacket {
pub signals: Vec<Vec<f64>>,
}

#[derive(Clone)]
#[derive(Clone, Deserialize)]
pub struct ProcessingConfig {
pub apply_bandpass: bool,
pub use_iir: bool, // true for IIR, false for FIR
Expand Down
4 changes: 3 additions & 1 deletion backend/websocket-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,6 @@ env_logger = "0.11" # Added for env_logger::init()
tokio-util = "0.7.15"

# Shared logic crate
shared-logic = { path = "../shared-logic" }
shared-logic = { path = "../shared-logic" }
serde = "1.0.228"
serde_json = "1"
41 changes: 39 additions & 2 deletions backend/websocket-server/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::os::windows::process;
use std::{sync::Arc};
use futures_util::stream::SplitSink;
use futures_util::{SinkExt, StreamExt};
Expand All @@ -11,8 +12,10 @@ use tokio_tungstenite::{
use tokio_util::sync::CancellationToken;
use shared_logic::bc::{start_broadcast};
use shared_logic::db::{initialize_connection};
use shared_logic::lsl::{ProcessingConfig}; // get ProcessingConfig from lsl.rs
use dotenvy::dotenv;
use log::{info, error};
use serde_json; // used to parse ProcessingConfig from JSON sent by frontend


#[tokio::main]
Expand Down Expand Up @@ -81,9 +84,44 @@ async fn handle_connection(ws_stream: WebSocketStream<TcpStream>) {
let write_clone = write.clone();
let cancel_token = CancellationToken::new();
let cancel_clone = cancel_token.clone();

// setup registration for signal processing configuration
let signal_config = read.next().await;

// we have the ProcessingConfig struct
// check if we received a message (two layers of unwrapping needed)
let processing_config: ProcessingConfig = match signal_config {

Some(Ok(config_json)) => {

// here, we parse the json into a signal config struct using serde_json
let config_text = config_json.to_text().unwrap();

match serde_json::from_str(config_text) {
Ok(config) => config,
Err(e) => {
error!("Error parsing signal configuration JSON: {}", e);
return;
}
}

}

Some(Err(e)) => {
error!("Error receiving signal configuration: {}", e);
return;
}

None => {
error!("No signal configuration received from client. Closing connection.");
return;
}
};

// spawns the broadcast task
let mut broadcast = Some(tokio::spawn(async move {
start_broadcast(write_clone, cancel_clone).await;
// pass ProcessingConfig into broadcast so it reaches receive_eeg
start_broadcast(write_clone, cancel_clone, processing_config).await;
}));


Expand Down Expand Up @@ -137,4 +175,3 @@ async fn handle_prep_close(
info!("Notified client prep close is complete.");
}
}

68 changes: 65 additions & 3 deletions frontend/components/nodes/filter-node/filter-node.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import React from 'react';
import { Handle, Position, useReactFlow } from '@xyflow/react';
import { useGlobalContext } from '@/context/GlobalContext';
import ComboBox from './combo-box';
import useWebsocket from '@/hooks/useWebsocket';
import { ProcessingConfig } from '@/lib/processing';

interface FilterNodeProps {
id?: string;
Expand All @@ -12,15 +14,52 @@ interface FilterNodeProps {
export default function FilterNode({ id }: FilterNodeProps) {
const [selectedFilter, setSelectedFilter] = React.useState('lowpass');
const [isConnected, setIsConnected] = React.useState(false);
const [frequency, setFrequency] = React.useState(75);

const [cutoff, setCutoff] = React.useState(50);

// Get React Flow instance
const reactFlowInstance = useReactFlow();

// Get data stream status from global context
const { dataStreaming } = useGlobalContext();


const { sendProcessingConfig } = useWebsocket(0, 0)

const buildConfig = (): ProcessingConfig => {
if (!isConnected) {
return {
apply_bandpass: false,
use_iir: false,
l_freq: null,
h_freq: null,
downsample_factor: null,
sfreq: 256,
n_channels: 4,
}
}

if (selectedFilter === 'lowpass') {
return {
apply_bandpass: true,
use_iir: false,
l_freq: null,
h_freq: cutoff,
downsample_factor: null,
sfreq: 256,
n_channels: 4,
}
}

return {
apply_bandpass: true,
use_iir: false,
l_freq: cutoff,
h_freq: null,
downsample_factor: null,
sfreq: 256,
n_channels: 4,
}
}

// Check connection status and update state
const checkConnectionStatus = React.useCallback(() => {
try {
Expand Down Expand Up @@ -81,6 +120,11 @@ export default function FilterNode({ id }: FilterNodeProps) {
};
}, [checkConnectionStatus]);

React.useEffect(() => {
if (!dataStreaming) return
sendProcessingConfig(buildConfig())
}, [selectedFilter, cutoff, isConnected, dataStreaming])

return (
<div className="relative">
{/* Input Handle - positioned to align with left circle */}
Expand Down Expand Up @@ -130,6 +174,24 @@ export default function FilterNode({ id }: FilterNodeProps) {
isConnected={isConnected}
isDataStreamOn={dataStreaming}
/>

{isConnected && (
<>
<input
type="range"
min={1}
max={100}
value={cutoff}
onChange={(e) => setCutoff(Number(e.target.value))}
/>

<p className="text-xs text-muted-foreground">
{selectedFilter === 'lowpass'
? 'Frequencies below cutoff will pass through'
: 'Frequencies above cutoff will pass through'}
</p>
</>
)}
</div>
);
}
17 changes: 16 additions & 1 deletion frontend/hooks/useWebsocket.tsx
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { useEffect, useState, useRef } from 'react';
import { useGlobalContext } from '@/context/GlobalContext';
import { ProcessingConfig } from '@/lib/processing';

export default function useWebsocket(
chartSize: number,
Expand All @@ -11,9 +12,19 @@ export default function useWebsocket(
const wsRef = useRef<WebSocket | null>(null);
const closingTimeoutRef = useRef<NodeJS.Timeout | null>(null);
const [isClosingGracefully, setIsClosingGracefully] = useState(false);
const processingConfigRef = useRef<ProcessingConfig | null>(null);

const intervalTime = 1000 / batchesPerSecond;

const sendProcessingConfig = (config: ProcessingConfig) => {
processingConfigRef.current = config

if (wsRef.current && wsRef.current.readyState === WebSocket.OPEN) {
wsRef.current.send(JSON.stringify(config))
console.log('Sent processing config:', config)
}
}

useEffect(() => {
console.log('data streaming:', dataStreaming);

Expand Down Expand Up @@ -45,6 +56,10 @@ export default function useWebsocket(

ws.onopen = () => {
console.log('WebSocket connection opened.');

if (processingConfigRef.current) {
ws.send(JSON.stringify(processingConfigRef.current))
}
};

ws.onmessage = (event) => {
Expand Down Expand Up @@ -128,5 +143,5 @@ export default function useWebsocket(
};
}, [chartSize, batchesPerSecond, dataStreaming, isClosingGracefully]);

return { renderData };
return { renderData, sendProcessingConfig };
}
10 changes: 10 additions & 0 deletions frontend/lib/processing.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
export type ProcessingConfig = {
apply_bandpass: boolean
use_iir: boolean
l_freq: number | null
h_freq: number | null
downsample_factor: number | null
sfreq: number
n_channels: number
}