Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions src/components/eventStream/EventList.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import { Box, List, ListItem, Typography } from "@mui/material";
import dayjs from "dayjs";
import utc from "dayjs/plugin/utc";

import { DATE_FORMAT, TIME_FORMAT } from "../../constants/datetimes";
import { useEventStream } from "../../state/eventStream";
import { EventMessage } from "../eventMessages/EventMessage";

dayjs.extend(utc);

/**
* Displays a list of events sorted by timestamp (newest first)
*/
export const EventList = () => {
const { events } = useEventStream();

if (events.length === 0) {
return (
<Box sx={{ p: 2, textAlign: "center" }}>
<Typography color="text.secondary" variant="body2">
No events to display
</Typography>
</Box>
);
}

return (
<Box sx={{ maxHeight: 400, overflow: "auto" }}>
<Typography sx={{ p: 1, fontWeight: "bold" }} variant="subtitle2">
Events ({events.length})
</Typography>

<List dense>
{events.map((event) => {
const eventTime = dayjs.utc(event.timestamp);

return (
<ListItem key={`${event.timestamp}-${event.ordinal}`} sx={{ mb: 1, borderRadius: 1 }}>
<Box sx={{ width: "100%" }}>
<Typography
color="text.secondary"
sx={{ display: "block", mb: 0.5 }}
variant="caption"
>
{eventTime.local().format(TIME_FORMAT)} • {eventTime.local().format(DATE_FORMAT)}
</Typography>
<EventMessage message={event} />
</Box>
</ListItem>
);
})}
</List>
</Box>
);
};
131 changes: 74 additions & 57 deletions src/components/eventStream/EventStream.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -6,34 +6,79 @@ import {
useGetEventStream,
} from "@squonk/account-server-client/event-stream";

import dayjs from "dayjs";
import utc from "dayjs/plugin/utc";
import { useAtom } from "jotai";
import { useSnackbar } from "notistack";

import { useASAuthorizationStatus } from "../../hooks/useIsAuthorized";
import { getMessageFromEvent, protoBlobToText } from "../../protobuf/protobuf";
import { eventStreamEnabledAtom } from "../../state/eventStream";
import { getMessageFromEvent } from "../../protobuf/protobuf";
import {
eventStreamEnabledAtom,
useEventStream,
webSocketStatusAtom,
} from "../../state/eventStream";
import { useUnreadEventCount } from "../../state/notifications";
import { EventMessage } from "../eventMessages/EventMessage";
import { useIsEventStreamInstalled } from "./useIsEventStreamInstalled";

dayjs.extend(utc);

/**
* Builds WebSocket URL
*/
const buildWebSocketUrl = (location: string): string => {
const url = new URL(location);
url.protocol = "wss:";

// Add ordinal parameter to get all historical messages
url.searchParams.set("stream_from_ordinal", "1");

return url.toString();
};

/**
* Manages WebSocket connection for event stream and displays toast notifications
*/
export const EventStream = () => {
const isEventStreamInstalled = useIsEventStreamInstalled();
const [location, setLocation] = useState<string | null>(null);
const { enqueueSnackbar } = useSnackbar();
const { incrementCount } = useUnreadEventCount();
const asRole = useASAuthorizationStatus();
const { addEvent, isEventNewerThanSession, initializeSession } = useEventStream();

const { data, error: streamError } = useGetEventStream({
query: { select: (data) => data.location, enabled: !!asRole && isEventStreamInstalled },
});

const { mutate: createEventStream } = useCreateEventStream({
mutation: {
onSuccess: (eventStreamResponse) => {
setLocation(eventStreamResponse.location);
},
},
mutation: { onSuccess: (eventStreamResponse) => setLocation(eventStreamResponse.location) },
});

const [eventStreamEnabled] = useAtom(eventStreamEnabledAtom);
const [, setWebSocketStatus] = useAtom(webSocketStatusAtom);

const handleWebSocketMessage = useCallback(
(event: MessageEvent) => {
const message = getMessageFromEvent(JSON.parse(event.data));

if (
message &&
addEvent(message) && // Only show toast for events newer than session start
isEventNewerThanSession(message)
) {
enqueueSnackbar(<EventMessage message={message} />, {
variant: "default",
anchorOrigin: { horizontal: "right", vertical: "bottom" },
autoHideDuration: 10_000,
});
incrementCount();
}
},
[enqueueSnackbar, incrementCount, addEvent, isEventNewerThanSession],
);

// Define callbacks *before* useWebSocket hook
const handleWebSocketOpen = useCallback(() => {
enqueueSnackbar("Connected to event stream", {
variant: "success",
Expand All @@ -43,19 +88,14 @@ export const EventStream = () => {

const handleWebSocketClose = useCallback(
(event: CloseEvent) => {
console.log(event);
if (event.wasClean) {
enqueueSnackbar("Disconnected from event stream", {
variant: "info",
anchorOrigin: { horizontal: "right", vertical: "bottom" },
});
} else {
console.warn("EventStream: WebSocket closed unexpectedly.");
enqueueSnackbar("Event stream disconnected unexpectedly. Attempting to reconnect...", {
variant: "warning",
anchorOrigin: { horizontal: "right", vertical: "bottom" },
});
}
const message = event.wasClean
? "Disconnected from event stream"
: "Event stream disconnected unexpectedly. Attempting to reconnect...";

enqueueSnackbar(message, {
variant: event.wasClean ? "info" : "warning",
anchorOrigin: { horizontal: "right", vertical: "bottom" },
});
},
[enqueueSnackbar],
);
Expand All @@ -67,42 +107,10 @@ export const EventStream = () => {
});
}, [enqueueSnackbar]);

const handleWebSocketMessage = useCallback(
(event: MessageEvent) => {
if (event.data instanceof Blob) {
protoBlobToText(event.data)
.then((textData) => {
const message = getMessageFromEvent(textData);
if (message) {
enqueueSnackbar(<EventMessage message={message} />, {
variant: "default",
anchorOrigin: { horizontal: "right", vertical: "bottom" },
autoHideDuration: 10_000,
});
} else {
console.warn(
"Received event data could not be parsed into a known message type:",
textData,
);
}
})
.catch((error) => {
console.error("Error processing protobuf message:", error);
enqueueSnackbar("Error processing incoming event", {
variant: "error",
anchorOrigin: { horizontal: "right", vertical: "bottom" },
});
});
} else {
console.warn("Received non-Blob WebSocket message:", event.data);
}
},
[enqueueSnackbar],
);

const wsUrl = eventStreamEnabled && asRole ? (location?.replace("ws", "wss") ?? null) : null;
// Build WebSocket URL
const wsUrl = eventStreamEnabled && asRole && location ? buildWebSocketUrl(location) : null;

useWebSocket(wsUrl, {
const { readyState } = useWebSocket(wsUrl, {
onOpen: handleWebSocketOpen,
onClose: handleWebSocketClose,
onError: handleWebSocketError,
Expand All @@ -113,7 +121,11 @@ export const EventStream = () => {
reconnectInterval: 3000,
});

// Effects can now safely use the hook results or return early based on auth
// Expose connection status for status indicator
useEffect(() => {
setWebSocketStatus(readyState);
}, [readyState, setWebSocketStatus]);

useEffect(() => {
if (asRole && data) {
setLocation(data);
Expand All @@ -126,5 +138,10 @@ export const EventStream = () => {
}
}, [asRole, streamError, createEventStream]);

// Initialize session on client side only
useEffect(() => {
initializeSession();
}, [initializeSession]);

return null;
};
25 changes: 25 additions & 0 deletions src/components/eventStream/EventStreamMessages.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { Box, Divider, Typography } from "@mui/material";

import { EventList } from "./EventList";
import { EventStreamToggle } from "./EventStreamToggle";
import { WebSocketStatusIndicator } from "./WebSocketStatusIndicator";

/**
* Main event stream interface in the user menu popover
*/
export const EventStreamMessages = () => (
<Box sx={{ minWidth: 300 }}>
<Typography sx={{ mb: 2 }} variant="h6">
Event Stream
</Typography>

<Box sx={{ display: "flex", alignItems: "center", justifyContent: "space-between", mb: 1 }}>
<EventStreamToggle />
<WebSocketStatusIndicator />
</Box>

<Divider sx={{ my: 2 }} />

<EventList />
</Box>
);
6 changes: 5 additions & 1 deletion src/components/eventStream/EventStreamToggle.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ export const EventStreamToggle = () => {
/>
}
label={`Event stream ${isEventStreamInstalled ? "(alpha)" : "(not available)"}`}
sx={{ mb: 2 }}
sx={{
margin: 0,
alignItems: "center",
"& .MuiFormControlLabel-label": { fontSize: "0.875rem", lineHeight: 1.2 },
}}
/>
);
};
90 changes: 90 additions & 0 deletions src/components/eventStream/WebSocketStatusIndicator.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import { FiberManualRecord } from "@mui/icons-material";
import { Box, Tooltip, Typography } from "@mui/material";
import { useAtom } from "jotai";

import { getWebSocketStatusFlags, webSocketStatusAtom } from "../../state/eventStream";

const STATUS_CONFIG = {
connected: {
color: "success.main",
text: "Connected",
tooltip: "Event stream is connected and receiving messages",
},
connecting: {
color: "warning.main",
text: "Connecting...",
tooltip: "Connecting to event stream...",
},
reconnecting: {
color: "warning.main",
text: "Reconnecting...",
tooltip: "Reconnecting to event stream...",
},
disconnected: {
color: "error.main",
text: "Disconnected",
tooltip: "Event stream is disconnected",
},
} as const;

/**
* WebSocket connection status indicator
*/
export const WebSocketStatusIndicator = () => {
const [readyState] = useAtom(webSocketStatusAtom);
const status = getWebSocketStatusFlags(readyState);

const getStatusKey = () => {
if (status.isConnected) {
return "connected";
}
if (status.isConnecting) {
return "connecting";
}
if (status.isReconnecting) {
return "reconnecting";
}
return "disconnected";
};

const statusKey = getStatusKey();
const config = STATUS_CONFIG[statusKey];

return (
<Tooltip arrow title={config.tooltip}>
<Box
sx={{
display: "flex",
alignItems: "center",
gap: 0.5,
height: "fit-content",
minHeight: "40px", // Match switch height
justifyContent: "center",
}}
>
<FiberManualRecord
sx={{
fontSize: 10,
color: config.color,
animation:
status.isConnecting || status.isReconnecting
? "pulse 1.5s ease-in-out infinite"
: "none",
"@keyframes pulse": {
"0%": { opacity: 1 },
"50%": { opacity: 0.5 },
"100%": { opacity: 1 },
},
}}
/>
<Typography
color="text.secondary"
sx={{ fontSize: "0.75rem", lineHeight: 1.2, whiteSpace: "nowrap" }}
variant="caption"
>
{config.text}
</Typography>
</Box>
</Tooltip>
);
};
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ export const useGetJobInputs = (instance: InstanceGetResponse | InstanceSummary)
{ query: { enabled: inputsEnabled, retry: instance.job_id === TEST_JOB_ID ? 1 : 3 } },
);

console.log(instance);

// Parse application specification
const applicationSpecification: ApplicationSpecification = instance.application_specification
? JSON.parse(instance.application_specification)
Expand Down
Loading