1- import { HexString , HermesClient } from "@pythnetwork/hermes-client" ;
1+ import {
2+ HexString ,
3+ HermesClient ,
4+ PriceUpdate ,
5+ } from "@pythnetwork/hermes-client" ;
26import { PriceInfo , IPriceListener , PriceItem } from "./interface" ;
37import { Logger } from "pino" ;
48
59type TimestampInMs = number & { readonly _ : unique symbol } ;
610
711export class PythPriceListener implements IPriceListener {
8- private connection : HermesClient ;
12+ private hermesClient : HermesClient ;
913 private priceIds : HexString [ ] ;
1014 private priceIdToAlias : Map < HexString , string > ;
1115 private latestPriceInfo : Map < HexString , PriceInfo > ;
@@ -14,11 +18,11 @@ export class PythPriceListener implements IPriceListener {
1418 private healthCheckInterval ?: NodeJS . Timeout ;
1519
1620 constructor (
17- connection : HermesClient ,
21+ hermesClient : HermesClient ,
1822 priceItems : PriceItem [ ] ,
1923 logger : Logger
2024 ) {
21- this . connection = connection ;
25+ this . hermesClient = hermesClient ;
2226 this . priceIds = priceItems . map ( ( priceItem ) => priceItem . id ) ;
2327 this . priceIdToAlias = new Map (
2428 priceItems . map ( ( priceItem ) => [ priceItem . id , priceItem . alias ] )
@@ -30,28 +34,101 @@ export class PythPriceListener implements IPriceListener {
3034 // This method should be awaited on and once it finishes it has the latest value
3135 // for the given price feeds (if they exist).
3236 async start ( ) {
33- this . connection . getPriceUpdatesStream ( this . priceIds ) ;
34-
35- try {
36- const priceUpdates = await this . connection . getLatestPriceUpdates (
37- this . priceIds ,
38- {
39- encoding : "hex" ,
40- parsed : true ,
41- ignoreInvalidPriceIds : true ,
37+ const priceMetadata = await this . hermesClient . getPriceFeeds ( ) ;
38+ const allPriceIds = priceMetadata . map ( ( priceMetadata ) => priceMetadata . id ) ;
39+
40+ // Filter out invalid price ids
41+ const { existingPriceIds, invalidPriceIds } = this . priceIds . reduce < {
42+ existingPriceIds : string [ ] ;
43+ invalidPriceIds : string [ ] ;
44+ } > (
45+ ( acc , id ) => {
46+ if ( allPriceIds . includes ( id ) ) {
47+ acc . existingPriceIds . push ( id ) ;
48+ } else {
49+ acc . invalidPriceIds . push ( id ) ;
4250 }
43- ) ;
51+ return acc ;
52+ } ,
53+ { existingPriceIds : [ ] , invalidPriceIds : [ ] }
54+ ) ;
55+
56+ const invalidPriceIdsWithAlias = invalidPriceIds . map ( ( id ) =>
57+ this . priceIdToAlias . get ( id )
58+ ) ;
59+ this . logger . error (
60+ `Invalid price id submitted for: ${ invalidPriceIdsWithAlias . join ( ", " ) } `
61+ ) ;
62+
63+ this . priceIds = existingPriceIds ;
64+ // TODO: We can just remove the invalid price ids from the map
65+ this . priceIdToAlias = new Map (
66+ existingPriceIds . map (
67+ ( id ) => [ id , this . priceIdToAlias . get ( id ) ] as [ HexString , string ]
68+ )
69+ ) ;
70+
71+ const eventSource = await this . hermesClient . getPriceUpdatesStream (
72+ this . priceIds ,
73+ {
74+ parsed : true ,
75+ ignoreInvalidPriceIds : true ,
76+ }
77+ ) ;
78+ eventSource . onmessage = ( event : MessageEvent < string > ) => {
79+ const priceUpdates = JSON . parse ( event . data ) as PriceUpdate ;
4480 priceUpdates . parsed ?. forEach ( ( priceUpdate ) => {
45- this . latestPriceInfo . set ( priceUpdate . id , {
46- price : priceUpdate . price . price ,
47- conf : priceUpdate . price . conf ,
48- publishTime : priceUpdate . price . publish_time ,
49- } ) ;
81+ this . logger . debug (
82+ `Received new price feed update from Pyth price service: ${ this . priceIdToAlias . get (
83+ priceUpdate . id
84+ ) } ${ priceUpdate . id } `
85+ ) ;
86+
87+ // Consider price to be currently available if it is not older than 60s
88+ const currentPrice =
89+ Date . now ( ) / 1000 - priceUpdate . price . publish_time > 60
90+ ? undefined
91+ : priceUpdate . price ;
92+ if ( currentPrice === undefined ) {
93+ return ;
94+ }
95+
96+ const priceInfo : PriceInfo = {
97+ conf : currentPrice . conf ,
98+ price : currentPrice . price ,
99+ publishTime : currentPrice . publish_time ,
100+ } ;
101+
102+ this . latestPriceInfo . set ( priceUpdate . id , priceInfo ) ;
103+ this . lastUpdated = Date . now ( ) as TimestampInMs ;
50104 } ) ;
51- } catch ( error : any ) {
52- // Always log the HTTP error first
53- this . logger . error ( "Failed to get latest price feeds:" , error ) ;
54- }
105+ } ;
106+
107+ eventSource . onerror = ( error : Event ) => {
108+ console . error ( "Error receiving updates from Hermes:" , error ) ;
109+ eventSource . close ( ) ;
110+ } ;
111+
112+ // try {
113+ // const priceUpdates = await this.hermesClient.getLatestPriceUpdates(
114+ // this.priceIds,
115+ // {
116+ // encoding: "hex",
117+ // parsed: true,
118+ // ignoreInvalidPriceIds: true,
119+ // }
120+ // );
121+ // priceUpdates.parsed?.forEach((priceUpdate) => {
122+ // this.latestPriceInfo.set(priceUpdate.id, {
123+ // price: priceUpdate.price.price,
124+ // conf: priceUpdate.price.conf,
125+ // publishTime: priceUpdate.price.publish_time,
126+ // });
127+ // });
128+ // } catch (error: any) {
129+ // // Always log the HTTP error first
130+ // this.logger.error("Failed to get latest price feeds:", error);
131+ // }
55132
56133 // Store health check interval reference
57134 this . healthCheckInterval = setInterval ( ( ) => {
0 commit comments