Skip to content
158 changes: 129 additions & 29 deletions cmd/ulxly/ulxly.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"os"
"strconv"
"strings"
"sync"
"time"

"github.com/ethereum/go-ethereum"
Expand Down Expand Up @@ -646,16 +647,19 @@ func claimEverything(cmd *cobra.Command) error {
RPCURL := *inputUlxlyArgs.rpcURL
limit := *inputUlxlyArgs.bridgeLimit
offset := *inputUlxlyArgs.bridgeOffset
concurrency := *inputUlxlyArgs.concurrency
urls, err := getBridgeServiceURLs()
if err != nil {
return err
}

depositMap := make(map[DepositID]*BridgeDeposit)

for _, bridgeServiceUrl := range urls {
deposits, bErr := getDepositsForAddress(fmt.Sprintf("%s/bridges/%s?offset=%d&limit=%d", bridgeServiceUrl, destinationAddress, offset, limit))
for bridgeServiceId, bridgeServiceUrl := range urls {
url := fmt.Sprintf("%s/bridges/%s?offset=%d&limit=%d", bridgeServiceUrl, destinationAddress, offset, limit)
deposits, bErr := getDepositsForAddress(url)
if bErr != nil {
log.Err(bErr).Uint32("id", bridgeServiceId).Str("url", url).Msgf("Error getting deposits for bridge: %s", bErr.Error())
return bErr
}
for idx, deposit := range deposits {
Expand All @@ -670,7 +674,7 @@ func claimEverything(cmd *cobra.Command) error {
continue
}

// if this new deposit is ready for claim OR it has already been claimed we should over ride the exisitng value
// if this new deposit is ready for claim OR it has already been claimed we should override the existing value
if deposit.ReadyForClaim || deposit.ClaimTxHash != "" {
depositMap[depId] = &deposits[idx]
}
Expand All @@ -692,36 +696,128 @@ func claimEverything(cmd *cobra.Command) error {
if err != nil {
return err
}
log.Info().Uint32("networkID", currentNetworkID).Msg("detected current networkid")
for _, deposit := range depositMap {
if deposit.DestNet != currentNetworkID {
log.Debug().Uint32("destination_network", deposit.DestNet).Msg("discarding deposit for different network")
continue
}
if deposit.ClaimTxHash != "" {
log.Info().Str("txhash", deposit.ClaimTxHash).Msg("It looks like this tx was already claimed")
continue
}
claimTx, dErr := claimSingleDeposit(cmd, client, bridgeContract, opts, *deposit, urls, currentNetworkID)
if dErr != nil {
log.Warn().Err(dErr).Uint32("DepositCnt", deposit.DepositCnt).
Uint32("OrigNet", deposit.OrigNet).
Uint32("DestNet", deposit.DestNet).
Uint32("NetworkID", deposit.NetworkID).
Str("OrigAddr", deposit.OrigAddr).
Str("DestAddr", deposit.DestAddr).
Msg("There was an error claiming")
continue
}
dErr = WaitMineTransaction(cmd.Context(), client, claimTx, timeoutTxnReceipt)
if dErr != nil {
log.Error().Err(dErr).Msg("error while waiting for tx to main")
}
log.Info().Uint32("networkID", currentNetworkID).Msg("current network")

workPool := make(chan *BridgeDeposit, concurrency) // bounded chan for controlled concurrency

nonceCounter, err := currentNonce(cmd.Context(), client, destinationAddress)
if err != nil {
return err
}
log.Info().Int64("nonce", nonceCounter.Int64()).Msg("starting nonce")
nonceMutex := sync.Mutex{}
nonceIncrement := big.NewInt(1)
retryNonces := make(chan *big.Int, concurrency) // bounded same as workPool

wg := sync.WaitGroup{} // wg so the last ones can get processed

for _, d := range depositMap {
wg.Add(1)
workPool <- d // block until a slot is available
go func(deposit *BridgeDeposit) {
defer func() {
<-workPool // release work slot
}()
defer wg.Done()

if deposit.DestNet != currentNetworkID {
log.Debug().Uint32("destination_network", deposit.DestNet).Msg("discarding deposit for different network")
return
}
if deposit.ClaimTxHash != "" {
log.Info().Str("txhash", deposit.ClaimTxHash).Msg("It looks like this tx was already claimed")
return
}
// Either use the next retry nonce, or set and increment the next one
var nextNonce *big.Int
select {
case n := <-retryNonces:
nextNonce = n
default:
nonceMutex.Lock()
nextNonce = big.NewInt(nonceCounter.Int64())
nonceCounter = nonceCounter.Add(nonceCounter, nonceIncrement)
nonceMutex.Unlock()
}
log.Info().Int64("nonce", nextNonce.Int64()).Msg("Next nonce")

claimTx, dErr := claimSingleDeposit(cmd, client, bridgeContract, withNonce(opts, nextNonce), *deposit, urls, currentNetworkID)
if dErr != nil {
log.Warn().Err(dErr).Uint32("DepositCnt", deposit.DepositCnt).
Uint32("OrigNet", deposit.OrigNet).
Uint32("DestNet", deposit.DestNet).
Uint32("NetworkID", deposit.NetworkID).
Str("OrigAddr", deposit.OrigAddr).
Str("DestAddr", deposit.DestAddr).
Int64("nonce", nextNonce.Int64()).
Msg("There was an error claiming")

// Some nonces should not be reused
if strings.Contains(dErr.Error(), "could not replace existing") {
return
}
if strings.Contains(dErr.Error(), "already known") {
return
}
if strings.Contains(dErr.Error(), "nonce is too low") {
return
}
// are there other cases?
retryNonces <- nextNonce
return
}
dErr = WaitMineTransaction(cmd.Context(), client, claimTx, timeoutTxnReceipt)
if dErr != nil {
log.Error().Err(dErr).Msg("error while waiting for tx to mine")
}
}(d)
}

wg.Wait()
return nil
}

func currentNonce(ctx context.Context, client *ethclient.Client, address string) (*big.Int, error) {
addr := common.HexToAddress(address)
nonce, err := client.NonceAt(ctx, addr, nil)
if err != nil {
log.Error().Err(err).Str("address", addr.Hex()).Msg("Failed to get nonce")
return nil, err
}
n := int64(nonce)
return big.NewInt(n), nil
}

// todo: implement for other fields in library, or find a library that does this
func withNonce(opts *bind.TransactOpts, newNonce *big.Int) *bind.TransactOpts {
if opts == nil {
return nil
}
clone := &bind.TransactOpts{
From: opts.From,
Signer: opts.Signer,
GasLimit: opts.GasLimit,
Context: opts.Context, // Usually OK to share, unless you need a separate context
NoSend: opts.NoSend,
}
// Deep-copy big.Int fields
if opts.Value != nil {
clone.Value = new(big.Int).Set(opts.Value)
}
if opts.GasFeeCap != nil {
clone.GasFeeCap = new(big.Int).Set(opts.GasFeeCap)
}
if opts.GasTipCap != nil {
clone.GasTipCap = new(big.Int).Set(opts.GasTipCap)
}
// Set the new nonce
if newNonce != nil {
clone.Nonce = new(big.Int).Set(newNonce)
}

return clone
}

func claimSingleDeposit(cmd *cobra.Command, client *ethclient.Client, bridgeContract *ulxly.Ulxly, opts *bind.TransactOpts, deposit BridgeDeposit, bridgeURLs map[uint32]string, currentNetworkID uint32) (*types.Transaction, error) {
networkIDForBridgeService := deposit.NetworkID
if deposit.NetworkID == 0 {
Expand Down Expand Up @@ -807,7 +903,7 @@ func WaitMineTransaction(ctx context.Context, client *ethclient.Client, tx *type
continue
}
if r.Status != 0 {
log.Info().Interface("txHash", r.TxHash).Msg("Deposit transaction successful")
log.Info().Interface("txHash", r.TxHash).Msg("transaction successful")
return nil
} else if r.Status == 0 {
log.Error().Interface("txHash", r.TxHash).Msg("Deposit transaction failed")
Expand Down Expand Up @@ -1486,6 +1582,7 @@ type ulxlyArgs struct {
bridgeLimit *int
bridgeOffset *int
wait *time.Duration
concurrency *uint
}

var inputUlxlyArgs = ulxlyArgs{}
Expand Down Expand Up @@ -1543,6 +1640,7 @@ const (
ArgBridgeLimit = "bridge-limit"
ArgBridgeOffset = "bridge-offset"
ArgWait = "wait"
ArgConcurrency = "concurrency"
)

func prepInputs(cmd *cobra.Command, args []string) error {
Expand Down Expand Up @@ -1821,6 +1919,8 @@ or if it's actually an intermediate hash.`,
inputUlxlyArgs.bridgeServiceURLs = claimEverythingCommand.Flags().StringSlice(ArgBridgeMappings, nil, "Mappings between network ids and bridge service urls. E.g. '1=http://network-1-bridgeurl,7=http://network-2-bridgeurl'")
inputUlxlyArgs.bridgeLimit = claimEverythingCommand.Flags().Int(ArgBridgeLimit, 25, "Limit the number or responses returned by the bridge service when claiming")
inputUlxlyArgs.bridgeOffset = claimEverythingCommand.Flags().Int(ArgBridgeOffset, 0, "The offset to specify for pagination of the underlying bridge service deposits")
inputUlxlyArgs.concurrency = claimEverythingCommand.Flags().Uint(ArgConcurrency, 1, "The worker pool size for claims")

fatalIfError(claimEverythingCommand.MarkFlagRequired(ArgBridgeMappings))

// Top Level
Expand Down
1 change: 1 addition & 0 deletions doc/polycli_ulxly__claim-everything.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ polycli ulxly claim-everything [flags]
--bridge-limit int Limit the number or responses returned by the bridge service when claiming (default 25)
--bridge-offset int The offset to specify for pagination of the underlying bridge service deposits
--bridge-service-map strings Mappings between network ids and bridge service urls. E.g. '1=http://network-1-bridgeurl,7=http://network-2-bridgeurl'
--concurrency uint The worker pool size for claims (default 1)
-h, --help help for claim-everything
```

Expand Down
Loading