Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/price_pusher/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
"@ton/ton": "^15.1.0",
"@types/pino": "^7.0.5",
"aptos": "^1.8.5",
"axios": "^1.6.8",
"express": "^4.18.2",
"fuels": "^0.94.5",
"jito-ts": "^3.0.1",
Expand Down
25 changes: 25 additions & 0 deletions apps/price_pusher/src/evm/command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { createPythContract } from "./pyth-contract";
import { isWsEndpoint, filterInvalidPriceItems } from "../utils";
import { PricePusherMetrics } from "../metrics";
import { createEvmBalanceTracker } from "./balance-tracker";
import { initMarketHoursUpdates } from "../market-hours";

export default {
command: "evm",
Expand Down Expand Up @@ -139,10 +140,18 @@ export default {

// Initialize metrics if enabled
let metrics: PricePusherMetrics | undefined;
let marketHoursCleanup: (() => void) | undefined;
if (enableMetrics) {
metrics = new PricePusherMetrics(logger.child({ module: "Metrics" }));
metrics.start(metricsPort);
logger.info(`Metrics server started on port ${metricsPort}`);

// Initialize market hours updates
marketHoursCleanup = initMarketHoursUpdates(
metrics,
logger.child({ module: "MarketHours" }),
priceItems.map((item) => item.id),
);
}

const pythListener = new PythPriceListener(
Expand Down Expand Up @@ -218,6 +227,22 @@ export default {
await balanceTracker.start();
}

// Start the controller
await controller.start();

// Cleanup function
const cleanup = () => {
if (metrics) {
metrics.stop();
}
if (marketHoursCleanup) {
marketHoursCleanup();
}
process.exit(0);
};

// Handle cleanup on process termination
process.on("SIGINT", cleanup);
process.on("SIGTERM", cleanup);
},
};
123 changes: 123 additions & 0 deletions apps/price_pusher/src/market-hours.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import axios from "axios";
import { PricePusherMetrics } from "./metrics";
import { Logger } from "pino";

// Types
export interface MarketHours {
is_open: boolean;
next_open: number | null;
next_close: number | null;
}

export interface PriceAttributes {
symbol: string;
asset_type: string;
base: string;
description: string;
display_symbol: string;
generic_symbol: string;
quote_currency: string;
schedule: string;
}

export interface PriceFeed {
id: string;
market_hours: MarketHours;
attributes: PriceAttributes;
}

export async function fetchPriceFeeds(): Promise<PriceFeed[]> {
const response = await axios.get<PriceFeed[]>(
"https://benchmarks.pyth.network/v1/price_feeds",
);
return response.data;
}

export function isValidPriceFeed(feed: PriceFeed): boolean {
return !!(feed.id && feed.market_hours && feed.attributes?.display_symbol);
}

export function updateMetricsForFeed(
metrics: PricePusherMetrics,
feed: PriceFeed,
logger: Logger,
): void {
const { id, market_hours, attributes } = feed;

logger.debug(
{
id,
display_symbol: attributes.display_symbol,
is_open: market_hours.is_open,
next_open: market_hours.next_open,
next_close: market_hours.next_close,
},
"Updating market hours metrics",
);

metrics.updateMarketHours(
id,
attributes.display_symbol,
market_hours.is_open,
market_hours.next_open,
market_hours.next_close,
);
}

export async function updateAllMarketHours(
metrics: PricePusherMetrics,
logger: Logger,
configuredPriceIds: string[],
): Promise<void> {
try {
const priceFeeds = await fetchPriceFeeds();
logger.debug(
`Fetched ${priceFeeds.length} price feeds for market hours update`,
);

// Filter feeds to only those in the config
const configuredFeeds = priceFeeds.filter((feed) =>
configuredPriceIds.includes(feed.id),
);
logger.debug(
`Found ${configuredFeeds.length} configured feeds out of ${priceFeeds.length} total feeds`,
);

for (const feed of configuredFeeds) {
try {
if (!isValidPriceFeed(feed)) {
logger.warn({ feed }, "Skipping feed due to missing required fields");
continue;
}
updateMetricsForFeed(metrics, feed, logger);
} catch (feedError) {
logger.error({ feed, error: feedError }, "Error processing feed");
continue;
}
}
} catch (error) {
logger.error({ error }, "Failed to fetch market hours");
}
}

// Initialization function
export function initMarketHoursUpdates(
metrics: PricePusherMetrics,
logger: Logger,
configuredPriceIds: string[],
intervalMs: number = 60000,
): () => void {
logger.info("Starting market hours updates");

// Initial update
updateAllMarketHours(metrics, logger, configuredPriceIds);

// Schedule regular updates
const interval = setInterval(
() => updateAllMarketHours(metrics, logger, configuredPriceIds),
intervalMs,
);

// Return cleanup function
return () => clearInterval(interval);
}
75 changes: 71 additions & 4 deletions apps/price_pusher/src/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ import express from "express";
import { PriceInfo } from "./interface";
import { Logger } from "pino";
import { UpdateCondition } from "./price-config";
import { Server } from "http";

// Define the metrics we want to track
export class PricePusherMetrics {
private registry: Registry;
private server: express.Express;
private app: express.Express;
private server?: Server;
private logger: Logger;

// Metrics for price feed updates
Expand All @@ -16,11 +18,15 @@ export class PricePusherMetrics {
public priceFeedsTotal: Gauge<string>;
// Wallet metrics
public walletBalance: Gauge<string>;
// Market hours metrics
public marketIsOpen: Gauge<string>;
public nextMarketOpen: Gauge<string>;
public nextMarketClose: Gauge<string>;

constructor(logger: Logger) {
this.logger = logger;
this.registry = new Registry();
this.server = express();
this.app = express();

// Register the default metrics (memory, CPU, etc.)
this.registry.setDefaultLabels({ app: "price_pusher" });
Expand Down Expand Up @@ -54,20 +60,49 @@ export class PricePusherMetrics {
registers: [this.registry],
});

// Market hours metrics
this.marketIsOpen = new Gauge({
name: "pyth_market_is_open",
help: "Whether the market is currently open (1) or closed (0)",
labelNames: ["price_id", "alias"],
registers: [this.registry],
});

this.nextMarketOpen = new Gauge({
name: "pyth_next_market_open",
help: "Unix timestamp of next market open time, -1 for 24/7 markets",
labelNames: ["price_id", "alias"],
registers: [this.registry],
});

this.nextMarketClose = new Gauge({
name: "pyth_next_market_close",
help: "Unix timestamp of next market close time, -1 for 24/7 markets",
labelNames: ["price_id", "alias"],
registers: [this.registry],
});

// Setup the metrics endpoint
this.server.get("/metrics", async (req, res) => {
this.app.get("/metrics", async (req, res) => {
res.set("Content-Type", this.registry.contentType);
res.end(await this.registry.metrics());
});
}

// Start the metrics server
public start(port: number): void {
this.server.listen(port, () => {
this.server = this.app.listen(port, () => {
this.logger.info(`Metrics server started on port ${port}`);
});
}

// Stop metrics server
public stop(): void {
if (this.server) {
this.server.close();
}
}

// Update the last published time for a price feed
public updateLastPublishedTime(
priceId: string,
Expand Down Expand Up @@ -150,4 +185,36 @@ export class PricePusherMetrics {
`Updated wallet balance metric: ${walletAddress} = ${balanceNum}`,
);
}

// Update market hours metrics
public updateMarketHours(
priceId: string,
alias: string,
isOpen: boolean,
nextOpen: number | null,
nextClose: number | null,
): void {
const labels = {
price_id: priceId,
alias,
};

const is24x7 = isOpen && nextOpen === null && nextClose === null;

this.marketIsOpen.set(labels, isOpen ? 1 : 0);

if (is24x7) {
this.nextMarketOpen.set(labels, -1);
this.nextMarketClose.set(labels, -1);
} else {
this.nextMarketOpen.set(
labels,
typeof nextOpen === "number" ? nextOpen : -1,
);
this.nextMarketClose.set(
labels,
typeof nextClose === "number" ? nextClose : -1,
);
}
}
}
Loading
Loading