Skip to content
Merged

v3.0.1 #1680

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "mars-v2-frontend",
"version": "3.0.0",
"version": "3.0.1",
"homepage": "./",
"private": false,
"license": "SEE LICENSE IN LICENSE FILE",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ function Content(props: Props & { account?: Account }) {
const items: Array<{ value: string; label: string }> = []
for (const item of menu) {
if (item.submenu) {
const filteredItems = item.submenu.filter((subItem) => !subItem.isSeparator)
const filteredItems = item.submenu.filter(
(subItem) => !subItem.isSeparator && !subItem.hideOnDesktop,
)
for (const subItem of filteredItems) {
const value = subItem.page || subItem.externalUrl || ''
items.push({ value, label: subItem.label })
Expand Down
132 changes: 123 additions & 9 deletions src/components/trade/TradeChart/streaming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,17 @@ import {

const streamingUrl = `${pythEndpoints.candles}/streaming`
const channelToSubscription = new Map()
let isStreamingActive = false
let streamReader: ReadableStreamDefaultReader<Uint8Array> | null = null
let lastDataReceivedTime = Date.now()
let heartbeatInterval: NodeJS.Timeout | null = null

function handleStreamingData(data: StreamData) {
const { id, p, t } = data

// Update last data received time for heartbeat monitoring
lastDataReceivedTime = Date.now()

const tradePrice = p
const tradeTime = t * 1000 // Multiplying by 1000 to get milliseconds

Expand Down Expand Up @@ -49,18 +56,76 @@ function handleStreamingData(data: StreamData) {
channelToSubscription.set(channelString, subscriptionItem)
}

function startHeartbeatMonitor() {
// Clear existing interval if any
if (heartbeatInterval) {
clearInterval(heartbeatInterval)
}

// Check every 30 seconds if we've received data recently
heartbeatInterval = setInterval(() => {
const timeSinceLastData = Date.now() - lastDataReceivedTime
// If no data received for 60 seconds and we have active subscriptions, restart stream
if (timeSinceLastData > 60000 && channelToSubscription.size > 0 && isStreamingActive) {
// console.log('No data received for 60 seconds, restarting stream...')
isStreamingActive = false
if (streamReader) {
streamReader.cancel()
streamReader = null
}
startStreaming()
}
}, 30000)
}

function stopHeartbeatMonitor() {
if (heartbeatInterval) {
clearInterval(heartbeatInterval)
heartbeatInterval = null
}
}

function startStreaming(retries = 3, delay = 3000) {
// Prevent multiple concurrent streams
if (isStreamingActive) {
return
}

// Cancel existing reader if any
if (streamReader) {
streamReader.cancel()
streamReader = null
}

isStreamingActive = true
lastDataReceivedTime = Date.now()

// Start heartbeat monitoring
startHeartbeatMonitor()

fetch(streamingUrl)
.then((response) => {
if (response.body === null) return
if (response.body === null) {
isStreamingActive = false
stopHeartbeatMonitor()
return
}
const reader = response.body.getReader()
streamReader = reader

function streamData() {
reader
.read()
.then(({ value, done }) => {
if (done) {
// console.error('Streaming ended.')
isStreamingActive = false
streamReader = null
stopHeartbeatMonitor()
// Reconnect when stream ends naturally
if (channelToSubscription.size > 0) {
attemptReconnect(retries, delay)
}
return
}

Expand All @@ -83,6 +148,9 @@ function startStreaming(retries = 3, delay = 3000) {
})
.catch((error) => {
// console.error('Error reading from stream:', error)
isStreamingActive = false
streamReader = null
stopHeartbeatMonitor()
attemptReconnect(retries, delay)
})
}
Expand All @@ -91,14 +159,20 @@ function startStreaming(retries = 3, delay = 3000) {
})
.catch((error) => {
// console.error('Error fetching from the streaming endpoint:', error)
isStreamingActive = false
streamReader = null
stopHeartbeatMonitor()
attemptReconnect(retries, delay)
})

function attemptReconnect(retriesLeft: number, delay: number) {
if (retriesLeft > 0) {
if (retriesLeft > 0 && channelToSubscription.size > 0) {
setTimeout(() => {
startStreaming(retriesLeft - 1, delay)
}, delay)
} else {
// console.error('Maximum reconnection attempts reached.')
isStreamingActive = false
}
}
}
Expand All @@ -123,14 +197,27 @@ export function subscribeOnStream(
callback: onRealtimeCallback,
}
let subscriptionItem = channelToSubscription.get(channelString)
subscriptionItem = {
subscriberUID,
resolution,
lastDailyBar,
handlers: [handler],

if (subscriptionItem) {
// Add handler to existing subscription
subscriptionItem.handlers.push(handler)
subscriptionItem.lastDailyBar = lastDailyBar
} else {
// Create new subscription
subscriptionItem = {
subscriberUID,
resolution,
lastDailyBar,
handlers: [handler],
}
}

channelToSubscription.set(channelString, subscriptionItem)
startStreaming()

// Only start streaming if not already active
if (!isStreamingActive) {
startStreaming()
}
}

export function unsubscribeFromStream(subscriberUID: string) {
Expand All @@ -141,8 +228,35 @@ export function unsubscribeFromStream(subscriberUID: string) {
)

if (handlerIndex !== -1) {
channelToSubscription.delete(channelString)
// Remove the specific handler
subscriptionItem.handlers.splice(handlerIndex, 1)

// If no more handlers, delete the subscription
if (subscriptionItem.handlers.length === 0) {
channelToSubscription.delete(channelString)
}
break
}
}

// If no more subscriptions, stop streaming
if (channelToSubscription.size === 0) {
if (streamReader) {
streamReader.cancel()
streamReader = null
}
isStreamingActive = false
stopHeartbeatMonitor()
}
}

// Handle browser tab visibility changes to restart stream when tab becomes active
if (typeof document !== 'undefined') {
document.addEventListener('visibilitychange', () => {
if (!document.hidden && channelToSubscription.size > 0 && !isStreamingActive) {
// Tab became visible and we have subscriptions but streaming is not active
// Restart the stream
startStreaming()
}
})
}
Loading