Skip to content

Commit 80d1371

Browse files
mxiao-cllmmcallister-cllapp-token-issuer-data-feeds[bot]
authored
dxFeed handle multi messages (#4398)
* dxFeed handle multi messages * Update .changeset/witty-experts-exercise.md --------- Co-authored-by: mmcallister-cll <[email protected]> Co-authored-by: app-token-issuer-data-feeds[bot] <134377064+app-token-issuer-data-feeds[bot]@users.noreply.github.com>
1 parent c51cd1d commit 80d1371

File tree

7 files changed

+117
-59
lines changed

7 files changed

+117
-59
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@chainlink/dxfeed-adapter': patch
3+
---
4+
5+
Handle multiple tickers in one message

packages/sources/dxfeed/src/transport/price-ws.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,14 @@ const eventSymbolIndex = 0
55
const priceIndex = 6
66

77
export const transport = buildWsTransport<BaseEndpointTypes>(
8-
(params) => ({ Trade: [params.base.toUpperCase()] }),
8+
(params) => [{ Trade: [params.base.toUpperCase()] }],
99
(message) => {
1010
if (message[0].data[0] != 'Trade' && message[0].data[0][0] != 'Trade') {
1111
return []
1212
}
1313

14-
const base = message[0].data[1][eventSymbolIndex]
15-
const price = message[0].data[1][priceIndex]
14+
const base = message[0].data[1][eventSymbolIndex].toString()
15+
const price = Number(message[0].data[1][priceIndex])
1616

1717
return [
1818
{

packages/sources/dxfeed/src/transport/stock-quotes.ts

Lines changed: 24 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -14,52 +14,54 @@ const askSizeIndex = 11
1414
const dataLength = 12
1515

1616
export const transport = buildWsTransport<BaseEndpointTypes>(
17-
(params) => ({ Quote: [params.base.toUpperCase()] }),
17+
(params) => [{ Quote: [params.base.toUpperCase()] }],
1818
(message) => {
1919
if (message[0].data[0] != 'Quote' && message[0].data[0][0] != 'Quote') {
2020
return []
2121
}
2222

2323
const data = message[0].data[1]
2424

25-
if (data.length != dataLength) {
26-
logger.warn(`${JSON.stringify(data)} is invalid since it doesn't have ${dataLength} fields.`)
25+
if (data.length % dataLength != 0) {
26+
logger.warn(
27+
`${JSON.stringify(data)} is invalid since length is not multiple of ${dataLength}.`,
28+
)
2729
return []
2830
}
2931

30-
const bidPrice = Number(data[bidPriceIndex])
31-
const askPrice = Number(data[askPriceIndex])
32+
return Array.from({ length: data.length / dataLength }, (_, i) => i * dataLength).map((i) => {
33+
const bidPrice = Number(data[i + bidPriceIndex])
34+
const askPrice = Number(data[i + askPriceIndex])
3235

33-
let midPrice: number
36+
let midPrice: number
3437

35-
if (bidPrice == 0) {
36-
midPrice = askPrice
37-
} else if (askPrice == 0) {
38-
midPrice = bidPrice
39-
} else {
40-
midPrice = (askPrice + bidPrice) / 2
41-
}
38+
if (bidPrice == 0) {
39+
midPrice = askPrice
40+
} else if (askPrice == 0) {
41+
midPrice = bidPrice
42+
} else {
43+
midPrice = (askPrice + bidPrice) / 2
44+
}
4245

43-
return [
44-
{
45-
params: { base: data[eventSymbolIndex] },
46+
return {
47+
params: { base: data[i + eventSymbolIndex].toString() },
4648
response: {
4749
result: null,
4850
data: {
4951
mid_price: midPrice,
5052
bid_price: bidPrice,
51-
bid_volume: Number(data[bidSizeIndex]),
53+
bid_volume: Number(data[i + bidSizeIndex]),
5254
ask_price: askPrice,
53-
ask_volume: Number(data[askSizeIndex]),
55+
ask_volume: Number(data[i + askSizeIndex]),
5456
},
5557
timestamps: {
5658
providerIndicatedTimeUnixMs: Math.max(
57-
Number(data[bidTimeIndex]),
58-
Number(data[askTimeIndex]),
59+
Number(data[i + bidTimeIndex]),
60+
Number(data[i + askTimeIndex]),
5961
),
6062
},
6163
},
62-
},
63-
]
64+
}
65+
})
6466
},
6567
)

packages/sources/dxfeed/src/transport/ws.ts

Lines changed: 12 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -10,23 +10,7 @@ type DXFeedMessage = {
1010
channel: string
1111
clientId?: string
1212
id: string
13-
data: [
14-
string,
15-
[
16-
string,
17-
number,
18-
number,
19-
number,
20-
number,
21-
string,
22-
number,
23-
string,
24-
number,
25-
number,
26-
number,
27-
number,
28-
],
29-
]
13+
data: [string, (string | number)[]]
3014
successful?: boolean
3115
advice?: {
3216
interval: number
@@ -111,7 +95,7 @@ class DxFeedWebsocketTransport<T extends BaseTransportTypes> extends WebSocketTr
11195
export function buildWsTransport<T extends BaseTransportTypes>(
11296
formatTicker: (
11397
base: TypeFromDefinition<(T & ProviderTypes)['Parameters']>,
114-
) => Record<string, string[]>,
98+
) => Record<string, string[]>[],
11599
processMessage: (message: DXFeedMessage) => ProviderResult<T & ProviderTypes>[],
116100
): WebSocketTransport<T & ProviderTypes> {
117101
const wsTransport: DxFeedWebsocketTransport<T> = new DxFeedWebsocketTransport({
@@ -153,22 +137,18 @@ export function buildWsTransport<T extends BaseTransportTypes>(
153137

154138
builders: {
155139
subscribeMessage: (params) => {
156-
return [
157-
{
158-
channel: SERVICE_SUB,
159-
data: { add: formatTicker(params) },
160-
clientId: wsTransport.connectionClientId,
161-
},
162-
]
140+
return formatTicker(params).map((ticker) => ({
141+
channel: SERVICE_SUB,
142+
data: { add: ticker },
143+
clientId: wsTransport.connectionClientId,
144+
}))
163145
},
164146
unsubscribeMessage: (params) => {
165-
return [
166-
{
167-
channel: SERVICE_SUB,
168-
data: { remove: formatTicker(params) },
169-
clientId: wsTransport.connectionClientId,
170-
},
171-
]
147+
return formatTicker(params).map((ticker) => ({
148+
channel: SERVICE_SUB,
149+
data: { remove: ticker },
150+
clientId: wsTransport.connectionClientId,
151+
}))
172152
},
173153
},
174154
})

packages/sources/dxfeed/test/integration/__snapshots__/adapter-ws.test.ts.snap

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,44 @@ exports[`websocket quote endpoint should return success 1`] = `
6868
}
6969
`;
7070

71+
exports[`websocket quote endpoint should return success for 1st item in multi message 1`] = `
72+
{
73+
"data": {
74+
"ask_price": 8,
75+
"ask_volume": 9,
76+
"bid_price": 5,
77+
"bid_volume": 6,
78+
"mid_price": 6.5,
79+
},
80+
"result": null,
81+
"statusCode": 200,
82+
"timestamps": {
83+
"providerDataReceivedUnixMs": 1018,
84+
"providerDataStreamEstablishedUnixMs": 1010,
85+
"providerIndicatedTimeUnixMs": 7,
86+
},
87+
}
88+
`;
89+
90+
exports[`websocket quote endpoint should return success for 2nd item in multi message 1`] = `
91+
{
92+
"data": {
93+
"ask_price": 17,
94+
"ask_volume": 18,
95+
"bid_price": 14,
96+
"bid_volume": 15,
97+
"mid_price": 15.5,
98+
},
99+
"result": null,
100+
"statusCode": 200,
101+
"timestamps": {
102+
"providerDataReceivedUnixMs": 1018,
103+
"providerDataStreamEstablishedUnixMs": 1010,
104+
"providerIndicatedTimeUnixMs": 16,
105+
},
106+
}
107+
`;
108+
71109
exports[`websocket stock endpoint should return success 1`] = `
72110
{
73111
"data": {

packages/sources/dxfeed/test/integration/adapter-ws.test.ts

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,18 @@ describe('websocket', () => {
1919
base: 'TSLA',
2020
transport: 'ws',
2121
}
22-
2322
const quoteData = {
2423
base: 'TSLA',
2524
endpoint: 'stock_quotes',
2625
}
26+
const quoteMulti1Data = {
27+
base: 'MULTI_1',
28+
endpoint: 'stock_quotes',
29+
}
30+
const quoteMulti2Data = {
31+
base: 'MULTI_2',
32+
endpoint: 'stock_quotes',
33+
}
2734

2835
beforeAll(async () => {
2936
oldEnv = JSON.parse(JSON.stringify(process.env))
@@ -44,8 +51,10 @@ describe('websocket', () => {
4451

4552
// Send initial request to start background execute and wait for cache to be filled with result
4653
await testAdapter.request(quoteData)
54+
await testAdapter.request(quoteMulti1Data)
55+
await testAdapter.request(quoteMulti2Data)
4756
await testAdapter.request(stockData)
48-
await testAdapter.waitForCache(4)
57+
await testAdapter.waitForCache(6)
4958
})
5059

5160
afterAll(async () => {
@@ -68,6 +77,16 @@ describe('websocket', () => {
6877
expect(response.json()).toMatchSnapshot()
6978
})
7079

80+
it('should return success for 1st item in multi message', async () => {
81+
const response = await testAdapter.request(quoteMulti1Data)
82+
expect(response.json()).toMatchSnapshot()
83+
})
84+
85+
it('should return success for 2nd item in multi message', async () => {
86+
const response = await testAdapter.request(quoteMulti2Data)
87+
expect(response.json()).toMatchSnapshot()
88+
})
89+
7190
it('should return bid when ask is 0', async () => {
7291
const response = await testAdapter.request({
7392
base: 'NO_ASK',

packages/sources/dxfeed/test/integration/fixtures.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,19 @@ export const mockWebSocketServer = (URL: string): MockWebsocketServer => {
6363
},
6464
]
6565

66+
const multiQuoteReponse = [
67+
{
68+
data: [
69+
'Quote',
70+
[
71+
...['MULTI_1', 1, 2, 3, 4, 'V', 5, 6, 7, 'V', 8, 9],
72+
...['MULTI_2', 10, 11, 12, 13, 'V', 14, 15, 16, 'V', 17, 18],
73+
],
74+
],
75+
channel: '/service/data',
76+
},
77+
]
78+
6679
const noBidQuoteReponse = [
6780
{
6881
data: [
@@ -125,6 +138,7 @@ export const mockWebSocketServer = (URL: string): MockWebsocketServer => {
125138
)
126139
socket.on('message', () => {
127140
socket.send(JSON.stringify(quoteReponse))
141+
socket.send(JSON.stringify(multiQuoteReponse))
128142
socket.send(JSON.stringify(noBidQuoteReponse))
129143
socket.send(JSON.stringify(noAskQuoteReponse))
130144
socket.send(JSON.stringify(invalidQuoteReponse))

0 commit comments

Comments
 (0)