Skip to content

Commit 9ac6174

Browse files
authored
add binary option to subscribe to binary vaas (#444)
* add binary option to subscrube to binary vaas * change property name * combine verbosity and binary to a single config * add tests
1 parent 6b07ae6 commit 9ac6174

File tree

2 files changed

+126
-25
lines changed

2 files changed

+126
-25
lines changed

third_party/pyth/price-service/src/__tests__/ws.test.ts

Lines changed: 83 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import { HexString, PriceFeed } from "@pythnetwork/pyth-sdk-js";
22
import { Server } from "http";
3-
import { number } from "joi";
43
import { WebSocket, WebSocketServer } from "ws";
54
import { sleep } from "../helpers";
65
import { PriceInfo, PriceStore } from "../listen";
@@ -250,6 +249,89 @@ describe("Client receives data", () => {
250249
await waitForSocketState(client, client.CLOSED);
251250
});
252251

252+
test("When subscribes with valid ids and binary flag set to true, returns correct price feed with vaa", async () => {
253+
const [client, serverMessages] = await createSocketClient();
254+
255+
const message: ClientMessage = {
256+
ids: [priceInfos[0].priceFeed.id, priceInfos[1].priceFeed.id],
257+
type: "subscribe",
258+
binary: true,
259+
};
260+
261+
client.send(JSON.stringify(message));
262+
263+
await waitForMessages(serverMessages, 1);
264+
265+
expect(serverMessages[0]).toStrictEqual({
266+
type: "response",
267+
status: "success",
268+
});
269+
270+
api.dispatchPriceFeedUpdate(priceInfos[0]);
271+
272+
await waitForMessages(serverMessages, 2);
273+
274+
expect(serverMessages[1]).toEqual({
275+
type: "price_update",
276+
price_feed: priceInfos[0].priceFeed.toJson(),
277+
});
278+
279+
api.dispatchPriceFeedUpdate(priceInfos[1]);
280+
281+
await waitForMessages(serverMessages, 3);
282+
283+
expect(serverMessages[2]).toEqual({
284+
type: "price_update",
285+
price_feed: {
286+
...priceInfos[1].priceFeed.toJson(),
287+
vaa: priceInfos[1].vaa.toString("base64"),
288+
},
289+
});
290+
291+
client.close();
292+
await waitForSocketState(client, client.CLOSED);
293+
});
294+
295+
test("When subscribes with valid ids and binary flag set to false, returns correct price feed without vaa", async () => {
296+
const [client, serverMessages] = await createSocketClient();
297+
298+
const message: ClientMessage = {
299+
ids: [priceInfos[0].priceFeed.id, priceInfos[1].priceFeed.id],
300+
type: "subscribe",
301+
binary: false,
302+
};
303+
304+
client.send(JSON.stringify(message));
305+
306+
await waitForMessages(serverMessages, 1);
307+
308+
expect(serverMessages[0]).toStrictEqual({
309+
type: "response",
310+
status: "success",
311+
});
312+
313+
api.dispatchPriceFeedUpdate(priceInfos[0]);
314+
315+
await waitForMessages(serverMessages, 2);
316+
317+
expect(serverMessages[1]).toEqual({
318+
type: "price_update",
319+
price_feed: priceInfos[0].priceFeed.toJson(),
320+
});
321+
322+
api.dispatchPriceFeedUpdate(priceInfos[1]);
323+
324+
await waitForMessages(serverMessages, 3);
325+
326+
expect(serverMessages[2]).toEqual({
327+
type: "price_update",
328+
price_feed: priceInfos[1].priceFeed.toJson(),
329+
});
330+
331+
client.close();
332+
await waitForSocketState(client, client.CLOSED);
333+
});
334+
253335
test("When subscribes with invalid ids, returns error", async () => {
254336
const [client, serverMessages] = await createSocketClient();
255337

third_party/pyth/price-service/src/ws.ts

Lines changed: 43 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,14 @@ const ClientMessageSchema: Joi.Schema = Joi.object({
1212
.items(Joi.string().regex(/^(0x)?[a-f0-9]{64}$/))
1313
.required(),
1414
verbose: Joi.boolean(),
15+
binary: Joi.boolean(),
1516
}).required();
1617

1718
export type ClientMessage = {
1819
type: "subscribe" | "unsubscribe";
1920
ids: HexString[];
2021
verbose?: boolean;
22+
binary?: boolean;
2123
};
2224

2325
export type ServerResponse = {
@@ -31,12 +33,20 @@ export type ServerPriceUpdate = {
3133
price_feed: any;
3234
};
3335

36+
export type PriceFeedConfig = {
37+
verbose: boolean;
38+
binary: boolean;
39+
};
40+
3441
export type ServerMessage = ServerResponse | ServerPriceUpdate;
3542

3643
export class WebSocketAPI {
3744
private wsCounter: number;
3845
private priceFeedClients: Map<HexString, Set<WebSocket>>;
39-
private priceFeedClientsVerbosity: Map<HexString, Map<WebSocket, boolean>>;
46+
private priceFeedClientsConfig: Map<
47+
HexString,
48+
Map<WebSocket, PriceFeedConfig>
49+
>;
4050
private aliveClients: Set<WebSocket>;
4151
private wsId: Map<WebSocket, number>;
4252
private priceFeedVaaInfo: PriceStore;
@@ -45,7 +55,7 @@ export class WebSocketAPI {
4555
constructor(priceFeedVaaInfo: PriceStore, promClient?: PromClient) {
4656
this.priceFeedVaaInfo = priceFeedVaaInfo;
4757
this.priceFeedClients = new Map();
48-
this.priceFeedClientsVerbosity = new Map();
58+
this.priceFeedClientsConfig = new Map();
4959
this.aliveClients = new Set();
5060
this.wsCounter = 0;
5161
this.wsId = new Map();
@@ -55,13 +65,14 @@ export class WebSocketAPI {
5565
private addPriceFeedClient(
5666
ws: WebSocket,
5767
id: HexString,
58-
verbose: boolean = false
68+
verbose: boolean = false,
69+
binary: boolean = false
5970
) {
6071
if (!this.priceFeedClients.has(id)) {
6172
this.priceFeedClients.set(id, new Set());
62-
this.priceFeedClientsVerbosity.set(id, new Map([[ws, verbose]]));
73+
this.priceFeedClientsConfig.set(id, new Map([[ws, { verbose, binary }]]));
6374
} else {
64-
this.priceFeedClientsVerbosity.get(id)!.set(ws, verbose);
75+
this.priceFeedClientsConfig.get(id)!.set(ws, { verbose, binary });
6576
}
6677
this.priceFeedClients.get(id)!.add(ws);
6778
}
@@ -71,7 +82,7 @@ export class WebSocketAPI {
7182
return;
7283
}
7384
this.priceFeedClients.get(id)!.delete(ws);
74-
this.priceFeedClientsVerbosity.get(id)!.delete(ws);
85+
this.priceFeedClientsConfig.get(id)!.delete(ws);
7586
}
7687

7788
dispatchPriceFeedUpdate(priceInfo: PriceInfo) {
@@ -96,27 +107,30 @@ export class WebSocketAPI {
96107
for (const client of clients.values()) {
97108
this.promClient?.addWebSocketInteraction("server_update", "ok");
98109

99-
const verbose = this.priceFeedClientsVerbosity
110+
const config = this.priceFeedClientsConfig
100111
.get(priceInfo.priceFeed.id)!
101112
.get(client);
102113

103-
const priceUpdate: ServerPriceUpdate = verbose
104-
? {
105-
type: "price_update",
106-
price_feed: {
107-
...priceInfo.priceFeed.toJson(),
108-
metadata: {
109-
emitter_chain: priceInfo.emitterChainId,
110-
attestation_time: priceInfo.attestationTime,
111-
sequence_number: priceInfo.seqNum,
112-
price_service_receive_time: priceInfo.priceServiceReceiveTime,
113-
},
114+
const verbose = config?.verbose;
115+
const binary = config?.binary;
116+
117+
const priceUpdate: ServerPriceUpdate = {
118+
type: "price_update",
119+
price_feed: {
120+
...priceInfo.priceFeed.toJson(),
121+
...(verbose && {
122+
metadata: {
123+
emitter_chain: priceInfo.emitterChainId,
124+
attestation_time: priceInfo.attestationTime,
125+
sequence_number: priceInfo.seqNum,
126+
price_service_receive_time: priceInfo.priceServiceReceiveTime,
114127
},
115-
}
116-
: {
117-
type: "price_update",
118-
price_feed: priceInfo.priceFeed.toJson(),
119-
};
128+
}),
129+
...(binary && {
130+
vaa: priceInfo.vaa.toString("base64"),
131+
}),
132+
},
133+
};
120134

121135
client.send(JSON.stringify(priceUpdate));
122136
}
@@ -161,7 +175,12 @@ export class WebSocketAPI {
161175

162176
if (message.type === "subscribe") {
163177
message.ids.forEach((id) =>
164-
this.addPriceFeedClient(ws, id, message.verbose === true)
178+
this.addPriceFeedClient(
179+
ws,
180+
id,
181+
message.verbose === true,
182+
message.binary === true
183+
)
165184
);
166185
} else {
167186
message.ids.forEach((id) => this.delPriceFeedClient(ws, id));

0 commit comments

Comments
 (0)