Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/price_pusher/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@pythnetwork/price-pusher",
"version": "9.3.3",
"version": "9.3.4",
"description": "Pyth Price Pusher",
"homepage": "https://pyth.network",
"main": "lib/index.js",
Expand Down
1 change: 1 addition & 0 deletions apps/price_pusher/src/aptos/aptos.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ export class AptosPricePusher implements IPricePusher {
async getPriceFeedsUpdateData(priceIds: string[]): Promise<number[][]> {
const response = await this.hermesClient.getLatestPriceUpdates(priceIds, {
encoding: "base64",
ignoreInvalidPriceIds: true,
});
return response.binary.data.map((data) =>
Array.from(Buffer.from(data, "base64")),
Expand Down
1 change: 1 addition & 0 deletions apps/price_pusher/src/fuel/fuel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ export class FuelPricePusher implements IPricePusher {
try {
const response = await this.hermesClient.getLatestPriceUpdates(priceIds, {
encoding: "base64",
ignoreInvalidPriceIds: true,
});
priceFeedUpdateData = response.binary.data;
} catch (err: any) {
Expand Down
1 change: 1 addition & 0 deletions apps/price_pusher/src/injective/injective.ts
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ export class InjectivePricePusher implements IPricePusher {
try {
const response = await this.hermesClient.getLatestPriceUpdates(priceIds, {
encoding: "base64",
ignoreInvalidPriceIds: true,
});
const vaas = response.binary.data;

Expand Down
1 change: 1 addition & 0 deletions apps/price_pusher/src/near/near.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ export class NearPricePusher implements IPricePusher {
): Promise<string[]> {
const response = await this.hermesClient.getLatestPriceUpdates(priceIds, {
encoding: "base64",
ignoreInvalidPriceIds: true,
});
return response.binary.data;
}
Expand Down
33 changes: 22 additions & 11 deletions apps/price_pusher/src/pyth-price-listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
} from "@pythnetwork/hermes-client";
import { PriceInfo, IPriceListener, PriceItem } from "./interface";
import { Logger } from "pino";
import { sleep } from "./utils";

type TimestampInMs = number & { readonly _: unique symbol };

Expand Down Expand Up @@ -34,6 +35,24 @@ export class PythPriceListener implements IPriceListener {
// This method should be awaited on and once it finishes it has the latest value
// for the given price feeds (if they exist).
async start() {
this.startListening();

// Store health check interval reference
this.healthCheckInterval = setInterval(() => {
if (
this.lastUpdated === undefined ||
this.lastUpdated < Date.now() - 30 * 1000
) {
throw new Error("Hermes Price feeds are not updating.");
}
}, 5000);
}

async startListening() {
this.logger.info(
`Starting to listen for price updates from Hermes for ${this.priceIds.length} price feeds.`,
);

const eventSource = await this.hermesClient.getPriceUpdatesStream(
this.priceIds,
{
Expand Down Expand Up @@ -71,20 +90,12 @@ export class PythPriceListener implements IPriceListener {
});
};

eventSource.onerror = (error: Event) => {
eventSource.onerror = async (error: Event) => {
console.error("Error receiving updates from Hermes:", error);
eventSource.close();
await sleep(5000); // Wait a bit before trying to reconnect
this.startListening(); // Attempt to restart the listener
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please note that we have another loop checking for latest price update that crashes if there's no update within last 30 seconds.

};

// Store health check interval reference
this.healthCheckInterval = setInterval(() => {
if (
this.lastUpdated === undefined ||
this.lastUpdated < Date.now() - 30 * 1000
) {
throw new Error("Hermes Price feeds are not updating.");
}
}, 5000);
}

getLatestPriceInfo(priceId: HexString): PriceInfo | undefined {
Expand Down
1 change: 1 addition & 0 deletions apps/price_pusher/src/solana/solana.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ export class SolanaPricePusher implements IPricePusher {
shuffledPriceIds,
{
encoding: "base64",
ignoreInvalidPriceIds: true,
},
);
priceFeedUpdateData = response.binary.data;
Expand Down
1 change: 1 addition & 0 deletions apps/price_pusher/src/sui/sui.ts
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ export class SuiPricePusher implements IPricePusher {
priceIdChunk,
{
encoding: "base64",
ignoreInvalidPriceIds: true,
},
);
if (response.binary.data.length !== 1) {
Expand Down
1 change: 1 addition & 0 deletions apps/price_pusher/src/ton/ton.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ export class TonPricePusher implements IPricePusher {
try {
const response = await this.hermesClient.getLatestPriceUpdates(priceIds, {
encoding: "base64",
ignoreInvalidPriceIds: true,
});
priceFeedUpdateData = response.binary.data;
} catch (err: any) {
Expand Down
Loading