1- import { useEffect , useState } from "react" ;
1+ import { useCallback , useEffect , useState } from "react" ;
2+ import useWebSocket , { ReadyState } from "react-use-websocket" ;
23
34import {
45 useCreateEventStream ,
@@ -10,74 +11,126 @@ import { useSnackbar } from "notistack";
1011import { getMessageFromEvent , protoBlobToText } from "../protobuf/protobuf" ;
1112import { EventMessage } from "./eventMessages/EventMessage" ;
1213
14+ // Helper function to get readable state name
15+ const getConnectionStatus = ( readyState : ReadyState ) => {
16+ const stateMap : Record < ReadyState , string > = {
17+ [ ReadyState . CONNECTING ] : "Connecting" ,
18+ [ ReadyState . OPEN ] : "Open" ,
19+ [ ReadyState . CLOSING ] : "Closing" ,
20+ [ ReadyState . CLOSED ] : "Closed" ,
21+ [ ReadyState . UNINSTANTIATED ] : "Uninstantiated" ,
22+ } ;
23+ return stateMap [ readyState ] ;
24+ } ;
25+
1326export const EventStream = ( ) => {
1427 const [ location , setLocation ] = useState < string | null > ( null ) ;
1528 const { enqueueSnackbar } = useSnackbar ( ) ;
1629
17- const { data, error } = useGetEventStream ( ) ;
30+ const { data, error : streamError } = useGetEventStream ( {
31+ query : { select : ( data ) => data . location } ,
32+ } ) ;
1833
1934 useEffect ( ( ) => {
20- if ( data ) {
21- setLocation ( data . location ) ;
22- }
35+ data && setLocation ( data ) ;
2336 } , [ data ] ) ;
37+
2438 const { mutate : createEventStream } = useCreateEventStream ( {
2539 mutation : {
26- onSuccess : ( data ) => {
27- setLocation ( data . location ) ;
40+ onSuccess : ( eventStreamResponse ) => {
41+ setLocation ( eventStreamResponse . location ) ;
2842 } ,
2943 } ,
3044 } ) ;
3145
3246 useEffect ( ( ) => {
33- if ( error ?. response ?. status === 404 ) {
47+ if ( streamError ?. response ?. status === 404 ) {
48+ console . log ( "EventStream: No active stream found, creating one..." ) ;
3449 createEventStream ( { data : { format : "JSON_STRING" } } ) ;
3550 }
36- } , [ error , createEventStream ] ) ;
51+ } , [ streamError , createEventStream ] ) ;
3752
38- useEffect ( ( ) => {
39- if ( location ) {
40- // Create WebSocket connection
41- const ws = new WebSocket ( location ) ;
53+ const handleWebSocketOpen = useCallback ( ( ) => {
54+ enqueueSnackbar ( "Connected to event stream" , {
55+ variant : "success" ,
56+ anchorOrigin : { horizontal : "right" , vertical : "bottom" } ,
57+ } ) ;
58+ } , [ enqueueSnackbar ] ) ;
4259
43- ws . addEventListener ( "open" , ( ) => {
44- enqueueSnackbar ( "Connected to event stream" , {
45- variant : "success" ,
46- anchorOrigin : { horizontal : "right" , vertical : "bottom" } ,
47- } ) ;
48- } ) ;
49-
50- ws . addEventListener ( "error" , ( ) => {
51- enqueueSnackbar ( "Failed to connect to event stream" , {
52- variant : "error" ,
60+ const handleWebSocketClose = useCallback (
61+ ( event : CloseEvent ) => {
62+ if ( event . wasClean ) {
63+ enqueueSnackbar ( "Disconnected from event stream" , {
64+ variant : "info" ,
5365 anchorOrigin : { horizontal : "right" , vertical : "bottom" } ,
5466 } ) ;
55- } ) ;
56-
57- ws . addEventListener ( "close" , ( ) => {
58- enqueueSnackbar ( "Disconnected from event stream" , {
67+ } else {
68+ console . warn (
69+ "EventStream: WebSocket closed unexpectedly. Reconnection attempts are handled by react-use-websocket." ,
70+ ) ;
71+ enqueueSnackbar ( "Event stream disconnected unexpectedly. Attempting to reconnect..." , {
5972 variant : "warning" ,
6073 anchorOrigin : { horizontal : "right" , vertical : "bottom" } ,
6174 } ) ;
62- } ) ;
63-
64- ws . addEventListener ( "message" , ( event ) => {
65- void protoBlobToText ( event . data ) . then ( ( data ) => {
66- const message = getMessageFromEvent ( data ) ;
67- message &&
68- enqueueSnackbar ( < EventMessage message = { message } /> , {
69- variant : "default" ,
75+ }
76+ } ,
77+ [ enqueueSnackbar ] ,
78+ ) ;
79+
80+ const handleWebSocketError = useCallback ( ( ) => {
81+ enqueueSnackbar ( "Event stream connection error. Reconnection attempts may follow." , {
82+ variant : "error" ,
83+ anchorOrigin : { horizontal : "right" , vertical : "bottom" } ,
84+ } ) ;
85+ } , [ enqueueSnackbar ] ) ;
86+
87+ const handleWebSocketMessage = useCallback (
88+ ( event : MessageEvent ) => {
89+ if ( event . data instanceof Blob ) {
90+ protoBlobToText ( event . data )
91+ . then ( ( textData ) => {
92+ const message = getMessageFromEvent ( textData ) ;
93+ if ( message ) {
94+ enqueueSnackbar ( < EventMessage message = { message } /> , {
95+ variant : "default" ,
96+ anchorOrigin : { horizontal : "right" , vertical : "bottom" } ,
97+ autoHideDuration : 10_000 ,
98+ } ) ;
99+ } else {
100+ console . warn (
101+ "Received event data could not be parsed into a known message type:" ,
102+ textData ,
103+ ) ;
104+ }
105+ } )
106+ . catch ( ( error ) => {
107+ console . error ( "Error processing protobuf message:" , error ) ;
108+ enqueueSnackbar ( "Error processing incoming event" , {
109+ variant : "error" ,
70110 anchorOrigin : { horizontal : "right" , vertical : "bottom" } ,
71- autoHideDuration : 100_000 ,
72111 } ) ;
73- } ) ;
74- } ) ;
112+ } ) ;
113+ } else {
114+ console . warn ( "Received non-Blob WebSocket message:" , event . data ) ;
115+ }
116+ } ,
117+ [ enqueueSnackbar ] ,
118+ ) ;
75119
76- return ( ) => {
77- ws . close ( ) ;
78- } ;
79- }
80- } , [ location , enqueueSnackbar ] ) ;
120+ const { readyState } = useWebSocket ( location , {
121+ onOpen : handleWebSocketOpen ,
122+ onClose : handleWebSocketClose ,
123+ onError : handleWebSocketError ,
124+ onMessage : handleWebSocketMessage ,
125+ shouldReconnect : ( ) => true ,
126+ retryOnError : true ,
127+ reconnectAttempts : 5 ,
128+ reconnectInterval : 3000 ,
129+ } ) ;
130+
131+ useEffect ( ( ) => {
132+ console . log ( `WebSocket Status: ${ getConnectionStatus ( readyState ) } ` ) ;
133+ } , [ readyState ] ) ;
81134
82135 return null ;
83136} ;
0 commit comments