@@ -2,6 +2,7 @@ package feeder
22
33import (
44 "encoding/json"
5+ "fmt"
56 "sync"
67 "time"
78
@@ -55,31 +56,12 @@ func NewPriceProvider(
5556 symbols .Add (s )
5657 }
5758
58- switch sourceName {
59- case sources .SourceBitfinex :
60- source = sources .NewTickSource (symbols , sources .BitfinexPriceUpdate , logger )
61- case sources .SourceBinance :
62- source = sources .NewTickSource (symbols , sources .BinancePriceUpdate , logger )
63- case sources .SourceCoingecko :
64- source = sources .NewTickSource (symbols , sources .CoingeckoPriceUpdate (config ), logger )
65- case sources .SourceOkex :
66- source = sources .NewTickSource (symbols , sources .OkexPriceUpdate , logger )
67- case sources .SourceGateIo :
68- source = sources .NewTickSource (symbols , sources .GateIoPriceUpdate , logger )
69- case sources .SourceCoinMarketCap :
70- source = sources .NewTickSource (symbols , sources .CoinmarketcapPriceUpdate (config ), logger )
71- case sources .SourceBybit :
72- source = sources .NewTickSource (symbols , sources .BybitPriceUpdate , logger )
73- case sources .SourceErisProtocol :
74- source = sources .NewTickSource (symbols , sources .ErisProtocolPriceUpdate , logger )
75- case sources .SourceUniswapV3 :
76- source = sources .NewTickSource (symbols , sources .UniswapV3PriceUpdate , logger )
77- case sources .SourceAvalon :
78- source = sources .NewTickSource (symbols , sources .AvalonPriceUpdate , logger )
79- case sources .SourceChainLink :
80- source = sources .NewTickSource (symbols , sources .ChainlinkPriceUpdate , logger )
81- default :
82- panic ("unknown price provider: " + sourceName )
59+ source , err := sources .GetRegisteredSource (sourceName , symbols , config , logger )
60+ if err != nil {
61+ logger .
62+ Warn ().
63+ Msg (err .Error ())
64+ return types.NullPriceProvider {}
8365 }
8466
8567 return newPriceProvider (source , sourceName , pairToSymbolMap , logger )
@@ -105,6 +87,10 @@ func newPriceProvider(source types.Source, sourceName string, pairToSymbolsMap m
10587 return pp
10688}
10789
90+ // loop runs in a background goroutine and continuously listens for price updates
91+ // from the source. It updates the lastPrices map with new data and handles
92+ // shutdown signals. The loop exits when stopSignal is closed, ensuring proper
93+ // cleanup of the source and done channel.
10894func (p * PriceProvider ) loop () {
10995 defer close (p .done )
11096 defer p .source .Close ()
@@ -135,7 +121,7 @@ func (p *PriceProvider) GetPrice(pair asset.Pair) types.Price {
135121 p .logger .Debug ().Str ("pair" , pair .String ()).Msg ("pair not configured for this pricefeeder" )
136122 return types.Price {
137123 Pair : pair ,
138- Price : - 1 , // abstain
124+ Price : types . PriceAbstain ,
139125 SourceName : p .sourceName ,
140126 Valid : false ,
141127 }
@@ -158,8 +144,9 @@ func (p *PriceProvider) Close() {
158144 <- p .done
159145}
160146
161- // isValid is a helper function which asserts if a price is valid given
162- // if it was found and the time at which it was last updated.
147+ // isValid determines whether a price is valid based on whether it was found and
148+ // whether it was updated within the [types.PriceTimeout] window. Prices that are
149+ // missing or older than [types.PriceTimeout] are considered invalid.
163150func isValid (price types.RawPrice , found bool ) bool {
164151 return found && time .Since (price .UpdateTime ) < types .PriceTimeout
165152}
@@ -171,8 +158,11 @@ func isValid(price types.RawPrice, found bool) bool {
171158// AggregatePriceProvider aggregates multiple price providers
172159// and queries them for prices.
173160type AggregatePriceProvider struct {
161+ // providers: A set of providers implemented using a map to zero size
162+ // empty struct. Using a map gives us random order of iteration, the
163+ // intended behavior (since golang's map range is unordered)
164+ providers map [types.PriceProvider ]struct {}
174165 logger zerolog.Logger
175- providers map [int ]types.PriceProvider // we use a map here to provide random ranging (since golang's map range is unordered)
176166}
177167
178168// NewAggregatePriceProvider instantiates a new AggregatePriceProvider instance
@@ -182,11 +172,24 @@ func NewAggregatePriceProvider(
182172 sourceConfigMap map [string ]json.RawMessage ,
183173 logger zerolog.Logger ,
184174) types.PriceProvider {
185- providers := make (map [int ] types.PriceProvider , len ( sourcesToPairSymbolMap ) )
186- i := 0
175+ providers := make (map [types.PriceProvider ] struct {} )
176+ invalidSources := [] string {}
187177 for sourceName , pairToSymbolMap := range sourcesToPairSymbolMap {
188- providers [i ] = NewPriceProvider (sourceName , pairToSymbolMap , sourceConfigMap [sourceName ], logger )
189- i ++
178+ pp := NewPriceProvider (sourceName , pairToSymbolMap , sourceConfigMap [sourceName ], logger )
179+ if _ , isNull := pp .(types.NullPriceProvider ); isNull {
180+ invalidSources = append (invalidSources , sourceName )
181+ continue
182+ }
183+ providers [pp ] = struct {}{}
184+ }
185+
186+ if len (providers ) != len (sourcesToPairSymbolMap ) {
187+ logger .Warn ().
188+ Msg (fmt .Sprintf ("invalid source names given as key in configuration: { invalidSources: %#v }" , invalidSources ))
189+ }
190+ if len (providers ) == 0 {
191+ logger .Error ().
192+ Msg (fmt .Sprintf ("no price providers available: { invalidSources: %#v }" , invalidSources ))
190193 }
191194
192195 return AggregatePriceProvider {
@@ -210,8 +213,8 @@ func (a AggregatePriceProvider) GetPrice(pair asset.Pair) types.Price {
210213 case "ustnibi:uusd" :
211214 // fetch unibi:uusd first to calculate the ustnibi:unibi price
212215
213- unibiUusdPrice := - 1.0 // default to -1 to indicate we haven't found a valid price yet
214- for _ , p := range a .providers {
216+ unibiUusdPrice := types . PriceAbstain // default to -1 to indicate we haven't found a valid price yet
217+ for p := range a .providers {
215218 price := p .GetPrice ("unibi:uusd" )
216219 if ! price .Valid {
217220 continue
@@ -228,13 +231,13 @@ func (a AggregatePriceProvider) GetPrice(pair asset.Pair) types.Price {
228231 return types.Price {
229232 SourceName : "missing" ,
230233 Pair : pair ,
231- Price : 0 ,
234+ Price : types . PriceAbstain ,
232235 Valid : false ,
233236 }
234237 }
235238
236239 // now we can calculate the ustnibi:unibi price
237- for _ , p := range a .providers {
240+ for p := range a .providers {
238241 price := p .GetPrice ("ustnibi:unibi" )
239242 if ! price .Valid {
240243 continue
@@ -257,7 +260,7 @@ func (a AggregatePriceProvider) GetPrice(pair asset.Pair) types.Price {
257260 return types.Price {
258261 SourceName : "missing" ,
259262 Pair : pair ,
260- Price : 0 ,
263+ Price : types . PriceAbstain ,
261264 Valid : false ,
262265 }
263266 }
@@ -278,14 +281,14 @@ func (a AggregatePriceProvider) GetPrice(pair asset.Pair) types.Price {
278281 return types.Price {
279282 SourceName : "missing" ,
280283 Pair : pair ,
281- Price : 0 ,
284+ Price : types . PriceAbstain ,
282285 Valid : false ,
283286 }
284287
285288 default :
286289 // for all other price pairs, iterate randomly, if we find a valid price, we return it
287290 // otherwise we go onto the next PriceProvider to ask for prices.
288- for _ , p := range a .providers {
291+ for p := range a .providers {
289292 price := p .GetPrice (pair )
290293 if price .Valid {
291294 aggregatePriceProvider .WithLabelValues (pair .String (), price .SourceName , "true" ).Inc ()
@@ -300,13 +303,13 @@ func (a AggregatePriceProvider) GetPrice(pair asset.Pair) types.Price {
300303 return types.Price {
301304 SourceName : "missing" ,
302305 Pair : pair ,
303- Price : 0 ,
306+ Price : types . PriceAbstain ,
304307 Valid : false ,
305308 }
306309}
307310
308311func (a AggregatePriceProvider ) Close () {
309- for _ , p := range a .providers {
312+ for p := range a .providers {
310313 p .Close ()
311314 }
312315}
0 commit comments