@@ -6,36 +6,70 @@ import {
66 useGetEventStream ,
77} from "@squonk/account-server-client/event-stream" ;
88
9+ import dayjs from "dayjs" ;
10+ import utc from "dayjs/plugin/utc" ;
911import { useAtom } from "jotai" ;
1012import { useSnackbar } from "notistack" ;
1113
1214import { useASAuthorizationStatus } from "../../hooks/useIsAuthorized" ;
13- import { getMessageFromEvent , protoBlobToText } from "../../protobuf/protobuf" ;
14- import { eventStreamEnabledAtom } from "../../state/eventStream" ;
15+ import { getMessageFromEvent } from "../../protobuf/protobuf" ;
16+ import { eventStreamEnabledAtom , useEventStream } from "../../state/eventStream" ;
1517import { useUnreadEventCount } from "../../state/notifications" ;
1618import { EventMessage } from "../eventMessages/EventMessage" ;
1719import { useIsEventStreamInstalled } from "./useIsEventStreamInstalled" ;
1820
21+ dayjs . extend ( utc ) ;
22+
23+ /**
24+ * Builds WebSocket URL
25+ */
26+ const buildWebSocketUrl = ( location : string ) : string => {
27+ const url = new URL ( location ) ;
28+ url . protocol = "wss:" ;
29+ return url . toString ( ) ;
30+ } ;
31+
32+ /**
33+ * Manages WebSocket connection for event stream and displays toast notifications
34+ */
1935export const EventStream = ( ) => {
2036 const isEventStreamInstalled = useIsEventStreamInstalled ( ) ;
2137 const [ location , setLocation ] = useState < string | null > ( null ) ;
2238 const { enqueueSnackbar } = useSnackbar ( ) ;
2339 const { incrementCount } = useUnreadEventCount ( ) ;
2440 const asRole = useASAuthorizationStatus ( ) ;
41+ const { addEvent, isEventNewerThanSession, initializeSession } = useEventStream ( ) ;
2542
2643 const { data, error : streamError } = useGetEventStream ( {
2744 query : { select : ( data ) => data . location , enabled : ! ! asRole && isEventStreamInstalled } ,
2845 } ) ;
46+
2947 const { mutate : createEventStream } = useCreateEventStream ( {
30- mutation : {
31- onSuccess : ( eventStreamResponse ) => {
32- setLocation ( eventStreamResponse . location ) ;
33- } ,
34- } ,
48+ mutation : { onSuccess : ( eventStreamResponse ) => setLocation ( eventStreamResponse . location ) } ,
3549 } ) ;
50+
3651 const [ eventStreamEnabled ] = useAtom ( eventStreamEnabledAtom ) ;
3752
38- // Define callbacks *before* useWebSocket hook
53+ const handleWebSocketMessage = useCallback (
54+ ( event : MessageEvent ) => {
55+ const message = getMessageFromEvent ( JSON . parse ( event . data ) ) ;
56+
57+ if (
58+ message &&
59+ addEvent ( message ) && // Only show toast for events newer than session start
60+ isEventNewerThanSession ( message )
61+ ) {
62+ enqueueSnackbar ( < EventMessage message = { message } /> , {
63+ variant : "default" ,
64+ anchorOrigin : { horizontal : "right" , vertical : "bottom" } ,
65+ autoHideDuration : 10_000 ,
66+ } ) ;
67+ incrementCount ( ) ;
68+ }
69+ } ,
70+ [ enqueueSnackbar , incrementCount , addEvent , isEventNewerThanSession ] ,
71+ ) ;
72+
3973 const handleWebSocketOpen = useCallback ( ( ) => {
4074 enqueueSnackbar ( "Connected to event stream" , {
4175 variant : "success" ,
@@ -45,19 +79,14 @@ export const EventStream = () => {
4579
4680 const handleWebSocketClose = useCallback (
4781 ( event : CloseEvent ) => {
48- console . log ( event ) ;
49- if ( event . wasClean ) {
50- enqueueSnackbar ( "Disconnected from event stream" , {
51- variant : "info" ,
52- anchorOrigin : { horizontal : "right" , vertical : "bottom" } ,
53- } ) ;
54- } else {
55- console . warn ( "EventStream: WebSocket closed unexpectedly." ) ;
56- enqueueSnackbar ( "Event stream disconnected unexpectedly. Attempting to reconnect..." , {
57- variant : "warning" ,
58- anchorOrigin : { horizontal : "right" , vertical : "bottom" } ,
59- } ) ;
60- }
82+ const message = event . wasClean
83+ ? "Disconnected from event stream"
84+ : "Event stream disconnected unexpectedly. Attempting to reconnect..." ;
85+
86+ enqueueSnackbar ( message , {
87+ variant : event . wasClean ? "info" : "warning" ,
88+ anchorOrigin : { horizontal : "right" , vertical : "bottom" } ,
89+ } ) ;
6190 } ,
6291 [ enqueueSnackbar ] ,
6392 ) ;
@@ -69,51 +98,8 @@ export const EventStream = () => {
6998 } ) ;
7099 } , [ enqueueSnackbar ] ) ;
71100
72- const handleWebSocketMessage = useCallback (
73- ( event : MessageEvent ) => {
74- console . log ( "message" ) ;
75- if ( event . data instanceof Blob ) {
76- protoBlobToText ( event . data )
77- . then ( ( textData ) => {
78- const message = getMessageFromEvent ( textData ) ;
79- if ( message ) {
80- incrementCount ( ) ;
81- enqueueSnackbar ( < EventMessage message = { message } /> , {
82- variant : "default" ,
83- anchorOrigin : { horizontal : "right" , vertical : "bottom" } ,
84- autoHideDuration : 10_000 ,
85- } ) ;
86- } else {
87- console . warn (
88- "Received event data could not be parsed into a known message type:" ,
89- textData ,
90- ) ;
91- }
92- } )
93- . catch ( ( error ) => {
94- console . error ( "Error processing protobuf message:" , error ) ;
95- enqueueSnackbar ( "Error processing incoming event" , {
96- variant : "error" ,
97- anchorOrigin : { horizontal : "right" , vertical : "bottom" } ,
98- } ) ;
99- } ) ;
100- } else {
101- console . warn ( "Received non-Blob WebSocket message:" , event . data ) ;
102- }
103- } ,
104- [ enqueueSnackbar , incrementCount ] ,
105- ) ;
106-
107- let wsUrl = null ;
108- if ( eventStreamEnabled && asRole && location ) {
109- const url = new URL ( location ) ;
110- url . protocol = "wss:" ;
111- url . search = new URLSearchParams ( {
112- // stream_from_timestamp: encodeURIComponent("2025-07-1T12:00:00Z"),
113- stream_from_ordinal : "1" ,
114- } ) . toString ( ) ;
115- wsUrl = url . toString ( ) ;
116- }
101+ // Build WebSocket URL
102+ const wsUrl = eventStreamEnabled && asRole && location ? buildWebSocketUrl ( location ) : null ;
117103
118104 useWebSocket ( wsUrl , {
119105 onOpen : handleWebSocketOpen ,
@@ -126,7 +112,6 @@ export const EventStream = () => {
126112 reconnectInterval : 3000 ,
127113 } ) ;
128114
129- // Effects can now safely use the hook results or return early based on auth
130115 useEffect ( ( ) => {
131116 if ( asRole && data ) {
132117 setLocation ( data ) ;
@@ -139,5 +124,10 @@ export const EventStream = () => {
139124 }
140125 } , [ asRole , streamError , createEventStream ] ) ;
141126
127+ // Initialize session on client side only
128+ useEffect ( ( ) => {
129+ initializeSession ( ) ;
130+ } , [ initializeSession ] ) ;
131+
142132 return null ;
143133} ;
0 commit comments