Skip to content

Commit 1cd2f81

Browse files
committed
chore: streamManagerV2 remove getStats function - canary
1 parent dbecb19 commit 1cd2f81

File tree

6 files changed

+120
-194
lines changed

6 files changed

+120
-194
lines changed

packages/sdk-ts/src/client/indexer/grpc_stream/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,5 +27,5 @@ export * from './streamV2/IndexerGrpcExplorerStreamV2.js'
2727
export * from './streamV2/IndexerGrpcDerivativesStreamV2.js'
2828
export * from './streamV2/IndexerGrpcAccountPortfolioStreamV2.js'
2929

30-
export * from './streamV2/StreamManager.js'
30+
export * from './streamV2/StreamManagerV2.js'
3131
export * from './streamV2/streamHelpersV2.js'

packages/sdk-ts/src/client/indexer/grpc_stream/streamV2/StreamManager.spec.ts renamed to packages/sdk-ts/src/client/indexer/grpc_stream/streamV2/StreamManagerV2.spec.ts

Lines changed: 2 additions & 140 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { Network, getNetworkEndpoints } from '@injectivelabs/networks'
22
import { it, vi, expect, describe, afterEach, beforeEach } from 'vitest'
3-
import { StreamManagerV2 } from './StreamManager.js'
3+
import { StreamManagerV2 } from './StreamManagerV2.js'
44
import { StreamState, StreamDisconnectReason } from '../../../../types/index.js'
55
import { IndexerGrpcDerivativesStreamV2 } from './IndexerGrpcDerivativesStreamV2.js'
66
import type { Mock } from 'vitest'
@@ -52,14 +52,6 @@ describe('StreamManagerV2', () => {
5252

5353
expect(manager.getId()).toBe('test-stream')
5454
expect(manager.getState()).toBe(StreamState.Idle)
55-
56-
const stats = manager.getStats()
57-
expect(stats.state).toBe(StreamState.Idle)
58-
expect(stats.connectCount).toBe(0)
59-
expect(stats.disconnectCount).toBe(0)
60-
expect(stats.retryCount).toBe(0)
61-
expect(stats.dataReceivedCount).toBe(0)
62-
expect(stats.errorCount).toBe(0)
6355
})
6456
})
6557

@@ -294,7 +286,7 @@ describe('StreamManagerV2', () => {
294286
expect(connectEvents).toHaveLength(1)
295287
expect(connectEvents[0]).toEqual({
296288
isReconnect: false,
297-
attempt: 1,
289+
attempt: 0,
298290
})
299291
})
300292

@@ -526,73 +518,6 @@ describe('StreamManagerV2', () => {
526518
})
527519
})
528520

529-
describe('statistics', () => {
530-
it('should track connection statistics', () => {
531-
const manager = new StreamManagerV2({
532-
id: 'test-stream',
533-
streamFactory,
534-
onData: onDataCallback,
535-
})
536-
537-
manager.start()
538-
manager.stop()
539-
540-
const stats = manager.getStats()
541-
expect(stats.connectCount).toBe(1)
542-
expect(stats.disconnectCount).toBe(1)
543-
expect(stats.connectedAt).toBeTruthy()
544-
expect(stats.disconnectedAt).toBeTruthy()
545-
})
546-
547-
it('should track data statistics', () => {
548-
const manager = new StreamManagerV2({
549-
id: 'test-stream',
550-
streamFactory,
551-
onData: onDataCallback,
552-
})
553-
554-
manager.start()
555-
556-
// Simulate multiple data events using proper emit pattern
557-
manager.emit('data', 'data1')
558-
manager.emit('data', 'data2')
559-
manager.emit('data', 'data3')
560-
561-
const stats = manager.getStats()
562-
expect(stats.dataReceivedCount).toBe(3)
563-
expect(stats.lastDataAt).toBeTruthy()
564-
expect(onDataCallback).toHaveBeenCalledTimes(3)
565-
})
566-
567-
it('should track retry statistics', () => {
568-
const manager = new StreamManagerV2({
569-
id: 'test-stream',
570-
streamFactory,
571-
onData: onDataCallback,
572-
retryConfig: {
573-
enabled: true,
574-
maxAttempts: 2,
575-
initialDelayMs: 100,
576-
maxDelayMs: 1000,
577-
backoffMultiplier: 2,
578-
persistent: false,
579-
},
580-
})
581-
582-
// Fail first attempt
583-
vi.mocked(streamFactory).mockImplementationOnce(() => {
584-
throw new Error('First failure')
585-
})
586-
587-
manager.start()
588-
vi.advanceTimersByTime(100) // Trigger retry
589-
590-
const stats = manager.getStats()
591-
expect(stats.retryCount).toBe(1)
592-
expect(stats.errorCount).toBe(1)
593-
})
594-
})
595-
596521
describe('edge cases', () => {
597522
it('should handle multiple start calls gracefully', () => {
598523
const manager = new StreamManagerV2({
@@ -721,15 +646,9 @@ describe('StreamManagerV2', () => {
721646
expect(manager.getState()).toBe(StreamState.Connected)
722647
expect(connectEvents).toHaveLength(1)
723648

724-
const stats = manager.getStats()
725-
expect(stats.connectCount).toBe(1)
726-
expect(stats.connectedAt).toBeTruthy()
727-
728649
// Data may or may not be received depending on market activity
729650
if (result === 'data') {
730651
expect(dataReceived.length).toBeGreaterThan(0)
731-
expect(stats.dataReceivedCount).toBeGreaterThan(0)
732-
expect(stats.lastDataAt).toBeTruthy()
733652
}
734653

735654
manager.stop()
@@ -794,63 +713,6 @@ describe('StreamManagerV2', () => {
794713
expect(disconnectEvents[0].willRetry).toBe(false)
795714
}, 10000)
796715

797-
it('should track statistics correctly with real stream', async () => {
798-
const endpoints = getNetworkEndpoints(Network.MainnetSentry)
799-
const derivativesStream = new IndexerGrpcDerivativesStreamV2(
800-
endpoints.indexer,
801-
)
802-
803-
let dataCount = 0
804-
805-
const manager = new StreamManagerV2({
806-
id: 'stats-test-stream',
807-
streamFactory: () =>
808-
derivativesStream.streamMarkets({
809-
callback: (response) => {
810-
manager.emit('data', response)
811-
},
812-
}),
813-
onData: () => {
814-
dataCount++
815-
},
816-
retryConfig: {
817-
enabled: false,
818-
},
819-
})
820-
821-
manager.start()
822-
823-
// Wait for connection
824-
await new Promise((resolve) => {
825-
const timeout = setTimeout(resolve, 3000)
826-
manager.on('connect', () => {
827-
clearTimeout(timeout)
828-
resolve('connected')
829-
})
830-
})
831-
832-
const stats = manager.getStats()
833-
834-
// Connection stats should always be present
835-
expect(stats.connectCount).toBe(1)
836-
expect(stats.disconnectCount).toBe(0)
837-
expect(stats.errorCount).toBe(0)
838-
expect(stats.retryCount).toBe(0)
839-
expect(stats.connectedAt).toBeTruthy()
840-
841-
// Data stats depend on whether market is active
842-
if (stats.dataReceivedCount > 0) {
843-
expect(dataCount).toBe(stats.dataReceivedCount)
844-
expect(stats.lastDataAt).toBeTruthy()
845-
}
846-
847-
manager.stop()
848-
849-
const finalStats = manager.getStats()
850-
expect(finalStats.disconnectCount).toBe(1)
851-
expect(finalStats.disconnectedAt).toBeTruthy()
852-
}, 8000)
853-
854716
it('should handle destroy() cleanup properly', async () => {
855717
const endpoints = getNetworkEndpoints(Network.MainnetSentry)
856718
const derivativesStream = new IndexerGrpcDerivativesStreamV2(

packages/sdk-ts/src/client/indexer/grpc_stream/streamV2/StreamManager.ts renamed to packages/sdk-ts/src/client/indexer/grpc_stream/streamV2/StreamManagerV2.ts

Lines changed: 6 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import {
55
StreamDisconnectReason,
66
} from '../../../../types/index.js'
77
import type {
8-
StreamStats,
98
StreamError,
109
StreamSubscription,
1110
StreamManagerConfig,
@@ -44,7 +43,6 @@ export class StreamManagerV2<TResponse> extends EventEmitter<
4443
private state: StreamState = StreamState.Idle
4544
private subscription: StreamSubscription | null = null
4645
private retryTimeoutId: NodeJS.Timeout | null = null
47-
private stats: StreamStats
4846
private retryAttempt: number = 0
4947

5048
constructor(config: StreamManagerConfig<TResponse>) {
@@ -57,20 +55,6 @@ export class StreamManagerV2<TResponse> extends EventEmitter<
5755
retryConfig: { ...DEFAULT_RETRY_CONFIG, ...config.retryConfig },
5856
}
5957

60-
// Initialize stream statistics
61-
this.stats = {
62-
state: StreamState.Idle,
63-
connectCount: 0,
64-
disconnectCount: 0,
65-
retryCount: 0,
66-
dataReceivedCount: 0,
67-
errorCount: 0,
68-
lastDataAt: null,
69-
createdAt: Date.now(),
70-
connectedAt: null,
71-
disconnectedAt: null,
72-
}
73-
7458
// Override emit to intercept 'data' events for stats tracking and user callback
7559
const originalEmit = this.emit.bind(this)
7660

@@ -104,14 +88,6 @@ export class StreamManagerV2<TResponse> extends EventEmitter<
10488
return this.config.id
10589
}
10690

107-
public getState(): StreamState {
108-
return this.state
109-
}
110-
111-
public getStats(): StreamStats {
112-
return { ...this.stats, state: this.state }
113-
}
114-
11591
/**
11692
* Destroy the stream manager and clean up all resources
11793
* Call this when the stream manager is no longer needed
@@ -121,11 +97,14 @@ export class StreamManagerV2<TResponse> extends EventEmitter<
12197
this.removeAllListeners()
12298
}
12399

100+
public getState(): StreamState {
101+
return this.state
102+
}
103+
124104
private updateState(newState: StreamState): void {
125105
const oldState = this.state
126106

127107
this.state = newState
128-
this.stats.state = newState
129108

130109
this.emit(StreamEvent.StateChange, { from: oldState, to: newState })
131110
}
@@ -180,7 +159,6 @@ export class StreamManagerV2<TResponse> extends EventEmitter<
180159

181160
this.retryTimeoutId = setTimeout(() => {
182161
this.retryAttempt++
183-
this.stats.retryCount++
184162

185163
this.emit(StreamEvent.Retry, {
186164
attempt: this.retryAttempt,
@@ -202,7 +180,6 @@ export class StreamManagerV2<TResponse> extends EventEmitter<
202180
private handleError(error: Error | StreamError | any): void {
203181
const errorInfo = this.extractErrorInfo(error)
204182

205-
this.stats.errorCount++
206183
this.emit(StreamEvent.Error, errorInfo)
207184

208185
// Map gRPC error code to appropriate disconnect reason
@@ -229,13 +206,10 @@ export class StreamManagerV2<TResponse> extends EventEmitter<
229206
}
230207

231208
/**
232-
* Handles incoming data - tracks stats and calls user callback
209+
* Handles incoming data - calls user callback
233210
* Called automatically when user emits 'data' event from streamFactory callback
234211
*/
235212
private handleData(response: TResponse): void {
236-
this.stats.dataReceivedCount++
237-
this.stats.lastDataAt = Date.now()
238-
239213
try {
240214
this.config.onData(response)
241215
} catch (error) {
@@ -246,22 +220,17 @@ export class StreamManagerV2<TResponse> extends EventEmitter<
246220
private handleConnected(isReconnect: boolean): void {
247221
this.updateState(StreamState.Connected)
248222
this.retryAttempt = 0
249-
this.stats.connectCount++
250-
this.stats.connectedAt = Date.now()
251223

252224
this.emit(StreamEvent.Connect, {
253225
isReconnect,
254-
attempt: this.stats.connectCount,
226+
attempt: 0,
255227
})
256228
}
257229

258230
private handleDisconnect(reason: StreamDisconnectReason): void {
259231
this.clearSubscription()
260232
this.clearRetryTimeout()
261233

262-
this.stats.disconnectCount++
263-
this.stats.disconnectedAt = Date.now()
264-
265234
// Determine if retry should be attempted based on disconnect reason
266235
const willRetry =
267236
reason !== StreamDisconnectReason.UserStopped &&

packages/sdk-ts/src/types/stream.ts

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -187,19 +187,3 @@ export interface StreamManagerEvents<TResponse> {
187187
message: string
188188
}
189189
}
190-
191-
/**
192-
* Stream statistics
193-
*/
194-
export interface StreamStats {
195-
state: StreamState
196-
connectCount: number
197-
disconnectCount: number
198-
retryCount: number
199-
dataReceivedCount: number
200-
errorCount: number
201-
lastDataAt: number | null
202-
createdAt: number
203-
connectedAt: number | null
204-
disconnectedAt: number | null
205-
}

packages/ts-types/src/common.ts

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,28 @@
1+
/**
2+
* @deprecated Use StreamStatusResponse from @injectivelabs/sdk-ts/types instead
3+
*/
4+
export interface StreamStatusResponse {
5+
details: string
6+
code: number
7+
metadata: any
8+
}
9+
10+
/**
11+
* @deprecated Use StreamOperation from @injectivelabs/sdk-ts/types instead
12+
*/
13+
export const StreamOperation = {
14+
Insert: 'insert',
15+
Delete: 'delete',
16+
Replace: 'replace',
17+
Update: 'update',
18+
Invalidate: 'invalidate',
19+
} as const
20+
21+
/**
22+
* @deprecated Use StreamOperation from @injectivelabs/sdk-ts/types instead
23+
*/
24+
export type StreamOperation =
25+
(typeof StreamOperation)[keyof typeof StreamOperation]
126
export interface PaginationOption {
227
key: string
328
offset?: number

0 commit comments

Comments
 (0)