diff --git a/cmd/livepeer/starter/flags.go b/cmd/livepeer/starter/flags.go index ff3395623d..3edb3fefe0 100644 --- a/cmd/livepeer/starter/flags.go +++ b/cmd/livepeer/starter/flags.go @@ -100,6 +100,7 @@ func NewLivepeerConfig(fs *flag.FlagSet) LivepeerConfig { cfg.MaxTotalEV = fs.String("maxTotalEV", *cfg.MaxTotalEV, "The maximum acceptable expected value for one PM payment") // Broadcaster deposit multiplier to determine max acceptable ticket faceValue cfg.DepositMultiplier = fs.Int("depositMultiplier", *cfg.DepositMultiplier, "The deposit multiplier used to determine max acceptable faceValue for PM tickets") + cfg.IgnoreSenderReserve = fs.Bool("ignoreSenderReserve", *cfg.IgnoreSenderReserve, "Ignore sender reserve; lowers gateway reserve needs but allows double-spending risk (unsafe)") // Orchestrator base pricing info cfg.PricePerUnit = fs.String("pricePerUnit", "0", "The price per 'pixelsPerUnit' amount pixels. Can be specified in wei or a custom currency in the format (e.g. 0.50USD). When using a custom currency, a corresponding price feed must be configured with -priceFeedAddr") // Unit of pixels for both O's pricePerUnit and B's maxPricePerUnit diff --git a/cmd/livepeer/starter/starter.go b/cmd/livepeer/starter/starter.go index 57abfd4356..ed75cbc41f 100755 --- a/cmd/livepeer/starter/starter.go +++ b/cmd/livepeer/starter/starter.go @@ -138,6 +138,7 @@ type LivepeerConfig struct { MaxTicketEV *string MaxTotalEV *string DepositMultiplier *int + IgnoreSenderReserve *bool PricePerUnit *string PixelsPerUnit *string PriceFeedAddr *string @@ -263,6 +264,7 @@ func DefaultLivepeerConfig() LivepeerConfig { defaultMaxPricePerUnit := "0" defaultMaxPricePerCapability := "" defaultIgnoreMaxPriceIfNeeded := false + defaultIgnoreSenderReserve := false defaultPixelsPerUnit := "1" defaultPriceFeedAddr := "0x639Fe6ab55C921f74e7fac1ee960C0B6293ba612" // ETH / USD price feed address on Arbitrum Mainnet defaultAutoAdjustPrice := true @@ -378,6 +380,7 @@ func DefaultLivepeerConfig() LivepeerConfig { MaxTicketEV: &defaultMaxTicketEV, MaxTotalEV: &defaultMaxTotalEV, DepositMultiplier: &defaultDepositMultiplier, + IgnoreSenderReserve: &defaultIgnoreSenderReserve, MaxPricePerUnit: &defaultMaxPricePerUnit, MaxPricePerCapability: &defaultMaxPricePerCapability, IgnoreMaxPriceIfNeeded: &defaultIgnoreMaxPriceIfNeeded, @@ -1015,9 +1018,10 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) { defer sm.Stop() tcfg := pm.TicketParamsConfig{ - EV: ev, - RedeemGas: redeemGas, - TxCostMultiplier: txCostMultiplier, + EV: ev, + RedeemGas: redeemGas, + TxCostMultiplier: txCostMultiplier, + IgnoreSenderReserve: *cfg.IgnoreSenderReserve, } n.Recipient, err = pm.NewRecipient( recipientAddr, @@ -1032,6 +1036,9 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) { glog.Errorf("Error setting up PM recipient: %v", err) return } + if *cfg.IgnoreSenderReserve { + glog.Warning("Sender reserve requirements disabled; relying on broadcaster deposit to cover ticket face value. Double-spend protection is reduced.") + } mfv, _ := new(big.Int).SetString(*cfg.MaxFaceValue, 10) if mfv == nil { panic(fmt.Errorf("-maxFaceValue must be a valid integer, but %v provided. Restart the node with a different valid value for -maxFaceValue", *cfg.MaxFaceValue)) diff --git a/pm/recipient.go b/pm/recipient.go index 2199842489..028c89411c 100644 --- a/pm/recipient.go +++ b/pm/recipient.go @@ -69,6 +69,9 @@ type TicketParamsConfig struct { // TxCostMultiplier is the desired multiplier of the transaction // cost for redemption TxCostMultiplier int + + // IgnoreSenderReserve instructs the recipient to skip sender reserve checks and accept tickets as long as the computed face value meets EV requirements + IgnoreSenderReserve bool } // GasPriceMonitor defines methods for monitoring gas prices @@ -271,16 +274,28 @@ func (r *recipient) faceValue(sender ethcommon.Address) (*big.Int, error) { faceValue = new(big.Int).Mul(r.cfg.EV, evMultiplier) } - // Fetch current max float for sender - maxFloat, err := r.sm.MaxFloat(sender) - if err != nil { - return nil, err - } + var maxFloat *big.Int + if !r.cfg.IgnoreSenderReserve { + // Fetch current max float for sender + var err error + maxFloat, err = r.sm.MaxFloat(sender) + if err != nil { + return nil, err + } - if faceValue.Cmp(maxFloat) > 0 { - // If faceValue > maxFloat - // Set faceValue = maxFloat - faceValue = maxFloat + if faceValue.Cmp(maxFloat) > 0 { + // If faceValue > maxFloat + // Set faceValue = maxFloat + faceValue = maxFloat + } + } else { + available, err := r.sm.SenderFunds(sender) + if err != nil { + return nil, err + } + if available.Cmp(faceValue) < 0 { + return nil, errInsufficientSenderReserve + } } if r.maxfacevalue.Cmp(big.NewInt(0)) > 0 { @@ -290,7 +305,9 @@ func (r *recipient) faceValue(sender ethcommon.Address) (*big.Int, error) { } if monitor.Enabled { monitor.TicketFaceValue(sender.Hex(), faceValue) - monitor.MaxFloat(sender.Hex(), maxFloat) + if !r.cfg.IgnoreSenderReserve && maxFloat != nil { + monitor.MaxFloat(sender.Hex(), maxFloat) + } } if faceValue.Cmp(r.cfg.EV) < 0 { return nil, errInsufficientSenderReserve diff --git a/pm/recipient_test.go b/pm/recipient_test.go index d84f5fab39..4371e6fb72 100644 --- a/pm/recipient_test.go +++ b/pm/recipient_test.go @@ -530,6 +530,21 @@ func TestTicketParams(t *testing.T) { _, err = r.TicketParams(sender, big.NewRat(1, 1)) assert.EqualError(err, errInsufficientSenderReserve.Error()) + // Test ignoring sender reserve requirements bypasses maxFloat enforcement + cfg.IgnoreSenderReserve = true + sm.maxFloat = big.NewInt(100) + sm.deposit = new(big.Int).Exp(big.NewInt(10), big.NewInt(18), nil) + rIgnore := NewRecipientWithSecret(recipient, b, v, gm, sm, tm, secret, cfg) + paramsIgnore, err := rIgnore.TicketParams(sender, big.NewRat(1, 1)) + require.Nil(err) + assert.True(paramsIgnore.FaceValue.Cmp(sm.maxFloat) > 0) + + // Test ignoring sender reserve requirements errors if deposit < faceValue + sm.deposit = new(big.Int).Sub(paramsIgnore.FaceValue, big.NewInt(1)) + _, err = rIgnore.TicketParams(sender, big.NewRat(1, 1)) + assert.EqualError(err, errInsufficientSenderReserve.Error()) + cfg.IgnoreSenderReserve = false + // Test faceValue < txCostWithGasPrice(current gasPrice) and faceValue > txCostWithGasPrice(avg gasPrice) // Set current gasPrice higher than avg gasPrice gm.gasPrice = new(big.Int).Add(avgGasPrice, big.NewInt(1)) diff --git a/pm/sendermonitor.go b/pm/sendermonitor.go index 5ef5dcfedd..ebb82ff3c2 100644 --- a/pm/sendermonitor.go +++ b/pm/sendermonitor.go @@ -41,6 +41,9 @@ type SenderMonitor interface { // MaxFloat returns a remote sender's max float MaxFloat(addr ethcommon.Address) (*big.Int, error) + // SenderFunds returns the broadcaster's spendable balance (deposit + total reserve - pending tickets) + SenderFunds(addr ethcommon.Address) (*big.Int, error) + // ValidateSender checks whether a sender's unlock period ends the round after the next round ValidateSender(addr ethcommon.Address) error } @@ -162,6 +165,35 @@ func (sm *LocalSenderMonitor) MaxFloat(addr ethcommon.Address) (*big.Int, error) return sm.maxFloat(addr) } +// SenderFunds returns the sender's deposit plus total reserve minus pending tickets +func (sm *LocalSenderMonitor) SenderFunds(addr ethcommon.Address) (*big.Int, error) { + sm.mu.Lock() + defer sm.mu.Unlock() + + sm.ensureCache(addr) + + info, err := sm.smgr.GetSenderInfo(addr) + if err != nil { + return nil, err + } + + totalReserve := new(big.Int).Set(info.Reserve.FundsRemaining) + if info.Reserve.ClaimedInCurrentRound != nil { + totalReserve.Add(totalReserve, info.Reserve.ClaimedInCurrentRound) + } + + available := new(big.Int).Set(totalReserve) + if info.Deposit != nil { + available.Add(available, info.Deposit) + } + available.Sub(available, sm.senders[addr].pendingAmount) + if available.Sign() < 0 { + available = big.NewInt(0) + } + + return available, nil +} + // QueueTicket adds a ticket to the queue for a remote sender func (sm *LocalSenderMonitor) QueueTicket(ticket *SignedTicket) error { sm.mu.Lock() diff --git a/pm/stub.go b/pm/stub.go index 33ba0352d1..1c2c53bc3a 100644 --- a/pm/stub.go +++ b/pm/stub.go @@ -381,6 +381,7 @@ func (s *stubGasPriceMonitor) GasPrice() *big.Int { type stubSenderMonitor struct { maxFloat *big.Int + funds *big.Int redeemable chan *redemption queued []*SignedTicket acceptable bool @@ -388,12 +389,14 @@ type stubSenderMonitor struct { maxFloatErr error validateSenderErr error shouldFail error + deposit *big.Int } func newStubSenderMonitor() *stubSenderMonitor { return &stubSenderMonitor{ maxFloat: big.NewInt(0), redeemable: make(chan *redemption), + deposit: big.NewInt(0), } } @@ -433,6 +436,13 @@ func (s *stubSenderMonitor) MaxFloat(_ ethcommon.Address) (*big.Int, error) { return s.maxFloat, nil } +func (s *stubSenderMonitor) SenderFunds(_ ethcommon.Address) (*big.Int, error) { + if s.funds != nil { + return new(big.Int).Set(s.funds), nil + } + return s.deposit, nil +} + func (s *stubSenderMonitor) ValidateSender(_ ethcommon.Address) error { return s.validateSenderErr } // MockRecipient is useful for testing components that depend on pm.Recipient diff --git a/server/redeemer.go b/server/redeemer.go index ccec858b43..c24c2d4e27 100644 --- a/server/redeemer.go +++ b/server/redeemer.go @@ -272,6 +272,29 @@ func (r *RedeemerClient) MaxFloat(sender ethcommon.Address) (*big.Int, error) { return mf, nil } +// SenderFunds retrieves the sender's spendable balance directly from the local sender manager cache +func (r *RedeemerClient) SenderFunds(sender ethcommon.Address) (*big.Int, error) { + info, err := r.sm.GetSenderInfo(sender) + if err != nil { + return nil, err + } + + totalReserve := new(big.Int).Set(info.Reserve.FundsRemaining) + if info.Reserve.ClaimedInCurrentRound != nil { + totalReserve.Add(totalReserve, info.Reserve.ClaimedInCurrentRound) + } + + available := new(big.Int).Set(totalReserve) + if info.Deposit != nil { + available.Add(available, info.Deposit) + } + + if available.Sign() < 0 { + available = big.NewInt(0) + } + return available, nil +} + // ValidateSender checks whether a sender has not recently unlocked its deposit and reserve func (r *RedeemerClient) ValidateSender(sender ethcommon.Address) error { info, err := r.sm.GetSenderInfo(sender)