Skip to content

Commit a571a4c

Browse files
committed
Use a single websocket again for ordered events
1 parent eb07648 commit a571a4c

File tree

1 file changed

+88
-117
lines changed

1 file changed

+88
-117
lines changed

ui/src/components/Sim/hooks/useLokiWebSocket.ts

Lines changed: 88 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,6 @@ import {
77
} from "@/components/Sim/types";
88
import { useRef } from "react";
99

10-
interface QueryConfig {
11-
query: string;
12-
parser: (
13-
streamLabels: any,
14-
timestamp: number,
15-
logLine: string,
16-
) => IServerMessage | null;
17-
}
18-
1910
// FIXME: latency in topology is wrong
2011

2112
// TODO: Replace with topology-based mapping
@@ -176,122 +167,102 @@ const parseCompletedBlockFetchLog = (
176167
return null;
177168
};
178169

179-
// Query configurations
180-
const QUERY_CONFIGS: QueryConfig[] = [
181-
{
182-
query: '{service="cardano-node", ns="BlockFetch.Server.SendBlock"}',
183-
parser: parseBlockFetchServerLog,
184-
},
185-
{
186-
query: '{service="cardano-node", process="UpstreamNode"} |= `MsgBlock`',
187-
parser: parseUpstreamNodeLog,
188-
},
189-
{
190-
query:
191-
'{service="cardano-node", ns="BlockFetch.Client.CompletedBlockFetch"}',
192-
parser: parseCompletedBlockFetchLog,
193-
},
194-
];
195-
196-
function connectLokiWebSockets(lokiHost: string, dispatch: any): () => void {
197-
const websockets: WebSocket[] = [];
198-
let connectedCount = 0;
199-
200-
dispatch({ type: "SET_LOKI_CONNECTED", payload: false });
201-
202-
const createWebSocket = (config: QueryConfig, index: number): WebSocket => {
203-
const query = encodeURIComponent(config.query);
204-
const wsUrl = `ws://${lokiHost}/loki/api/v1/tail?query=${query}`;
205-
console.log(`Connecting with query ${index}:`, wsUrl);
206-
const ws = new WebSocket(wsUrl);
207-
208-
let count = 0;
209-
ws.onopen = () => {
210-
connectedCount += 1;
211-
if (connectedCount === QUERY_CONFIGS.length) {
212-
dispatch({ type: "SET_LOKI_CONNECTED", payload: true });
213-
}
214-
};
215-
216-
ws.onmessage = (event) => {
217-
try {
218-
const data = JSON.parse(event.data);
219-
console.debug(`Received Loki streams from query ${index}:`, data);
220-
221-
if (data.streams && Array.isArray(data.streams)) {
222-
const events: IServerMessage[] = [];
223-
224-
data.streams.forEach((stream: any) => {
225-
console.debug("Stream labels:", stream.stream);
226-
if (stream.values && Array.isArray(stream.values)) {
227-
stream.values.forEach(
228-
([timestamp, logLine]: [string, string]) => {
229-
count++;
230-
console.debug(`Stream value from query ${index}:`, count, {
231-
timestamp,
232-
logLine,
233-
});
234-
235-
const timestampSeconds = parseFloat(timestamp) / 1000000000;
236-
const event = config.parser(
237-
stream.stream,
238-
timestampSeconds,
239-
logLine,
240-
);
241-
if (event) {
242-
console.warn("Parsed", event.time_s, event.message);
243-
events.push(event);
244-
}
245-
},
246-
);
247-
}
248-
});
170+
// Combined parser that handles all event types
171+
const parseCardanoNodeLog = (
172+
streamLabels: any,
173+
timestamp: number,
174+
logLine: string,
175+
): IServerMessage | null => {
176+
try {
177+
const logData = JSON.parse(logLine);
178+
179+
// Try each parser in order
180+
if (logData.kind === "BlockFetchServer") {
181+
return parseBlockFetchServerLog(streamLabels, timestamp, logLine);
182+
}
183+
184+
if (logData.kind === "CompletedBlockFetch") {
185+
return parseCompletedBlockFetchLog(streamLabels, timestamp, logLine);
186+
}
187+
188+
if (logData.msg === "MsgBlock" && logData.direction === "Send") {
189+
return parseUpstreamNodeLog(streamLabels, timestamp, logLine);
190+
}
191+
} catch (error) {
192+
console.warn("Failed to parse log line:", logLine, error);
193+
}
249194

250-
if (events.length > 0) {
251-
dispatch({ type: "ADD_TIMELINE_EVENT_BATCH", payload: events });
195+
return null;
196+
};
197+
198+
function connectLokiWebSocket(lokiHost: string, dispatch: any): () => void {
199+
// NOTE: Single websocket is essential because:
200+
// 1. Timeline aggregation assumes events are chronologically ordered
201+
// 2. Multiple websockets deliver events out of order across different queries
202+
// 3. Loki naturally returns results in chronological order within a single stream
203+
// 4. Sorting large event arrays in the reducer is too expensive for dense simulation data
204+
const query =
205+
'{service="cardano-node"} |~ "SendBlock|MsgBlock|CompletedBlockFetch"';
206+
const wsUrl = `ws://${lokiHost}/loki/api/v1/tail?query=${encodeURIComponent(query)}&limit=5000`;
207+
console.log("Connecting to Loki:", wsUrl);
208+
209+
const ws = new WebSocket(wsUrl);
210+
211+
ws.onopen = () => {
212+
dispatch({ type: "SET_LOKI_CONNECTED", payload: true });
213+
};
214+
215+
let count = 0;
216+
ws.onmessage = (event) => {
217+
try {
218+
const data = JSON.parse(event.data);
219+
console.debug("Received Loki streams:", data);
220+
221+
if (data.streams && Array.isArray(data.streams)) {
222+
const events: IServerMessage[] = [];
223+
224+
data.streams.forEach((stream: any) => {
225+
console.debug("Stream labels:", stream.stream);
226+
if (stream.values && Array.isArray(stream.values)) {
227+
stream.values.forEach(([timestamp, logLine]: [string, string]) => {
228+
count++;
229+
console.debug(`Stream value:`, count, {
230+
timestamp,
231+
logLine,
232+
});
233+
const timestampSeconds = parseFloat(timestamp) / 1000000000;
234+
const event = parseCardanoNodeLog(
235+
stream.stream,
236+
timestampSeconds,
237+
logLine,
238+
);
239+
if (event) {
240+
console.warn("Parsed", event.time_s, event.message);
241+
events.push(event);
242+
}
243+
});
252244
}
245+
});
246+
247+
if (events.length > 0) {
248+
dispatch({ type: "ADD_TIMELINE_EVENT_BATCH", payload: events });
253249
}
254-
} catch (error) {
255-
console.error(
256-
`Error processing Loki message from query ${index}:`,
257-
error,
258-
);
259-
}
260-
};
261-
262-
ws.onerror = (error) => {
263-
console.error(`WebSocket error for query ${index}:`, error);
264-
connectedCount = 0;
265-
dispatch({ type: "SET_LOKI_CONNECTED", payload: false });
266-
};
267-
268-
ws.onclose = () => {
269-
connectedCount = Math.max(0, connectedCount - 1);
270-
if (connectedCount === 0) {
271-
dispatch({ type: "SET_LOKI_CONNECTED", payload: false });
272250
}
273-
};
274-
275-
return ws;
251+
} catch (error) {
252+
console.error("Error processing Loki message:", error);
253+
}
276254
};
277255

278-
try {
279-
QUERY_CONFIGS.forEach((config, index) => {
280-
websockets.push(createWebSocket(config, index));
281-
});
282-
} catch (error) {
283-
console.error("Failed to create WebSocket connections:", error);
256+
ws.onerror = (error) => {
257+
console.error("WebSocket error:", error);
284258
dispatch({ type: "SET_LOKI_CONNECTED", payload: false });
285-
}
259+
};
286260

287-
// Return cleanup function
288-
return () => {
289-
websockets.forEach((ws) => {
290-
if (ws) {
291-
ws.close();
292-
}
293-
});
261+
ws.onclose = () => {
262+
dispatch({ type: "SET_LOKI_CONNECTED", payload: false });
294263
};
264+
265+
return () => ws.close();
295266
}
296267

297268
export const useLokiWebSocket = () => {
@@ -306,7 +277,7 @@ export const useLokiWebSocket = () => {
306277

307278
dispatch({ type: "RESET_TIMELINE" });
308279

309-
cleanupRef.current = connectLokiWebSockets(lokiHost, dispatch);
280+
cleanupRef.current = connectLokiWebSocket(lokiHost, dispatch);
310281
};
311282

312283
const disconnect = () => {

0 commit comments

Comments
 (0)