diff --git a/package.json b/package.json index ff1d9038b..53f108157 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "mars-v2-frontend", - "version": "3.0.0", + "version": "3.0.1", "homepage": "./", "private": false, "license": "SEE LICENSE IN LICENSE FILE", diff --git a/src/components/header/navigation/mobile/MobileNavigation.tsx b/src/components/header/navigation/mobile/MobileNavigation.tsx index 90bfe23a1..ebf147ea6 100644 --- a/src/components/header/navigation/mobile/MobileNavigation.tsx +++ b/src/components/header/navigation/mobile/MobileNavigation.tsx @@ -65,7 +65,9 @@ function Content(props: Props & { account?: Account }) { const items: Array<{ value: string; label: string }> = [] for (const item of menu) { if (item.submenu) { - const filteredItems = item.submenu.filter((subItem) => !subItem.isSeparator) + const filteredItems = item.submenu.filter( + (subItem) => !subItem.isSeparator && !subItem.hideOnDesktop, + ) for (const subItem of filteredItems) { const value = subItem.page || subItem.externalUrl || '' items.push({ value, label: subItem.label }) diff --git a/src/components/trade/TradeChart/streaming.ts b/src/components/trade/TradeChart/streaming.ts index 9f1a4caa6..693cf7380 100644 --- a/src/components/trade/TradeChart/streaming.ts +++ b/src/components/trade/TradeChart/streaming.ts @@ -7,10 +7,17 @@ import { const streamingUrl = `${pythEndpoints.candles}/streaming` const channelToSubscription = new Map() +let isStreamingActive = false +let streamReader: ReadableStreamDefaultReader | null = null +let lastDataReceivedTime = Date.now() +let heartbeatInterval: NodeJS.Timeout | null = null function handleStreamingData(data: StreamData) { const { id, p, t } = data + // Update last data received time for heartbeat monitoring + lastDataReceivedTime = Date.now() + const tradePrice = p const tradeTime = t * 1000 // Multiplying by 1000 to get milliseconds @@ -49,11 +56,62 @@ function handleStreamingData(data: StreamData) { channelToSubscription.set(channelString, subscriptionItem) } +function startHeartbeatMonitor() { + // Clear existing interval if any + if (heartbeatInterval) { + clearInterval(heartbeatInterval) + } + + // Check every 30 seconds if we've received data recently + heartbeatInterval = setInterval(() => { + const timeSinceLastData = Date.now() - lastDataReceivedTime + // If no data received for 60 seconds and we have active subscriptions, restart stream + if (timeSinceLastData > 60000 && channelToSubscription.size > 0 && isStreamingActive) { + // console.log('No data received for 60 seconds, restarting stream...') + isStreamingActive = false + if (streamReader) { + streamReader.cancel() + streamReader = null + } + startStreaming() + } + }, 30000) +} + +function stopHeartbeatMonitor() { + if (heartbeatInterval) { + clearInterval(heartbeatInterval) + heartbeatInterval = null + } +} + function startStreaming(retries = 3, delay = 3000) { + // Prevent multiple concurrent streams + if (isStreamingActive) { + return + } + + // Cancel existing reader if any + if (streamReader) { + streamReader.cancel() + streamReader = null + } + + isStreamingActive = true + lastDataReceivedTime = Date.now() + + // Start heartbeat monitoring + startHeartbeatMonitor() + fetch(streamingUrl) .then((response) => { - if (response.body === null) return + if (response.body === null) { + isStreamingActive = false + stopHeartbeatMonitor() + return + } const reader = response.body.getReader() + streamReader = reader function streamData() { reader @@ -61,6 +119,13 @@ function startStreaming(retries = 3, delay = 3000) { .then(({ value, done }) => { if (done) { // console.error('Streaming ended.') + isStreamingActive = false + streamReader = null + stopHeartbeatMonitor() + // Reconnect when stream ends naturally + if (channelToSubscription.size > 0) { + attemptReconnect(retries, delay) + } return } @@ -83,6 +148,9 @@ function startStreaming(retries = 3, delay = 3000) { }) .catch((error) => { // console.error('Error reading from stream:', error) + isStreamingActive = false + streamReader = null + stopHeartbeatMonitor() attemptReconnect(retries, delay) }) } @@ -91,14 +159,20 @@ function startStreaming(retries = 3, delay = 3000) { }) .catch((error) => { // console.error('Error fetching from the streaming endpoint:', error) + isStreamingActive = false + streamReader = null + stopHeartbeatMonitor() + attemptReconnect(retries, delay) }) + function attemptReconnect(retriesLeft: number, delay: number) { - if (retriesLeft > 0) { + if (retriesLeft > 0 && channelToSubscription.size > 0) { setTimeout(() => { startStreaming(retriesLeft - 1, delay) }, delay) } else { // console.error('Maximum reconnection attempts reached.') + isStreamingActive = false } } } @@ -123,14 +197,27 @@ export function subscribeOnStream( callback: onRealtimeCallback, } let subscriptionItem = channelToSubscription.get(channelString) - subscriptionItem = { - subscriberUID, - resolution, - lastDailyBar, - handlers: [handler], + + if (subscriptionItem) { + // Add handler to existing subscription + subscriptionItem.handlers.push(handler) + subscriptionItem.lastDailyBar = lastDailyBar + } else { + // Create new subscription + subscriptionItem = { + subscriberUID, + resolution, + lastDailyBar, + handlers: [handler], + } } + channelToSubscription.set(channelString, subscriptionItem) - startStreaming() + + // Only start streaming if not already active + if (!isStreamingActive) { + startStreaming() + } } export function unsubscribeFromStream(subscriberUID: string) { @@ -141,8 +228,35 @@ export function unsubscribeFromStream(subscriberUID: string) { ) if (handlerIndex !== -1) { - channelToSubscription.delete(channelString) + // Remove the specific handler + subscriptionItem.handlers.splice(handlerIndex, 1) + + // If no more handlers, delete the subscription + if (subscriptionItem.handlers.length === 0) { + channelToSubscription.delete(channelString) + } break } } + + // If no more subscriptions, stop streaming + if (channelToSubscription.size === 0) { + if (streamReader) { + streamReader.cancel() + streamReader = null + } + isStreamingActive = false + stopHeartbeatMonitor() + } +} + +// Handle browser tab visibility changes to restart stream when tab becomes active +if (typeof document !== 'undefined') { + document.addEventListener('visibilitychange', () => { + if (!document.hidden && channelToSubscription.size > 0 && !isStreamingActive) { + // Tab became visible and we have subscriptions but streaming is not active + // Restart the stream + startStreaming() + } + }) }