Skip to content

Commit da814ce

Browse files
committed
Parse TxSent and TxReceived events from EB fetch requests
1 parent e70b1b6 commit da814ce

File tree

1 file changed

+124
-2
lines changed

1 file changed

+124
-2
lines changed

ui/src/components/Sim/hooks/useLokiWebSocket.ts

Lines changed: 124 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import {
66
IRankingBlockReceived,
77
IEndorserBlockSent,
88
IEndorserBlockReceived,
9+
ITransactionSent,
10+
ITransactionReceived,
911
} from "@/components/Sim/types";
1012
import { useRef } from "react";
1113

@@ -257,14 +259,132 @@ const parseEndorserBlockReceived = (
257259
return null;
258260
};
259261

262+
const parseTransactionSent = (
263+
streamLabels: any,
264+
timestamp: number,
265+
logLine: string,
266+
): IServerMessage | null => {
267+
try {
268+
const log = JSON.parse(logLine);
269+
270+
// TODO: indicate this is many transactions or visualize as a very big transaction
271+
272+
// From immdb-server (no ns)
273+
// {"at":"2025-12-05T14:06:12.4254Z","connectionId":"127.0.0.1:3001 127.0.0.1:3002","direction":"Send","msg":"MsgLeiosBlockTxs","mux_at":"2025-12-05T14:06:12.4254Z","prevCount":265}
274+
if (log.msg === "MsgLeiosBlockTxs" && log.direction === "Send") {
275+
const sender = streamLabels.process;
276+
const connectionId = log.connectionId;
277+
let recipient = "Node0";
278+
279+
if (connectionId) {
280+
const endpoints = connectionId.split(" ");
281+
if (endpoints.length === 2) {
282+
const recipientEndpoint = endpoints[1];
283+
recipient = HOST_PORT_TO_NODE[recipientEndpoint] || recipient;
284+
}
285+
}
286+
287+
const message: ITransactionSent = {
288+
type: EServerMessageType.TransactionSent,
289+
id: `tx-batch-${log.prevCount}`,
290+
sender,
291+
recipient,
292+
};
293+
294+
return {
295+
time_s: timestamp,
296+
message,
297+
};
298+
}
299+
300+
// From cardano-node ns=LeiosFetch.Remote.Send.BlockTxs
301+
// {"kind":"Send","msg":{"kind":"MsgLeiosBlockTxs","numTxs":30,"txs":"\u003celided\u003e","txsBytesSize":491520},"mux_at":"2025-12-05T14:06:12.52467535Z","peer":{"connectionId":"127.0.0.1:3002 127.0.0.1:3003"}}
302+
if (log.kind === "Send" && log.msg && log.msg.kind === "MsgLeiosBlockTxs") {
303+
const sender = streamLabels.process;
304+
const connectionId = log.peer?.connectionId;
305+
let recipient = "Node0";
306+
307+
if (connectionId) {
308+
const endpoints = connectionId.split(" ");
309+
if (endpoints.length === 2) {
310+
const recipientEndpoint = endpoints[1];
311+
recipient = HOST_PORT_TO_NODE[recipientEndpoint] || recipient;
312+
}
313+
}
314+
315+
const message: ITransactionSent = {
316+
type: EServerMessageType.TransactionSent,
317+
id: `tx-batch-${log.msg.txs}`, // FIXME: msg.txs is always elided
318+
sender,
319+
recipient,
320+
};
321+
322+
return {
323+
time_s: timestamp,
324+
message,
325+
};
326+
}
327+
} catch (error) {
328+
console.warn("Failed to parse TransactionSent log line:", logLine, error);
329+
}
330+
331+
return null;
332+
};
333+
334+
const parseTransactionReceived = (
335+
streamLabels: any,
336+
timestamp: number,
337+
logLine: string,
338+
): IServerMessage | null => {
339+
try {
340+
const log = JSON.parse(logLine);
341+
342+
// From cardano-node ns=LeiosFetch.Remote.Receive.BlockTxs
343+
// {"mux_at":"2025-12-05T14:06:12.52499731Z","peer":{"connectionId":"127.0.0.1:3003 127.0.0.1:3002"},"kind":"Recv","msg":{"txsBytesSize":491520,"kind":"MsgLeiosBlockTxs","numTxs":30,"txs":"\u003celided\u003e"}}
344+
if (log.kind === "Recv" && log.msg && log.msg.kind === "MsgLeiosBlockTxs") {
345+
const recipient = streamLabels.process;
346+
const connectionId = log.peer?.connectionId;
347+
let sender = "Node0";
348+
349+
if (connectionId) {
350+
const endpoints = connectionId.split(" ");
351+
if (endpoints.length === 2) {
352+
const senderEndpoint = endpoints[1];
353+
sender = HOST_PORT_TO_NODE[senderEndpoint] || sender;
354+
}
355+
}
356+
357+
const message: ITransactionReceived = {
358+
type: EServerMessageType.TransactionReceived,
359+
id: `tx-${log.msg.txs}`, // FIXME: msg.txs is always elided
360+
sender,
361+
recipient,
362+
};
363+
364+
return {
365+
time_s: timestamp,
366+
message,
367+
};
368+
}
369+
} catch (error) {
370+
console.warn(
371+
"Failed to parse TransactionReceived log line:",
372+
logLine,
373+
error,
374+
);
375+
}
376+
377+
return null;
378+
};
379+
260380
function connectLokiWebSocket(lokiHost: string, dispatch: any): () => void {
261381
// NOTE: Single websocket is essential because:
262382
// 1. Timeline aggregation assumes events are chronologically ordered
263383
// 2. Multiple websockets deliver events out of order across different queries
264384
// 3. Loki naturally returns results in chronological order within a single stream
265385
// 4. Sorting large event arrays in the reducer is too expensive for dense simulation data
266386
const query =
267-
'{service="cardano-node"} |~ "BlockFetchServer|MsgBlock|CompletedBlockFetch|MsgLeiosBlock"';
387+
'{service="cardano-node"} |~ "BlockFetchServer|MsgBlock|CompletedBlockFetch|MsgLeiosBlock|MsgLeiosBlockTxs"';
268388
const wsUrl = `ws://${lokiHost}/loki/api/v1/tail?query=${encodeURIComponent(query)}&limit=5000`;
269389
console.log("Connecting to Loki:", wsUrl);
270390

@@ -296,7 +416,9 @@ function connectLokiWebSocket(lokiHost: string, dispatch: any): () => void {
296416
parseRankingBlockSent(stream.stream, ts, logLine) ||
297417
parseRankingBlockReceived(stream.stream, ts, logLine) ||
298418
parseEndorserBlockSent(stream.stream, ts, logLine) ||
299-
parseEndorserBlockReceived(stream.stream, ts, logLine);
419+
parseEndorserBlockReceived(stream.stream, ts, logLine) ||
420+
parseTransactionSent(stream.stream, ts, logLine) ||
421+
parseTransactionReceived(stream.stream, ts, logLine);
300422
if (event) {
301423
console.warn("Parsed", event.time_s, event.message);
302424
events.push(event);

0 commit comments

Comments
 (0)