Skip to content

Commit 6a46dfc

Browse files
authored
Use single server for both ws and http (#230)
* Use single server for both ws and http * Format code
1 parent da1f19b commit 6a46dfc

File tree

6 files changed

+70
-72
lines changed

6 files changed

+70
-72
lines changed

devnet/pyth-price-service.yaml

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,6 @@ spec:
1313
- port: 4200
1414
name: rest-api
1515
protocol: TCP
16-
- port: 6200
17-
name: wss-api
18-
protocol: TCP
1916
clusterIP: None
2017
selector:
2118
app: pyth-price-service
@@ -71,8 +68,6 @@ spec:
7168
value: '[{"chain_id":1,"emitter_address":"71f8dcb863d176e2c420ad6610cf687359612b6fb392e0642b0ca6b1f186aa3b"}]'
7269
- name: REST_PORT
7370
value: '4200'
74-
- name: WS_PORT
75-
value: '6200'
7671
- name: PROM_PORT
7772
value: '8081'
7873
- name: READINESS_SPY_SYNC_TIME_SECONDS

third_party/pyth/price-service/src/__tests__/ws.test.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ async function waitForMessages(messages: any[], cnt: number): Promise<void> {
5252
}
5353

5454
async function createSocketClient(): Promise<[WebSocket, any[]]> {
55-
const client = new WebSocket(`ws://localhost:${port}`);
55+
const client = new WebSocket(`ws://localhost:${port}/ws`);
5656

5757
await waitForSocketState(client, client.OPEN);
5858

@@ -79,9 +79,12 @@ beforeAll(async () => {
7979
getPriceIds: () => new Set(priceFeeds.map((priceFeed) => priceFeed.id)),
8080
};
8181

82-
api = new WebSocketAPI({ port }, priceInfo);
82+
api = new WebSocketAPI(priceInfo);
8383

84-
[wss, server] = api.run();
84+
server = new Server();
85+
server.listen(port);
86+
87+
wss = api.run(server);
8588
});
8689

8790
afterAll(async () => {

third_party/pyth/price-service/src/index.ts

Lines changed: 39 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -20,43 +20,43 @@ setDefaultWasm("node");
2020
// Set up the logger.
2121
initLogger({ logLevel: process.env.LOG_LEVEL });
2222

23-
const promClient = new PromClient({
24-
name: "price_service",
25-
port: parseInt(envOrErr("PROM_PORT")),
26-
});
27-
28-
const listener = new Listener(
29-
{
30-
spyServiceHost: envOrErr("SPY_SERVICE_HOST"),
31-
filtersRaw: process.env.SPY_SERVICE_FILTERS,
32-
readiness: {
33-
spySyncTimeSeconds: parseInt(envOrErr("READINESS_SPY_SYNC_TIME_SECONDS")),
34-
numLoadedSymbols: parseInt(envOrErr("READINESS_NUM_LOADED_SYMBOLS")),
23+
async function run() {
24+
const promClient = new PromClient({
25+
name: "price_service",
26+
port: parseInt(envOrErr("PROM_PORT")),
27+
});
28+
29+
const listener = new Listener(
30+
{
31+
spyServiceHost: envOrErr("SPY_SERVICE_HOST"),
32+
filtersRaw: process.env.SPY_SERVICE_FILTERS,
33+
readiness: {
34+
spySyncTimeSeconds: parseInt(
35+
envOrErr("READINESS_SPY_SYNC_TIME_SECONDS")
36+
),
37+
numLoadedSymbols: parseInt(envOrErr("READINESS_NUM_LOADED_SYMBOLS")),
38+
},
3539
},
36-
},
37-
promClient
38-
);
39-
40-
// In future if we have more components we will modify it to include them all
41-
const isReady = () => listener.isReady();
42-
43-
const restAPI = new RestAPI(
44-
{
45-
port: parseInt(envOrErr("REST_PORT")),
46-
},
47-
listener,
48-
isReady,
49-
promClient
50-
);
51-
52-
const wsAPI = new WebSocketAPI(
53-
{
54-
port: parseInt(envOrErr("WS_PORT")),
55-
},
56-
listener,
57-
promClient
58-
);
59-
60-
listener.run();
61-
restAPI.run();
62-
wsAPI.run();
40+
promClient
41+
);
42+
43+
// In future if we have more components we will modify it to include them all
44+
const isReady = () => listener.isReady();
45+
46+
const restAPI = new RestAPI(
47+
{
48+
port: parseInt(envOrErr("REST_PORT")),
49+
},
50+
listener,
51+
isReady,
52+
promClient
53+
);
54+
55+
const wsAPI = new WebSocketAPI(listener, promClient);
56+
57+
listener.run();
58+
const server = await restAPI.run();
59+
wsAPI.run(server);
60+
}
61+
62+
run();

third_party/pyth/price-service/src/listen.ts

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -168,8 +168,12 @@ export class Listener implements PriceStore {
168168
let isAnyPriceNew = batchAttestation.priceAttestations.some(
169169
(priceAttestation) => {
170170
const key = priceAttestation.priceId;
171-
let lastAttestationTime = this.priceFeedVaaMap.get(key)?.attestationTime;
172-
return lastAttestationTime === undefined || lastAttestationTime < priceAttestation.attestationTime;
171+
let lastAttestationTime =
172+
this.priceFeedVaaMap.get(key)?.attestationTime;
173+
return (
174+
lastAttestationTime === undefined ||
175+
lastAttestationTime < priceAttestation.attestationTime
176+
);
173177
}
174178
);
175179

@@ -182,7 +186,10 @@ export class Listener implements PriceStore {
182186

183187
let lastAttestationTime = this.priceFeedVaaMap.get(key)?.attestationTime;
184188

185-
if (lastAttestationTime === undefined || lastAttestationTime < priceAttestation.attestationTime) {
189+
if (
190+
lastAttestationTime === undefined ||
191+
lastAttestationTime < priceAttestation.attestationTime
192+
) {
186193
const priceFeed = priceAttestationToPriceFeed(priceAttestation);
187194
this.priceFeedVaaMap.set(key, {
188195
seqNum: parsedVAA.sequence,

third_party/pyth/price-service/src/rest.ts

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import { PromClient } from "./promClient";
99
import { DurationInMs, DurationInSec } from "./helpers";
1010
import { StatusCodes } from "http-status-codes";
1111
import { validate, ValidationError, Joi, schema } from "express-validation";
12+
import { Server } from "http";
1213

1314
const MORGAN_LOG_FORMAT =
1415
':remote-addr - :remote-user ":method :url HTTP/:http-version"' +
@@ -80,7 +81,7 @@ export class RestAPI {
8081
}).required(),
8182
};
8283
app.get(
83-
"/latest_vaas",
84+
"/api/latest_vaas",
8485
validate(latestVaasInputSchema),
8586
(req: Request, res: Response) => {
8687
let priceIds = req.query.ids as string[];
@@ -126,7 +127,7 @@ export class RestAPI {
126127
}
127128
);
128129
endpoints.push(
129-
"latest_vaas?ids[]=<price_feed_id>&ids[]=<price_feed_id_2>&.."
130+
"api/latest_vaas?ids[]=<price_feed_id>&ids[]=<price_feed_id_2>&.."
130131
);
131132

132133
const latestPriceFeedsInputSchema: schema = {
@@ -137,7 +138,7 @@ export class RestAPI {
137138
}).required(),
138139
};
139140
app.get(
140-
"/latest_price_feeds",
141+
"/api/latest_price_feeds",
141142
validate(latestPriceFeedsInputSchema),
142143
(req: Request, res: Response) => {
143144
let priceIds = req.query.ids as string[];
@@ -177,7 +178,7 @@ export class RestAPI {
177178
}
178179
);
179180
endpoints.push(
180-
"latest_price_feeds?ids[]=<price_feed_id>&ids[]=<price_feed_id_2>&.."
181+
"api/latest_price_feeds?ids[]=<price_feed_id>&ids[]=<price_feed_id_2>&.."
181182
);
182183

183184
app.get("/ready", (_, res: Response) => {
@@ -194,6 +195,9 @@ export class RestAPI {
194195
});
195196
endpoints.push("live");
196197

198+
// Websocket endpoint
199+
endpoints.push("ws");
200+
197201
app.get("/", (_, res: Response) => res.json(endpoints));
198202

199203
app.use(function (err: any, _: Request, res: Response, next: NextFunction) {
@@ -211,9 +215,9 @@ export class RestAPI {
211215
return app;
212216
}
213217

214-
async run() {
218+
async run(): Promise<Server> {
215219
let app = await this.createApp();
216-
app.listen(this.port, () =>
220+
return app.listen(this.port, () =>
217221
logger.debug("listening on REST port " + this.port)
218222
);
219223
}

third_party/pyth/price-service/src/ws.ts

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -34,19 +34,13 @@ export type ServerMessage = ServerResponse | ServerPriceUpdate;
3434

3535
export class WebSocketAPI {
3636
private wsCounter: number;
37-
private port: number;
3837
private priceFeedClients: Map<HexString, Set<WebSocket>>;
3938
private aliveClients: Set<WebSocket>;
4039
private wsId: Map<WebSocket, number>;
4140
private priceFeedVaaInfo: PriceStore;
4241
private promClient: PromClient | undefined;
4342

44-
constructor(
45-
config: { port: number },
46-
priceFeedVaaInfo: PriceStore,
47-
promClient?: PromClient
48-
) {
49-
this.port = config.port;
43+
constructor(priceFeedVaaInfo: PriceStore, promClient?: PromClient) {
5044
this.priceFeedVaaInfo = priceFeedVaaInfo;
5145
this.priceFeedClients = new Map();
5246
this.aliveClients = new Set();
@@ -167,11 +161,8 @@ export class WebSocketAPI {
167161
ws.send(JSON.stringify(response));
168162
}
169163

170-
run(): [WebSocketServer, http.Server] {
171-
const app = express();
172-
const server = http.createServer(app);
173-
174-
const wss = new WebSocketServer({ server });
164+
run(server: http.Server): WebSocketServer {
165+
const wss = new WebSocketServer({ server, path: "/ws" });
175166

176167
wss.on("connection", (ws: WebSocket, request: http.IncomingMessage) => {
177168
logger.info(
@@ -220,12 +211,10 @@ export class WebSocketAPI {
220211
clearInterval(pingInterval);
221212
});
222213

223-
server.listen(this.port, () =>
224-
logger.debug("listening on WS port " + this.port)
225-
);
226214
this.priceFeedVaaInfo.addUpdateListener(
227215
this.dispatchPriceFeedUpdate.bind(this)
228216
);
229-
return [wss, server];
217+
218+
return wss;
230219
}
231220
}

0 commit comments

Comments
 (0)