diff --git a/src/components/eventStream/EventList.tsx b/src/components/eventStream/EventList.tsx new file mode 100644 index 000000000..79fdf13ac --- /dev/null +++ b/src/components/eventStream/EventList.tsx @@ -0,0 +1,55 @@ +import { Box, List, ListItem, Typography } from "@mui/material"; +import dayjs from "dayjs"; +import utc from "dayjs/plugin/utc"; + +import { DATE_FORMAT, TIME_FORMAT } from "../../constants/datetimes"; +import { useEventStream } from "../../state/eventStream"; +import { EventMessage } from "../eventMessages/EventMessage"; + +dayjs.extend(utc); + +/** + * Displays a list of events sorted by timestamp (newest first) + */ +export const EventList = () => { + const { events } = useEventStream(); + + if (events.length === 0) { + return ( + + + No events to display + + + ); + } + + return ( + + + Events ({events.length}) + + + + {events.map((event) => { + const eventTime = dayjs.utc(event.timestamp); + + return ( + + + + {eventTime.local().format(TIME_FORMAT)} • {eventTime.local().format(DATE_FORMAT)} + + + + + ); + })} + + + ); +}; diff --git a/src/components/eventStream/EventStream.tsx b/src/components/eventStream/EventStream.tsx index cbfa1125f..331deace7 100644 --- a/src/components/eventStream/EventStream.tsx +++ b/src/components/eventStream/EventStream.tsx @@ -6,34 +6,79 @@ import { useGetEventStream, } from "@squonk/account-server-client/event-stream"; +import dayjs from "dayjs"; +import utc from "dayjs/plugin/utc"; import { useAtom } from "jotai"; import { useSnackbar } from "notistack"; import { useASAuthorizationStatus } from "../../hooks/useIsAuthorized"; -import { getMessageFromEvent, protoBlobToText } from "../../protobuf/protobuf"; -import { eventStreamEnabledAtom } from "../../state/eventStream"; +import { getMessageFromEvent } from "../../protobuf/protobuf"; +import { + eventStreamEnabledAtom, + useEventStream, + webSocketStatusAtom, +} from "../../state/eventStream"; +import { useUnreadEventCount } from "../../state/notifications"; import { EventMessage } from "../eventMessages/EventMessage"; import { useIsEventStreamInstalled } from "./useIsEventStreamInstalled"; +dayjs.extend(utc); + +/** + * Builds WebSocket URL + */ +const buildWebSocketUrl = (location: string): string => { + const url = new URL(location); + url.protocol = "wss:"; + + // Add ordinal parameter to get all historical messages + url.searchParams.set("stream_from_ordinal", "1"); + + return url.toString(); +}; + +/** + * Manages WebSocket connection for event stream and displays toast notifications + */ export const EventStream = () => { const isEventStreamInstalled = useIsEventStreamInstalled(); const [location, setLocation] = useState(null); const { enqueueSnackbar } = useSnackbar(); + const { incrementCount } = useUnreadEventCount(); const asRole = useASAuthorizationStatus(); + const { addEvent, isEventNewerThanSession, initializeSession } = useEventStream(); const { data, error: streamError } = useGetEventStream({ query: { select: (data) => data.location, enabled: !!asRole && isEventStreamInstalled }, }); + const { mutate: createEventStream } = useCreateEventStream({ - mutation: { - onSuccess: (eventStreamResponse) => { - setLocation(eventStreamResponse.location); - }, - }, + mutation: { onSuccess: (eventStreamResponse) => setLocation(eventStreamResponse.location) }, }); + const [eventStreamEnabled] = useAtom(eventStreamEnabledAtom); + const [, setWebSocketStatus] = useAtom(webSocketStatusAtom); + + const handleWebSocketMessage = useCallback( + (event: MessageEvent) => { + const message = getMessageFromEvent(JSON.parse(event.data)); + + if ( + message && + addEvent(message) && // Only show toast for events newer than session start + isEventNewerThanSession(message) + ) { + enqueueSnackbar(, { + variant: "default", + anchorOrigin: { horizontal: "right", vertical: "bottom" }, + autoHideDuration: 10_000, + }); + incrementCount(); + } + }, + [enqueueSnackbar, incrementCount, addEvent, isEventNewerThanSession], + ); - // Define callbacks *before* useWebSocket hook const handleWebSocketOpen = useCallback(() => { enqueueSnackbar("Connected to event stream", { variant: "success", @@ -43,19 +88,14 @@ export const EventStream = () => { const handleWebSocketClose = useCallback( (event: CloseEvent) => { - console.log(event); - if (event.wasClean) { - enqueueSnackbar("Disconnected from event stream", { - variant: "info", - anchorOrigin: { horizontal: "right", vertical: "bottom" }, - }); - } else { - console.warn("EventStream: WebSocket closed unexpectedly."); - enqueueSnackbar("Event stream disconnected unexpectedly. Attempting to reconnect...", { - variant: "warning", - anchorOrigin: { horizontal: "right", vertical: "bottom" }, - }); - } + const message = event.wasClean + ? "Disconnected from event stream" + : "Event stream disconnected unexpectedly. Attempting to reconnect..."; + + enqueueSnackbar(message, { + variant: event.wasClean ? "info" : "warning", + anchorOrigin: { horizontal: "right", vertical: "bottom" }, + }); }, [enqueueSnackbar], ); @@ -67,42 +107,10 @@ export const EventStream = () => { }); }, [enqueueSnackbar]); - const handleWebSocketMessage = useCallback( - (event: MessageEvent) => { - if (event.data instanceof Blob) { - protoBlobToText(event.data) - .then((textData) => { - const message = getMessageFromEvent(textData); - if (message) { - enqueueSnackbar(, { - variant: "default", - anchorOrigin: { horizontal: "right", vertical: "bottom" }, - autoHideDuration: 10_000, - }); - } else { - console.warn( - "Received event data could not be parsed into a known message type:", - textData, - ); - } - }) - .catch((error) => { - console.error("Error processing protobuf message:", error); - enqueueSnackbar("Error processing incoming event", { - variant: "error", - anchorOrigin: { horizontal: "right", vertical: "bottom" }, - }); - }); - } else { - console.warn("Received non-Blob WebSocket message:", event.data); - } - }, - [enqueueSnackbar], - ); - - const wsUrl = eventStreamEnabled && asRole ? (location?.replace("ws", "wss") ?? null) : null; + // Build WebSocket URL + const wsUrl = eventStreamEnabled && asRole && location ? buildWebSocketUrl(location) : null; - useWebSocket(wsUrl, { + const { readyState } = useWebSocket(wsUrl, { onOpen: handleWebSocketOpen, onClose: handleWebSocketClose, onError: handleWebSocketError, @@ -113,7 +121,11 @@ export const EventStream = () => { reconnectInterval: 3000, }); - // Effects can now safely use the hook results or return early based on auth + // Expose connection status for status indicator + useEffect(() => { + setWebSocketStatus(readyState); + }, [readyState, setWebSocketStatus]); + useEffect(() => { if (asRole && data) { setLocation(data); @@ -126,5 +138,10 @@ export const EventStream = () => { } }, [asRole, streamError, createEventStream]); + // Initialize session on client side only + useEffect(() => { + initializeSession(); + }, [initializeSession]); + return null; }; diff --git a/src/components/eventStream/EventStreamMessages.tsx b/src/components/eventStream/EventStreamMessages.tsx new file mode 100644 index 000000000..9f8d72ee7 --- /dev/null +++ b/src/components/eventStream/EventStreamMessages.tsx @@ -0,0 +1,25 @@ +import { Box, Divider, Typography } from "@mui/material"; + +import { EventList } from "./EventList"; +import { EventStreamToggle } from "./EventStreamToggle"; +import { WebSocketStatusIndicator } from "./WebSocketStatusIndicator"; + +/** + * Main event stream interface in the user menu popover + */ +export const EventStreamMessages = () => ( + + + Event Stream + + + + + + + + + + + +); diff --git a/src/components/eventStream/EventStreamToggle.tsx b/src/components/eventStream/EventStreamToggle.tsx index bca595953..77e8b7044 100644 --- a/src/components/eventStream/EventStreamToggle.tsx +++ b/src/components/eventStream/EventStreamToggle.tsx @@ -23,7 +23,11 @@ export const EventStreamToggle = () => { /> } label={`Event stream ${isEventStreamInstalled ? "(alpha)" : "(not available)"}`} - sx={{ mb: 2 }} + sx={{ + margin: 0, + alignItems: "center", + "& .MuiFormControlLabel-label": { fontSize: "0.875rem", lineHeight: 1.2 }, + }} /> ); }; diff --git a/src/components/eventStream/WebSocketStatusIndicator.tsx b/src/components/eventStream/WebSocketStatusIndicator.tsx new file mode 100644 index 000000000..4e4f0c9fb --- /dev/null +++ b/src/components/eventStream/WebSocketStatusIndicator.tsx @@ -0,0 +1,90 @@ +import { FiberManualRecord } from "@mui/icons-material"; +import { Box, Tooltip, Typography } from "@mui/material"; +import { useAtom } from "jotai"; + +import { getWebSocketStatusFlags, webSocketStatusAtom } from "../../state/eventStream"; + +const STATUS_CONFIG = { + connected: { + color: "success.main", + text: "Connected", + tooltip: "Event stream is connected and receiving messages", + }, + connecting: { + color: "warning.main", + text: "Connecting...", + tooltip: "Connecting to event stream...", + }, + reconnecting: { + color: "warning.main", + text: "Reconnecting...", + tooltip: "Reconnecting to event stream...", + }, + disconnected: { + color: "error.main", + text: "Disconnected", + tooltip: "Event stream is disconnected", + }, +} as const; + +/** + * WebSocket connection status indicator + */ +export const WebSocketStatusIndicator = () => { + const [readyState] = useAtom(webSocketStatusAtom); + const status = getWebSocketStatusFlags(readyState); + + const getStatusKey = () => { + if (status.isConnected) { + return "connected"; + } + if (status.isConnecting) { + return "connecting"; + } + if (status.isReconnecting) { + return "reconnecting"; + } + return "disconnected"; + }; + + const statusKey = getStatusKey(); + const config = STATUS_CONFIG[statusKey]; + + return ( + + + + + {config.text} + + + + ); +}; diff --git a/src/components/instances/JobDetails/JobInputSection/useGetJobInputs.ts b/src/components/instances/JobDetails/JobInputSection/useGetJobInputs.ts index b1dfa80ad..20ec97f02 100644 --- a/src/components/instances/JobDetails/JobInputSection/useGetJobInputs.ts +++ b/src/components/instances/JobDetails/JobInputSection/useGetJobInputs.ts @@ -27,8 +27,6 @@ export const useGetJobInputs = (instance: InstanceGetResponse | InstanceSummary) { query: { enabled: inputsEnabled, retry: instance.job_id === TEST_JOB_ID ? 1 : 3 } }, ); - console.log(instance); - // Parse application specification const applicationSpecification: ApplicationSpecification = instance.application_specification ? JSON.parse(instance.application_specification) diff --git a/src/layouts/navigation/UserMenu.tsx b/src/layouts/navigation/UserMenu.tsx index 400418525..d483e3971 100644 --- a/src/layouts/navigation/UserMenu.tsx +++ b/src/layouts/navigation/UserMenu.tsx @@ -1,8 +1,9 @@ import { AccountCircle as AccountCircleIcon } from "@mui/icons-material"; -import { Box, Fade, IconButton, Paper, Popper, Tooltip } from "@mui/material"; +import { Badge, Box, Fade, IconButton, Paper, Popper, Tooltip } from "@mui/material"; import { bindPopper, bindToggle, usePopupState } from "material-ui-popup-state/hooks"; import { useKeycloakUser } from "../../hooks/useKeycloakUser"; +import { useUnreadEventCount } from "../../state/notifications"; import { UserMenuContent } from "./UserMenuContent"; /** @@ -11,27 +12,39 @@ import { UserMenuContent } from "./UserMenuContent"; export const UserMenu = () => { const popupState = usePopupState({ variant: "popper", popupId: "user-menu" }); const { isLoading } = useKeycloakUser(); + const { count, resetCount } = useUnreadEventCount(); + + // Reset count when menu is opened + const handleToggle = bindToggle(popupState); + const handleMenuToggle = (event: React.MouseEvent) => { + if (!popupState.isOpen) { + resetCount(); + } + handleToggle.onClick(event); + }; return ( <> - - - + + + + + theme.zIndex.appBar + 1 }} {...bindPopper(popupState)} // anchorOrigin={{ vertical: "bottom", horizontal: "left" }} // transformOrigin={{ vertical: "top", horizontal: "left" }} diff --git a/src/layouts/navigation/UserMenuContent.tsx b/src/layouts/navigation/UserMenuContent.tsx index 00d4f5e71..87324a504 100644 --- a/src/layouts/navigation/UserMenuContent.tsx +++ b/src/layouts/navigation/UserMenuContent.tsx @@ -5,30 +5,10 @@ import { AuthButton } from "../../components/auth/AuthButton"; import { CenterLoader } from "../../components/CenterLoader"; import { Chips } from "../../components/Chips"; import { ColourSchemeSelection } from "../../components/ColourSchemeSelection"; -import { EventStreamToggle } from "../../components/eventStream/EventStreamToggle"; +import { EventStreamMessages } from "../../components/eventStream/EventStreamMessages"; import { useASAuthorizationStatus, useDMAuthorizationStatus } from "../../hooks/useIsAuthorized"; import { useKeycloakUser } from "../../hooks/useKeycloakUser"; -/** - * Content of the user menu - */ -export const UserMenuContent = () => { - const theme = useTheme(); - const biggerThanMd = useMediaQuery(theme.breakpoints.up("md")); - // Removed eventStreamEnabledAtom usage, now handled in EventStreamToggle - - return ( - - - Account - - - - - - ); -}; - const UserMenuContentInner = () => { const asRole = useASAuthorizationStatus(); const dmRole = useDMAuthorizationStatus(); @@ -62,7 +42,7 @@ const UserMenuContentInner = () => { {user.username} Roles: - + @@ -74,3 +54,23 @@ const UserMenuContentInner = () => { return ; }; + +/** + * Content of the user menu + */ +export const UserMenuContent = () => { + const theme = useTheme(); + const biggerThanMd = useMediaQuery(theme.breakpoints.up("md")); + // Removed eventStreamEnabledAtom usage, now handled in EventStreamToggle + + return ( + + + Account + + + + + + ); +}; diff --git a/src/protobuf/protobuf.ts b/src/protobuf/protobuf.ts index 5d2604295..1c8d15406 100644 --- a/src/protobuf/protobuf.ts +++ b/src/protobuf/protobuf.ts @@ -82,19 +82,24 @@ export const storageType = getPrefixedMessageNameFromSchema( ) as StorageTypeName; // --- End Runtime Constants --- -type ProcessingMessagePayload = { +interface MessageBase { + timestamp: string; + ordinal: number; +} + +interface ProcessingMessagePayload extends MessageBase { type: ProcessingTypeName; name: string; coins: string; product: string; -}; +} -type StorageMessagePayload = { +interface StorageMessagePayload extends MessageBase { type: StorageTypeName; name: string; bytes: string; reason: StorageReasonEnum; -}; +} // Discriminated union type representing the possible charge message payloads // derived from Protobuf messages (Processing or Storage). @@ -103,6 +108,8 @@ export type ChargeMessage = ProcessingMessagePayload | StorageMessagePayload; interface EventStreamMessage { message_type: string; message_body: any; + ess_timestamp: string; + ess_ordinal: number; } /** @@ -133,6 +140,8 @@ export const getMessageFromEvent = (event: EventStreamMessage): ChargeMessage | case processingType: { const parsed = fromJson(MerchantProcessingChargeMessageSchema, event.message_body); return { + timestamp: parsed.timestamp, + ordinal: event.ess_ordinal, type: processingType, name: parsed.name, coins: parsed.coins, @@ -141,7 +150,14 @@ export const getMessageFromEvent = (event: EventStreamMessage): ChargeMessage | } case storageType: { const parsed = fromJson(MerchantStorageChargeMessageSchema, event.message_body); - return { type: storageType, name: parsed.name, bytes: parsed.bytes, reason: parsed.reason }; + return { + timestamp: parsed.timestamp, + ordinal: event.ess_ordinal, + type: storageType, + name: parsed.name, + bytes: parsed.bytes, + reason: parsed.reason, + }; } default: return null; diff --git a/src/state/eventStream.ts b/src/state/eventStream.ts index 63056b8ee..1bae321f4 100644 --- a/src/state/eventStream.ts +++ b/src/state/eventStream.ts @@ -1,6 +1,89 @@ -import { atom } from "jotai"; +import { ReadyState } from "react-use-websocket"; + +import dayjs from "dayjs"; +import utc from "dayjs/plugin/utc"; +import { atom, useAtom } from "jotai"; + +import { type ChargeMessage } from "../protobuf/protobuf"; + +dayjs.extend(utc); + +/** + * Atom to store all events + */ +export const eventsAtom = atom([]); + +/** + * Atom to track when the user's session started + */ +export const sessionStartTimeAtom = atom(null); + +/** + * Atom to track WebSocket connection status + */ +export const webSocketStatusAtom = atom(ReadyState.CLOSED); + +/** + * Utility function to derive boolean status flags from ReadyState + */ +export const getWebSocketStatusFlags = (readyState: ReadyState) => ({ + isConnected: readyState === ReadyState.OPEN, + isConnecting: readyState === ReadyState.CONNECTING, + isDisconnected: readyState === ReadyState.CLOSED, + isReconnecting: readyState === ReadyState.CLOSING, +}); + +/** + * Hook to manage events with deduplication and sorting + */ +export const useEventStream = () => { + const [events, setEvents] = useAtom(eventsAtom); + const [sessionStartTime, setSessionStartTime] = useAtom(sessionStartTimeAtom); + + const addEvent = (event: ChargeMessage) => { + const eventId = `${event.timestamp}-${event.ordinal}`; + const existingEventIds = events.map((e) => `${e.timestamp}-${e.ordinal}`); + + if (existingEventIds.includes(eventId)) { + return false; + } + + const newEvents = [...events, event] + .sort((a, b) => dayjs.utc(b.timestamp).valueOf() - dayjs.utc(a.timestamp).valueOf()) + .slice(-100); // Keep last 100 events + + setEvents(newEvents); + return true; + }; + + const isEventNewerThanSession = (event: ChargeMessage) => { + if (!sessionStartTime) { + return false; + } + return dayjs.utc(event.timestamp).isAfter(sessionStartTime); + }; + + const initializeSession = () => { + if (!sessionStartTime) { + setSessionStartTime(dayjs.utc()); + } + }; + + const clearEvents = () => { + setEvents([]); + }; + + return { + events, + addEvent, + isEventNewerThanSession, + initializeSession, + clearEvents, + sessionStartTime, + }; +}; /** - * Atom to toggle the event stream functionality on or off. + * Atom to control event stream enablement */ -export const eventStreamEnabledAtom = atom(false); +export const eventStreamEnabledAtom = atom(true); diff --git a/src/state/notifications.ts b/src/state/notifications.ts new file mode 100644 index 000000000..14a602b84 --- /dev/null +++ b/src/state/notifications.ts @@ -0,0 +1,18 @@ +import { atom, useAtom } from "jotai"; + +/** + * Atom to track the number of unread event notifications + */ +export const unreadEventCountAtom = atom(0); + +/** + * Hook to access and update the unread event count + */ +export const useUnreadEventCount = () => { + const [count, setCount] = useAtom(unreadEventCountAtom); + + const incrementCount = () => setCount((prev) => prev + 1); + const resetCount = () => setCount(0); + + return { count, incrementCount, resetCount }; +};