@@ -58,10 +58,7 @@ type oracleSvc struct {
5858}
5959
6060const (
61- maxRespTime = 15 * time .Second
62- maxRespHeadersTime = 15 * time .Second
63- maxRespBytes = 10 * 1024 * 1024
64- maxTxStatusRetries = 3
61+ maxRespTime = 3 * time .Second
6562 maxRetriesPerInterval = 3
6663 MaxRetriesReConnectWebSocket = 5
6764)
@@ -195,7 +192,7 @@ func (s *oracleSvc) Start(ctx context.Context) (err error) {
195192 for ticker , pricePuller := range s .pricePullers {
196193 switch pricePuller .Provider () {
197194 case FeedProviderBinance , FeedProviderStork , FeedProviderDynamic :
198- go s .processSetPriceFeed (ticker , pricePuller , dataC )
195+ go s .processSetPriceFeed (ctx , ticker , pricePuller , dataC )
199196 default :
200197 s .logger .WithField ("provider" , pricePuller .Provider ()).Warningln ("unsupported price feed provider" )
201198 }
@@ -207,7 +204,7 @@ func (s *oracleSvc) Start(ctx context.Context) (err error) {
207204 return
208205}
209206
210- func (s * oracleSvc ) processSetPriceFeed (ticker string , pricePuller PricePuller , dataC chan <- * PriceData ) {
207+ func (s * oracleSvc ) processSetPriceFeed (ctx context. Context , ticker string , pricePuller PricePuller , dataC chan <- * PriceData ) {
211208 feedLogger := s .logger .WithFields (log.Fields {
212209 "ticker" : ticker ,
213210 "provider" : pricePuller .ProviderName (),
@@ -216,36 +213,38 @@ func (s *oracleSvc) processSetPriceFeed(ticker string, pricePuller PricePuller,
216213 symbol := pricePuller .Symbol ()
217214
218215 t := time .NewTimer (5 * time .Second )
216+ defer t .Stop ()
217+
219218 for {
220219 select {
220+ case <- ctx .Done ():
221+ feedLogger .Infoln ("context cancelled, stopping price feed" )
222+ return
221223 case <- t .C :
222- ctx , cancelFn := context . WithTimeout ( context . Background (), maxRespTime )
223- defer cancelFn ()
224+ var result * PriceData
225+ var err error
224226
225- result , err := pricePuller .PullPrice (ctx )
227+ for i := 0 ; i < maxRetriesPerInterval ; i ++ {
228+ requestCtx , cancelFn := context .WithTimeout (ctx , maxRespTime )
229+ result , err = pricePuller .PullPrice (requestCtx )
230+ cancelFn ()
226231
227- if err != nil {
228- metrics .ReportFuncError (s .svcTags )
229- feedLogger .WithError (err ).Warningln ("retrying PullPrice after error" )
230-
231- for i := 0 ; i < maxRetriesPerInterval ; i ++ {
232- if result , err = pricePuller .PullPrice (ctx ); err != nil {
233- time .Sleep (time .Second )
234- continue
235- }
232+ if err == nil {
236233 break
237234 }
238235
239- if err != nil {
240- metrics .ReportFuncCallAndTimingWithErr (s .svcTags )(& err )
241- feedLogger .WithFields (log.Fields {
242- "symbol" : symbol ,
243- "retries" : maxRetriesPerInterval ,
244- }).WithError (err ).Errorln ("failed to fetch price" )
236+ time .Sleep (100 * time .Millisecond )
237+ }
245238
246- t .Reset (pricePuller .Interval ())
247- continue
248- }
239+ if err != nil {
240+ metrics .ReportFuncCallAndTimingWithErr (s .svcTags )(& err )
241+ feedLogger .WithFields (log.Fields {
242+ "symbol" : symbol ,
243+ "retries" : maxRetriesPerInterval ,
244+ }).WithError (err ).Errorln ("failed to fetch price" )
245+
246+ t .Reset (pricePuller .Interval ())
247+ continue
249248 }
250249
251250 if result != nil {
@@ -259,9 +258,9 @@ func (s *oracleSvc) processSetPriceFeed(ticker string, pricePuller PricePuller,
259258
260259const (
261260 commitPriceBatchTimeLimit = 5 * time .Second
261+ chainMaxTimeLimit = 3 * time .Second
262262 commitPriceBatchSizeLimit = 100
263- maxRetries = 6
264- chainMaxTimeLimit = 5 * time .Second
263+ maxRetries = 3
265264)
266265
267266var pullIntervalChain = 500 * time .Millisecond
@@ -362,6 +361,8 @@ func (s *oracleSvc) commitSetPrices(ctx context.Context, dataC <-chan *PriceData
362361 defer doneFn ()
363362
364363 expirationTimer := time .NewTimer (commitPriceBatchTimeLimit )
364+ defer expirationTimer .Stop ()
365+
365366 pricesBatch := make (map [string ]* PriceData )
366367 pricesMeta := make (map [string ]int )
367368
@@ -409,6 +410,11 @@ func (s *oracleSvc) commitSetPrices(ctx context.Context, dataC <-chan *PriceData
409410
410411 for {
411412 select {
413+ case <- ctx .Done ():
414+ s .logger .Infoln ("context cancelled, stopping commitSetPrices" )
415+ prevBatch , prevMeta := resetBatch ()
416+ submitBatch (prevBatch , prevMeta , false )
417+ return
412418 case priceData , ok := <- dataC :
413419 if ! ok {
414420 s .logger .Infoln ("stopping committing prices" )
@@ -485,12 +491,16 @@ func (s *oracleSvc) broadcastToClient(
485491 }, s .svcTags )
486492 }
487493
494+ diff := time .Since (ts )
495+
488496 batchLog .WithFields (log.Fields {
489497 "cosmosClient" : cosmosClient .ClientContext ().From ,
490498 "height" : txResp .TxResponse .Height ,
491499 "hash" : txResp .TxResponse .TxHash ,
492- "duration" : time .Since (ts ),
493- }).Infoln ("sent Tx successfully in " , time .Since (ts ))
500+ "duration" : diff ,
501+ }).Infoln ("sent Tx successfully in " , diff )
502+
503+ metrics .Timer ("price_oracle.execution_time" , diff , s .svcTags )
494504 return true
495505 }
496506
0 commit comments