Skip to content

Commit 50d289f

Browse files
feat: add event stream history (#1691)
1 parent fd1ae2f commit 50d289f

File tree

11 files changed

+420
-101
lines changed

11 files changed

+420
-101
lines changed
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
import { Box, List, ListItem, Typography } from "@mui/material";
2+
import dayjs from "dayjs";
3+
import utc from "dayjs/plugin/utc";
4+
5+
import { DATE_FORMAT, TIME_FORMAT } from "../../constants/datetimes";
6+
import { useEventStream } from "../../state/eventStream";
7+
import { EventMessage } from "../eventMessages/EventMessage";
8+
9+
dayjs.extend(utc);
10+
11+
/**
12+
* Displays a list of events sorted by timestamp (newest first)
13+
*/
14+
export const EventList = () => {
15+
const { events } = useEventStream();
16+
17+
if (events.length === 0) {
18+
return (
19+
<Box sx={{ p: 2, textAlign: "center" }}>
20+
<Typography color="text.secondary" variant="body2">
21+
No events to display
22+
</Typography>
23+
</Box>
24+
);
25+
}
26+
27+
return (
28+
<Box sx={{ maxHeight: 400, overflow: "auto" }}>
29+
<Typography sx={{ p: 1, fontWeight: "bold" }} variant="subtitle2">
30+
Events ({events.length})
31+
</Typography>
32+
33+
<List dense>
34+
{events.map((event) => {
35+
const eventTime = dayjs.utc(event.timestamp);
36+
37+
return (
38+
<ListItem key={`${event.timestamp}-${event.ordinal}`} sx={{ mb: 1, borderRadius: 1 }}>
39+
<Box sx={{ width: "100%" }}>
40+
<Typography
41+
color="text.secondary"
42+
sx={{ display: "block", mb: 0.5 }}
43+
variant="caption"
44+
>
45+
{eventTime.local().format(TIME_FORMAT)}{eventTime.local().format(DATE_FORMAT)}
46+
</Typography>
47+
<EventMessage message={event} />
48+
</Box>
49+
</ListItem>
50+
);
51+
})}
52+
</List>
53+
</Box>
54+
);
55+
};

src/components/eventStream/EventStream.tsx

Lines changed: 74 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -6,34 +6,79 @@ import {
66
useGetEventStream,
77
} from "@squonk/account-server-client/event-stream";
88

9+
import dayjs from "dayjs";
10+
import utc from "dayjs/plugin/utc";
911
import { useAtom } from "jotai";
1012
import { useSnackbar } from "notistack";
1113

1214
import { useASAuthorizationStatus } from "../../hooks/useIsAuthorized";
13-
import { getMessageFromEvent, protoBlobToText } from "../../protobuf/protobuf";
14-
import { eventStreamEnabledAtom } from "../../state/eventStream";
15+
import { getMessageFromEvent } from "../../protobuf/protobuf";
16+
import {
17+
eventStreamEnabledAtom,
18+
useEventStream,
19+
webSocketStatusAtom,
20+
} from "../../state/eventStream";
21+
import { useUnreadEventCount } from "../../state/notifications";
1522
import { EventMessage } from "../eventMessages/EventMessage";
1623
import { useIsEventStreamInstalled } from "./useIsEventStreamInstalled";
1724

25+
dayjs.extend(utc);
26+
27+
/**
28+
* Builds WebSocket URL
29+
*/
30+
const buildWebSocketUrl = (location: string): string => {
31+
const url = new URL(location);
32+
url.protocol = "wss:";
33+
34+
// Add ordinal parameter to get all historical messages
35+
url.searchParams.set("stream_from_ordinal", "1");
36+
37+
return url.toString();
38+
};
39+
40+
/**
41+
* Manages WebSocket connection for event stream and displays toast notifications
42+
*/
1843
export const EventStream = () => {
1944
const isEventStreamInstalled = useIsEventStreamInstalled();
2045
const [location, setLocation] = useState<string | null>(null);
2146
const { enqueueSnackbar } = useSnackbar();
47+
const { incrementCount } = useUnreadEventCount();
2248
const asRole = useASAuthorizationStatus();
49+
const { addEvent, isEventNewerThanSession, initializeSession } = useEventStream();
2350

2451
const { data, error: streamError } = useGetEventStream({
2552
query: { select: (data) => data.location, enabled: !!asRole && isEventStreamInstalled },
2653
});
54+
2755
const { mutate: createEventStream } = useCreateEventStream({
28-
mutation: {
29-
onSuccess: (eventStreamResponse) => {
30-
setLocation(eventStreamResponse.location);
31-
},
32-
},
56+
mutation: { onSuccess: (eventStreamResponse) => setLocation(eventStreamResponse.location) },
3357
});
58+
3459
const [eventStreamEnabled] = useAtom(eventStreamEnabledAtom);
60+
const [, setWebSocketStatus] = useAtom(webSocketStatusAtom);
61+
62+
const handleWebSocketMessage = useCallback(
63+
(event: MessageEvent) => {
64+
const message = getMessageFromEvent(JSON.parse(event.data));
65+
66+
if (
67+
message &&
68+
addEvent(message) && // Only show toast for events newer than session start
69+
isEventNewerThanSession(message)
70+
) {
71+
enqueueSnackbar(<EventMessage message={message} />, {
72+
variant: "default",
73+
anchorOrigin: { horizontal: "right", vertical: "bottom" },
74+
autoHideDuration: 10_000,
75+
});
76+
incrementCount();
77+
}
78+
},
79+
[enqueueSnackbar, incrementCount, addEvent, isEventNewerThanSession],
80+
);
3581

36-
// Define callbacks *before* useWebSocket hook
3782
const handleWebSocketOpen = useCallback(() => {
3883
enqueueSnackbar("Connected to event stream", {
3984
variant: "success",
@@ -43,19 +88,14 @@ export const EventStream = () => {
4388

4489
const handleWebSocketClose = useCallback(
4590
(event: CloseEvent) => {
46-
console.log(event);
47-
if (event.wasClean) {
48-
enqueueSnackbar("Disconnected from event stream", {
49-
variant: "info",
50-
anchorOrigin: { horizontal: "right", vertical: "bottom" },
51-
});
52-
} else {
53-
console.warn("EventStream: WebSocket closed unexpectedly.");
54-
enqueueSnackbar("Event stream disconnected unexpectedly. Attempting to reconnect...", {
55-
variant: "warning",
56-
anchorOrigin: { horizontal: "right", vertical: "bottom" },
57-
});
58-
}
91+
const message = event.wasClean
92+
? "Disconnected from event stream"
93+
: "Event stream disconnected unexpectedly. Attempting to reconnect...";
94+
95+
enqueueSnackbar(message, {
96+
variant: event.wasClean ? "info" : "warning",
97+
anchorOrigin: { horizontal: "right", vertical: "bottom" },
98+
});
5999
},
60100
[enqueueSnackbar],
61101
);
@@ -67,42 +107,10 @@ export const EventStream = () => {
67107
});
68108
}, [enqueueSnackbar]);
69109

70-
const handleWebSocketMessage = useCallback(
71-
(event: MessageEvent) => {
72-
if (event.data instanceof Blob) {
73-
protoBlobToText(event.data)
74-
.then((textData) => {
75-
const message = getMessageFromEvent(textData);
76-
if (message) {
77-
enqueueSnackbar(<EventMessage message={message} />, {
78-
variant: "default",
79-
anchorOrigin: { horizontal: "right", vertical: "bottom" },
80-
autoHideDuration: 10_000,
81-
});
82-
} else {
83-
console.warn(
84-
"Received event data could not be parsed into a known message type:",
85-
textData,
86-
);
87-
}
88-
})
89-
.catch((error) => {
90-
console.error("Error processing protobuf message:", error);
91-
enqueueSnackbar("Error processing incoming event", {
92-
variant: "error",
93-
anchorOrigin: { horizontal: "right", vertical: "bottom" },
94-
});
95-
});
96-
} else {
97-
console.warn("Received non-Blob WebSocket message:", event.data);
98-
}
99-
},
100-
[enqueueSnackbar],
101-
);
102-
103-
const wsUrl = eventStreamEnabled && asRole ? (location?.replace("ws", "wss") ?? null) : null;
110+
// Build WebSocket URL
111+
const wsUrl = eventStreamEnabled && asRole && location ? buildWebSocketUrl(location) : null;
104112

105-
useWebSocket(wsUrl, {
113+
const { readyState } = useWebSocket(wsUrl, {
106114
onOpen: handleWebSocketOpen,
107115
onClose: handleWebSocketClose,
108116
onError: handleWebSocketError,
@@ -113,7 +121,11 @@ export const EventStream = () => {
113121
reconnectInterval: 3000,
114122
});
115123

116-
// Effects can now safely use the hook results or return early based on auth
124+
// Expose connection status for status indicator
125+
useEffect(() => {
126+
setWebSocketStatus(readyState);
127+
}, [readyState, setWebSocketStatus]);
128+
117129
useEffect(() => {
118130
if (asRole && data) {
119131
setLocation(data);
@@ -126,5 +138,10 @@ export const EventStream = () => {
126138
}
127139
}, [asRole, streamError, createEventStream]);
128140

141+
// Initialize session on client side only
142+
useEffect(() => {
143+
initializeSession();
144+
}, [initializeSession]);
145+
129146
return null;
130147
};
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
import { Box, Divider, Typography } from "@mui/material";
2+
3+
import { EventList } from "./EventList";
4+
import { EventStreamToggle } from "./EventStreamToggle";
5+
import { WebSocketStatusIndicator } from "./WebSocketStatusIndicator";
6+
7+
/**
8+
* Main event stream interface in the user menu popover
9+
*/
10+
export const EventStreamMessages = () => (
11+
<Box sx={{ minWidth: 300 }}>
12+
<Typography sx={{ mb: 2 }} variant="h6">
13+
Event Stream
14+
</Typography>
15+
16+
<Box sx={{ display: "flex", alignItems: "center", justifyContent: "space-between", mb: 1 }}>
17+
<EventStreamToggle />
18+
<WebSocketStatusIndicator />
19+
</Box>
20+
21+
<Divider sx={{ my: 2 }} />
22+
23+
<EventList />
24+
</Box>
25+
);

src/components/eventStream/EventStreamToggle.tsx

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,11 @@ export const EventStreamToggle = () => {
2323
/>
2424
}
2525
label={`Event stream ${isEventStreamInstalled ? "(alpha)" : "(not available)"}`}
26-
sx={{ mb: 2 }}
26+
sx={{
27+
margin: 0,
28+
alignItems: "center",
29+
"& .MuiFormControlLabel-label": { fontSize: "0.875rem", lineHeight: 1.2 },
30+
}}
2731
/>
2832
);
2933
};
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
import { FiberManualRecord } from "@mui/icons-material";
2+
import { Box, Tooltip, Typography } from "@mui/material";
3+
import { useAtom } from "jotai";
4+
5+
import { getWebSocketStatusFlags, webSocketStatusAtom } from "../../state/eventStream";
6+
7+
const STATUS_CONFIG = {
8+
connected: {
9+
color: "success.main",
10+
text: "Connected",
11+
tooltip: "Event stream is connected and receiving messages",
12+
},
13+
connecting: {
14+
color: "warning.main",
15+
text: "Connecting...",
16+
tooltip: "Connecting to event stream...",
17+
},
18+
reconnecting: {
19+
color: "warning.main",
20+
text: "Reconnecting...",
21+
tooltip: "Reconnecting to event stream...",
22+
},
23+
disconnected: {
24+
color: "error.main",
25+
text: "Disconnected",
26+
tooltip: "Event stream is disconnected",
27+
},
28+
} as const;
29+
30+
/**
31+
* WebSocket connection status indicator
32+
*/
33+
export const WebSocketStatusIndicator = () => {
34+
const [readyState] = useAtom(webSocketStatusAtom);
35+
const status = getWebSocketStatusFlags(readyState);
36+
37+
const getStatusKey = () => {
38+
if (status.isConnected) {
39+
return "connected";
40+
}
41+
if (status.isConnecting) {
42+
return "connecting";
43+
}
44+
if (status.isReconnecting) {
45+
return "reconnecting";
46+
}
47+
return "disconnected";
48+
};
49+
50+
const statusKey = getStatusKey();
51+
const config = STATUS_CONFIG[statusKey];
52+
53+
return (
54+
<Tooltip arrow title={config.tooltip}>
55+
<Box
56+
sx={{
57+
display: "flex",
58+
alignItems: "center",
59+
gap: 0.5,
60+
height: "fit-content",
61+
minHeight: "40px", // Match switch height
62+
justifyContent: "center",
63+
}}
64+
>
65+
<FiberManualRecord
66+
sx={{
67+
fontSize: 10,
68+
color: config.color,
69+
animation:
70+
status.isConnecting || status.isReconnecting
71+
? "pulse 1.5s ease-in-out infinite"
72+
: "none",
73+
"@keyframes pulse": {
74+
"0%": { opacity: 1 },
75+
"50%": { opacity: 0.5 },
76+
"100%": { opacity: 1 },
77+
},
78+
}}
79+
/>
80+
<Typography
81+
color="text.secondary"
82+
sx={{ fontSize: "0.75rem", lineHeight: 1.2, whiteSpace: "nowrap" }}
83+
variant="caption"
84+
>
85+
{config.text}
86+
</Typography>
87+
</Box>
88+
</Tooltip>
89+
);
90+
};

src/components/instances/JobDetails/JobInputSection/useGetJobInputs.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,6 @@ export const useGetJobInputs = (instance: InstanceGetResponse | InstanceSummary)
2727
{ query: { enabled: inputsEnabled, retry: instance.job_id === TEST_JOB_ID ? 1 : 3 } },
2828
);
2929

30-
console.log(instance);
31-
3230
// Parse application specification
3331
const applicationSpecification: ApplicationSpecification = instance.application_specification
3432
? JSON.parse(instance.application_specification)

0 commit comments

Comments
 (0)