Skip to content

Commit abc9850

Browse files
update gateway payment balances to track by pipeline
1 parent df78778 commit abc9850

File tree

4 files changed

+91
-7
lines changed

4 files changed

+91
-7
lines changed

core/accounting.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,12 @@ package core
33
import (
44
"context"
55
"math/big"
6+
"strings"
67
"sync"
78
"time"
89

910
ethcommon "github.com/ethereum/go-ethereum/common"
11+
"github.com/golang/glog"
1012
"github.com/livepeer/go-livepeer/clog"
1113
)
1214

@@ -35,7 +37,7 @@ func (b *Balance) Credit(amount *big.Rat) {
3537
// to send with a payment, the new credit represented by the payment and the existing credit (i.e reserved balance)
3638
func (b *Balance) StageUpdate(minCredit, ev *big.Rat) (int, *big.Rat, *big.Rat) {
3739
existingCredit := b.balances.Reserve(b.addr, b.manifestID)
38-
40+
glog.Infof("existing credit for manifest id %v: %v", string(b.manifestID), existingCredit.FloatString(3))
3941
// If the existing credit exceeds the minimum credit then no tickets are required
4042
// and the total payment value is 0
4143
if existingCredit.Cmp(minCredit) >= 0 {
@@ -211,11 +213,16 @@ func (b *Balances) SetFixedPrice(id ManifestID, fixedPrice *big.Rat) {
211213

212214
func (b *Balances) cleanup() {
213215
for id, balance := range b.balances {
214-
b.mtx.Lock()
215-
if int64(time.Since(balance.lastUpdate)) > int64(b.ttl) {
216-
delete(b.balances, id)
216+
//only cleanup Balance if not a pipeline manifestID
217+
glog.Infof("checking to clear balance for: %v", id)
218+
if len(strings.Split(string(id), `_`)) == 1 {
219+
b.mtx.Lock()
220+
if int64(time.Since(balance.lastUpdate)) > int64(b.ttl) {
221+
glog.Infof("clearing balances session %v", id, balance.amount.FloatString(3))
222+
delete(b.balances, id)
223+
}
224+
b.mtx.Unlock()
217225
}
218-
b.mtx.Unlock()
219226
}
220227
}
221228

pm/sender.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ type Sender interface {
2121
// StartSession creates a session for a given set of ticket params which tracks information
2222
// for creating new tickets
2323
StartSession(ticketParams TicketParams) string
24+
StartSessionByID(ticketParams TicketParams, sessionID string) string
25+
UpdateSessionByID(TicketParams TicketParams, sessionID string)
2426

2527
// CleanupSession deletes session from the internal map
2628
CleanupSession(sessionID string)
@@ -75,6 +77,24 @@ func (s *sender) StartSession(ticketParams TicketParams) string {
7577
return sessionID
7678
}
7779

80+
func (s *sender) StartSessionByID(ticketParams TicketParams, sessionID string) string {
81+
s.sessions.Store(sessionID, &session{
82+
ticketParams: ticketParams,
83+
senderNonce: 0,
84+
})
85+
86+
return sessionID
87+
}
88+
89+
func (s *sender) UpdateSessionByID(ticketParams TicketParams, sessionID string) {
90+
_, err := s.loadSession(sessionID)
91+
if err != nil {
92+
s.StartSessionByID(ticketParams, sessionID)
93+
}
94+
session, _ := s.loadSession(sessionID)
95+
session.ticketParams = ticketParams
96+
}
97+
7898
// EV returns the ticket EV for a session
7999
func (s *sender) EV(sessionID string) (*big.Rat, error) {
80100
session, err := s.loadSession(sessionID)

server/ai_session.go

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,11 @@ import (
99
"sync"
1010
"time"
1111

12+
ethcommon "github.com/ethereum/go-ethereum/common"
1213
"github.com/livepeer/go-livepeer/clog"
1314
"github.com/livepeer/go-livepeer/common"
1415
"github.com/livepeer/go-livepeer/core"
16+
"github.com/livepeer/go-livepeer/pm"
1517
"github.com/livepeer/go-tools/drivers"
1618
"github.com/livepeer/lpms/stream"
1719
)
@@ -273,6 +275,8 @@ func (sel *AISessionSelector) Remove(sess *AISession) {
273275
}
274276

275277
func (sel *AISessionSelector) Refresh(ctx context.Context) error {
278+
oldBalances, oldSenderSessions := sel.getBalances()
279+
276280
sessions, err := sel.getSessions(ctx)
277281
if err != nil {
278282
return err
@@ -293,6 +297,9 @@ func (sel *AISessionSelector) Refresh(ctx context.Context) error {
293297
continue
294298
}
295299

300+
// update session to persist payment balances
301+
updateSessionForAI(sess, sel.cap, sel.modelID, sel.node.Balances, oldBalances, oldSenderSessions)
302+
296303
if modelConstraint.Warm {
297304
warmSessions = append(warmSessions, sess)
298305
} else {
@@ -308,6 +315,24 @@ func (sel *AISessionSelector) Refresh(ctx context.Context) error {
308315
return nil
309316
}
310317

318+
func (sel *AISessionSelector) getBalances() (map[string]Balance, map[string]pm.Sender) {
319+
balances := make(map[string]Balance)
320+
senders := make(map[string]pm.Sender)
321+
for _, sess := range sel.warmPool.sessMap {
322+
balances[sess.Transcoder()] = sess.Balance
323+
senders[sess.Transcoder()] = sess.Sender
324+
}
325+
326+
for _, sess := range sel.coldPool.sessMap {
327+
if _, ok := balances[sess.Transcoder()]; !ok {
328+
balances[sess.Transcoder()] = sess.Balance
329+
senders[sess.Transcoder()] = sess.Sender
330+
}
331+
}
332+
333+
return balances, senders
334+
}
335+
311336
func (sel *AISessionSelector) getSessions(ctx context.Context) ([]*BroadcastSession, error) {
312337
// No warm constraints applied here because we don't want to filter out orchs based on warm criteria at discovery time
313338
// Instead, we want all orchs that support the model and then will filter for orchs that have a warm model separately
@@ -384,9 +409,19 @@ func (c *AISessionManager) Select(ctx context.Context, cap core.Capability, mode
384409
return nil, nil
385410
}
386411

387-
if err := refreshSessionIfNeeded(ctx, sess.BroadcastSession); err != nil {
412+
//send a temp session to be refreshed
413+
// updateSession in broadcast.go updates the orchestrator OS and ticket params.
414+
// it also updates the pm.Sender session using ticket params and the Balance using the auth token.
415+
// we want to persist these to new
416+
newSess := *sess.BroadcastSession
417+
newSess.PMSessionID = strconv.Itoa(int(cap)) + "_" + modelID + "_" + "temp"
418+
newSess.Sender.StartSessionByID(*pmTicketParams(newSess.OrchestratorInfo.TicketParams), newSess.PMSessionID)
419+
if err := refreshSessionIfNeeded(ctx, &newSess); err != nil {
388420
return nil, err
389421
}
422+
sess.BroadcastSession.OrchestratorInfo = newSess.OrchestratorInfo
423+
sess.Sender.UpdateSessionByID(*pmTicketParams(sess.OrchestratorInfo.TicketParams), sess.PMSessionID)
424+
//updateSessionForAI(sess.BroadcastSession, cap, modelID, c.node.Balances)
390425

391426
return sess, nil
392427
}
@@ -432,3 +467,25 @@ func (c *AISessionManager) getSelector(ctx context.Context, cap core.Capability,
432467

433468
return sel, nil
434469
}
470+
471+
func updateSessionForAI(sess *BroadcastSession, cap core.Capability, modelID string, balances *core.AddressBalances, oldBalances map[string]Balance, oldSenderSessions map[string]pm.Sender) {
472+
//clean up other session
473+
sess.CleanupSession(sess.PMSessionID)
474+
// override PMSessionID to track tickets per pipeline/model
475+
transcoderUrl := sess.Transcoder()
476+
sess.lock.Lock()
477+
defer sess.lock.Unlock()
478+
sess.PMSessionID = strconv.Itoa(int(cap)) + "_" + modelID
479+
// save balance between refreshes
480+
if oldBalance, ok := oldBalances[transcoderUrl]; ok {
481+
sess.Balance = oldBalance
482+
} else {
483+
sess.Balance = core.NewBalance(ethcommon.BytesToAddress(sess.OrchestratorInfo.TicketParams.Recipient), core.ManifestID(strconv.Itoa(int(cap))+"_"+modelID), balances)
484+
}
485+
// save sender sessions between refreshes
486+
if oldSenderSession, ok := oldSenderSessions[transcoderUrl]; ok {
487+
sess.Sender = oldSenderSession
488+
} else {
489+
sess.Sender.StartSessionByID(*pmTicketParams(sess.OrchestratorInfo.TicketParams), sess.PMSessionID)
490+
}
491+
}

server/segment_rpc.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -781,7 +781,7 @@ func newBalanceUpdate(sess *BroadcastSession, minCredit *big.Rat) (*BalanceUpdat
781781
}
782782

783783
update.NumTickets, update.NewCredit, update.ExistingCredit = sess.Balance.StageUpdate(safeMinCredit, ev)
784-
784+
glog.Infof("Staged balance update - numTickets=%v newCredit=%v existingCredit=%v", update.NumTickets, update.NewCredit.FloatString(3), update.ExistingCredit.FloatString(3))
785785
return update, nil
786786
}
787787

0 commit comments

Comments
 (0)