diff --git a/apps/price_pusher/package.json b/apps/price_pusher/package.json index fafdfa6ee7..4e144e2733 100644 --- a/apps/price_pusher/package.json +++ b/apps/price_pusher/package.json @@ -1,6 +1,6 @@ { "name": "@pythnetwork/price-pusher", - "version": "8.3.2", + "version": "8.3.3", "description": "Pyth Price Pusher", "homepage": "https://pyth.network", "main": "lib/index.js", @@ -24,6 +24,7 @@ "format": "prettier --write \"src/**/*.ts\"", "test:lint": "eslint src/", "start": "node lib/index.js", + "test": "jest", "dev": "ts-node src/index.ts", "prepublishOnly": "pnpm run build && pnpm run test:lint", "preversion": "pnpm run test:lint", diff --git a/apps/price_pusher/src/__tests__/pyth-price-listener.test.ts b/apps/price_pusher/src/__tests__/pyth-price-listener.test.ts new file mode 100644 index 0000000000..c0ece74181 --- /dev/null +++ b/apps/price_pusher/src/__tests__/pyth-price-listener.test.ts @@ -0,0 +1,101 @@ +import { PythPriceListener } from "../pyth-price-listener"; +import { PriceServiceConnection } from "@pythnetwork/price-service-client"; +import { Logger } from "pino"; + +describe("PythPriceListener", () => { + let logger: Logger; + let connection: PriceServiceConnection; + let listener: PythPriceListener; + let originalConsoleError: typeof console.error; + + beforeEach(() => { + // Save original console.error and mock it + originalConsoleError = console.error; + console.error = jest.fn(); + + logger = { + debug: jest.fn(), + error: jest.fn(), + info: jest.fn(), + } as unknown as Logger; + + // Use real Hermes beta endpoint for testing + connection = new PriceServiceConnection("https://hermes.pyth.network"); + }); + + afterEach(() => { + // Clean up websocket connection + connection.closeWebSocket(); + // Clean up health check interval + if (listener) { + listener.cleanup(); + } + // Restore original console.error + console.error = originalConsoleError; + }); + + it("should handle invalid price feeds gracefully", async () => { + const validFeedId = + "e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43"; // BTC/USD + const invalidFeedId = + "0000000000000000000000000000000000000000000000000000000000000000"; + + const priceItems = [ + { id: validFeedId, alias: "BTC/USD" }, + { id: invalidFeedId, alias: "INVALID/PRICE" }, + ]; + + listener = new PythPriceListener(connection, priceItems, logger); + + await listener.start(); + + // Wait for both error handlers to complete + await new Promise((resolve) => { + const checkInterval = setInterval(() => { + const errorCalls = (logger.error as jest.Mock).mock.calls; + + // Check for both HTTP and websocket error logs + const hasHttpError = errorCalls.some( + (call) => call[0] === "Failed to get latest price feeds:" + ); + const hasGetLatestError = errorCalls.some((call) => + call[0].includes("not found for getLatestPriceFeeds") + ); + const hasWsError = errorCalls.some((call) => + call[0].includes("not found for subscribePriceFeedUpdates") + ); + + if (hasHttpError && hasGetLatestError && hasWsError) { + clearInterval(checkInterval); + resolve(true); + } + }, 100); + }); + + // Verify HTTP error was logged + expect(logger.error).toHaveBeenCalledWith( + "Failed to get latest price feeds:", + expect.objectContaining({ + message: "Request failed with status code 404", + }) + ); + + // Verify invalid feed error was logged + expect(logger.error).toHaveBeenCalledWith( + `Price feed ${invalidFeedId} (INVALID/PRICE) not found for getLatestPriceFeeds` + ); + + // Verify invalid feed error was logged + expect(logger.error).toHaveBeenCalledWith( + `Price feed ${invalidFeedId} (INVALID/PRICE) not found for subscribePriceFeedUpdates` + ); + + // Verify resubscription message was logged + expect(logger.info).toHaveBeenCalledWith( + "Resubscribing with valid feeds only" + ); + + // Verify priceIds was updated to only include valid feeds + expect(listener["priceIds"]).toEqual([validFeedId]); + }); +}); diff --git a/apps/price_pusher/src/pyth-price-listener.ts b/apps/price_pusher/src/pyth-price-listener.ts index 2a01a36b5e..24a35a5d85 100644 --- a/apps/price_pusher/src/pyth-price-listener.ts +++ b/apps/price_pusher/src/pyth-price-listener.ts @@ -15,6 +15,7 @@ export class PythPriceListener implements IPriceListener { private latestPriceInfo: Map; private logger: Logger; private lastUpdated: TimestampInMs | undefined; + private healthCheckInterval?: NodeJS.Timeout; constructor( connection: PriceServiceConnection, @@ -33,26 +34,111 @@ export class PythPriceListener implements IPriceListener { // This method should be awaited on and once it finishes it has the latest value // for the given price feeds (if they exist). async start() { + // Set custom error handler for websocket errors + this.connection.onWsError = (error: Error) => { + if (error.message.includes("not found")) { + // Extract invalid feed IDs from error message + const match = error.message.match(/\[(.*?)\]/); + if (match) { + const invalidFeedIds = match[1].split(",").map((id) => { + // Remove '0x' prefix if present to match our stored IDs + return id.trim().replace(/^0x/, ""); + }); + + // Log invalid feeds with their aliases + invalidFeedIds.forEach((id) => { + this.logger.error( + `Price feed ${id} (${this.priceIdToAlias.get( + id + )}) not found for subscribePriceFeedUpdates` + ); + }); + + // Filter out invalid feeds and resubscribe with valid ones + const validFeeds = this.priceIds.filter( + (id) => !invalidFeedIds.includes(id) + ); + + this.priceIds = validFeeds; + + if (validFeeds.length > 0) { + this.logger.info("Resubscribing with valid feeds only"); + this.connection.subscribePriceFeedUpdates( + validFeeds, + this.onNewPriceFeed.bind(this) + ); + } + } + } else { + this.logger.error("Websocket error occurred:", error); + } + }; + this.connection.subscribePriceFeedUpdates( this.priceIds, this.onNewPriceFeed.bind(this) ); - const priceFeeds = await this.connection.getLatestPriceFeeds(this.priceIds); - priceFeeds?.forEach((priceFeed) => { - // Getting unchecked because although it might be old - // but might not be there on the target chain. - const latestAvailablePrice = priceFeed.getPriceUnchecked(); - this.latestPriceInfo.set(priceFeed.id, { - price: latestAvailablePrice.price, - conf: latestAvailablePrice.conf, - publishTime: latestAvailablePrice.publishTime, + try { + const priceFeeds = await this.connection.getLatestPriceFeeds( + this.priceIds + ); + priceFeeds?.forEach((priceFeed) => { + const latestAvailablePrice = priceFeed.getPriceUnchecked(); + this.latestPriceInfo.set(priceFeed.id, { + price: latestAvailablePrice.price, + conf: latestAvailablePrice.conf, + publishTime: latestAvailablePrice.publishTime, + }); }); - }); + } catch (error: any) { + // Always log the HTTP error first + this.logger.error("Failed to get latest price feeds:", error); + + if (error.response.data.includes("Price ids not found:")) { + // Extract invalid feed IDs from error message + const invalidFeedIds = error.response.data + .split("Price ids not found:")[1] + .split(",") + .map((id: string) => id.trim().replace(/^0x/, "")); + + // Log invalid feeds with their aliases + invalidFeedIds.forEach((id: string) => { + this.logger.error( + `Price feed ${id} (${this.priceIdToAlias.get( + id + )}) not found for getLatestPriceFeeds` + ); + }); - // Check health of the price feeds 5 second. If the price feeds are not updating - // for more than 30s, throw an error. - setInterval(() => { + // Filter out invalid feeds and retry + const validFeeds = this.priceIds.filter( + (id) => !invalidFeedIds.includes(id) + ); + + this.priceIds = validFeeds; + + if (validFeeds.length > 0) { + this.logger.info( + "Retrying getLatestPriceFeeds with valid feeds only" + ); + const validPriceFeeds = await this.connection.getLatestPriceFeeds( + validFeeds + ); + validPriceFeeds?.forEach((priceFeed) => { + const latestAvailablePrice = priceFeed.getPriceUnchecked(); + this.latestPriceInfo.set(priceFeed.id, { + price: latestAvailablePrice.price, + conf: latestAvailablePrice.conf, + publishTime: latestAvailablePrice.publishTime, + }); + }); + } + } + } + + // Store health check interval reference + this.healthCheckInterval = setInterval(() => { if ( this.lastUpdated === undefined || this.lastUpdated < Date.now() - 30 * 1000 @@ -88,4 +174,10 @@ export class PythPriceListener implements IPriceListener { getLatestPriceInfo(priceId: string): PriceInfo | undefined { return this.latestPriceInfo.get(priceId); } + + cleanup() { + if (this.healthCheckInterval) { + clearInterval(this.healthCheckInterval); + } + } }