Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion apps/price_pusher/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@pythnetwork/price-pusher",
"version": "8.3.2",
"version": "8.3.3",
"description": "Pyth Price Pusher",
"homepage": "https://pyth.network",
"main": "lib/index.js",
Expand All @@ -23,6 +23,7 @@
"build": "tsc",
"format": "prettier --write \"src/**/*.ts\"",
"test:lint": "eslint src/",
"test": "jest",
"start": "node lib/index.js",
"dev": "ts-node src/index.ts",
"prepublishOnly": "pnpm run build && pnpm run test:lint",
Expand Down
162 changes: 162 additions & 0 deletions apps/price_pusher/src/__tests__/pyth-price-listener.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
import { PythPriceListener } from "../pyth-price-listener";
import { PriceServiceConnection } from "@pythnetwork/price-service-client";
import pino from "pino";

jest.mock("@pythnetwork/price-service-client");

describe("PythPriceListener", () => {
// Constants
const TEST_FEEDS = {
BTC: { id: "btc_feed", alias: "BTC/USD", price: "20000", conf: "100" },
ETH: { id: "eth_feed", alias: "ETH/USD", price: "1500", conf: "50" },
};

// Test helpers
const createMockSubscriptionFeed = (
id: string,
price: string,
conf: string,
publishTime: number
) => ({
id,
getPriceNoOlderThan: () => ({
price,
conf,
publishTime,
}),
});

let mockPriceServiceConnection: jest.Mocked<PriceServiceConnection>;

beforeEach(() => {
mockPriceServiceConnection = new PriceServiceConnection(
""
) as jest.Mocked<PriceServiceConnection>;
});

it("should log warning when price feeds are stale", async () => {
jest.useFakeTimers();
const currentTime = Math.floor(Date.now() / 1000);

const priceItems = [TEST_FEEDS.BTC, TEST_FEEDS.ETH];
const logger = pino({ level: "silent" });
const warnSpy = jest.spyOn(logger, "warn");

const pythListener = new PythPriceListener(
mockPriceServiceConnection,
priceItems,
logger
);

// Mock subscription updates
mockPriceServiceConnection.subscribePriceFeedUpdates.mockImplementation(
(_, callback) => {
// Initial updates for both feeds
callback(
createMockSubscriptionFeed(
TEST_FEEDS.BTC.id,
TEST_FEEDS.BTC.price,
TEST_FEEDS.BTC.conf,
currentTime
) as any
);

callback(
createMockSubscriptionFeed(
TEST_FEEDS.ETH.id,
TEST_FEEDS.ETH.price,
TEST_FEEDS.ETH.conf,
currentTime
) as any
);

// Continue updating only BTC price
setInterval(() => {
callback(
createMockSubscriptionFeed(
TEST_FEEDS.BTC.id,
TEST_FEEDS.BTC.price,
TEST_FEEDS.BTC.conf,
currentTime + 30
) as any
);
}, 5000); // Update every 5 seconds to simulate the actual code

return Promise.resolve();
}
);

await pythListener.start();

// Verify initial state
const btcPrice = pythListener.getLatestPriceInfo("btc_feed");
const ethPrice = pythListener.getLatestPriceInfo("eth_feed");
expect(btcPrice).toBeDefined();
expect(ethPrice).toBeDefined();

// Advance time and run one interval check
jest.advanceTimersByTime(31 * 1000);
jest.runOnlyPendingTimers();

// Verify warning was logged only about ETH being stale
expect(warnSpy).toHaveBeenCalledWith(
expect.objectContaining({
staleFeeds: expect.arrayContaining([
expect.objectContaining({
id: "eth_feed",
alias: "ETH/USD",
lastPublishTime: currentTime,
}),
]),
}),
expect.stringContaining("price feeds haven't updated")
);

jest.useRealTimers();
});

it("should log warning when no price feed updates received", async () => {
jest.useFakeTimers();

const priceItems = [
{ id: "btc_feed", alias: "BTC/USD" },
{ id: "eth_feed", alias: "ETH/USD" },
];

const logger = pino({ level: "silent" });
const warnSpy = jest.spyOn(logger, "warn");

const pythListener = new PythPriceListener(
mockPriceServiceConnection,
priceItems,
logger
);

// Mock subscription with no updates
mockPriceServiceConnection.subscribePriceFeedUpdates.mockImplementation(
() => Promise.resolve()
);

await pythListener.start();

// Verify initial state - no prices
const btcPrice = pythListener.getLatestPriceInfo("btc_feed");
const ethPrice = pythListener.getLatestPriceInfo("eth_feed");
expect(btcPrice).toBeUndefined();
expect(ethPrice).toBeUndefined();

// Advance time and run one interval check
jest.advanceTimersByTime(31 * 1000);
jest.runOnlyPendingTimers();

// Verify warning about no updates
expect(warnSpy).toHaveBeenCalledWith(
expect.objectContaining({
currentTime: expect.any(Number),
}),
"No price feed updates have been received yet"
);

jest.useRealTimers();
});
});
36 changes: 31 additions & 5 deletions apps/price_pusher/src/pyth-price-listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,37 @@ export class PythPriceListener implements IPriceListener {
// Check health of the price feeds 5 second. If the price feeds are not updating
// for more than 30s, throw an error.
setInterval(() => {
if (
this.lastUpdated === undefined ||
this.lastUpdated < Date.now() - 30 * 1000
) {
throw new Error("Hermes Price feeds are not updating.");
const now = Date.now();
const staleThreshold = now - 30 * 1000;

// Check if we've never received any updates
if (this.lastUpdated === undefined) {
this.logger.warn(
{
currentTime: now,
},
"No price feed updates have been received yet"
);
return;
}

// Find stale price feeds
const staleFeeds = Array.from(this.latestPriceInfo.entries())
.filter(([, info]) => info.publishTime * 1000 < staleThreshold)
.map(([id, info]) => ({
id,
alias: this.priceIdToAlias.get(id),
lastPublishTime: info.publishTime,
}));

if (staleFeeds.length > 0) {
this.logger.warn(
{
staleFeeds,
currentTime: now,
},
`${staleFeeds.length} price feeds haven't updated in the last 30 seconds`
);
}
}, 5000);
}
Expand Down
Loading