From 35b12d71be6d9608cdd6ae955005dec6c8e61202 Mon Sep 17 00:00:00 2001 From: Sebastian Nagel Date: Tue, 2 Dec 2025 10:00:28 +0100 Subject: [PATCH 01/15] Add loki websocket connection --- ui/README.md | 29 +++++- ui/public/scenarios.json | 6 ++ ui/public/topologies/prototype.yaml | 23 +++++ .../components/Sim/hooks/useLokiWebSocket.ts | 98 +++++++++++++++++++ ui/src/components/Sim/modules/Scenario.tsx | 63 ++++++++---- ui/src/contexts/SimContext/context.ts | 2 + ui/src/contexts/SimContext/reducer.ts | 12 ++- ui/src/contexts/SimContext/types.ts | 8 +- 8 files changed, 215 insertions(+), 26 deletions(-) create mode 100644 ui/public/topologies/prototype.yaml create mode 100644 ui/src/components/Sim/hooks/useLokiWebSocket.ts diff --git a/ui/README.md b/ui/README.md index 5f9eebdb6..f91aede01 100644 --- a/ui/README.md +++ b/ui/README.md @@ -66,4 +66,31 @@ Then update `public/scenarios.json` accordingly: } ``` -Now add that +## Add a live Loki streaming scenario + +For live visualization of node logs, you can configure scenarios that connect to a Loki instance via WebSocket. This allows real-time monitoring of running Cardano nodes. + +First, ensure your Loki instance is running and accessible, for example by following the [leios-demo](https://github.com/input-output-hk/leios-demo/) instructions. +Then add a scenario with a `loki` field instead of `trace` to `public/scenarios.json`: + +```json +{ + "scenarios": [ + { + "name": "Leios Demo 202511", + "topology": "topologies/prototype.yaml", + "duration": 300, + "loki": "localhost:3100" + } + ] +} +``` + +## Configuration + +Scenarios support two modes: + +- **Stored traces**: Use the `trace` field pointing to a JSONL file (optionally gzipped) +- **Live streaming**: Use the `loki` field with host:port of your Loki instance + +Both modes require a `topology` field specifying the network topology YAML file and a `duration` defining the amount of loaded data. diff --git a/ui/public/scenarios.json b/ui/public/scenarios.json index d2cde4ed3..7e4d13853 100644 --- a/ui/public/scenarios.json +++ b/ui/public/scenarios.json @@ -11,6 +11,12 @@ "topology": "topologies/small.yaml", "duration": 120, "trace": "traces/small-1txkbs-nocpu.jsonl.gz" + }, + { + "name": "Leios Demo 202511", + "topology": "topologies/prototype.yaml", + "duration": 300, + "loki": "localhost:3100" } ] } diff --git a/ui/public/topologies/prototype.yaml b/ui/public/topologies/prototype.yaml new file mode 100644 index 000000000..0d25baaa2 --- /dev/null +++ b/ui/public/topologies/prototype.yaml @@ -0,0 +1,23 @@ +nodes: + node-0: + stake: 1 + location: + - 0 + - 0 + producers: {} + node-1: + stake: 0 + location: + - 0 + - 100 + producers: + node-0: + latency-ms: 17.0 + node-2: + stake: 0 + location: + - 0 + - 200 + producers: + node-1: + latency-ms: 32.0 diff --git a/ui/src/components/Sim/hooks/useLokiWebSocket.ts b/ui/src/components/Sim/hooks/useLokiWebSocket.ts new file mode 100644 index 000000000..cfa317dd5 --- /dev/null +++ b/ui/src/components/Sim/hooks/useLokiWebSocket.ts @@ -0,0 +1,98 @@ +import { useSimContext } from "@/contexts/SimContext/context"; +import { useCallback, useEffect, useRef, useState } from "react"; + +export const useLokiWebSocket = () => { + const { + state: { lokiHost }, + dispatch, + } = useSimContext(); + const [connecting, setConnecting] = useState(false); + const [connected, setConnected] = useState(false); + const wsRef = useRef(null); + + const connect = useCallback(() => { + if (!lokiHost || connecting || connected) return; + + setConnecting(true); + dispatch({ type: "RESET_TIMELINE" }); + + try { + // TODO: find a better query + const query = encodeURIComponent('{service="cardano-node"} |= "blockNo"'); + const wsUrl = `ws://${lokiHost}/loki/api/v1/tail?query=${query}`; + console.log("Connecting to ", wsUrl); + const ws = new WebSocket(wsUrl); + wsRef.current = ws; + + ws.onopen = () => { + setConnecting(false); + setConnected(true); + dispatch({ type: "SET_LOKI_CONNECTED", payload: true }); + }; + + ws.onmessage = (event) => { + try { + const data = JSON.parse(event.data); + console.log("Received Loki stream data:", data); + + if (data.streams && Array.isArray(data.streams)) { + data.streams.forEach((stream: any) => { + if (stream.values && Array.isArray(stream.values)) { + stream.values.forEach( + ([timestamp, logLine]: [string, string]) => { + console.log("Stream value:", { timestamp, logLine }); + }, + ); + } + }); + } + } catch (error) { + console.error("Error processing Loki message:", error); + } + }; + + ws.onerror = (error) => { + console.error("WebSocket error:", error); + setConnecting(false); + setConnected(false); + dispatch({ type: "SET_LOKI_CONNECTED", payload: false }); + }; + + ws.onclose = () => { + setConnecting(false); + setConnected(false); + dispatch({ type: "SET_LOKI_CONNECTED", payload: false }); + wsRef.current = null; + }; + } catch (error) { + console.error("Failed to create WebSocket connection:", error); + setConnecting(false); + setConnected(false); + } + }, [lokiHost, connecting, connected, dispatch]); + + const disconnect = useCallback(() => { + if (wsRef.current) { + wsRef.current.close(); + wsRef.current = null; + } + setConnecting(false); + setConnected(false); + dispatch({ type: "SET_LOKI_CONNECTED", payload: false }); + }, [dispatch]); + + useEffect(() => { + return () => { + if (wsRef.current) { + wsRef.current.close(); + } + }; + }, []); + + return { + connect, + disconnect, + connecting, + connected, + }; +}; diff --git a/ui/src/components/Sim/modules/Scenario.tsx b/ui/src/components/Sim/modules/Scenario.tsx index cc9f9fdfb..97bdc0ef8 100644 --- a/ui/src/components/Sim/modules/Scenario.tsx +++ b/ui/src/components/Sim/modules/Scenario.tsx @@ -5,16 +5,20 @@ import { import { IScenario } from "@/contexts/SimContext/types"; import { ChangeEvent, FC, useCallback, useEffect, useState } from "react"; import { useStreamMessagesHandler } from "../hooks/useStreamMessagesHandler"; +import { useLokiWebSocket } from "../hooks/useLokiWebSocket"; import { Button } from "@/components/Button"; export const Scenario: FC = () => { const { - state: { allScenarios, activeScenario, events }, + state: { allScenarios, activeScenario, events, lokiHost, lokiConnected }, dispatch, } = useSimContext(); const { startStream, streaming, stopStream } = useStreamMessagesHandler(); + const { connect: connectLoki, disconnect: disconnectLoki, connecting: lokiConnecting } = useLokiWebSocket(); const [includeTransactions, setIncludeTransactions] = useState(true); + const isLokiMode = !!lokiHost; + useEffect(() => { (async () => { const response = await fetch("scenarios.json"); @@ -25,7 +29,7 @@ export const Scenario: FC = () => { scenario.topology, window.location.toString(), ).toString(), - trace: new URL(scenario.trace, window.location.toString()).toString(), + trace: scenario.trace ? new URL(scenario.trace, window.location.toString()).toString() : undefined, })); dispatch({ type: "SET_SCENARIOS", payload: scenarios }); })(); @@ -39,7 +43,11 @@ export const Scenario: FC = () => { ); const handleUnloadScenario = useCallback(() => { - stopStream(); + if (isLokiMode) { + disconnectLoki(); + } else { + stopStream(); + } dispatch({ type: "RESET_TIMELINE" }); dispatch({ type: "BATCH_UPDATE", @@ -47,13 +55,18 @@ export const Scenario: FC = () => { aggregatedData: defaultAggregatedData, }, }); - }, [stopStream, dispatch]); + }, [isLokiMode, disconnectLoki, stopStream, dispatch]); const handleStartStream = useCallback(() => { - startStream(includeTransactions); - }, [startStream, includeTransactions]); + if (isLokiMode) { + connectLoki(); + } else { + startStream(includeTransactions); + } + }, [isLokiMode, connectLoki, startStream, includeTransactions]); - const isLoaded = events.length > 0 || streaming; + const isLoaded = events.length > 0 || streaming || lokiConnected; + const isConnecting = streaming || lokiConnecting; return (
@@ -73,25 +86,30 @@ export const Scenario: FC = () => {
-
- -
+ {!isLokiMode && ( +
+ +
+ )}
{isLoaded && ( )}
diff --git a/ui/src/contexts/SimContext/context.ts b/ui/src/contexts/SimContext/context.ts index d91e86fa3..fe1c7b9db 100644 --- a/ui/src/contexts/SimContext/context.ts +++ b/ui/src/contexts/SimContext/context.ts @@ -32,6 +32,8 @@ export const defaultState: ISimContextState = { }, aggregatedData: defaultAggregatedData, tracePath: "", + lokiHost: undefined, + lokiConnected: false, topography: { links: new Map(), nodes: new Map() }, topologyPath: "", topologyLoaded: false, diff --git a/ui/src/contexts/SimContext/reducer.ts b/ui/src/contexts/SimContext/reducer.ts index 9803fcf5d..3471c761f 100644 --- a/ui/src/contexts/SimContext/reducer.ts +++ b/ui/src/contexts/SimContext/reducer.ts @@ -18,7 +18,7 @@ export const reducer = ( allScenarios, activeScenario: scenario.name, maxTime: scenario.duration, - tracePath: scenario.trace, + tracePath: scenario.trace || "", topologyPath: scenario.topology, }; } @@ -35,7 +35,9 @@ export const reducer = ( aggregatedData: defaultAggregatedData, activeScenario: scenario.name, maxTime: scenario.duration, - tracePath: scenario.trace, + tracePath: scenario.trace || "", + lokiHost: scenario.loki, + lokiConnected: false, topologyPath: scenario.topology, topologyLoaded: state.topologyLoaded && scenario.topology === state.topologyPath, @@ -160,6 +162,12 @@ export const reducer = ( speedMultiplier: 1, }; + case "SET_LOKI_CONNECTED": + return { + ...state, + lokiConnected: action.payload, + }; + default: return state; } diff --git a/ui/src/contexts/SimContext/types.ts b/ui/src/contexts/SimContext/types.ts index 28f2dd901..201004f5c 100644 --- a/ui/src/contexts/SimContext/types.ts +++ b/ui/src/contexts/SimContext/types.ts @@ -67,7 +67,8 @@ export interface IScenario { name: string; topology: string; duration: number; - trace: string; + trace?: string; + loki?: string; } export interface ISimContextState { @@ -76,6 +77,8 @@ export interface ISimContextState { graph: IGraphContextState; aggregatedData: ISimulationAggregatedDataState; tracePath: string; + lokiHost?: string; + lokiConnected: boolean; topography: ITransformedNodeMap; topologyPath: string; topologyLoaded: boolean; @@ -108,7 +111,8 @@ export type TSimContextActions = | { type: "SET_TIMELINE_TIME"; payload: number } | { type: "SET_TIMELINE_PLAYING"; payload: boolean } | { type: "SET_TIMELINE_SPEED"; payload: number } - | { type: "RESET_TIMELINE" }; + | { type: "RESET_TIMELINE" } + | { type: "SET_LOKI_CONNECTED"; payload: boolean }; export interface ISimContext { state: ISimContextState; From c5ea10dd197a0847277979f1b25533c60f28a6b0 Mon Sep 17 00:00:00 2001 From: Sebastian Nagel Date: Tue, 2 Dec 2025 10:14:19 +0100 Subject: [PATCH 02/15] Auto-start scenarios using query param --- ui/README.md | 16 +++++++++++ ui/src/components/Sim/modules/Scenario.tsx | 31 ++++++++++++++++++---- ui/src/contexts/SimContext/context.ts | 1 + ui/src/contexts/SimContext/reducer.ts | 1 + ui/src/contexts/SimContext/types.ts | 3 ++- 5 files changed, 46 insertions(+), 6 deletions(-) diff --git a/ui/README.md b/ui/README.md index f91aede01..2dff0faaa 100644 --- a/ui/README.md +++ b/ui/README.md @@ -94,3 +94,19 @@ Scenarios support two modes: - **Live streaming**: Use the `loki` field with host:port of your Loki instance Both modes require a `topology` field specifying the network topology YAML file and a `duration` defining the amount of loaded data. + +### Auto-starting scenarios + +Scenarios can be auto-loaded/-connected using a URL query parameter: + +``` +?scenario= +``` + +Where `` is the zero-based index of the scenario in the scenarios.json array. For example: + +- `?scenario=0` - Auto-loads the first scenario (e.g., "200 TxkB/s") +- `?scenario=1` - Auto-loads the second scenario (e.g., "1 TxkB/s") +- `?scenario=2` - Auto-connects to the third scenario (e.g., "Leios Demo 202511") + +This is useful for direct links, bookmarking, or embedding specific scenarios. diff --git a/ui/src/components/Sim/modules/Scenario.tsx b/ui/src/components/Sim/modules/Scenario.tsx index 97bdc0ef8..d3f42d9e1 100644 --- a/ui/src/components/Sim/modules/Scenario.tsx +++ b/ui/src/components/Sim/modules/Scenario.tsx @@ -10,7 +10,7 @@ import { Button } from "@/components/Button"; export const Scenario: FC = () => { const { - state: { allScenarios, activeScenario, events, lokiHost, lokiConnected }, + state: { allScenarios, activeScenario, events, lokiHost, lokiConnected, autoStart }, dispatch, } = useSimContext(); const { startStream, streaming, stopStream } = useStreamMessagesHandler(); @@ -32,9 +32,33 @@ export const Scenario: FC = () => { trace: scenario.trace ? new URL(scenario.trace, window.location.toString()).toString() : undefined, })); dispatch({ type: "SET_SCENARIOS", payload: scenarios }); + + // Check for scenario URL parameter + const urlParams = new URLSearchParams(window.location.search); + const scenarioParam = urlParams.get("scenario"); + if (scenarioParam) { + const scenarioIndex = parseInt(scenarioParam, 10); + if (scenarioIndex >= 0 && scenarioIndex < scenarios.length) { + const targetScenario = scenarios[scenarioIndex]; + dispatch({ type: "SET_SCENARIO", payload: targetScenario.name, autoStart: true }); + } + } })(); }, []); + // Auto-load/connect when autoStart is true + useEffect(() => { + if (autoStart && activeScenario) { + if (isLokiMode) { + connectLoki(); + } else { + startStream(true); // Include transactions by default for auto-load + } + // Reset autoStart flag after triggering + dispatch({ type: "SET_SCENARIO", payload: activeScenario, autoStart: false }); + } + }, [autoStart, activeScenario, isLokiMode, connectLoki, startStream, dispatch]); + const chooseScenario = useCallback( (event: ChangeEvent) => { dispatch({ type: "SET_SCENARIO", payload: event.target.value }); @@ -117,10 +141,7 @@ export const Scenario: FC = () => { onClick={handleUnloadScenario} className="w-[80px]" > - {isLokiMode - ? "Disconnect" - : (streaming ? "Cancel" : "Reset") - } + {isConnecting ? "Cancel" : "Reset"} )} diff --git a/ui/src/contexts/SimContext/context.ts b/ui/src/contexts/SimContext/context.ts index fe1c7b9db..37b19a4ee 100644 --- a/ui/src/contexts/SimContext/context.ts +++ b/ui/src/contexts/SimContext/context.ts @@ -24,6 +24,7 @@ export const defaultAggregatedData: ISimulationAggregatedDataState = { export const defaultState: ISimContextState = { allScenarios: [], activeScenario: "", + autoStart: false, graph: { canvasRef: { current: null }, canvasOffsetX: 0, diff --git a/ui/src/contexts/SimContext/reducer.ts b/ui/src/contexts/SimContext/reducer.ts index 3471c761f..7745cb0ec 100644 --- a/ui/src/contexts/SimContext/reducer.ts +++ b/ui/src/contexts/SimContext/reducer.ts @@ -34,6 +34,7 @@ export const reducer = ( ...state, aggregatedData: defaultAggregatedData, activeScenario: scenario.name, + autoStart: action.autoStart || false, maxTime: scenario.duration, tracePath: scenario.trace || "", lokiHost: scenario.loki, diff --git a/ui/src/contexts/SimContext/types.ts b/ui/src/contexts/SimContext/types.ts index 201004f5c..bd011ae7f 100644 --- a/ui/src/contexts/SimContext/types.ts +++ b/ui/src/contexts/SimContext/types.ts @@ -74,6 +74,7 @@ export interface IScenario { export interface ISimContextState { allScenarios: IScenario[]; activeScenario: string; + autoStart: boolean; graph: IGraphContextState; aggregatedData: ISimulationAggregatedDataState; tracePath: string; @@ -91,7 +92,7 @@ export interface ISimContextState { export type TSimContextActions = | { type: "SET_SCENARIOS"; payload: IScenario[] } - | { type: "SET_SCENARIO"; payload: string } + | { type: "SET_SCENARIO"; payload: string; autoStart?: boolean } | { type: "SET_CURRENT_NODE"; payload: string | undefined } | { type: "SET_CANVAS_PROPS"; From 87e5224d50aa555d247f0a76856c78f7a94307b6 Mon Sep 17 00:00:00 2001 From: Sebastian Nagel Date: Tue, 2 Dec 2025 10:46:31 +0100 Subject: [PATCH 03/15] Parse immdb-server logs as RBSent events --- ui/public/topologies/prototype.yaml | 10 ++-- .../components/Sim/hooks/useLokiWebSocket.ts | 54 ++++++++++++++++++- .../components/Sim/modules/TimelineSlider.tsx | 14 ++--- ui/src/contexts/SimContext/context.ts | 1 + ui/src/contexts/SimContext/reducer.ts | 44 +++++++++++++-- ui/src/contexts/SimContext/types.ts | 1 + 6 files changed, 107 insertions(+), 17 deletions(-) diff --git a/ui/public/topologies/prototype.yaml b/ui/public/topologies/prototype.yaml index 0d25baaa2..b07bb950e 100644 --- a/ui/public/topologies/prototype.yaml +++ b/ui/public/topologies/prototype.yaml @@ -1,23 +1,23 @@ nodes: - node-0: + UpstreamNode: stake: 1 location: - 0 - 0 producers: {} - node-1: + Node0: stake: 0 location: - 0 - 100 producers: - node-0: + UpstreamNode: latency-ms: 17.0 - node-2: + DownstreamNode: stake: 0 location: - 0 - 200 producers: - node-1: + Node0: latency-ms: 32.0 diff --git a/ui/src/components/Sim/hooks/useLokiWebSocket.ts b/ui/src/components/Sim/hooks/useLokiWebSocket.ts index cfa317dd5..7e5b2f49b 100644 --- a/ui/src/components/Sim/hooks/useLokiWebSocket.ts +++ b/ui/src/components/Sim/hooks/useLokiWebSocket.ts @@ -1,4 +1,9 @@ import { useSimContext } from "@/contexts/SimContext/context"; +import { + IServerMessage, + EServerMessageType, + IRankingBlockSent, +} from "@/components/Sim/types"; import { useCallback, useEffect, useRef, useState } from "react"; export const useLokiWebSocket = () => { @@ -10,6 +15,29 @@ export const useLokiWebSocket = () => { const [connected, setConnected] = useState(false); const wsRef = useRef(null); + const parseUpstreamNodeLog = ( + logLine: string, + timestamp: number, + ): IServerMessage | null => { + if (logLine.trim() === "MsgBlock") { + // Hard-coded IRankingBlockSent for UpstreamNode -> Node0 + const message: IRankingBlockSent = { + type: EServerMessageType.RBSent, + slot: 0, // TODO: extract from actual data when available + id: `rb-${timestamp}`, + sender: "UpstreamNode", + recipient: "Node0", + }; + + return { + time_s: timestamp, + message, + }; + } + + return null; + }; + const connect = useCallback(() => { if (!lokiHost || connecting || connected) return; @@ -17,8 +45,10 @@ export const useLokiWebSocket = () => { dispatch({ type: "RESET_TIMELINE" }); try { - // TODO: find a better query - const query = encodeURIComponent('{service="cardano-node"} |= "blockNo"'); + // TODO: find better queries + const query = encodeURIComponent( + '{service="immdb-server"} |= "MsgBlock"', + ); const wsUrl = `ws://${lokiHost}/loki/api/v1/tail?query=${query}`; console.log("Connecting to ", wsUrl); const ws = new WebSocket(wsUrl); @@ -36,15 +66,35 @@ export const useLokiWebSocket = () => { console.log("Received Loki stream data:", data); if (data.streams && Array.isArray(data.streams)) { + const events: IServerMessage[] = []; + data.streams.forEach((stream: any) => { if (stream.values && Array.isArray(stream.values)) { stream.values.forEach( ([timestamp, logLine]: [string, string]) => { console.log("Stream value:", { timestamp, logLine }); + + const timestampSeconds = parseFloat(timestamp) / 1000000000; + const event = parseUpstreamNodeLog( + logLine, + timestampSeconds, + ); + + if (event) { + events.push(event); + } }, ); } }); + + if (events.length > 0) { + console.log("Dispatching events:", events); + dispatch({ + type: "ADD_TIMELINE_EVENT_BATCH", + payload: events, + }); + } } } catch (error) { console.error("Error processing Loki message:", error); diff --git a/ui/src/components/Sim/modules/TimelineSlider.tsx b/ui/src/components/Sim/modules/TimelineSlider.tsx index 3cf993a85..721af704e 100644 --- a/ui/src/components/Sim/modules/TimelineSlider.tsx +++ b/ui/src/components/Sim/modules/TimelineSlider.tsx @@ -3,7 +3,7 @@ import { type FC, useCallback } from "react"; export const TimelineSlider: FC = () => { const { - state: { events, currentTime }, + state: { events, currentTime, minTime, maxTime }, dispatch, } = useSimContext(); @@ -16,15 +16,17 @@ export const TimelineSlider: FC = () => { ); const hasEvents = events.length > 0; - const maxTime = hasEvents ? events[events.length - 1].time_s : 100; // Default duration when no events + const timeRange = maxTime - minTime; const formatTime = (timeInSeconds: number, highResolution = false) => { + // Show relative time from minTime + const relativeTime = timeInSeconds - minTime; return highResolution - ? `${timeInSeconds.toFixed(3)}s` - : `${timeInSeconds.toFixed(1)}s`; + ? `${relativeTime.toFixed(3)}s` + : `${relativeTime.toFixed(1)}s`; }; - const currentPercent = maxTime > 0 ? (currentTime / maxTime) * 100 : 0; + const currentPercent = timeRange > 0 ? ((currentTime - minTime) / timeRange) * 100 : 0; return (
{ {/* Interactive slider */} event.time_s); + const minEventTime = Math.min(...timestamps); + const maxEventTime = Math.max(...timestamps); + + // Update timeline bounds and clamp current time + const newMinTime = + state.minTime == 0 + ? minEventTime + : Math.min(state.minTime, minEventTime); + const newMaxTime = Math.max(state.maxTime, maxEventTime); + + const clampedCurrentTime = Math.max( + newMinTime, + Math.min(state.currentTime, newMaxTime), + ); + return { ...state, - events: [...state.events, ...action.payload], + events: newEvents, + minTime: newMinTime, + maxTime: newMaxTime, + currentTime: clampedCurrentTime, }; + } case "SET_TIMELINE_TIME": { - const newTime = action.payload; + const newTime = Math.max( + state.minTime, + Math.min(action.payload, state.maxTime), + ); // Recompute complete aggregated data based on new timeline position const nodeIds = Array.from(state.topography.nodes.keys()); @@ -159,6 +193,8 @@ export const reducer = ( ...state, events: [], currentTime: 0, + minTime: 0, + maxTime: 0, isPlaying: false, speedMultiplier: 1, }; diff --git a/ui/src/contexts/SimContext/types.ts b/ui/src/contexts/SimContext/types.ts index bd011ae7f..82eb15475 100644 --- a/ui/src/contexts/SimContext/types.ts +++ b/ui/src/contexts/SimContext/types.ts @@ -85,6 +85,7 @@ export interface ISimContextState { topologyLoaded: boolean; events: IServerMessage[]; currentTime: number; + minTime: number; maxTime: number; isPlaying: boolean; speedMultiplier: number; From 9c8013ff3c8595977c52de3c133cd1bf4c0a0f26 Mon Sep 17 00:00:00 2001 From: Sebastian Nagel Date: Tue, 2 Dec 2025 11:10:50 +0100 Subject: [PATCH 04/15] Improve RBSent parsing from immdb-server output --- .../components/Sim/hooks/useLokiWebSocket.ts | 88 +++++++++++++------ 1 file changed, 61 insertions(+), 27 deletions(-) diff --git a/ui/src/components/Sim/hooks/useLokiWebSocket.ts b/ui/src/components/Sim/hooks/useLokiWebSocket.ts index 7e5b2f49b..6596bc5c7 100644 --- a/ui/src/components/Sim/hooks/useLokiWebSocket.ts +++ b/ui/src/components/Sim/hooks/useLokiWebSocket.ts @@ -6,27 +6,48 @@ import { } from "@/components/Sim/types"; import { useCallback, useEffect, useRef, useState } from "react"; -export const useLokiWebSocket = () => { - const { - state: { lokiHost }, - dispatch, - } = useSimContext(); - const [connecting, setConnecting] = useState(false); - const [connected, setConnected] = useState(false); - const wsRef = useRef(null); +// TODO: Replace with topology-based mapping +const HOST_PORT_TO_NODE: Record = { + "127.0.0.1:3001": "UpstreamNode", + "127.0.0.1:3002": "Node0", + "127.0.0.1:3003": "Node1", + "127.0.0.1:3004": "Node2", + // Add more mappings as needed +}; + +const parseCardanoNodeLog = ( + streamLabels: any, + timestamp: number, + logLine: string, +): IServerMessage | null => { + try { + const logData = JSON.parse(logLine); + + if (logData.msg === "MsgBlock" && logData.direction === "Send") { + // Extract sender from stream labels (process name) + const sender = streamLabels.process; + + // Parse connection to extract recipient + // connectionId format: "127.0.0.1:3001 127.0.0.1:3002" + const connectionId = logData.connectionId; + let recipient = "Node0"; // fallback + + if (connectionId) { + // Split connectionId to get both endpoints + const endpoints = connectionId.split(" "); + if (endpoints.length === 2) { + // Second endpoint is the recipient + const recipientEndpoint = endpoints[1]; + recipient = HOST_PORT_TO_NODE[recipientEndpoint] || recipient; + } + } - const parseUpstreamNodeLog = ( - logLine: string, - timestamp: number, - ): IServerMessage | null => { - if (logLine.trim() === "MsgBlock") { - // Hard-coded IRankingBlockSent for UpstreamNode -> Node0 const message: IRankingBlockSent = { type: EServerMessageType.RBSent, - slot: 0, // TODO: extract from actual data when available - id: `rb-${timestamp}`, - sender: "UpstreamNode", - recipient: "Node0", + slot: logData.prevCount || 0, // FIXME: Use proper slot number + id: `rb-${logData.prevCount + 1}`, + sender, + recipient, }; return { @@ -34,9 +55,21 @@ export const useLokiWebSocket = () => { message, }; } + } catch (error) { + console.warn("Failed to parse log line:", logLine, error); + } - return null; - }; + return null; +}; + +export const useLokiWebSocket = () => { + const { + state: { lokiHost }, + dispatch, + } = useSimContext(); + const [connecting, setConnecting] = useState(false); + const [connected, setConnected] = useState(false); + const wsRef = useRef(null); const connect = useCallback(() => { if (!lokiHost || connecting || connected) return; @@ -47,7 +80,7 @@ export const useLokiWebSocket = () => { try { // TODO: find better queries const query = encodeURIComponent( - '{service="immdb-server"} |= "MsgBlock"', + '{service="cardano-node"} |= "MsgBlock"', ); const wsUrl = `ws://${lokiHost}/loki/api/v1/tail?query=${query}`; console.log("Connecting to ", wsUrl); @@ -63,24 +96,26 @@ export const useLokiWebSocket = () => { ws.onmessage = (event) => { try { const data = JSON.parse(event.data); - console.log("Received Loki stream data:", data); + console.debug("Received Loki streams:", data); if (data.streams && Array.isArray(data.streams)) { const events: IServerMessage[] = []; data.streams.forEach((stream: any) => { + console.debug("Stream labels:", stream.stream); if (stream.values && Array.isArray(stream.values)) { stream.values.forEach( ([timestamp, logLine]: [string, string]) => { - console.log("Stream value:", { timestamp, logLine }); + console.debug("Stream value:", { timestamp, logLine }); const timestampSeconds = parseFloat(timestamp) / 1000000000; - const event = parseUpstreamNodeLog( - logLine, + const event = parseCardanoNodeLog( + stream.stream, timestampSeconds, + logLine, ); - if (event) { + console.debug("Parsed", event.time_s, event.message); events.push(event); } }, @@ -89,7 +124,6 @@ export const useLokiWebSocket = () => { }); if (events.length > 0) { - console.log("Dispatching events:", events); dispatch({ type: "ADD_TIMELINE_EVENT_BATCH", payload: events, From 05c7b30ac9344bd0f6272053dbaa8b095dbd2541 Mon Sep 17 00:00:00 2001 From: Sebastian Nagel Date: Thu, 4 Dec 2025 12:38:02 +0100 Subject: [PATCH 05/15] Parse RBSent from SendBlock namespace --- .../components/Sim/hooks/useLokiWebSocket.ts | 55 ++++++++++++++++--- 1 file changed, 46 insertions(+), 9 deletions(-) diff --git a/ui/src/components/Sim/hooks/useLokiWebSocket.ts b/ui/src/components/Sim/hooks/useLokiWebSocket.ts index 6596bc5c7..64bc82f77 100644 --- a/ui/src/components/Sim/hooks/useLokiWebSocket.ts +++ b/ui/src/components/Sim/hooks/useLokiWebSocket.ts @@ -10,8 +10,7 @@ import { useCallback, useEffect, useRef, useState } from "react"; const HOST_PORT_TO_NODE: Record = { "127.0.0.1:3001": "UpstreamNode", "127.0.0.1:3002": "Node0", - "127.0.0.1:3003": "Node1", - "127.0.0.1:3004": "Node2", + "127.0.0.1:3003": "DownstreamNode", // Add more mappings as needed }; @@ -23,6 +22,7 @@ const parseCardanoNodeLog = ( try { const logData = JSON.parse(logLine); + // Handle MsgBlock with Send direction if (logData.msg === "MsgBlock" && logData.direction === "Send") { // Extract sender from stream labels (process name) const sender = streamLabels.process; @@ -45,7 +45,41 @@ const parseCardanoNodeLog = ( const message: IRankingBlockSent = { type: EServerMessageType.RBSent, slot: logData.prevCount || 0, // FIXME: Use proper slot number - id: `rb-${logData.prevCount + 1}`, + id: `rb-${logData.prevCount + 1}`, // FIXME: use proper block hash + sender, + recipient, + }; + + return { + time_s: timestamp, + message, + }; + } + + // Handle BlockFetchServer kind + if (logData.kind === "BlockFetchServer" && logData.peer && logData.block) { + // Extract sender from stream labels (process name) + const sender = streamLabels.process; + + // Parse connection to extract recipient + // connectionId format: "127.0.0.1:3002 127.0.0.1:3003" + const connectionId = logData.peer.connectionId; + let recipient = "Node0"; // fallback + + if (connectionId) { + // Split connectionId to get both endpoints + const endpoints = connectionId.split(" "); + if (endpoints.length === 2) { + // Second endpoint is the recipient + const recipientEndpoint = endpoints[1]; + recipient = HOST_PORT_TO_NODE[recipientEndpoint] || recipient; + } + } + + const message: IRankingBlockSent = { + type: EServerMessageType.RBSent, + slot: 0, // FIXME: Use proper slot number + id: `rb-${logData.block.substring(0, 8)}`, sender, recipient, }; @@ -78,15 +112,14 @@ export const useLokiWebSocket = () => { dispatch({ type: "RESET_TIMELINE" }); try { - // TODO: find better queries - const query = encodeURIComponent( - '{service="cardano-node"} |= "MsgBlock"', - ); - const wsUrl = `ws://${lokiHost}/loki/api/v1/tail?query=${query}`; + // TODO: Multiple websockets instead? e.g. query={ns="BlockFetch.Client.CompletedBlockFetch"} + const query = encodeURIComponent('{service="cardano-node"}'); + const wsUrl = `ws://${lokiHost}/loki/api/v1/tail?query=${query}&limit=10000`; console.log("Connecting to ", wsUrl); const ws = new WebSocket(wsUrl); wsRef.current = ws; + let count = 0; ws.onopen = () => { setConnecting(false); setConnected(true); @@ -106,7 +139,11 @@ export const useLokiWebSocket = () => { if (stream.values && Array.isArray(stream.values)) { stream.values.forEach( ([timestamp, logLine]: [string, string]) => { - console.debug("Stream value:", { timestamp, logLine }); + count++; + console.debug("Stream value:", count, { + timestamp, + logLine, + }); const timestampSeconds = parseFloat(timestamp) / 1000000000; const event = parseCardanoNodeLog( From 51bbe8684ee71d705adf36dcca9ecc703f1297f7 Mon Sep 17 00:00:00 2001 From: Sebastian Nagel Date: Thu, 4 Dec 2025 14:45:35 +0100 Subject: [PATCH 06/15] Maintain multiple websockets with smaller queries --- ui/public/topologies/prototype.yaml | 4 +- .../components/Sim/hooks/useLokiWebSocket.ts | 276 ++++++++++-------- 2 files changed, 160 insertions(+), 120 deletions(-) diff --git a/ui/public/topologies/prototype.yaml b/ui/public/topologies/prototype.yaml index b07bb950e..fdc2ca4c6 100644 --- a/ui/public/topologies/prototype.yaml +++ b/ui/public/topologies/prototype.yaml @@ -12,7 +12,7 @@ nodes: - 100 producers: UpstreamNode: - latency-ms: 17.0 + latency-ms: 200.0 DownstreamNode: stake: 0 location: @@ -20,4 +20,4 @@ nodes: - 200 producers: Node0: - latency-ms: 32.0 + latency-ms: 200.0 diff --git a/ui/src/components/Sim/hooks/useLokiWebSocket.ts b/ui/src/components/Sim/hooks/useLokiWebSocket.ts index 64bc82f77..36fc1229f 100644 --- a/ui/src/components/Sim/hooks/useLokiWebSocket.ts +++ b/ui/src/components/Sim/hooks/useLokiWebSocket.ts @@ -4,7 +4,18 @@ import { EServerMessageType, IRankingBlockSent, } from "@/components/Sim/types"; -import { useCallback, useEffect, useRef, useState } from "react"; +import { useRef } from "react"; + +interface QueryConfig { + query: string; + parser: ( + streamLabels: any, + timestamp: number, + logLine: string, + ) => IServerMessage | null; +} + +// FIXME: latency in topology is wrong // TODO: Replace with topology-based mapping const HOST_PORT_TO_NODE: Record = { @@ -14,7 +25,7 @@ const HOST_PORT_TO_NODE: Record = { // Add more mappings as needed }; -const parseCardanoNodeLog = ( +const parseBlockFetchServerLog = ( streamLabels: any, timestamp: number, logLine: string, @@ -22,14 +33,14 @@ const parseCardanoNodeLog = ( try { const logData = JSON.parse(logLine); - // Handle MsgBlock with Send direction - if (logData.msg === "MsgBlock" && logData.direction === "Send") { + // Handle BlockFetchServer kind + if (logData.kind === "BlockFetchServer" && logData.peer && logData.block) { // Extract sender from stream labels (process name) const sender = streamLabels.process; // Parse connection to extract recipient - // connectionId format: "127.0.0.1:3001 127.0.0.1:3002" - const connectionId = logData.connectionId; + // connectionId format: "127.0.0.1:3002 127.0.0.1:3003" + const connectionId = logData.peer.connectionId; let recipient = "Node0"; // fallback if (connectionId) { @@ -44,8 +55,8 @@ const parseCardanoNodeLog = ( const message: IRankingBlockSent = { type: EServerMessageType.RBSent, - slot: logData.prevCount || 0, // FIXME: Use proper slot number - id: `rb-${logData.prevCount + 1}`, // FIXME: use proper block hash + slot: 0, // FIXME: Use proper slot number + id: `rb-blockfetch-${logData.block.substring(0, 8)}`, sender, recipient, }; @@ -55,15 +66,29 @@ const parseCardanoNodeLog = ( message, }; } + } catch (error) { + console.warn("Failed to parse BlockFetchServer log line:", logLine, error); + } - // Handle BlockFetchServer kind - if (logData.kind === "BlockFetchServer" && logData.peer && logData.block) { + return null; +}; + +const parseUpstreamNodeLog = ( + streamLabels: any, + timestamp: number, + logLine: string, +): IServerMessage | null => { + try { + const logData = JSON.parse(logLine); + + // Handle MsgBlock with Send direction + if (logData.msg === "MsgBlock" && logData.direction === "Send") { // Extract sender from stream labels (process name) const sender = streamLabels.process; // Parse connection to extract recipient - // connectionId format: "127.0.0.1:3002 127.0.0.1:3003" - const connectionId = logData.peer.connectionId; + // connectionId format: "127.0.0.1:3001 127.0.0.1:3002" + const connectionId = logData.connectionId; let recipient = "Node0"; // fallback if (connectionId) { @@ -78,8 +103,8 @@ const parseCardanoNodeLog = ( const message: IRankingBlockSent = { type: EServerMessageType.RBSent, - slot: 0, // FIXME: Use proper slot number - id: `rb-${logData.block.substring(0, 8)}`, + slot: logData.prevCount || 0, // FIXME: Use proper slot number + id: `rb-upstream-${logData.prevCount + 1}`, // FIXME: use proper block hash sender, recipient, }; @@ -90,130 +115,145 @@ const parseCardanoNodeLog = ( }; } } catch (error) { - console.warn("Failed to parse log line:", logLine, error); + console.warn("Failed to parse UpstreamNode log line:", logLine, error); } return null; }; -export const useLokiWebSocket = () => { - const { - state: { lokiHost }, - dispatch, - } = useSimContext(); - const [connecting, setConnecting] = useState(false); - const [connected, setConnected] = useState(false); - const wsRef = useRef(null); +// Query configurations +const QUERY_CONFIGS: QueryConfig[] = [ + { + query: '{service="cardano-node", ns="BlockFetch.Server.SendBlock"}', + parser: parseBlockFetchServerLog, + }, + { + query: '{service="cardano-node", process="UpstreamNode"} |= `MsgBlock`', + parser: parseUpstreamNodeLog, + }, +]; - const connect = useCallback(() => { - if (!lokiHost || connecting || connected) return; +function connectLokiWebSockets(lokiHost: string, dispatch: any): () => void { + const websockets: WebSocket[] = []; + let connectedCount = 0; - setConnecting(true); - dispatch({ type: "RESET_TIMELINE" }); + dispatch({ type: "SET_LOKI_CONNECTED", payload: false }); - try { - // TODO: Multiple websockets instead? e.g. query={ns="BlockFetch.Client.CompletedBlockFetch"} - const query = encodeURIComponent('{service="cardano-node"}'); - const wsUrl = `ws://${lokiHost}/loki/api/v1/tail?query=${query}&limit=10000`; - console.log("Connecting to ", wsUrl); - const ws = new WebSocket(wsUrl); - wsRef.current = ws; - - let count = 0; - ws.onopen = () => { - setConnecting(false); - setConnected(true); + const createWebSocket = (config: QueryConfig, index: number): WebSocket => { + const query = encodeURIComponent(config.query); + const wsUrl = `ws://${lokiHost}/loki/api/v1/tail?query=${query}`; + console.log(`Connecting with query ${index}:`, wsUrl); + const ws = new WebSocket(wsUrl); + + let count = 0; + ws.onopen = () => { + connectedCount += 1; + if (connectedCount === QUERY_CONFIGS.length) { dispatch({ type: "SET_LOKI_CONNECTED", payload: true }); - }; + } + }; + + ws.onmessage = (event) => { + try { + const data = JSON.parse(event.data); + console.debug(`Received Loki streams from query ${index}:`, data); - ws.onmessage = (event) => { - try { - const data = JSON.parse(event.data); - console.debug("Received Loki streams:", data); - - if (data.streams && Array.isArray(data.streams)) { - const events: IServerMessage[] = []; - - data.streams.forEach((stream: any) => { - console.debug("Stream labels:", stream.stream); - if (stream.values && Array.isArray(stream.values)) { - stream.values.forEach( - ([timestamp, logLine]: [string, string]) => { - count++; - console.debug("Stream value:", count, { - timestamp, - logLine, - }); - - const timestampSeconds = parseFloat(timestamp) / 1000000000; - const event = parseCardanoNodeLog( - stream.stream, - timestampSeconds, - logLine, - ); - if (event) { - console.debug("Parsed", event.time_s, event.message); - events.push(event); - } - }, - ); - } - }); - - if (events.length > 0) { - dispatch({ - type: "ADD_TIMELINE_EVENT_BATCH", - payload: events, - }); + if (data.streams && Array.isArray(data.streams)) { + const events: IServerMessage[] = []; + + data.streams.forEach((stream: any) => { + console.debug("Stream labels:", stream.stream); + if (stream.values && Array.isArray(stream.values)) { + stream.values.forEach( + ([timestamp, logLine]: [string, string]) => { + count++; + console.debug(`Stream value from query ${index}:`, count, { + timestamp, + logLine, + }); + + const timestampSeconds = parseFloat(timestamp) / 1000000000; + const event = config.parser( + stream.stream, + timestampSeconds, + logLine, + ); + if (event) { + console.debug("Parsed", event.time_s, event.message); + events.push(event); + } + }, + ); } + }); + + if (events.length > 0) { + dispatch({ type: "ADD_TIMELINE_EVENT_BATCH", payload: events }); } - } catch (error) { - console.error("Error processing Loki message:", error); } - }; + } catch (error) { + console.error( + `Error processing Loki message from query ${index}:`, + error, + ); + } + }; - ws.onerror = (error) => { - console.error("WebSocket error:", error); - setConnecting(false); - setConnected(false); - dispatch({ type: "SET_LOKI_CONNECTED", payload: false }); - }; + ws.onerror = (error) => { + console.error(`WebSocket error for query ${index}:`, error); + connectedCount = 0; + dispatch({ type: "SET_LOKI_CONNECTED", payload: false }); + }; - ws.onclose = () => { - setConnecting(false); - setConnected(false); + ws.onclose = () => { + connectedCount = Math.max(0, connectedCount - 1); + if (connectedCount === 0) { dispatch({ type: "SET_LOKI_CONNECTED", payload: false }); - wsRef.current = null; - }; - } catch (error) { - console.error("Failed to create WebSocket connection:", error); - setConnecting(false); - setConnected(false); - } - }, [lokiHost, connecting, connected, dispatch]); + } + }; - const disconnect = useCallback(() => { - if (wsRef.current) { - wsRef.current.close(); - wsRef.current = null; - } - setConnecting(false); - setConnected(false); + return ws; + }; + + try { + QUERY_CONFIGS.forEach((config, index) => { + websockets.push(createWebSocket(config, index)); + }); + } catch (error) { + console.error("Failed to create WebSocket connections:", error); dispatch({ type: "SET_LOKI_CONNECTED", payload: false }); - }, [dispatch]); + } - useEffect(() => { - return () => { - if (wsRef.current) { - wsRef.current.close(); + // Return cleanup function + return () => { + websockets.forEach((ws) => { + if (ws) { + ws.close(); } - }; - }, []); + }); + }; +} - return { - connect, - disconnect, - connecting, - connected, +export const useLokiWebSocket = () => { + const { + state: { lokiHost, lokiConnected }, + dispatch, + } = useSimContext(); + const cleanupRef = useRef<(() => void) | null>(null); + + const connect = () => { + if (!lokiHost || lokiConnected) return; + + dispatch({ type: "RESET_TIMELINE" }); + + cleanupRef.current = connectLokiWebSockets(lokiHost, dispatch); }; + + const disconnect = () => { + cleanupRef.current?.(); + cleanupRef.current = null; + dispatch({ type: "SET_LOKI_CONNECTED", payload: false }); + }; + + return { connect, disconnect }; }; From 9f075f37c1a54e36f1b889b8e6229739c846e691 Mon Sep 17 00:00:00 2001 From: Sebastian Nagel Date: Thu, 4 Dec 2025 14:59:23 +0100 Subject: [PATCH 07/15] Parse RBReceived events --- .../components/Sim/hooks/useLokiWebSocket.ts | 64 ++++++++++++++++++- 1 file changed, 62 insertions(+), 2 deletions(-) diff --git a/ui/src/components/Sim/hooks/useLokiWebSocket.ts b/ui/src/components/Sim/hooks/useLokiWebSocket.ts index 36fc1229f..5065ff1fe 100644 --- a/ui/src/components/Sim/hooks/useLokiWebSocket.ts +++ b/ui/src/components/Sim/hooks/useLokiWebSocket.ts @@ -3,6 +3,7 @@ import { IServerMessage, EServerMessageType, IRankingBlockSent, + IRankingBlockReceived, } from "@/components/Sim/types"; import { useRef } from "react"; @@ -56,7 +57,7 @@ const parseBlockFetchServerLog = ( const message: IRankingBlockSent = { type: EServerMessageType.RBSent, slot: 0, // FIXME: Use proper slot number - id: `rb-blockfetch-${logData.block.substring(0, 8)}`, + id: `rb-${logData.block.substring(0, 8)}`, sender, recipient, }; @@ -121,6 +122,60 @@ const parseUpstreamNodeLog = ( return null; }; +const parseCompletedBlockFetchLog = ( + streamLabels: any, + timestamp: number, + logLine: string, +): IServerMessage | null => { + try { + const logData = JSON.parse(logLine); + + // Handle CompletedBlockFetch kind + if ( + logData.kind === "CompletedBlockFetch" && + logData.peer && + logData.block + ) { + // Extract recipient from stream labels (process name) + const recipient = streamLabels.process; + + // Parse connection to extract sender + // connectionId format: "127.0.0.1:3003 127.0.0.1:3002" + const connectionId = logData.peer.connectionId; + let sender = "Node0"; // fallback + if (connectionId) { + // Split connectionId to get both endpoints + const endpoints = connectionId.split(" "); + if (endpoints.length === 2) { + const senderEndpoint = endpoints[1]; + sender = HOST_PORT_TO_NODE[senderEndpoint] || sender; + } + } + + const message: IRankingBlockReceived = { + type: EServerMessageType.RBReceived, + slot: 0, // FIXME: Use proper slot number + id: `rb-${logData.block.substring(0, 8)}`, + sender, + recipient, + }; + + return { + time_s: timestamp, + message, + }; + } + } catch (error) { + console.warn( + "Failed to parse CompletedBlockFetch log line:", + logLine, + error, + ); + } + + return null; +}; + // Query configurations const QUERY_CONFIGS: QueryConfig[] = [ { @@ -131,6 +186,11 @@ const QUERY_CONFIGS: QueryConfig[] = [ query: '{service="cardano-node", process="UpstreamNode"} |= `MsgBlock`', parser: parseUpstreamNodeLog, }, + { + query: + '{service="cardano-node", ns="BlockFetch.Client.CompletedBlockFetch"}', + parser: parseCompletedBlockFetchLog, + }, ]; function connectLokiWebSockets(lokiHost: string, dispatch: any): () => void { @@ -179,7 +239,7 @@ function connectLokiWebSockets(lokiHost: string, dispatch: any): () => void { logLine, ); if (event) { - console.debug("Parsed", event.time_s, event.message); + console.warn("Parsed", event.time_s, event.message); events.push(event); } }, From 660eb8435429ef9dce1351ff8001236c5bee4480 Mon Sep 17 00:00:00 2001 From: Sebastian Nagel Date: Thu, 4 Dec 2025 15:19:55 +0100 Subject: [PATCH 08/15] Use a single websocket again for ordered events --- .../components/Sim/hooks/useLokiWebSocket.ts | 205 ++++++++---------- 1 file changed, 88 insertions(+), 117 deletions(-) diff --git a/ui/src/components/Sim/hooks/useLokiWebSocket.ts b/ui/src/components/Sim/hooks/useLokiWebSocket.ts index 5065ff1fe..00dfd3c73 100644 --- a/ui/src/components/Sim/hooks/useLokiWebSocket.ts +++ b/ui/src/components/Sim/hooks/useLokiWebSocket.ts @@ -7,15 +7,6 @@ import { } from "@/components/Sim/types"; import { useRef } from "react"; -interface QueryConfig { - query: string; - parser: ( - streamLabels: any, - timestamp: number, - logLine: string, - ) => IServerMessage | null; -} - // FIXME: latency in topology is wrong // TODO: Replace with topology-based mapping @@ -176,122 +167,102 @@ const parseCompletedBlockFetchLog = ( return null; }; -// Query configurations -const QUERY_CONFIGS: QueryConfig[] = [ - { - query: '{service="cardano-node", ns="BlockFetch.Server.SendBlock"}', - parser: parseBlockFetchServerLog, - }, - { - query: '{service="cardano-node", process="UpstreamNode"} |= `MsgBlock`', - parser: parseUpstreamNodeLog, - }, - { - query: - '{service="cardano-node", ns="BlockFetch.Client.CompletedBlockFetch"}', - parser: parseCompletedBlockFetchLog, - }, -]; - -function connectLokiWebSockets(lokiHost: string, dispatch: any): () => void { - const websockets: WebSocket[] = []; - let connectedCount = 0; - - dispatch({ type: "SET_LOKI_CONNECTED", payload: false }); - - const createWebSocket = (config: QueryConfig, index: number): WebSocket => { - const query = encodeURIComponent(config.query); - const wsUrl = `ws://${lokiHost}/loki/api/v1/tail?query=${query}`; - console.log(`Connecting with query ${index}:`, wsUrl); - const ws = new WebSocket(wsUrl); - - let count = 0; - ws.onopen = () => { - connectedCount += 1; - if (connectedCount === QUERY_CONFIGS.length) { - dispatch({ type: "SET_LOKI_CONNECTED", payload: true }); - } - }; - - ws.onmessage = (event) => { - try { - const data = JSON.parse(event.data); - console.debug(`Received Loki streams from query ${index}:`, data); - - if (data.streams && Array.isArray(data.streams)) { - const events: IServerMessage[] = []; - - data.streams.forEach((stream: any) => { - console.debug("Stream labels:", stream.stream); - if (stream.values && Array.isArray(stream.values)) { - stream.values.forEach( - ([timestamp, logLine]: [string, string]) => { - count++; - console.debug(`Stream value from query ${index}:`, count, { - timestamp, - logLine, - }); - - const timestampSeconds = parseFloat(timestamp) / 1000000000; - const event = config.parser( - stream.stream, - timestampSeconds, - logLine, - ); - if (event) { - console.warn("Parsed", event.time_s, event.message); - events.push(event); - } - }, - ); - } - }); +// Combined parser that handles all event types +const parseCardanoNodeLog = ( + streamLabels: any, + timestamp: number, + logLine: string, +): IServerMessage | null => { + try { + const logData = JSON.parse(logLine); + + // Try each parser in order + if (logData.kind === "BlockFetchServer") { + return parseBlockFetchServerLog(streamLabels, timestamp, logLine); + } + + if (logData.kind === "CompletedBlockFetch") { + return parseCompletedBlockFetchLog(streamLabels, timestamp, logLine); + } + + if (logData.msg === "MsgBlock" && logData.direction === "Send") { + return parseUpstreamNodeLog(streamLabels, timestamp, logLine); + } + } catch (error) { + console.warn("Failed to parse log line:", logLine, error); + } - if (events.length > 0) { - dispatch({ type: "ADD_TIMELINE_EVENT_BATCH", payload: events }); + return null; +}; + +function connectLokiWebSocket(lokiHost: string, dispatch: any): () => void { + // NOTE: Single websocket is essential because: + // 1. Timeline aggregation assumes events are chronologically ordered + // 2. Multiple websockets deliver events out of order across different queries + // 3. Loki naturally returns results in chronological order within a single stream + // 4. Sorting large event arrays in the reducer is too expensive for dense simulation data + const query = + '{service="cardano-node"} |~ "SendBlock|MsgBlock|CompletedBlockFetch"'; + const wsUrl = `ws://${lokiHost}/loki/api/v1/tail?query=${encodeURIComponent(query)}&limit=5000`; + console.log("Connecting to Loki:", wsUrl); + + const ws = new WebSocket(wsUrl); + + ws.onopen = () => { + dispatch({ type: "SET_LOKI_CONNECTED", payload: true }); + }; + + let count = 0; + ws.onmessage = (event) => { + try { + const data = JSON.parse(event.data); + console.debug("Received Loki streams:", data); + + if (data.streams && Array.isArray(data.streams)) { + const events: IServerMessage[] = []; + + data.streams.forEach((stream: any) => { + console.debug("Stream labels:", stream.stream); + if (stream.values && Array.isArray(stream.values)) { + stream.values.forEach(([timestamp, logLine]: [string, string]) => { + count++; + console.debug(`Stream value:`, count, { + timestamp, + logLine, + }); + const timestampSeconds = parseFloat(timestamp) / 1000000000; + const event = parseCardanoNodeLog( + stream.stream, + timestampSeconds, + logLine, + ); + if (event) { + console.warn("Parsed", event.time_s, event.message); + events.push(event); + } + }); } + }); + + if (events.length > 0) { + dispatch({ type: "ADD_TIMELINE_EVENT_BATCH", payload: events }); } - } catch (error) { - console.error( - `Error processing Loki message from query ${index}:`, - error, - ); - } - }; - - ws.onerror = (error) => { - console.error(`WebSocket error for query ${index}:`, error); - connectedCount = 0; - dispatch({ type: "SET_LOKI_CONNECTED", payload: false }); - }; - - ws.onclose = () => { - connectedCount = Math.max(0, connectedCount - 1); - if (connectedCount === 0) { - dispatch({ type: "SET_LOKI_CONNECTED", payload: false }); } - }; - - return ws; + } catch (error) { + console.error("Error processing Loki message:", error); + } }; - try { - QUERY_CONFIGS.forEach((config, index) => { - websockets.push(createWebSocket(config, index)); - }); - } catch (error) { - console.error("Failed to create WebSocket connections:", error); + ws.onerror = (error) => { + console.error("WebSocket error:", error); dispatch({ type: "SET_LOKI_CONNECTED", payload: false }); - } + }; - // Return cleanup function - return () => { - websockets.forEach((ws) => { - if (ws) { - ws.close(); - } - }); + ws.onclose = () => { + dispatch({ type: "SET_LOKI_CONNECTED", payload: false }); }; + + return () => ws.close(); } export const useLokiWebSocket = () => { @@ -306,7 +277,7 @@ export const useLokiWebSocket = () => { dispatch({ type: "RESET_TIMELINE" }); - cleanupRef.current = connectLokiWebSockets(lokiHost, dispatch); + cleanupRef.current = connectLokiWebSocket(lokiHost, dispatch); }; const disconnect = () => { From 676ab331ce7508f9ffc103ec919e2cd4e339e5ed Mon Sep 17 00:00:00 2001 From: Sebastian Nagel Date: Thu, 4 Dec 2025 17:47:07 +0100 Subject: [PATCH 09/15] Parse EBSent and re-organize parsers --- .../components/Sim/hooks/useLokiWebSocket.ts | 176 ++++++++++-------- 1 file changed, 94 insertions(+), 82 deletions(-) diff --git a/ui/src/components/Sim/hooks/useLokiWebSocket.ts b/ui/src/components/Sim/hooks/useLokiWebSocket.ts index 00dfd3c73..ac5eb5308 100644 --- a/ui/src/components/Sim/hooks/useLokiWebSocket.ts +++ b/ui/src/components/Sim/hooks/useLokiWebSocket.ts @@ -4,6 +4,7 @@ import { EServerMessageType, IRankingBlockSent, IRankingBlockReceived, + IEndorserBlockSent, } from "@/components/Sim/types"; import { useRef } from "react"; @@ -17,29 +18,24 @@ const HOST_PORT_TO_NODE: Record = { // Add more mappings as needed }; -const parseBlockFetchServerLog = ( +const parseRankingBlockSent = ( streamLabels: any, timestamp: number, logLine: string, ): IServerMessage | null => { try { - const logData = JSON.parse(logLine); + const log = JSON.parse(logLine); - // Handle BlockFetchServer kind - if (logData.kind === "BlockFetchServer" && logData.peer && logData.block) { - // Extract sender from stream labels (process name) + // From cardano-node ns=BlockFetch.Server.SendBlock + // {"block":"56515bfd5751ca2c1ca0f21050cdb1cd020e396c623a16a2274528f643d4b5fd","kind":"BlockFetchServer","peer":{"connectionId":"127.0.0.1:3002 127.0.0.1:3003"}} + if (log.kind === "BlockFetchServer" && log.peer && log.block) { const sender = streamLabels.process; - - // Parse connection to extract recipient - // connectionId format: "127.0.0.1:3002 127.0.0.1:3003" - const connectionId = logData.peer.connectionId; - let recipient = "Node0"; // fallback + const connectionId = log.peer.connectionId; + let recipient = "Node0"; if (connectionId) { - // Split connectionId to get both endpoints const endpoints = connectionId.split(" "); if (endpoints.length === 2) { - // Second endpoint is the recipient const recipientEndpoint = endpoints[1]; recipient = HOST_PORT_TO_NODE[recipientEndpoint] || recipient; } @@ -47,8 +43,8 @@ const parseBlockFetchServerLog = ( const message: IRankingBlockSent = { type: EServerMessageType.RBSent, - slot: 0, // FIXME: Use proper slot number - id: `rb-${logData.block.substring(0, 8)}`, + slot: 0, + id: `rb-${log.block.substring(0, 8)}`, sender, recipient, }; @@ -58,36 +54,17 @@ const parseBlockFetchServerLog = ( message, }; } - } catch (error) { - console.warn("Failed to parse BlockFetchServer log line:", logLine, error); - } - return null; -}; - -const parseUpstreamNodeLog = ( - streamLabels: any, - timestamp: number, - logLine: string, -): IServerMessage | null => { - try { - const logData = JSON.parse(logLine); - - // Handle MsgBlock with Send direction - if (logData.msg === "MsgBlock" && logData.direction === "Send") { - // Extract sender from stream labels (process name) + // From immdb-server (no ns) + // {"at":"2025-12-05T12:45:21.0021Z","connectionId":"127.0.0.1:3001 127.0.0.1:3002","direction":"Send","msg":"MsgBlock","mux_at":"2025-12-05T12:45:21.0020Z","prevCount":13} + if (log.msg === "MsgBlock" && log.direction === "Send") { const sender = streamLabels.process; - - // Parse connection to extract recipient - // connectionId format: "127.0.0.1:3001 127.0.0.1:3002" - const connectionId = logData.connectionId; - let recipient = "Node0"; // fallback + const connectionId = log.connectionId; + let recipient = "Node0"; if (connectionId) { - // Split connectionId to get both endpoints const endpoints = connectionId.split(" "); if (endpoints.length === 2) { - // Second endpoint is the recipient const recipientEndpoint = endpoints[1]; recipient = HOST_PORT_TO_NODE[recipientEndpoint] || recipient; } @@ -95,8 +72,8 @@ const parseUpstreamNodeLog = ( const message: IRankingBlockSent = { type: EServerMessageType.RBSent, - slot: logData.prevCount || 0, // FIXME: Use proper slot number - id: `rb-upstream-${logData.prevCount + 1}`, // FIXME: use proper block hash + slot: log.prevCount || 0, + id: `rb-upstream-${log.prevCount + 1}`, sender, recipient, }; @@ -107,35 +84,28 @@ const parseUpstreamNodeLog = ( }; } } catch (error) { - console.warn("Failed to parse UpstreamNode log line:", logLine, error); + console.warn("Failed to parse RankingBlockSent log line:", logLine, error); } return null; }; -const parseCompletedBlockFetchLog = ( +const parseRankingBlockReceived = ( streamLabels: any, timestamp: number, logLine: string, ): IServerMessage | null => { try { - const logData = JSON.parse(logLine); - - // Handle CompletedBlockFetch kind - if ( - logData.kind === "CompletedBlockFetch" && - logData.peer && - logData.block - ) { - // Extract recipient from stream labels (process name) + const log = JSON.parse(logLine); + + // ns=BlockFetch.Client.CompletedBlockFetch + // {"block":"56515bfd5751ca2c1ca0f21050cdb1cd020e396c623a16a2274528f643d4b5fd","delay":4985924.003937032,"kind":"CompletedBlockFetch","peer":{"connectionId":"127.0.0.1:3003 127.0.0.1:3002"},"size":862} + if (log.kind === "CompletedBlockFetch" && log.peer && log.block) { const recipient = streamLabels.process; + const connectionId = log.peer.connectionId; + let sender = "Node0"; - // Parse connection to extract sender - // connectionId format: "127.0.0.1:3003 127.0.0.1:3002" - const connectionId = logData.peer.connectionId; - let sender = "Node0"; // fallback if (connectionId) { - // Split connectionId to get both endpoints const endpoints = connectionId.split(" "); if (endpoints.length === 2) { const senderEndpoint = endpoints[1]; @@ -145,8 +115,8 @@ const parseCompletedBlockFetchLog = ( const message: IRankingBlockReceived = { type: EServerMessageType.RBReceived, - slot: 0, // FIXME: Use proper slot number - id: `rb-${logData.block.substring(0, 8)}`, + slot: 0, // FIXME: use proper slot + id: `rb-${log.block.substring(0, 8)}`, sender, recipient, }; @@ -158,7 +128,7 @@ const parseCompletedBlockFetchLog = ( } } catch (error) { console.warn( - "Failed to parse CompletedBlockFetch log line:", + "Failed to parse RankingBlockReceived log line:", logLine, error, ); @@ -167,29 +137,73 @@ const parseCompletedBlockFetchLog = ( return null; }; -// Combined parser that handles all event types -const parseCardanoNodeLog = ( +const parseEndorserBlockSent = ( streamLabels: any, timestamp: number, logLine: string, ): IServerMessage | null => { try { - const logData = JSON.parse(logLine); + const log = JSON.parse(logLine); - // Try each parser in order - if (logData.kind === "BlockFetchServer") { - return parseBlockFetchServerLog(streamLabels, timestamp, logLine); - } + // From immdb-server (no ns) + // {"at":"2025-12-05T12:45:20.9134Z","connectionId":"127.0.0.1:3001 127.0.0.1:3002","direction":"Send","msg":"MsgLeiosBlock","mux_at":"2025-12-05T12:45:20.9131Z","prevCount":0} + if (log.msg === "MsgLeiosBlock" && log.direction === "Send") { + const sender = streamLabels.process; + const connectionId = log.connectionId; + let recipient = "Node0"; + + if (connectionId) { + const endpoints = connectionId.split(" "); + if (endpoints.length === 2) { + const recipientEndpoint = endpoints[1]; + recipient = HOST_PORT_TO_NODE[recipientEndpoint] || recipient; + } + } + + const message: IEndorserBlockSent = { + type: EServerMessageType.EBSent, + slot: 0, // FIXME: use correct slot + id: `eb-${log.prevCount || 0}`, + sender, + recipient, + }; - if (logData.kind === "CompletedBlockFetch") { - return parseCompletedBlockFetchLog(streamLabels, timestamp, logLine); + return { + time_s: timestamp, + message, + }; } - if (logData.msg === "MsgBlock" && logData.direction === "Send") { - return parseUpstreamNodeLog(streamLabels, timestamp, logLine); + // From cardano-node ns=LeiosFetch.Remote.Send.Block + // {"kind":"Send","msg":{"eb":"\u003celided\u003e","ebBytesSize":27471,"kind":"MsgLeiosBlock"},"mux_at":"2025-12-05T12:45:20.93446848Z","peer":{"connectionId":"127.0.0.1:3002 127.0.0.1:3003"}} + if (log.kind === "Send" && log.msg && log.msg.kind === "MsgLeiosBlock") { + const sender = streamLabels.process; + const connectionId = log.peer?.connectionId; + let recipient = "Node0"; + + if (connectionId) { + const endpoints = connectionId.split(" "); + if (endpoints.length === 2) { + const recipientEndpoint = endpoints[1]; + recipient = HOST_PORT_TO_NODE[recipientEndpoint] || recipient; + } + } + + const message: IEndorserBlockSent = { + type: EServerMessageType.EBSent, + slot: 0, // FIXME: use correct slot + id: `eb-${log.msg.eb}`, // FIXME: msg.eb is always elided + sender, + recipient, + }; + + return { + time_s: timestamp, + message, + }; } } catch (error) { - console.warn("Failed to parse log line:", logLine, error); + console.warn("Failed to parse EndorserBlockSent log line:", logLine, error); } return null; @@ -202,7 +216,7 @@ function connectLokiWebSocket(lokiHost: string, dispatch: any): () => void { // 3. Loki naturally returns results in chronological order within a single stream // 4. Sorting large event arrays in the reducer is too expensive for dense simulation data const query = - '{service="cardano-node"} |~ "SendBlock|MsgBlock|CompletedBlockFetch"'; + '{service="cardano-node"} |~ "SendBlock|MsgBlock|CompletedBlockFetch|MsgLeiosBlock"'; const wsUrl = `ws://${lokiHost}/loki/api/v1/tail?query=${encodeURIComponent(query)}&limit=5000`; console.log("Connecting to Loki:", wsUrl); @@ -226,16 +240,14 @@ function connectLokiWebSocket(lokiHost: string, dispatch: any): () => void { if (stream.values && Array.isArray(stream.values)) { stream.values.forEach(([timestamp, logLine]: [string, string]) => { count++; - console.debug(`Stream value:`, count, { - timestamp, - logLine, - }); - const timestampSeconds = parseFloat(timestamp) / 1000000000; - const event = parseCardanoNodeLog( - stream.stream, - timestampSeconds, - logLine, - ); + console.debug(`Stream value:`, count, { timestamp, logLine }); + const ts = parseFloat(timestamp) / 1000000000; + + // TODO: simplify and push further upstream (e.g. into alloy) + const event = + parseRankingBlockSent(stream.stream, ts, logLine) || + parseRankingBlockReceived(stream.stream, ts, logLine) || + parseEndorserBlockSent(stream.stream, ts, logLine); if (event) { console.warn("Parsed", event.time_s, event.message); events.push(event); From dea4e18679d09dc1ebafe1d958303ef214e85e87 Mon Sep 17 00:00:00 2001 From: Sebastian Nagel Date: Fri, 5 Dec 2025 14:34:53 +0100 Subject: [PATCH 10/15] Parse EBReceived events --- .../components/Sim/hooks/useLokiWebSocket.ts | 51 ++++++++++++++++++- 1 file changed, 50 insertions(+), 1 deletion(-) diff --git a/ui/src/components/Sim/hooks/useLokiWebSocket.ts b/ui/src/components/Sim/hooks/useLokiWebSocket.ts index ac5eb5308..a868954f4 100644 --- a/ui/src/components/Sim/hooks/useLokiWebSocket.ts +++ b/ui/src/components/Sim/hooks/useLokiWebSocket.ts @@ -5,6 +5,7 @@ import { IRankingBlockSent, IRankingBlockReceived, IEndorserBlockSent, + IEndorserBlockReceived, } from "@/components/Sim/types"; import { useRef } from "react"; @@ -209,6 +210,53 @@ const parseEndorserBlockSent = ( return null; }; +const parseEndorserBlockReceived = ( + streamLabels: any, + timestamp: number, + logLine: string, +): IServerMessage | null => { + try { + const log = JSON.parse(logLine); + + // From cardano-node ns=LeiosFetch.Remote.Receive.Block + // {"mux_at":"2025-12-05T12:45:21.98320066Z","peer":{"connectionId":"127.0.0.1:3003 127.0.0.1:3002"},"kind":"Recv","msg":{"kind":"MsgLeiosBlock","eb":"\u003celided\u003e","ebBytesSize":27471}} + if (log.kind === "Recv" && log.msg && log.msg.kind === "MsgLeiosBlock") { + const recipient = streamLabels.process; + const connectionId = log.peer?.connectionId; + let sender = "Node0"; + + if (connectionId) { + const endpoints = connectionId.split(" "); + if (endpoints.length === 2) { + const senderEndpoint = endpoints[1]; + sender = HOST_PORT_TO_NODE[senderEndpoint] || sender; + } + } + + const message: IEndorserBlockReceived = { + type: EServerMessageType.EBReceived, + slot: 0, // FIXME: use correct slot + id: `eb-${log.msg.eb}`, // FIXME: msg.eb is always elided + sender, + recipient, + }; + + return { + time_s: timestamp, + message, + }; + } + } catch (error) { + console.warn( + "Failed to parse EndorserBlockReceived log line:", + logLine, + error, + ); + } + + return null; +}; + function connectLokiWebSocket(lokiHost: string, dispatch: any): () => void { // NOTE: Single websocket is essential because: // 1. Timeline aggregation assumes events are chronologically ordered @@ -247,7 +295,8 @@ function connectLokiWebSocket(lokiHost: string, dispatch: any): () => void { const event = parseRankingBlockSent(stream.stream, ts, logLine) || parseRankingBlockReceived(stream.stream, ts, logLine) || - parseEndorserBlockSent(stream.stream, ts, logLine); + parseEndorserBlockSent(stream.stream, ts, logLine) || + parseEndorserBlockReceived(stream.stream, ts, logLine); if (event) { console.warn("Parsed", event.time_s, event.message); events.push(event); From e435c3f5d4ee59bd0fecfd505d78e9d6f7035c07 Mon Sep 17 00:00:00 2001 From: Sebastian Nagel Date: Fri, 5 Dec 2025 15:13:55 +0100 Subject: [PATCH 11/15] Fix double aggregation when seeking playback --- ui/src/components/Sim/modules/Playback.tsx | 59 +++++++++++----------- 1 file changed, 29 insertions(+), 30 deletions(-) diff --git a/ui/src/components/Sim/modules/Playback.tsx b/ui/src/components/Sim/modules/Playback.tsx index f3359a8e0..8966cd442 100644 --- a/ui/src/components/Sim/modules/Playback.tsx +++ b/ui/src/components/Sim/modules/Playback.tsx @@ -13,9 +13,9 @@ export const Playback: FC = () => { // Timeline playback refs const intervalRef = useRef(null); const lastUpdateRef = useRef(0); - const currentTimeRef = useRef(currentTime); - // Refs for seeking functionality + // Refs for stable seeking callbacks + const currentTimeRef = useRef(currentTime); const eventsRef = useRef(events); const maxTimeRef = useRef(maxTime); @@ -47,15 +47,28 @@ export const Playback: FC = () => { }, []); const handleStep = useCallback( + (stepAmount: number) => { + const maxEventTime = + events.length > 0 ? events[events.length - 1].time_s : maxTime; + const newTime = Math.max( + 0, + Math.min(currentTime + stepAmount, maxEventTime), + ); + dispatch({ type: "SET_TIMELINE_TIME", payload: newTime }); + }, + [dispatch, events, maxTime, currentTime], + ); + + // Stable version for seeking intervals (uses refs) + const handleStepForSeeking = useCallback( (stepAmount: number) => { const maxEventTime = eventsRef.current.length > 0 ? eventsRef.current[eventsRef.current.length - 1].time_s : maxTimeRef.current; - const currentTime = currentTimeRef.current; const newTime = Math.max( 0, - Math.min(currentTime + stepAmount, maxEventTime), + Math.min(currentTimeRef.current + stepAmount, maxEventTime), ); currentTimeRef.current = newTime; dispatch({ type: "SET_TIMELINE_TIME", payload: newTime }); @@ -68,17 +81,17 @@ export const Playback: FC = () => { // Clear any existing seeking first stopSeeking(); - // Initial step using current ref values + // Initial step using current context values handleStep(stepAmount); - // Start continuous seeking after delay + // Start continuous seeking after delay using stable callback stepTimeoutRef.current = window.setTimeout(() => { stepIntervalRef.current = window.setInterval(() => { - handleStep(stepAmount); + handleStepForSeeking(stepAmount); }, 33); // ~30 FPS smooth seeking }, 300); // initial delay }, - [handleStep, stopSeeking], + [handleStep, handleStepForSeeking, stopSeeking], ); // Timeline playback effect - handles automatic advancement when playing @@ -91,6 +104,9 @@ export const Playback: FC = () => { clearInterval(intervalRef.current); } + // Capture current time at interval start to avoid stale closure + let localCurrentTime = currentTime; + // Start playback interval intervalRef.current = window.setInterval(() => { const now = performance.now(); @@ -99,16 +115,10 @@ export const Playback: FC = () => { ((now - lastUpdateRef.current) / 1000) * speedMultiplier; lastUpdateRef.current = now; - const newTime = Math.min( - currentTimeRef.current + deltaTime, - maxEventTime, - ); - currentTimeRef.current = newTime; + const newTime = Math.min(localCurrentTime + deltaTime, maxEventTime); + localCurrentTime = newTime; - dispatch({ - type: "SET_TIMELINE_TIME", - payload: newTime, - }); + dispatch({ type: "SET_TIMELINE_TIME", payload: newTime }); // Auto-pause at the end if (newTime >= maxEventTime) { @@ -124,16 +134,9 @@ export const Playback: FC = () => { intervalRef.current = null; } } - }, [ - isPlaying, - events.length, - currentTime, - speedMultiplier, - dispatch, - stopSeeking, - ]); + }, [isPlaying, events.length, speedMultiplier, dispatch]); - // Keep refs in sync when values change externally + // Keep refs in sync with context values useEffect(() => { currentTimeRef.current = currentTime; lastUpdateRef.current = performance.now(); @@ -245,7 +248,6 @@ export const Playback: FC = () => { {isLoaded && (