@@ -145,6 +145,8 @@ type connectionManager struct {
145145 validator validator
146146 p2pDialer p2p.Dialer
147147 timeGetter TimeGetter
148+ priceCheckInterval time.Duration
149+ priceDropPercent float64
148150
149151 // These are populated by Connect at runtime.
150152 ctx context.Context
@@ -207,6 +209,8 @@ func NewManager(
207209 preReconnect : preReconnect ,
208210 postReconnect : postReconnect ,
209211 uuid : uuid .String (),
212+ priceDropPercent : 10 , // reconnect if price dropped 10% or more
213+ priceCheckInterval : 30 * time .Second ,
210214 }
211215
212216 m .eventBus .SubscribeAsync (connectionstate .AppTopicConnectionState , m .reconnectOnHold )
@@ -303,6 +307,7 @@ func (m *connectionManager) Connect(consumerID identity.Identity, hermesID commo
303307
304308 go m .consumeConnectionStates (m .activeConnection .State ())
305309 go m .checkSessionIP (m .channel , m .connectOptions .ConsumerID , m .connectOptions .SessionID , originalPublicIP )
310+ go m .monitorPrice (prc , proposalLookup )
306311
307312 return nil
308313}
@@ -384,7 +389,7 @@ func (m *connectionManager) initSession(tracer *trace.Tracer, prc market.Price)
384389 m .setStatus (func (status * connectionstate.Status ) {
385390 status .SessionID = sessionID
386391 })
387- m .publishSessionCreate (sessionID )
392+ m .publishSessionCreate ()
388393 paymentSession .SetSessionID (string (sessionID ))
389394 tracer .EndStage (traceStart )
390395
@@ -662,7 +667,7 @@ func (m *connectionManager) createP2PSession(c Connection, opts ConnectOptions,
662667 return & sessionResponse , nil
663668}
664669
665- func (m * connectionManager ) publishSessionCreate (sessionID session. ID ) {
670+ func (m * connectionManager ) publishSessionCreate () {
666671 sessionInfo := m .Status ()
667672 // avoid printing IP address in logs
668673 sessionInfo .ConsumerLocation .IP = ""
@@ -1031,3 +1036,30 @@ func logDisconnectError(err error) {
10311036 log .Error ().Err (err ).Msg ("Disconnect error" )
10321037 }
10331038}
1039+
1040+ func (m * connectionManager ) monitorPrice (currentPrice market.Price , proposalLookup ProposalLookup ) {
1041+ t := time .NewTicker (m .priceCheckInterval )
1042+ for {
1043+ select {
1044+ case <- m .currentCtx ().Done ():
1045+ return
1046+ case <- t .C :
1047+ proposal , err := proposalLookup ()
1048+ if err != nil {
1049+ log .Error ().Err (err ).Msg ("Failed to lookup proposal" )
1050+ continue
1051+ }
1052+ newPrice := m .priceFromProposal (* proposal )
1053+
1054+ // Check if both GiB and Hourly prices dropped by at least 10%
1055+ giBDrop := float64 (currentPrice .PricePerGiB .Int64 ()- newPrice .PricePerGiB .Int64 ()) / float64 (currentPrice .PricePerGiB .Int64 ())
1056+ hourDrop := float64 (currentPrice .PricePerHour .Int64 ()- newPrice .PricePerHour .Int64 ()) / float64 (currentPrice .PricePerHour .Int64 ())
1057+
1058+ if giBDrop * 100 >= m .priceDropPercent || hourDrop * 100 >= m .priceDropPercent {
1059+ log .Info ().Msgf ("Price dropped significantly from %q to %q, disconnecting" , currentPrice .String (), newPrice .String ())
1060+ m .Disconnect ()
1061+ return
1062+ }
1063+ }
1064+ }
1065+ }
0 commit comments