Skip to content

Commit f923bef

Browse files
committed
Parse EBSent and re-organize parsers
1 parent a571a4c commit f923bef

File tree

1 file changed

+94
-82
lines changed

1 file changed

+94
-82
lines changed

ui/src/components/Sim/hooks/useLokiWebSocket.ts

Lines changed: 94 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import {
44
EServerMessageType,
55
IRankingBlockSent,
66
IRankingBlockReceived,
7+
IEndorserBlockSent,
78
} from "@/components/Sim/types";
89
import { useRef } from "react";
910

@@ -17,38 +18,33 @@ const HOST_PORT_TO_NODE: Record<string, string> = {
1718
// Add more mappings as needed
1819
};
1920

20-
const parseBlockFetchServerLog = (
21+
const parseRankingBlockSent = (
2122
streamLabels: any,
2223
timestamp: number,
2324
logLine: string,
2425
): IServerMessage | null => {
2526
try {
26-
const logData = JSON.parse(logLine);
27+
const log = JSON.parse(logLine);
2728

28-
// Handle BlockFetchServer kind
29-
if (logData.kind === "BlockFetchServer" && logData.peer && logData.block) {
30-
// Extract sender from stream labels (process name)
29+
// From cardano-node ns=BlockFetch.Server.SendBlock
30+
// {"block":"56515bfd5751ca2c1ca0f21050cdb1cd020e396c623a16a2274528f643d4b5fd","kind":"BlockFetchServer","peer":{"connectionId":"127.0.0.1:3002 127.0.0.1:3003"}}
31+
if (log.kind === "BlockFetchServer" && log.peer && log.block) {
3132
const sender = streamLabels.process;
32-
33-
// Parse connection to extract recipient
34-
// connectionId format: "127.0.0.1:3002 127.0.0.1:3003"
35-
const connectionId = logData.peer.connectionId;
36-
let recipient = "Node0"; // fallback
33+
const connectionId = log.peer.connectionId;
34+
let recipient = "Node0";
3735

3836
if (connectionId) {
39-
// Split connectionId to get both endpoints
4037
const endpoints = connectionId.split(" ");
4138
if (endpoints.length === 2) {
42-
// Second endpoint is the recipient
4339
const recipientEndpoint = endpoints[1];
4440
recipient = HOST_PORT_TO_NODE[recipientEndpoint] || recipient;
4541
}
4642
}
4743

4844
const message: IRankingBlockSent = {
4945
type: EServerMessageType.RBSent,
50-
slot: 0, // FIXME: Use proper slot number
51-
id: `rb-${logData.block.substring(0, 8)}`,
46+
slot: 0,
47+
id: `rb-${log.block.substring(0, 8)}`,
5248
sender,
5349
recipient,
5450
};
@@ -58,45 +54,26 @@ const parseBlockFetchServerLog = (
5854
message,
5955
};
6056
}
61-
} catch (error) {
62-
console.warn("Failed to parse BlockFetchServer log line:", logLine, error);
63-
}
6457

65-
return null;
66-
};
67-
68-
const parseUpstreamNodeLog = (
69-
streamLabels: any,
70-
timestamp: number,
71-
logLine: string,
72-
): IServerMessage | null => {
73-
try {
74-
const logData = JSON.parse(logLine);
75-
76-
// Handle MsgBlock with Send direction
77-
if (logData.msg === "MsgBlock" && logData.direction === "Send") {
78-
// Extract sender from stream labels (process name)
58+
// From immdb-server (no ns)
59+
// {"at":"2025-12-05T12:45:21.0021Z","connectionId":"127.0.0.1:3001 127.0.0.1:3002","direction":"Send","msg":"MsgBlock","mux_at":"2025-12-05T12:45:21.0020Z","prevCount":13}
60+
if (log.msg === "MsgBlock" && log.direction === "Send") {
7961
const sender = streamLabels.process;
80-
81-
// Parse connection to extract recipient
82-
// connectionId format: "127.0.0.1:3001 127.0.0.1:3002"
83-
const connectionId = logData.connectionId;
84-
let recipient = "Node0"; // fallback
62+
const connectionId = log.connectionId;
63+
let recipient = "Node0";
8564

8665
if (connectionId) {
87-
// Split connectionId to get both endpoints
8866
const endpoints = connectionId.split(" ");
8967
if (endpoints.length === 2) {
90-
// Second endpoint is the recipient
9168
const recipientEndpoint = endpoints[1];
9269
recipient = HOST_PORT_TO_NODE[recipientEndpoint] || recipient;
9370
}
9471
}
9572

9673
const message: IRankingBlockSent = {
9774
type: EServerMessageType.RBSent,
98-
slot: logData.prevCount || 0, // FIXME: Use proper slot number
99-
id: `rb-upstream-${logData.prevCount + 1}`, // FIXME: use proper block hash
75+
slot: log.prevCount || 0,
76+
id: `rb-upstream-${log.prevCount + 1}`,
10077
sender,
10178
recipient,
10279
};
@@ -107,35 +84,28 @@ const parseUpstreamNodeLog = (
10784
};
10885
}
10986
} catch (error) {
110-
console.warn("Failed to parse UpstreamNode log line:", logLine, error);
87+
console.warn("Failed to parse RankingBlockSent log line:", logLine, error);
11188
}
11289

11390
return null;
11491
};
11592

116-
const parseCompletedBlockFetchLog = (
93+
const parseRankingBlockReceived = (
11794
streamLabels: any,
11895
timestamp: number,
11996
logLine: string,
12097
): IServerMessage | null => {
12198
try {
122-
const logData = JSON.parse(logLine);
123-
124-
// Handle CompletedBlockFetch kind
125-
if (
126-
logData.kind === "CompletedBlockFetch" &&
127-
logData.peer &&
128-
logData.block
129-
) {
130-
// Extract recipient from stream labels (process name)
99+
const log = JSON.parse(logLine);
100+
101+
// ns=BlockFetch.Client.CompletedBlockFetch
102+
// {"block":"56515bfd5751ca2c1ca0f21050cdb1cd020e396c623a16a2274528f643d4b5fd","delay":4985924.003937032,"kind":"CompletedBlockFetch","peer":{"connectionId":"127.0.0.1:3003 127.0.0.1:3002"},"size":862}
103+
if (log.kind === "CompletedBlockFetch" && log.peer && log.block) {
131104
const recipient = streamLabels.process;
105+
const connectionId = log.peer.connectionId;
106+
let sender = "Node0";
132107

133-
// Parse connection to extract sender
134-
// connectionId format: "127.0.0.1:3003 127.0.0.1:3002"
135-
const connectionId = logData.peer.connectionId;
136-
let sender = "Node0"; // fallback
137108
if (connectionId) {
138-
// Split connectionId to get both endpoints
139109
const endpoints = connectionId.split(" ");
140110
if (endpoints.length === 2) {
141111
const senderEndpoint = endpoints[1];
@@ -145,8 +115,8 @@ const parseCompletedBlockFetchLog = (
145115

146116
const message: IRankingBlockReceived = {
147117
type: EServerMessageType.RBReceived,
148-
slot: 0, // FIXME: Use proper slot number
149-
id: `rb-${logData.block.substring(0, 8)}`,
118+
slot: 0, // FIXME: use proper slot
119+
id: `rb-${log.block.substring(0, 8)}`,
150120
sender,
151121
recipient,
152122
};
@@ -158,7 +128,7 @@ const parseCompletedBlockFetchLog = (
158128
}
159129
} catch (error) {
160130
console.warn(
161-
"Failed to parse CompletedBlockFetch log line:",
131+
"Failed to parse RankingBlockReceived log line:",
162132
logLine,
163133
error,
164134
);
@@ -167,29 +137,73 @@ const parseCompletedBlockFetchLog = (
167137
return null;
168138
};
169139

170-
// Combined parser that handles all event types
171-
const parseCardanoNodeLog = (
140+
const parseEndorserBlockSent = (
172141
streamLabels: any,
173142
timestamp: number,
174143
logLine: string,
175144
): IServerMessage | null => {
176145
try {
177-
const logData = JSON.parse(logLine);
146+
const log = JSON.parse(logLine);
178147

179-
// Try each parser in order
180-
if (logData.kind === "BlockFetchServer") {
181-
return parseBlockFetchServerLog(streamLabels, timestamp, logLine);
182-
}
148+
// From immdb-server (no ns)
149+
// {"at":"2025-12-05T12:45:20.9134Z","connectionId":"127.0.0.1:3001 127.0.0.1:3002","direction":"Send","msg":"MsgLeiosBlock","mux_at":"2025-12-05T12:45:20.9131Z","prevCount":0}
150+
if (log.msg === "MsgLeiosBlock" && log.direction === "Send") {
151+
const sender = streamLabels.process;
152+
const connectionId = log.connectionId;
153+
let recipient = "Node0";
154+
155+
if (connectionId) {
156+
const endpoints = connectionId.split(" ");
157+
if (endpoints.length === 2) {
158+
const recipientEndpoint = endpoints[1];
159+
recipient = HOST_PORT_TO_NODE[recipientEndpoint] || recipient;
160+
}
161+
}
162+
163+
const message: IEndorserBlockSent = {
164+
type: EServerMessageType.EBSent,
165+
slot: 0, // FIXME: use correct slot
166+
id: `eb-${log.prevCount || 0}`,
167+
sender,
168+
recipient,
169+
};
183170

184-
if (logData.kind === "CompletedBlockFetch") {
185-
return parseCompletedBlockFetchLog(streamLabels, timestamp, logLine);
171+
return {
172+
time_s: timestamp,
173+
message,
174+
};
186175
}
187176

188-
if (logData.msg === "MsgBlock" && logData.direction === "Send") {
189-
return parseUpstreamNodeLog(streamLabels, timestamp, logLine);
177+
// From cardano-node ns=LeiosFetch.Remote.Send.Block
178+
// {"kind":"Send","msg":{"eb":"\u003celided\u003e","ebBytesSize":27471,"kind":"MsgLeiosBlock"},"mux_at":"2025-12-05T12:45:20.93446848Z","peer":{"connectionId":"127.0.0.1:3002 127.0.0.1:3003"}}
179+
if (log.kind === "Send" && log.msg && log.msg.kind === "MsgLeiosBlock") {
180+
const sender = streamLabels.process;
181+
const connectionId = log.peer?.connectionId;
182+
let recipient = "Node0";
183+
184+
if (connectionId) {
185+
const endpoints = connectionId.split(" ");
186+
if (endpoints.length === 2) {
187+
const recipientEndpoint = endpoints[1];
188+
recipient = HOST_PORT_TO_NODE[recipientEndpoint] || recipient;
189+
}
190+
}
191+
192+
const message: IEndorserBlockSent = {
193+
type: EServerMessageType.EBSent,
194+
slot: 0, // FIXME: use correct slot
195+
id: `eb-${log.msg.eb}`, // FIXME: msg.eb is always elided
196+
sender,
197+
recipient,
198+
};
199+
200+
return {
201+
time_s: timestamp,
202+
message,
203+
};
190204
}
191205
} catch (error) {
192-
console.warn("Failed to parse log line:", logLine, error);
206+
console.warn("Failed to parse EndorserBlockSent log line:", logLine, error);
193207
}
194208

195209
return null;
@@ -202,7 +216,7 @@ function connectLokiWebSocket(lokiHost: string, dispatch: any): () => void {
202216
// 3. Loki naturally returns results in chronological order within a single stream
203217
// 4. Sorting large event arrays in the reducer is too expensive for dense simulation data
204218
const query =
205-
'{service="cardano-node"} |~ "SendBlock|MsgBlock|CompletedBlockFetch"';
219+
'{service="cardano-node"} |~ "SendBlock|MsgBlock|CompletedBlockFetch|MsgLeiosBlock"';
206220
const wsUrl = `ws://${lokiHost}/loki/api/v1/tail?query=${encodeURIComponent(query)}&limit=5000`;
207221
console.log("Connecting to Loki:", wsUrl);
208222

@@ -226,16 +240,14 @@ function connectLokiWebSocket(lokiHost: string, dispatch: any): () => void {
226240
if (stream.values && Array.isArray(stream.values)) {
227241
stream.values.forEach(([timestamp, logLine]: [string, string]) => {
228242
count++;
229-
console.debug(`Stream value:`, count, {
230-
timestamp,
231-
logLine,
232-
});
233-
const timestampSeconds = parseFloat(timestamp) / 1000000000;
234-
const event = parseCardanoNodeLog(
235-
stream.stream,
236-
timestampSeconds,
237-
logLine,
238-
);
243+
console.debug(`Stream value:`, count, { timestamp, logLine });
244+
const ts = parseFloat(timestamp) / 1000000000;
245+
246+
// TODO: simplify and push further upstream (e.g. into alloy)
247+
const event =
248+
parseRankingBlockSent(stream.stream, ts, logLine) ||
249+
parseRankingBlockReceived(stream.stream, ts, logLine) ||
250+
parseEndorserBlockSent(stream.stream, ts, logLine);
239251
if (event) {
240252
console.warn("Parsed", event.time_s, event.message);
241253
events.push(event);

0 commit comments

Comments
 (0)