Skip to content

Commit 1fb964e

Browse files
committed
refactor websocket to websocket context
1 parent bc5f558 commit 1fb964e

File tree

5 files changed

+201
-175
lines changed

5 files changed

+201
-175
lines changed

frontend/app/home/page.tsx

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
'use client';
22
import ReactFlowView from '@/components/ui-react-flow/react-flow-view';
33
import { GlobalProvider } from '@/context/GlobalContext';
4+
import { WebSocketProvider } from '@/context/WebSocketContext';
45
import AppHeader from '@/components/ui-header/app-header';
56
import SettingsBar from '@/components/ui-header/settings-bar';
67
import { NotificationsProvider } from '@/components/notifications';
78

89
export default function Home() {
910
return (
1011
<GlobalProvider>
12+
<WebSocketProvider>
1113
<NotificationsProvider>
1214
<div className="h-screen flex flex-col">
1315
{/* Top section for header and settings bar */}
@@ -22,6 +24,7 @@ export default function Home() {
2224
</div>
2325
</div>
2426
</NotificationsProvider>
27+
</WebSocketProvider>
2528
</GlobalProvider>
2629
);
2730
}

frontend/components/nodes/signal-graph-node/signal-graph-node.tsx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { Card } from '@/components/ui/card';
22
import { Handle, Position, useReactFlow } from '@xyflow/react';
3-
import useWebsocket from '@/hooks/useWebsocket';
3+
import useNodeData from '@/hooks/useNodeData';
44
import React from 'react';
55
import { useGlobalContext } from '@/context/GlobalContext';
66
import { ArrowUpRight } from 'lucide-react';
@@ -16,7 +16,7 @@ import {
1616
import SignalGraphView from './signal-graph-full';
1717

1818
export default function SignalGraphNode({ id }: { id?: string }) {
19-
const { renderData } = useWebsocket(20, 10);
19+
const { renderData } = useNodeData(20, 10);
2020

2121
const processedData = renderData;
2222
const reactFlowInstance = useReactFlow();
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
'use client';
2+
3+
import React, { createContext, useContext, useEffect, useRef, useCallback, ReactNode } from 'react';
4+
import { useGlobalContext } from './GlobalContext';
5+
import { ProcessingConfig } from '@/lib/processing';
6+
7+
export type DataPoint = {
8+
time: string;
9+
signal1: number;
10+
signal2: number;
11+
signal3: number;
12+
signal4: number;
13+
};
14+
15+
type Subscriber = (points: DataPoint[]) => void;
16+
17+
type WebSocketContextType = {
18+
subscribe: (fn: Subscriber) => () => void;
19+
sendProcessingConfig: (config: ProcessingConfig) => void;
20+
};
21+
22+
const WebSocketContext = createContext<WebSocketContextType | undefined>(undefined);
23+
24+
const DEFAULT_PROCESSING_CONFIG: ProcessingConfig = {
25+
apply_bandpass: false,
26+
use_iir: false,
27+
l_freq: null,
28+
h_freq: null,
29+
downsample_factor: null,
30+
sfreq: 256,
31+
n_channels: 4,
32+
};
33+
34+
function normalizeBatch(batch: any): DataPoint[] {
35+
return batch.timestamps.map((time: number, i: number) => ({
36+
time: String(time),
37+
signal1: batch.signals[0][i],
38+
signal2: batch.signals[1][i],
39+
signal3: batch.signals[2][i],
40+
signal4: batch.signals[3][i],
41+
}));
42+
}
43+
44+
export function WebSocketProvider({ children }: { children: ReactNode }) {
45+
const { dataStreaming } = useGlobalContext();
46+
const wsRef = useRef<WebSocket | null>(null);
47+
const processingConfigRef = useRef<ProcessingConfig | null>(null);
48+
const subscribersRef = useRef<Set<Subscriber>>(new Set());
49+
const closingTimeoutRef = useRef<NodeJS.Timeout | null>(null);
50+
const isClosingGracefullyRef = useRef(false);
51+
52+
const subscribe = useCallback((fn: Subscriber) => {
53+
subscribersRef.current.add(fn);
54+
return () => subscribersRef.current.delete(fn);
55+
}, []);
56+
57+
const sendProcessingConfig = useCallback((config: ProcessingConfig) => {
58+
processingConfigRef.current = config;
59+
if (wsRef.current?.readyState === WebSocket.OPEN) {
60+
wsRef.current.send(JSON.stringify(config));
61+
console.log('Sent processing config:', config);
62+
}
63+
}, []);
64+
65+
// Forward processing-config-update events from filter node to backend
66+
useEffect(() => {
67+
const handler = (event: Event) => {
68+
sendProcessingConfig((event as CustomEvent<ProcessingConfig>).detail);
69+
};
70+
window.addEventListener('processing-config-update', handler);
71+
return () => window.removeEventListener('processing-config-update', handler);
72+
}, [sendProcessingConfig]);
73+
74+
// Manage WebSocket lifecycle
75+
useEffect(() => {
76+
if (!dataStreaming) {
77+
if (wsRef.current?.readyState === WebSocket.OPEN && !isClosingGracefullyRef.current) {
78+
isClosingGracefullyRef.current = true;
79+
wsRef.current.send('clientClosing');
80+
closingTimeoutRef.current = setTimeout(() => {
81+
console.warn('Timeout: no confirmed closing received. Forcing close.');
82+
wsRef.current?.close();
83+
isClosingGracefullyRef.current = false;
84+
}, 5000);
85+
}
86+
return;
87+
}
88+
89+
if (wsRef.current && wsRef.current.readyState !== WebSocket.CLOSED) return;
90+
91+
console.log('Opening WebSocket connection...');
92+
const ws = new WebSocket('ws://localhost:8080');
93+
wsRef.current = ws;
94+
95+
ws.onopen = () => {
96+
console.log('WebSocket connection opened.');
97+
ws.send(JSON.stringify(processingConfigRef.current ?? DEFAULT_PROCESSING_CONFIG));
98+
};
99+
100+
ws.onmessage = (event) => {
101+
const message = event.data;
102+
if (message === 'confirmed closing') {
103+
console.log("Received 'confirmed closing' from server.");
104+
if (closingTimeoutRef.current) clearTimeout(closingTimeoutRef.current);
105+
ws.close();
106+
isClosingGracefullyRef.current = false;
107+
} else {
108+
try {
109+
const points = normalizeBatch(JSON.parse(message));
110+
subscribersRef.current.forEach((fn) => fn(points));
111+
} catch (e) {
112+
console.error('Failed to parse WebSocket message:', e);
113+
}
114+
}
115+
};
116+
117+
ws.onclose = (event) => {
118+
console.log('WebSocket connection closed:', event.code, event.reason);
119+
wsRef.current = null;
120+
isClosingGracefullyRef.current = false;
121+
};
122+
123+
ws.onerror = () => {
124+
if (closingTimeoutRef.current) clearTimeout(closingTimeoutRef.current);
125+
isClosingGracefullyRef.current = false;
126+
};
127+
128+
return () => {
129+
if (closingTimeoutRef.current) clearTimeout(closingTimeoutRef.current);
130+
if (ws.readyState === WebSocket.OPEN && !isClosingGracefullyRef.current) {
131+
ws.send('clientClosing');
132+
closingTimeoutRef.current = setTimeout(() => ws.close(), 5000);
133+
} else if (ws.readyState !== WebSocket.CLOSED) {
134+
ws.close();
135+
}
136+
wsRef.current = null;
137+
};
138+
}, [dataStreaming]);
139+
140+
return (
141+
<WebSocketContext.Provider value={{ subscribe, sendProcessingConfig }}>
142+
{children}
143+
</WebSocketContext.Provider>
144+
);
145+
}
146+
147+
export function useWebSocketContext() {
148+
const ctx = useContext(WebSocketContext);
149+
if (!ctx) throw new Error('useWebSocketContext must be used within a WebSocketProvider');
150+
return ctx;
151+
}

frontend/hooks/useNodeData.ts

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import { useEffect, useRef, useState } from 'react';
2+
import { useWebSocketContext, DataPoint } from '@/context/WebSocketContext';
3+
import { useGlobalContext } from '@/context/GlobalContext';
4+
5+
export default function useNodeData(chartSize: number, batchesPerSecond: number) {
6+
const { subscribe } = useWebSocketContext();
7+
const { dataStreaming } = useGlobalContext();
8+
const [renderData, setRenderData] = useState<DataPoint[]>([]);
9+
const bufferRef = useRef<DataPoint[]>([]);
10+
11+
// Subscribe to incoming data points from the shared WebSocket
12+
useEffect(() => {
13+
const unsubscribe = subscribe((points) => {
14+
bufferRef.current.push(...points);
15+
});
16+
return unsubscribe;
17+
}, [subscribe]);
18+
19+
// Drain the buffer into renderData at the node's own rate
20+
useEffect(() => {
21+
if (!dataStreaming || batchesPerSecond <= 0) return;
22+
23+
const intervalTime = 1000 / batchesPerSecond;
24+
const id = setInterval(() => {
25+
if (bufferRef.current.length > 0) {
26+
const batch = bufferRef.current.splice(
27+
0,
28+
Math.min(bufferRef.current.length, chartSize)
29+
);
30+
setRenderData((prev) => [...prev, ...batch].slice(-chartSize));
31+
}
32+
}, intervalTime);
33+
34+
return () => clearInterval(id);
35+
}, [dataStreaming, batchesPerSecond, chartSize]);
36+
37+
// Always clear the buffer on start/stop to prevent backlog buildup.
38+
// renderData is never cleared so the chart holds its last frame when paused
39+
// and new data naturally replaces it as it arrives.
40+
useEffect(() => {
41+
bufferRef.current = [];
42+
}, [dataStreaming]);
43+
44+
return { renderData };
45+
}

0 commit comments

Comments
 (0)