Skip to content

Commit a84ca7f

Browse files
authored
Aleno: resubscribe after reconnect (#3819)
1 parent 0612124 commit a84ca7f

File tree

3 files changed

+55
-4
lines changed

3 files changed

+55
-4
lines changed

.changeset/pretty-goats-add.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@chainlink/aleno-adapter': patch
3+
---
4+
5+
Bug fix: Recreate subscriptions after reconnecting

packages/sources/aleno/src/transport/price-socketio.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
1-
import { io, Socket } from 'socket.io-client'
1+
import { EndpointContext } from '@chainlink/external-adapter-framework/adapter'
22
import { TransportDependencies } from '@chainlink/external-adapter-framework/transports'
3-
import { makeLogger, sleep } from '@chainlink/external-adapter-framework/util'
43
import {
54
StreamingTransport,
65
SubscriptionDeltas,
76
} from '@chainlink/external-adapter-framework/transports/abstract/streaming'
7+
import { makeLogger, sleep } from '@chainlink/external-adapter-framework/util'
88
import { TypeFromDefinition } from '@chainlink/external-adapter-framework/validation/input-params'
9-
import { EndpointContext } from '@chainlink/external-adapter-framework/adapter'
10-
import { BaseEndpointTypes } from '../endpoint/price'
9+
import { io, Socket } from 'socket.io-client'
1110
import { config } from '../config'
11+
import { BaseEndpointTypes } from '../endpoint/price'
1212

1313
const logger = makeLogger('SocketIOTransport')
1414

@@ -220,6 +220,7 @@ export class SocketIOTransport extends StreamingTransport<SocketIOTransportTypes
220220

221221
this.socket.on('connect', () => {
222222
logger.info({ msg: 'Connection open' })
223+
this.confirmedSubscriptions = new Set<string>()
223224
})
224225

225226
this.socket.on('disconnect', (reason, details) => {

packages/sources/aleno/test/unit/adapter-socket.test.ts

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,51 @@ describe('SocketIOTransport', () => {
135135
expect(subscribeSpy).toBeCalledTimes(1)
136136
})
137137

138+
it('should resubscribe after reconnect', async () => {
139+
const subscribeCalls: SubscribeCall[] = []
140+
const subscribeSpy = jest
141+
.fn()
142+
.mockImplementation((subscriptions: string[], callback: SubscribeCallback) => {
143+
subscribeCalls.push({ subscriptions, callback })
144+
})
145+
mockSocket.clientMock.on('subscribe', subscribeSpy)
146+
147+
const transport = new SocketIOTransport()
148+
149+
transport.streamHandler(context, emptySubscriptions)
150+
await jest.advanceTimersByTimeAsync(BACKGROUND_EXECUTE_MS_SSE)
151+
152+
mockSocket.clientMock.emit('connect')
153+
154+
const subscriptions = [{ base: 'FRAC', quote: 'USD' }]
155+
const streamHandlerPromise = transport.streamHandler(context, {
156+
desired: subscriptions,
157+
new: subscriptions,
158+
stale: [],
159+
})
160+
161+
expect(subscribeSpy).toBeCalledTimes(1)
162+
expect(subscribeSpy).toBeCalledWith(['FRAC/USD'], expect.any(Function))
163+
164+
subscribeCalls[0].callback({
165+
status: 'ok',
166+
involvedSubscriptions: ['frac/usd'],
167+
subscriptionsAfterUpdate: ['frac/usd'],
168+
})
169+
await jest.advanceTimersByTimeAsync(BACKGROUND_EXECUTE_MS_SSE)
170+
await streamHandlerPromise
171+
172+
// Simulate a reconnect
173+
mockSocket.clientMock.emit('connect')
174+
175+
transport.streamHandler(context, { desired: subscriptions, new: [], stale: [] })
176+
await jest.advanceTimersByTimeAsync(BACKGROUND_EXECUTE_MS_SSE)
177+
178+
// Resubscribed
179+
expect(subscribeSpy).toBeCalledTimes(2)
180+
expect(subscribeSpy).toHaveBeenNthCalledWith(2, ['FRAC/USD'], expect.any(Function))
181+
})
182+
138183
it('should unsubscribe', async () => {
139184
const subscribeCalls: SubscribeCall[] = []
140185
const subscribeSpy = jest.fn().mockImplementation((subscriptions, callback) => {

0 commit comments

Comments
 (0)