From 32f50afca056716b1ebf81f1320dba58ca175445 Mon Sep 17 00:00:00 2001 From: amaloney1 Date: Fri, 27 Mar 2026 19:25:12 -0700 Subject: [PATCH 1/2] Frontend websocket change to pipeline --- .../nodes/filter-node/filter-node.tsx | 30 ++--- .../nodes/window-node/window-node.tsx | 46 ++++---- .../ui-react-flow/react-flow-view.tsx | 107 ++++++++++++++++++ frontend/context/WebSocketContext.tsx | 57 ++-------- frontend/lib/pipeline.ts | 9 ++ 5 files changed, 169 insertions(+), 80 deletions(-) create mode 100644 frontend/lib/pipeline.ts diff --git a/frontend/components/nodes/filter-node/filter-node.tsx b/frontend/components/nodes/filter-node/filter-node.tsx index a11075a..a882b97 100644 --- a/frontend/components/nodes/filter-node/filter-node.tsx +++ b/frontend/components/nodes/filter-node/filter-node.tsx @@ -1,10 +1,6 @@ 'use client'; import { useGlobalContext } from '@/context/GlobalContext'; -import { ProcessingConfig } from '@/lib/processing'; -const dispatchProcessingConfig = (config: ProcessingConfig) => { - window.dispatchEvent(new CustomEvent('processing-config-update', { detail: config })); -}; import { Handle, Position, useReactFlow } from '@xyflow/react'; import React from 'react'; import ComboBox from './combo-box'; @@ -25,7 +21,7 @@ export default function FilterNode({ id }: FilterNodeProps) { const { dataStreaming } = useGlobalContext(); - const buildConfig = (): ProcessingConfig => { + const buildConfig = () => { if (!isConnected) { return { apply_bandpass: false, @@ -77,6 +73,18 @@ export default function FilterNode({ id }: FilterNodeProps) { } } + // write config to node data for later retrieval when dispatching pipeline payload + const pushConfigToNodeData = React.useCallback(() => { + if (!id) return; + const config = buildConfig(); + reactFlowInstance.setNodes((nds) => + nds.map((n) => + n.id === id ? { ...n, data: { ...n.data, config } } : n + ) + ); + }, [id, reactFlowInstance, selectedFilter, lowCutoff, highCutoff, isConnected]); + + // Check connection status and update state const checkConnectionStatus = React.useCallback(() => { try { @@ -137,14 +145,10 @@ export default function FilterNode({ id }: FilterNodeProps) { }; }, [checkConnectionStatus]); + // Push config to node data and dispatch processing config when relevant state changes React.useEffect(() => { - if (!dataStreaming) return - dispatchProcessingConfig(buildConfig()) - }, [selectedFilter, lowCutoff, highCutoff, isConnected, dataStreaming]) - - React.useEffect(() => { - dispatchProcessingConfig(buildConfig()); - }, []); + pushConfigToNodeData(); + }, [pushConfigToNodeData]); return (
@@ -202,4 +206,4 @@ export default function FilterNode({ id }: FilterNodeProps) {
); -} \ No newline at end of file +} diff --git a/frontend/components/nodes/window-node/window-node.tsx b/frontend/components/nodes/window-node/window-node.tsx index a84cc27..438e41b 100644 --- a/frontend/components/nodes/window-node/window-node.tsx +++ b/frontend/components/nodes/window-node/window-node.tsx @@ -3,7 +3,6 @@ import { useGlobalContext } from '@/context/GlobalContext'; import { Handle, Position, useReactFlow } from '@xyflow/react'; import React from 'react'; import WindowComboBox, { type WindowOption } from './window-combo-box'; -import { WindowingConfig } from '@/lib/processing'; interface WindowNodeProps { id?: string; @@ -25,15 +24,32 @@ export default function WindowNode({ id }: WindowNodeProps) { const { dataStreaming } = useGlobalContext(); - const dispatchWindowingConfig = (config: WindowingConfig) => { - window.dispatchEvent(new CustomEvent('windowing-config-update', { detail: config })); - }; - - const buildConfig = (): WindowingConfig => ({ + const buildConfig = () => ({ chunk_size: windowSize, overlap_size: overlapSize, }); + // Validate config values + const isValidConfig = + Number.isInteger(windowSize) && + windowSize > 0 && + Number.isInteger(overlapSize) && + overlapSize >= 0 && + overlapSize < windowSize; + + // Push config to node data and dispatch processing config when relevant state changes + const pushConfigToNodeData = React.useCallback(() => { + if (!id) return; + if (!isValidConfig) return; + const config = buildConfig(); + reactFlowInstance.setNodes((nds) => + nds.map((n) => + n.id === id ? { ...n, data: { ...n.data, config } } : n + ) + ); + }, [id, reactFlowInstance, windowSize, overlapSize, isValidConfig]); + + // Check connection status and update state const checkConnectionStatus = React.useCallback(() => { try { @@ -73,17 +89,6 @@ export default function WindowNode({ id }: WindowNodeProps) { setIsConnected(false); } }, [id, reactFlowInstance]); - - const isValidConfig = - Number.isInteger(windowSize) && - windowSize > 0 && - Number.isInteger(overlapSize) && - overlapSize >= 0 && - overlapSize < windowSize; - - React.useEffect(() => { - dispatchWindowingConfig(buildConfig()); - }, []); // Check connection status on mount and when edges might change React.useEffect(() => { @@ -105,10 +110,11 @@ export default function WindowNode({ id }: WindowNodeProps) { }; }, [checkConnectionStatus]); + // Push config to node data and dispatch processing config when relevant state changes React.useEffect(() => { if(!isValidConfig) return; - dispatchWindowingConfig(buildConfig()); - }, [windowSize, overlapSize, selectedOption, isConnected, dataStreaming]); + pushConfigToNodeData(); + }, [pushConfigToNodeData, isValidConfig]); return (
@@ -165,4 +171,4 @@ export default function WindowNode({ id }: WindowNodeProps) { />
); -} \ No newline at end of file +} diff --git a/frontend/components/ui-react-flow/react-flow-view.tsx b/frontend/components/ui-react-flow/react-flow-view.tsx index 3f25c97..bdcbeb9 100644 --- a/frontend/components/ui-react-flow/react-flow-view.tsx +++ b/frontend/components/ui-react-flow/react-flow-view.tsx @@ -37,6 +37,9 @@ import { import { useEffect, useState } from 'react'; import { X, Ellipsis, RotateCw, RotateCcw } from 'lucide-react'; +import { useWebSocketContext } from '@/context/WebSocketContext'; +import { PipelinePayload } from '@/lib/pipeline'; +import { useGlobalContext } from '@/context/GlobalContext'; const nodeTypes = { 'source-node': SourceNode, @@ -46,6 +49,100 @@ const nodeTypes = { 'window-node': WindowNode, }; +// defines backend types for React Flow types +const typeMap: Record = { + 'source-node': 'source', + 'filter-node': 'preprocessing', + 'window-node': 'window', + 'machine-learning-node': 'ml', +}; + +// allow for defaults of the filtering node to still be applied if user doesn't specify them in the UI +const DEFAULT_PROCESSING = { + apply_bandpass: false, + use_iir: false, + l_freq: null, + h_freq: null, + downsample_factor: null, + sfreq: 256, + n_channels: 4, +}; +const DEFAULT_WINDOWING = { + chunk_size: 64, + overlap_size: 0, +}; + +const topoSort = (nodes: Node[], edges: Edge[]) => { + const incoming = new Map(); // count of incoming edges for each node + const outgoing = new Map(); // list of target nodes for each node + + // Initialize maps to 0 incoming and empty outgoing + nodes.forEach((n) => { + incoming.set(n.id, 0); + outgoing.set(n.id, []); + }); + + edges.forEach((e) => { + if (!outgoing.has(e.source)) return; + outgoing.get(e.source)!.push(e.target); // Add target to outgoing list of source + incoming.set(e.target, (incoming.get(e.target) ?? 0) + 1); // Increment incoming count for target + }); + + const queue = nodes + .filter((n) => (incoming.get(n.id) ?? 0) === 0) + .map((n) => n.id); + + const ordered: string[] = []; + + // Kahn's algorithm for topological sorting + while (queue.length > 0) { + const id = queue.shift()!; + ordered.push(id); + + for (const target of outgoing.get(id) ?? []) { + const next = (incoming.get(target) ?? 0) - 1; + incoming.set(target, next); + if (next === 0) queue.push(target); + } + } + + // In case of cycles or disconnected nodes + nodes.forEach((n) => { + if (!ordered.includes(n.id)) ordered.push(n.id); + }); + + return ordered.map((id) => nodes.find((n) => n.id === id)!).filter(Boolean); +}; + +// Converts React Flow state to backend pipeline format +const buildPipelinePayload = ( + nodes: Node[], + edges: Edge[], + sessionId: string +): PipelinePayload => { + const orderedNodes = topoSort(nodes, edges); // Ensure nodes are in execution order + + return { + session_id: sessionId, + nodes: orderedNodes.map((n) => { + const type = typeMap[n.type ?? ''] ?? n.type ?? 'unknown'; // Map to backend type + const config = (n.data as { config?: Record })?.config ?? {}; + + if(type == 'preprocessing') { + return {type, config: {...DEFAULT_PROCESSING, ...config}}; //apply defaults if not specified by user + } + + if(type == 'window') { + return {type, config: {...DEFAULT_WINDOWING, ...config}}; + } + + return {type,config}; + }), + }; +}; + + + let id = 0; const getId = () => `node_${id++}`; @@ -54,6 +151,8 @@ const ReactFlowInterface = () => { const [edges, setEdges] = useEdgesState([]); const { screenToFlowPosition } = useReactFlow(); const [isControlsOpen, setIsControlsOpen] = useState(false); + const { sendPipelinePayload } = useWebSocketContext(); + const {activeSessionId} = useGlobalContext(); // Listen for global pipeline reset to clear nodes/edges useEffect(() => { @@ -159,6 +258,14 @@ const ReactFlowInterface = () => { [nodes, setEdges] ); + // Send updated pipeline to backend on any changes to nodes, edges, or active session + useEffect(() => { + if (nodes.length === 0) return; + if(activeSessionId== null) return; //no session yet + const payload = buildPipelinePayload(nodes, edges, String(activeSessionId)); + sendPipelinePayload(payload); + }, [nodes, edges, activeSessionId, sendPipelinePayload]); + const onNodesChange: OnNodesChange = useCallback( (changes) => { setNodes((nds) => applyNodeChanges(changes, nds)); diff --git a/frontend/context/WebSocketContext.tsx b/frontend/context/WebSocketContext.tsx index 7b22d55..a3aaf74 100644 --- a/frontend/context/WebSocketContext.tsx +++ b/frontend/context/WebSocketContext.tsx @@ -2,7 +2,7 @@ import React, { createContext, useContext, useEffect, useRef, useCallback, ReactNode } from 'react'; import { useGlobalContext } from './GlobalContext'; -import { ProcessingConfig, WindowingConfig } from '@/lib/processing'; +import { PipelinePayload } from '@/lib/pipeline'; export type DataPoint = { time: string; @@ -16,22 +16,11 @@ type Subscriber = (points: DataPoint[]) => void; type WebSocketContextType = { subscribe: (fn: Subscriber) => () => void; - sendProcessingConfig: (config: ProcessingConfig) => void; - sendWindowingConfig: (config: WindowingConfig) => void; + sendPipelinePayload: (payload: PipelinePayload) => void; }; const WebSocketContext = createContext(undefined); -const DEFAULT_PROCESSING_CONFIG: ProcessingConfig = { - apply_bandpass: false, - use_iir: false, - l_freq: null, - h_freq: null, - downsample_factor: null, - sfreq: 256, - n_channels: 4, -}; - function formatTimestamp(raw: any): string { const s = String(raw); // ISO 8601 with T: "2026-03-11T03:55:22.715574979Z" @@ -55,8 +44,7 @@ function normalizeBatch(batch: any): DataPoint[] { export function WebSocketProvider({ children }: { children: ReactNode }) { const { dataStreaming } = useGlobalContext(); const wsRef = useRef(null); - const processingConfigRef = useRef(null); - const windowingConfigRef = useRef(null); + const pipelinePayloadRef = useRef(null); const subscribersRef = useRef>(new Set()); const closingTimeoutRef = useRef(null); const isClosingGracefullyRef = useRef(false); @@ -66,38 +54,14 @@ export function WebSocketProvider({ children }: { children: ReactNode }) { return () => subscribersRef.current.delete(fn); }, []); - const sendProcessingConfig = useCallback((config: ProcessingConfig) => { - processingConfigRef.current = config; - if (wsRef.current?.readyState === WebSocket.OPEN) { - wsRef.current.send(JSON.stringify(config)); - console.log('Sent processing config:', config); - } - }, []); - - const sendWindowingConfig = useCallback((config: WindowingConfig) => { - windowingConfigRef.current = config; + const sendPipelinePayload = useCallback((payload: PipelinePayload) => { + pipelinePayloadRef.current = payload; if (wsRef.current?.readyState === WebSocket.OPEN) { - wsRef.current.send(JSON.stringify(config)); - console.log('Sent windowing config:', config); + wsRef.current.send(JSON.stringify(payload)); + console.log('Sent pipeline payload:', payload); } }, []); - // Forward config events from nodes to backend - useEffect(() => { - const processingHandler = (event: Event) => { - sendProcessingConfig((event as CustomEvent).detail); - }; - const windowingHandler = (event: Event) => { - sendWindowingConfig((event as CustomEvent).detail); - }; - window.addEventListener('processing-config-update', processingHandler); - window.addEventListener('windowing-config-update', windowingHandler); - return () => { - window.removeEventListener('processing-config-update', processingHandler); - window.removeEventListener('windowing-config-update', windowingHandler); - }; - }, [sendProcessingConfig, sendWindowingConfig]); - // Manage WebSocket lifecycle useEffect(() => { if (!dataStreaming) { @@ -121,9 +85,8 @@ export function WebSocketProvider({ children }: { children: ReactNode }) { ws.onopen = () => { console.log('WebSocket connection opened.'); - ws.send(JSON.stringify(processingConfigRef.current ?? DEFAULT_PROCESSING_CONFIG)); - if (windowingConfigRef.current) { - ws.send(JSON.stringify(windowingConfigRef.current)); + if (pipelinePayloadRef.current) { + ws.send(JSON.stringify(pipelinePayloadRef.current)); } }; @@ -168,7 +131,7 @@ export function WebSocketProvider({ children }: { children: ReactNode }) { }, [dataStreaming]); return ( - + {children} ); diff --git a/frontend/lib/pipeline.ts b/frontend/lib/pipeline.ts new file mode 100644 index 0000000..0596631 --- /dev/null +++ b/frontend/lib/pipeline.ts @@ -0,0 +1,9 @@ +export type PipelineNode = { + type: string; + config: Record; +}; + +export type PipelinePayload = { + session_id: string; + nodes: PipelineNode[]; +}; From c70de233913819aa7a2d51cedc4cab32672825d0 Mon Sep 17 00:00:00 2001 From: amaloney1 Date: Thu, 2 Apr 2026 15:41:08 -0700 Subject: [PATCH 2/2] frontend websocket updates --- .../nodes/filter-node/filter-node.tsx | 1 + .../ui-react-flow/react-flow-view.tsx | 160 +++++++++++++++--- 2 files changed, 142 insertions(+), 19 deletions(-) diff --git a/frontend/components/nodes/filter-node/filter-node.tsx b/frontend/components/nodes/filter-node/filter-node.tsx index a882b97..5377b03 100644 --- a/frontend/components/nodes/filter-node/filter-node.tsx +++ b/frontend/components/nodes/filter-node/filter-node.tsx @@ -76,6 +76,7 @@ export default function FilterNode({ id }: FilterNodeProps) { // write config to node data for later retrieval when dispatching pipeline payload const pushConfigToNodeData = React.useCallback(() => { if (!id) return; + if (!isConnected) return; // Don't push config if not connected const config = buildConfig(); reactFlowInstance.setNodes((nds) => nds.map((n) => diff --git a/frontend/components/ui-react-flow/react-flow-view.tsx b/frontend/components/ui-react-flow/react-flow-view.tsx index bdcbeb9..9407a9d 100644 --- a/frontend/components/ui-react-flow/react-flow-view.tsx +++ b/frontend/components/ui-react-flow/react-flow-view.tsx @@ -51,7 +51,6 @@ const nodeTypes = { // defines backend types for React Flow types const typeMap: Record = { - 'source-node': 'source', 'filter-node': 'preprocessing', 'window-node': 'window', 'machine-learning-node': 'ml', @@ -72,6 +71,13 @@ const DEFAULT_WINDOWING = { overlap_size: 0, }; +// Only these nodes are executed by the backend pipeline +const PIPELINE_NODE_TYPES = new Set([ + 'window-node', + 'filter-node', + 'machine-learning-node', +]); + const topoSort = (nodes: Node[], edges: Edge[]) => { const incoming = new Map(); // count of incoming edges for each node const outgoing = new Map(); // list of target nodes for each node @@ -114,6 +120,100 @@ const topoSort = (nodes: Node[], edges: Edge[]) => { return ordered.map((id) => nodes.find((n) => n.id === id)!).filter(Boolean); }; +const validatePipeline = (nodes: Node[], edges: Edge[]) => { + const errors: string[] = []; + const warnings: string[] = []; + + // maps to track nodes and their connections + const byId = new Map(nodes.map((n) => [n.id, n])); + const incoming = new Map(); + const outgoing = new Map(); + + nodes.forEach((n) => { + incoming.set(n.id, []); + outgoing.set(n.id, []); + }); + + // Build connection maps + edges.forEach((e) => { + if (!incoming.has(e.target) || !outgoing.has(e.source)) return; + incoming.get(e.target)!.push(e.source); + outgoing.get(e.source)!.push(e.target); + }); + + // Require 1 source node + const sourceNodes = nodes.filter((n) => n.type === 'source-node'); + if (sourceNodes.length === 0) errors.push('Missing Source node.'); + if (sourceNodes.length > 1) errors.push('Multiple Source nodes are not allowed.'); + + // Require 1 window node + const windowNodes = nodes.filter((n) => n.type === 'window-node'); + if (windowNodes.length === 0) errors.push('Missing Window node.'); + if (windowNodes.length > 1) errors.push('Multiple Window nodes are not allowed.'); + + // Require window node must connect directly to source + windowNodes.forEach((win) => { + const ins = incoming.get(win.id) ?? []; + if (ins.length === 0 || ins.some((id) => byId.get(id)?.type !== 'source-node')) { + errors.push('Window node must connect directly from Source.'); + } + }); + + // If ML node exists, window is required for the ML pipeline + const mlNodes = nodes.filter((n) => n.type === 'machine-learning-node'); + if (mlNodes.length > 0 && windowNodes.length === 0) { + errors.push('A Window node is required for ML pipelines.'); + } + + // Require that output nodes are terminal + const outputTypes = new Set(['signal-graph-node', 'machine-learning-node']); + nodes.forEach((n) => { + if (outputTypes.has(n.type ?? '')) { + const outs = outgoing.get(n.id) ?? []; + if (outs.length > 0) errors.push('Output nodes must be terminal.'); + } + }); + + // Require that ML node doesnt connect directly to source + nodes + .filter((n) => n.type === 'machine-learning-node') + .forEach((ml) => { + const ins = incoming.get(ml.id) ?? []; + if (ins.some((id) => byId.get(id)?.type === 'source-node')) { + errors.push('ML nodes cannot connect directly to Source.'); + } + }); + + // Warn if filter node appears before window when window exists + const filterNodes = nodes.filter((n) => n.type === 'filter-node'); + const filterDirectFromSource = filterNodes.some((fn) => { + const ins = incoming.get(fn.id) ?? []; + return ins.some((id) => byId.get(id)?.type === 'source-node'); + }); + if (windowNodes.length > 0 && filterDirectFromSource) { + warnings.push('Filter nodes should come after Window nodes when Window nodes are present.'); + } + + // Warn if output nodes are connected directly to Source while preprocessing exists + const hasPreprocessing = filterNodes.length > 0 || windowNodes.length > 0; + const outputDirectFromSource = nodes.some((n) => { + if (!outputTypes.has(n.type ?? '')) return false; + const ins = incoming.get(n.id) ?? []; + return ins.some((id) => byId.get(id)?.type === 'source-node'); + }); + if (hasPreprocessing && outputDirectFromSource) { + warnings.push('Outputs should come after preprocessing when preprocessing exists.'); + } + + // Cycle detection: if topoSort doesn't include all nodes + const ordered = topoSort(nodes, edges); + if (ordered.length !== nodes.length) { + errors.push('Pipeline contains a cycle.'); + } + + return { errors, warnings }; +}; + // Converts React Flow state to backend pipeline format const buildPipelinePayload = ( nodes: Node[], @@ -124,20 +224,22 @@ const buildPipelinePayload = ( return { session_id: sessionId, - nodes: orderedNodes.map((n) => { - const type = typeMap[n.type ?? ''] ?? n.type ?? 'unknown'; // Map to backend type - const config = (n.data as { config?: Record })?.config ?? {}; - - if(type == 'preprocessing') { - return {type, config: {...DEFAULT_PROCESSING, ...config}}; //apply defaults if not specified by user - } + nodes: orderedNodes + .filter((n) => PIPELINE_NODE_TYPES.has(n.type ?? '')) + .map((n) => { + const type = typeMap[n.type ?? ''] ?? n.type ?? 'unknown'; // Map to backend type + const config = (n.data as { config?: Record })?.config ?? {}; + + if(type == 'preprocessing') { + return {type, config: {...DEFAULT_PROCESSING, ...config}}; //apply defaults if not specified by user + } - if(type == 'window') { - return {type, config: {...DEFAULT_WINDOWING, ...config}}; - } + if(type == 'window') { + return {type, config: {...DEFAULT_WINDOWING, ...config}}; + } - return {type,config}; - }), + return {type,config}; + }), }; }; @@ -262,6 +364,14 @@ const ReactFlowInterface = () => { useEffect(() => { if (nodes.length === 0) return; if(activeSessionId== null) return; //no session yet + + // Validate pipeline before sending + const { errors, warnings} = validatePipeline(nodes, edges); + if (errors.length > 0) { + console.error('Pipeline validation errors:', errors); + return; // Don't send invalid pipeline + } + const payload = buildPipelinePayload(nodes, edges, String(activeSessionId)); sendPipelinePayload(payload); }, [nodes, edges, activeSessionId, sendPipelinePayload]); @@ -316,23 +426,35 @@ const ReactFlowInterface = () => { setNodes((nds) => [...nds, newNode]); }; - const isValidConnection = useCallback( + const isValidConnection = useCallback( (connection: Connection | Edge) => { if (connection.source === connection.target) return false; const sourceNode = nodes.find((n) => n.id === connection.source); const targetNode = nodes.find((n) => n.id === connection.target); if (!sourceNode || !targetNode) return false; - // Enforce: ML node requires Filter as immediate predecessor - if (targetNode.type === 'machine-learning-node') { - return sourceNode.type === 'filter-node'; + // Source must be the first node: block any incoming edge into Source + if (targetNode.type === 'source-node') { + return false; + } + + // Block if source node already has an outgoing edge (Source can have only one) + if (sourceNode.type === 'source-node') { + const hasOutgoing = edges.some((e) => e.source === sourceNode.id); + if (hasOutgoing) return false; } - // Allow Source → Filter; block Source → ML handled above - if (targetNode.type === 'filter-node') { + // Block window node not directly connecting to source + if (targetNode.type === 'window-node') { return sourceNode.type === 'source-node'; } + // Output nodes are terminal: block any outgoing edge from them + if (sourceNode.type === 'machine-learning-node' || sourceNode.type === 'signal-graph-node') { + return false; + } + + return true; }, [nodes]