diff --git a/frontend/components/nodes/filter-node/filter-node.tsx b/frontend/components/nodes/filter-node/filter-node.tsx
index a11075a..5377b03 100644
--- a/frontend/components/nodes/filter-node/filter-node.tsx
+++ b/frontend/components/nodes/filter-node/filter-node.tsx
@@ -1,10 +1,6 @@
'use client';
import { useGlobalContext } from '@/context/GlobalContext';
-import { ProcessingConfig } from '@/lib/processing';
-const dispatchProcessingConfig = (config: ProcessingConfig) => {
- window.dispatchEvent(new CustomEvent('processing-config-update', { detail: config }));
-};
import { Handle, Position, useReactFlow } from '@xyflow/react';
import React from 'react';
import ComboBox from './combo-box';
@@ -25,7 +21,7 @@ export default function FilterNode({ id }: FilterNodeProps) {
const { dataStreaming } = useGlobalContext();
- const buildConfig = (): ProcessingConfig => {
+ const buildConfig = () => {
if (!isConnected) {
return {
apply_bandpass: false,
@@ -77,6 +73,19 @@ export default function FilterNode({ id }: FilterNodeProps) {
}
}
+ // write config to node data for later retrieval when dispatching pipeline payload
+ const pushConfigToNodeData = React.useCallback(() => {
+ if (!id) return;
+ if (!isConnected) return; // Don't push config if not connected
+ const config = buildConfig();
+ reactFlowInstance.setNodes((nds) =>
+ nds.map((n) =>
+ n.id === id ? { ...n, data: { ...n.data, config } } : n
+ )
+ );
+ }, [id, reactFlowInstance, selectedFilter, lowCutoff, highCutoff, isConnected]);
+
+
// Check connection status and update state
const checkConnectionStatus = React.useCallback(() => {
try {
@@ -137,14 +146,10 @@ export default function FilterNode({ id }: FilterNodeProps) {
};
}, [checkConnectionStatus]);
+ // Push config to node data and dispatch processing config when relevant state changes
React.useEffect(() => {
- if (!dataStreaming) return
- dispatchProcessingConfig(buildConfig())
- }, [selectedFilter, lowCutoff, highCutoff, isConnected, dataStreaming])
-
- React.useEffect(() => {
- dispatchProcessingConfig(buildConfig());
- }, []);
+ pushConfigToNodeData();
+ }, [pushConfigToNodeData]);
return (
@@ -202,4 +207,4 @@ export default function FilterNode({ id }: FilterNodeProps) {
);
-}
\ No newline at end of file
+}
diff --git a/frontend/components/nodes/window-node/window-node.tsx b/frontend/components/nodes/window-node/window-node.tsx
index a84cc27..438e41b 100644
--- a/frontend/components/nodes/window-node/window-node.tsx
+++ b/frontend/components/nodes/window-node/window-node.tsx
@@ -3,7 +3,6 @@ import { useGlobalContext } from '@/context/GlobalContext';
import { Handle, Position, useReactFlow } from '@xyflow/react';
import React from 'react';
import WindowComboBox, { type WindowOption } from './window-combo-box';
-import { WindowingConfig } from '@/lib/processing';
interface WindowNodeProps {
id?: string;
@@ -25,15 +24,32 @@ export default function WindowNode({ id }: WindowNodeProps) {
const { dataStreaming } = useGlobalContext();
- const dispatchWindowingConfig = (config: WindowingConfig) => {
- window.dispatchEvent(new CustomEvent('windowing-config-update', { detail: config }));
- };
-
- const buildConfig = (): WindowingConfig => ({
+ const buildConfig = () => ({
chunk_size: windowSize,
overlap_size: overlapSize,
});
+ // Validate config values
+ const isValidConfig =
+ Number.isInteger(windowSize) &&
+ windowSize > 0 &&
+ Number.isInteger(overlapSize) &&
+ overlapSize >= 0 &&
+ overlapSize < windowSize;
+
+ // Push config to node data and dispatch processing config when relevant state changes
+ const pushConfigToNodeData = React.useCallback(() => {
+ if (!id) return;
+ if (!isValidConfig) return;
+ const config = buildConfig();
+ reactFlowInstance.setNodes((nds) =>
+ nds.map((n) =>
+ n.id === id ? { ...n, data: { ...n.data, config } } : n
+ )
+ );
+ }, [id, reactFlowInstance, windowSize, overlapSize, isValidConfig]);
+
+
// Check connection status and update state
const checkConnectionStatus = React.useCallback(() => {
try {
@@ -73,17 +89,6 @@ export default function WindowNode({ id }: WindowNodeProps) {
setIsConnected(false);
}
}, [id, reactFlowInstance]);
-
- const isValidConfig =
- Number.isInteger(windowSize) &&
- windowSize > 0 &&
- Number.isInteger(overlapSize) &&
- overlapSize >= 0 &&
- overlapSize < windowSize;
-
- React.useEffect(() => {
- dispatchWindowingConfig(buildConfig());
- }, []);
// Check connection status on mount and when edges might change
React.useEffect(() => {
@@ -105,10 +110,11 @@ export default function WindowNode({ id }: WindowNodeProps) {
};
}, [checkConnectionStatus]);
+ // Push config to node data and dispatch processing config when relevant state changes
React.useEffect(() => {
if(!isValidConfig) return;
- dispatchWindowingConfig(buildConfig());
- }, [windowSize, overlapSize, selectedOption, isConnected, dataStreaming]);
+ pushConfigToNodeData();
+ }, [pushConfigToNodeData, isValidConfig]);
return (
@@ -165,4 +171,4 @@ export default function WindowNode({ id }: WindowNodeProps) {
/>
);
-}
\ No newline at end of file
+}
diff --git a/frontend/components/ui-react-flow/react-flow-view.tsx b/frontend/components/ui-react-flow/react-flow-view.tsx
index 3f25c97..9407a9d 100644
--- a/frontend/components/ui-react-flow/react-flow-view.tsx
+++ b/frontend/components/ui-react-flow/react-flow-view.tsx
@@ -37,6 +37,9 @@ import {
import { useEffect, useState } from 'react';
import { X, Ellipsis, RotateCw, RotateCcw } from 'lucide-react';
+import { useWebSocketContext } from '@/context/WebSocketContext';
+import { PipelinePayload } from '@/lib/pipeline';
+import { useGlobalContext } from '@/context/GlobalContext';
const nodeTypes = {
'source-node': SourceNode,
@@ -46,6 +49,202 @@ const nodeTypes = {
'window-node': WindowNode,
};
+// defines backend types for React Flow types
+const typeMap: Record = {
+ 'filter-node': 'preprocessing',
+ 'window-node': 'window',
+ 'machine-learning-node': 'ml',
+};
+
+// allow for defaults of the filtering node to still be applied if user doesn't specify them in the UI
+const DEFAULT_PROCESSING = {
+ apply_bandpass: false,
+ use_iir: false,
+ l_freq: null,
+ h_freq: null,
+ downsample_factor: null,
+ sfreq: 256,
+ n_channels: 4,
+};
+const DEFAULT_WINDOWING = {
+ chunk_size: 64,
+ overlap_size: 0,
+};
+
+// Only these nodes are executed by the backend pipeline
+const PIPELINE_NODE_TYPES = new Set([
+ 'window-node',
+ 'filter-node',
+ 'machine-learning-node',
+]);
+
+const topoSort = (nodes: Node[], edges: Edge[]) => {
+ const incoming = new Map(); // count of incoming edges for each node
+ const outgoing = new Map(); // list of target nodes for each node
+
+ // Initialize maps to 0 incoming and empty outgoing
+ nodes.forEach((n) => {
+ incoming.set(n.id, 0);
+ outgoing.set(n.id, []);
+ });
+
+ edges.forEach((e) => {
+ if (!outgoing.has(e.source)) return;
+ outgoing.get(e.source)!.push(e.target); // Add target to outgoing list of source
+ incoming.set(e.target, (incoming.get(e.target) ?? 0) + 1); // Increment incoming count for target
+ });
+
+ const queue = nodes
+ .filter((n) => (incoming.get(n.id) ?? 0) === 0)
+ .map((n) => n.id);
+
+ const ordered: string[] = [];
+
+ // Kahn's algorithm for topological sorting
+ while (queue.length > 0) {
+ const id = queue.shift()!;
+ ordered.push(id);
+
+ for (const target of outgoing.get(id) ?? []) {
+ const next = (incoming.get(target) ?? 0) - 1;
+ incoming.set(target, next);
+ if (next === 0) queue.push(target);
+ }
+ }
+
+ // In case of cycles or disconnected nodes
+ nodes.forEach((n) => {
+ if (!ordered.includes(n.id)) ordered.push(n.id);
+ });
+
+ return ordered.map((id) => nodes.find((n) => n.id === id)!).filter(Boolean);
+};
+
+const validatePipeline = (nodes: Node[], edges: Edge[]) => {
+ const errors: string[] = [];
+ const warnings: string[] = [];
+
+ // maps to track nodes and their connections
+ const byId = new Map(nodes.map((n) => [n.id, n]));
+ const incoming = new Map();
+ const outgoing = new Map();
+
+ nodes.forEach((n) => {
+ incoming.set(n.id, []);
+ outgoing.set(n.id, []);
+ });
+
+ // Build connection maps
+ edges.forEach((e) => {
+ if (!incoming.has(e.target) || !outgoing.has(e.source)) return;
+ incoming.get(e.target)!.push(e.source);
+ outgoing.get(e.source)!.push(e.target);
+ });
+
+ // Require 1 source node
+ const sourceNodes = nodes.filter((n) => n.type === 'source-node');
+ if (sourceNodes.length === 0) errors.push('Missing Source node.');
+ if (sourceNodes.length > 1) errors.push('Multiple Source nodes are not allowed.');
+
+ // Require 1 window node
+ const windowNodes = nodes.filter((n) => n.type === 'window-node');
+ if (windowNodes.length === 0) errors.push('Missing Window node.');
+ if (windowNodes.length > 1) errors.push('Multiple Window nodes are not allowed.');
+
+ // Require window node must connect directly to source
+ windowNodes.forEach((win) => {
+ const ins = incoming.get(win.id) ?? [];
+ if (ins.length === 0 || ins.some((id) => byId.get(id)?.type !== 'source-node')) {
+ errors.push('Window node must connect directly from Source.');
+ }
+ });
+
+ // If ML node exists, window is required for the ML pipeline
+ const mlNodes = nodes.filter((n) => n.type === 'machine-learning-node');
+ if (mlNodes.length > 0 && windowNodes.length === 0) {
+ errors.push('A Window node is required for ML pipelines.');
+ }
+
+ // Require that output nodes are terminal
+ const outputTypes = new Set(['signal-graph-node', 'machine-learning-node']);
+ nodes.forEach((n) => {
+ if (outputTypes.has(n.type ?? '')) {
+ const outs = outgoing.get(n.id) ?? [];
+ if (outs.length > 0) errors.push('Output nodes must be terminal.');
+ }
+ });
+
+ // Require that ML node doesnt connect directly to source
+ nodes
+ .filter((n) => n.type === 'machine-learning-node')
+ .forEach((ml) => {
+ const ins = incoming.get(ml.id) ?? [];
+ if (ins.some((id) => byId.get(id)?.type === 'source-node')) {
+ errors.push('ML nodes cannot connect directly to Source.');
+ }
+ });
+
+ // Warn if filter node appears before window when window exists
+ const filterNodes = nodes.filter((n) => n.type === 'filter-node');
+ const filterDirectFromSource = filterNodes.some((fn) => {
+ const ins = incoming.get(fn.id) ?? [];
+ return ins.some((id) => byId.get(id)?.type === 'source-node');
+ });
+ if (windowNodes.length > 0 && filterDirectFromSource) {
+ warnings.push('Filter nodes should come after Window nodes when Window nodes are present.');
+ }
+
+ // Warn if output nodes are connected directly to Source while preprocessing exists
+ const hasPreprocessing = filterNodes.length > 0 || windowNodes.length > 0;
+ const outputDirectFromSource = nodes.some((n) => {
+ if (!outputTypes.has(n.type ?? '')) return false;
+ const ins = incoming.get(n.id) ?? [];
+ return ins.some((id) => byId.get(id)?.type === 'source-node');
+ });
+ if (hasPreprocessing && outputDirectFromSource) {
+ warnings.push('Outputs should come after preprocessing when preprocessing exists.');
+ }
+
+ // Cycle detection: if topoSort doesn't include all nodes
+ const ordered = topoSort(nodes, edges);
+ if (ordered.length !== nodes.length) {
+ errors.push('Pipeline contains a cycle.');
+ }
+
+ return { errors, warnings };
+};
+
+// Converts React Flow state to backend pipeline format
+const buildPipelinePayload = (
+ nodes: Node[],
+ edges: Edge[],
+ sessionId: string
+): PipelinePayload => {
+ const orderedNodes = topoSort(nodes, edges); // Ensure nodes are in execution order
+
+ return {
+ session_id: sessionId,
+ nodes: orderedNodes
+ .filter((n) => PIPELINE_NODE_TYPES.has(n.type ?? ''))
+ .map((n) => {
+ const type = typeMap[n.type ?? ''] ?? n.type ?? 'unknown'; // Map to backend type
+ const config = (n.data as { config?: Record })?.config ?? {};
+
+ if(type == 'preprocessing') {
+ return {type, config: {...DEFAULT_PROCESSING, ...config}}; //apply defaults if not specified by user
+ }
+
+ if(type == 'window') {
+ return {type, config: {...DEFAULT_WINDOWING, ...config}};
+ }
+
+ return {type,config};
+ }),
+ };
+};
+
+
+
let id = 0;
const getId = () => `node_${id++}`;
@@ -54,6 +253,8 @@ const ReactFlowInterface = () => {
const [edges, setEdges] = useEdgesState([]);
const { screenToFlowPosition } = useReactFlow();
const [isControlsOpen, setIsControlsOpen] = useState(false);
+ const { sendPipelinePayload } = useWebSocketContext();
+ const {activeSessionId} = useGlobalContext();
// Listen for global pipeline reset to clear nodes/edges
useEffect(() => {
@@ -159,6 +360,22 @@ const ReactFlowInterface = () => {
[nodes, setEdges]
);
+ // Send updated pipeline to backend on any changes to nodes, edges, or active session
+ useEffect(() => {
+ if (nodes.length === 0) return;
+ if(activeSessionId== null) return; //no session yet
+
+ // Validate pipeline before sending
+ const { errors, warnings} = validatePipeline(nodes, edges);
+ if (errors.length > 0) {
+ console.error('Pipeline validation errors:', errors);
+ return; // Don't send invalid pipeline
+ }
+
+ const payload = buildPipelinePayload(nodes, edges, String(activeSessionId));
+ sendPipelinePayload(payload);
+ }, [nodes, edges, activeSessionId, sendPipelinePayload]);
+
const onNodesChange: OnNodesChange = useCallback(
(changes) => {
setNodes((nds) => applyNodeChanges(changes, nds));
@@ -209,23 +426,35 @@ const ReactFlowInterface = () => {
setNodes((nds) => [...nds, newNode]);
};
- const isValidConnection = useCallback(
+ const isValidConnection = useCallback(
(connection: Connection | Edge) => {
if (connection.source === connection.target) return false;
const sourceNode = nodes.find((n) => n.id === connection.source);
const targetNode = nodes.find((n) => n.id === connection.target);
if (!sourceNode || !targetNode) return false;
- // Enforce: ML node requires Filter as immediate predecessor
- if (targetNode.type === 'machine-learning-node') {
- return sourceNode.type === 'filter-node';
+ // Source must be the first node: block any incoming edge into Source
+ if (targetNode.type === 'source-node') {
+ return false;
+ }
+
+ // Block if source node already has an outgoing edge (Source can have only one)
+ if (sourceNode.type === 'source-node') {
+ const hasOutgoing = edges.some((e) => e.source === sourceNode.id);
+ if (hasOutgoing) return false;
}
- // Allow Source → Filter; block Source → ML handled above
- if (targetNode.type === 'filter-node') {
+ // Block window node not directly connecting to source
+ if (targetNode.type === 'window-node') {
return sourceNode.type === 'source-node';
}
+ // Output nodes are terminal: block any outgoing edge from them
+ if (sourceNode.type === 'machine-learning-node' || sourceNode.type === 'signal-graph-node') {
+ return false;
+ }
+
+
return true;
},
[nodes]
diff --git a/frontend/context/WebSocketContext.tsx b/frontend/context/WebSocketContext.tsx
index 7b22d55..a3aaf74 100644
--- a/frontend/context/WebSocketContext.tsx
+++ b/frontend/context/WebSocketContext.tsx
@@ -2,7 +2,7 @@
import React, { createContext, useContext, useEffect, useRef, useCallback, ReactNode } from 'react';
import { useGlobalContext } from './GlobalContext';
-import { ProcessingConfig, WindowingConfig } from '@/lib/processing';
+import { PipelinePayload } from '@/lib/pipeline';
export type DataPoint = {
time: string;
@@ -16,22 +16,11 @@ type Subscriber = (points: DataPoint[]) => void;
type WebSocketContextType = {
subscribe: (fn: Subscriber) => () => void;
- sendProcessingConfig: (config: ProcessingConfig) => void;
- sendWindowingConfig: (config: WindowingConfig) => void;
+ sendPipelinePayload: (payload: PipelinePayload) => void;
};
const WebSocketContext = createContext(undefined);
-const DEFAULT_PROCESSING_CONFIG: ProcessingConfig = {
- apply_bandpass: false,
- use_iir: false,
- l_freq: null,
- h_freq: null,
- downsample_factor: null,
- sfreq: 256,
- n_channels: 4,
-};
-
function formatTimestamp(raw: any): string {
const s = String(raw);
// ISO 8601 with T: "2026-03-11T03:55:22.715574979Z"
@@ -55,8 +44,7 @@ function normalizeBatch(batch: any): DataPoint[] {
export function WebSocketProvider({ children }: { children: ReactNode }) {
const { dataStreaming } = useGlobalContext();
const wsRef = useRef(null);
- const processingConfigRef = useRef(null);
- const windowingConfigRef = useRef(null);
+ const pipelinePayloadRef = useRef(null);
const subscribersRef = useRef>(new Set());
const closingTimeoutRef = useRef(null);
const isClosingGracefullyRef = useRef(false);
@@ -66,38 +54,14 @@ export function WebSocketProvider({ children }: { children: ReactNode }) {
return () => subscribersRef.current.delete(fn);
}, []);
- const sendProcessingConfig = useCallback((config: ProcessingConfig) => {
- processingConfigRef.current = config;
- if (wsRef.current?.readyState === WebSocket.OPEN) {
- wsRef.current.send(JSON.stringify(config));
- console.log('Sent processing config:', config);
- }
- }, []);
-
- const sendWindowingConfig = useCallback((config: WindowingConfig) => {
- windowingConfigRef.current = config;
+ const sendPipelinePayload = useCallback((payload: PipelinePayload) => {
+ pipelinePayloadRef.current = payload;
if (wsRef.current?.readyState === WebSocket.OPEN) {
- wsRef.current.send(JSON.stringify(config));
- console.log('Sent windowing config:', config);
+ wsRef.current.send(JSON.stringify(payload));
+ console.log('Sent pipeline payload:', payload);
}
}, []);
- // Forward config events from nodes to backend
- useEffect(() => {
- const processingHandler = (event: Event) => {
- sendProcessingConfig((event as CustomEvent).detail);
- };
- const windowingHandler = (event: Event) => {
- sendWindowingConfig((event as CustomEvent).detail);
- };
- window.addEventListener('processing-config-update', processingHandler);
- window.addEventListener('windowing-config-update', windowingHandler);
- return () => {
- window.removeEventListener('processing-config-update', processingHandler);
- window.removeEventListener('windowing-config-update', windowingHandler);
- };
- }, [sendProcessingConfig, sendWindowingConfig]);
-
// Manage WebSocket lifecycle
useEffect(() => {
if (!dataStreaming) {
@@ -121,9 +85,8 @@ export function WebSocketProvider({ children }: { children: ReactNode }) {
ws.onopen = () => {
console.log('WebSocket connection opened.');
- ws.send(JSON.stringify(processingConfigRef.current ?? DEFAULT_PROCESSING_CONFIG));
- if (windowingConfigRef.current) {
- ws.send(JSON.stringify(windowingConfigRef.current));
+ if (pipelinePayloadRef.current) {
+ ws.send(JSON.stringify(pipelinePayloadRef.current));
}
};
@@ -168,7 +131,7 @@ export function WebSocketProvider({ children }: { children: ReactNode }) {
}, [dataStreaming]);
return (
-
+
{children}
);
diff --git a/frontend/lib/pipeline.ts b/frontend/lib/pipeline.ts
new file mode 100644
index 0000000..0596631
--- /dev/null
+++ b/frontend/lib/pipeline.ts
@@ -0,0 +1,9 @@
+export type PipelineNode = {
+ type: string;
+ config: Record;
+};
+
+export type PipelinePayload = {
+ session_id: string;
+ nodes: PipelineNode[];
+};