Skip to content

Commit ab7eca3

Browse files
committed
feat(price-pusher): add market hours updates and metrics tracking
1 parent 25a3789 commit ab7eca3

File tree

4 files changed

+220
-4
lines changed

4 files changed

+220
-4
lines changed

apps/price_pusher/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@
7878
"@ton/ton": "^15.1.0",
7979
"@types/pino": "^7.0.5",
8080
"aptos": "^1.8.5",
81+
"axios": "^1.6.8",
8182
"express": "^4.18.2",
8283
"fuels": "^0.94.5",
8384
"jito-ts": "^3.0.1",

apps/price_pusher/src/evm/command.ts

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import { createPythContract } from "./pyth-contract";
1313
import { isWsEndpoint, filterInvalidPriceItems } from "../utils";
1414
import { PricePusherMetrics } from "../metrics";
1515
import { createEvmBalanceTracker } from "./balance-tracker";
16+
import { initMarketHoursUpdates } from "../market-hours";
1617

1718
export default {
1819
command: "evm",
@@ -139,10 +140,18 @@ export default {
139140

140141
// Initialize metrics if enabled
141142
let metrics: PricePusherMetrics | undefined;
143+
let marketHoursCleanup: (() => void) | undefined;
142144
if (enableMetrics) {
143145
metrics = new PricePusherMetrics(logger.child({ module: "Metrics" }));
144146
metrics.start(metricsPort);
145147
logger.info(`Metrics server started on port ${metricsPort}`);
148+
149+
// Initialize market hours updates
150+
marketHoursCleanup = initMarketHoursUpdates(
151+
metrics,
152+
logger.child({ module: "MarketHours" }),
153+
priceItems.map((item) => item.id),
154+
);
146155
}
147156

148157
const pythListener = new PythPriceListener(
@@ -218,6 +227,22 @@ export default {
218227
await balanceTracker.start();
219228
}
220229

230+
// Start the controller
221231
await controller.start();
232+
233+
// Cleanup function
234+
const cleanup = () => {
235+
if (metrics) {
236+
metrics.stop();
237+
}
238+
if (marketHoursCleanup) {
239+
marketHoursCleanup();
240+
}
241+
process.exit(0);
242+
};
243+
244+
// Handle cleanup on process termination
245+
process.on("SIGINT", cleanup);
246+
process.on("SIGTERM", cleanup);
222247
},
223248
};
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
import axios from "axios";
2+
import { PricePusherMetrics } from "./metrics";
3+
import { Logger } from "pino";
4+
5+
// Types
6+
export interface MarketHours {
7+
is_open: boolean;
8+
next_open: number | null;
9+
next_close: number | null;
10+
}
11+
12+
export interface PriceAttributes {
13+
symbol: string;
14+
asset_type: string;
15+
base: string;
16+
description: string;
17+
display_symbol: string;
18+
generic_symbol: string;
19+
quote_currency: string;
20+
schedule: string;
21+
}
22+
23+
export interface PriceFeed {
24+
id: string;
25+
market_hours: MarketHours;
26+
attributes: PriceAttributes;
27+
}
28+
29+
export async function fetchPriceFeeds(): Promise<PriceFeed[]> {
30+
const response = await axios.get<PriceFeed[]>(
31+
"https://benchmarks.pyth.network/v1/price_feeds",
32+
);
33+
return response.data;
34+
}
35+
36+
export function isValidPriceFeed(feed: PriceFeed): boolean {
37+
return !!(feed.id && feed.market_hours && feed.attributes?.display_symbol);
38+
}
39+
40+
export function updateMetricsForFeed(
41+
metrics: PricePusherMetrics,
42+
feed: PriceFeed,
43+
logger: Logger,
44+
): void {
45+
const { id, market_hours, attributes } = feed;
46+
47+
logger.debug(
48+
{
49+
id,
50+
display_symbol: attributes.display_symbol,
51+
is_open: market_hours.is_open,
52+
next_open: market_hours.next_open,
53+
next_close: market_hours.next_close,
54+
},
55+
"Updating market hours metrics",
56+
);
57+
58+
metrics.updateMarketHours(
59+
id,
60+
attributes.display_symbol,
61+
market_hours.is_open,
62+
market_hours.next_open,
63+
market_hours.next_close,
64+
);
65+
}
66+
67+
export async function updateAllMarketHours(
68+
metrics: PricePusherMetrics,
69+
logger: Logger,
70+
configuredPriceIds: string[],
71+
): Promise<void> {
72+
try {
73+
const priceFeeds = await fetchPriceFeeds();
74+
logger.debug(
75+
`Fetched ${priceFeeds.length} price feeds for market hours update`,
76+
);
77+
78+
// Filter feeds to only those in the config
79+
const configuredFeeds = priceFeeds.filter((feed) =>
80+
configuredPriceIds.includes(feed.id),
81+
);
82+
logger.debug(
83+
`Found ${configuredFeeds.length} configured feeds out of ${priceFeeds.length} total feeds`,
84+
);
85+
86+
for (const feed of configuredFeeds) {
87+
try {
88+
if (!isValidPriceFeed(feed)) {
89+
logger.warn({ feed }, "Skipping feed due to missing required fields");
90+
continue;
91+
}
92+
updateMetricsForFeed(metrics, feed, logger);
93+
} catch (feedError) {
94+
logger.error({ feed, error: feedError }, "Error processing feed");
95+
continue;
96+
}
97+
}
98+
} catch (error) {
99+
logger.error({ error }, "Failed to fetch market hours");
100+
}
101+
}
102+
103+
// Initialization function
104+
export function initMarketHoursUpdates(
105+
metrics: PricePusherMetrics,
106+
logger: Logger,
107+
configuredPriceIds: string[],
108+
intervalMs: number = 60000,
109+
): () => void {
110+
logger.info("Starting market hours updates");
111+
112+
// Initial update
113+
updateAllMarketHours(metrics, logger, configuredPriceIds);
114+
115+
// Schedule regular updates
116+
const interval = setInterval(
117+
() => updateAllMarketHours(metrics, logger, configuredPriceIds),
118+
intervalMs,
119+
);
120+
121+
// Return cleanup function
122+
return () => clearInterval(interval);
123+
}

apps/price_pusher/src/metrics.ts

Lines changed: 71 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@ import express from "express";
33
import { PriceInfo } from "./interface";
44
import { Logger } from "pino";
55
import { UpdateCondition } from "./price-config";
6+
import { Server } from "http";
67

78
// Define the metrics we want to track
89
export class PricePusherMetrics {
910
private registry: Registry;
10-
private server: express.Express;
11+
private app: express.Express;
12+
private server?: Server;
1113
private logger: Logger;
1214

1315
// Metrics for price feed updates
@@ -16,11 +18,15 @@ export class PricePusherMetrics {
1618
public priceFeedsTotal: Gauge<string>;
1719
// Wallet metrics
1820
public walletBalance: Gauge<string>;
21+
// Market hours metrics
22+
public marketIsOpen: Gauge<string>;
23+
public nextMarketOpen: Gauge<string>;
24+
public nextMarketClose: Gauge<string>;
1925

2026
constructor(logger: Logger) {
2127
this.logger = logger;
2228
this.registry = new Registry();
23-
this.server = express();
29+
this.app = express();
2430

2531
// Register the default metrics (memory, CPU, etc.)
2632
this.registry.setDefaultLabels({ app: "price_pusher" });
@@ -54,20 +60,49 @@ export class PricePusherMetrics {
5460
registers: [this.registry],
5561
});
5662

63+
// Market hours metrics
64+
this.marketIsOpen = new Gauge({
65+
name: "pyth_market_is_open",
66+
help: "Whether the market is currently open (1) or closed (0)",
67+
labelNames: ["price_id", "alias"],
68+
registers: [this.registry],
69+
});
70+
71+
this.nextMarketOpen = new Gauge({
72+
name: "pyth_next_market_open",
73+
help: "Unix timestamp of next market open time, -1 for 24/7 markets",
74+
labelNames: ["price_id", "alias"],
75+
registers: [this.registry],
76+
});
77+
78+
this.nextMarketClose = new Gauge({
79+
name: "pyth_next_market_close",
80+
help: "Unix timestamp of next market close time, -1 for 24/7 markets",
81+
labelNames: ["price_id", "alias"],
82+
registers: [this.registry],
83+
});
84+
5785
// Setup the metrics endpoint
58-
this.server.get("/metrics", async (req, res) => {
86+
this.app.get("/metrics", async (req, res) => {
5987
res.set("Content-Type", this.registry.contentType);
6088
res.end(await this.registry.metrics());
6189
});
6290
}
6391

6492
// Start the metrics server
6593
public start(port: number): void {
66-
this.server.listen(port, () => {
94+
this.server = this.app.listen(port, () => {
6795
this.logger.info(`Metrics server started on port ${port}`);
6896
});
6997
}
7098

99+
// Stop metrics server
100+
public stop(): void {
101+
if (this.server) {
102+
this.server.close();
103+
}
104+
}
105+
71106
// Update the last published time for a price feed
72107
public updateLastPublishedTime(
73108
priceId: string,
@@ -150,4 +185,36 @@ export class PricePusherMetrics {
150185
`Updated wallet balance metric: ${walletAddress} = ${balanceNum}`,
151186
);
152187
}
188+
189+
// Update market hours metrics
190+
public updateMarketHours(
191+
priceId: string,
192+
alias: string,
193+
isOpen: boolean,
194+
nextOpen: number | null,
195+
nextClose: number | null,
196+
): void {
197+
const labels = {
198+
price_id: priceId,
199+
alias,
200+
};
201+
202+
const is24x7 = isOpen && nextOpen === null && nextClose === null;
203+
204+
this.marketIsOpen.set(labels, isOpen ? 1 : 0);
205+
206+
if (is24x7) {
207+
this.nextMarketOpen.set(labels, -1);
208+
this.nextMarketClose.set(labels, -1);
209+
} else {
210+
this.nextMarketOpen.set(
211+
labels,
212+
typeof nextOpen === "number" ? nextOpen : -1,
213+
);
214+
this.nextMarketClose.set(
215+
labels,
216+
typeof nextClose === "number" ? nextClose : -1,
217+
);
218+
}
219+
}
153220
}

0 commit comments

Comments
 (0)