Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 42 additions & 22 deletions tools/preconf-rpc/blocktracker/blocktracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package blocktracker

import (
"context"
"errors"
"log/slog"
"math/big"
"sync"
"sync/atomic"
"time"

Expand All @@ -22,7 +24,7 @@ type blockTracker struct {
blocks *lru.Cache[uint64, *types.Block]
client EthClient
log *slog.Logger
checkTrigger chan struct{}
checkCond *sync.Cond
}

func NewBlockTracker(client EthClient, log *slog.Logger) (*blockTracker, error) {
Expand All @@ -36,7 +38,7 @@ func NewBlockTracker(client EthClient, log *slog.Logger) (*blockTracker, error)
blocks: cache,
client: client,
log: log,
checkTrigger: make(chan struct{}, 1),
checkCond: sync.NewCond(&sync.Mutex{}),
}, nil
}

Expand All @@ -63,8 +65,8 @@ func (b *blockTracker) Start(ctx context.Context) <-chan struct{} {
}
_ = b.blocks.Add(blockNo, block)
b.latestBlockNo.Store(block.NumberU64())
b.log.Debug("New block detected", "number", block.NumberU64(), "hash", block.Hash().Hex())
b.triggerCheck()
b.log.Debug("New block detected", "number", block.NumberU64(), "hash", block.Hash().Hex())
}
}
}
Expand All @@ -73,48 +75,66 @@ func (b *blockTracker) Start(ctx context.Context) <-chan struct{} {
}

func (b *blockTracker) triggerCheck() {
select {
case b.checkTrigger <- struct{}{}:
default:
// Non-blocking send, if channel is full, we skip
}
b.checkCond.L.Lock()
b.checkCond.Broadcast()
b.checkCond.L.Unlock()
}

func (b *blockTracker) LatestBlockNumber() uint64 {
return b.latestBlockNo.Load()
}

func (b *blockTracker) NextBlockNumber() (uint64, time.Duration, error) {
block, found := b.blocks.Get(b.latestBlockNo.Load())
if !found {
return 0, 0, errors.New("latest block not found in cache")
}
blockTime := time.Unix(int64(block.Time()), 0)
return b.latestBlockNo.Load() + 1, time.Until(blockTime.Add(12 * time.Second)), nil
}

func (b *blockTracker) CheckTxnInclusion(
ctx context.Context,
txHash common.Hash,
blockNumber uint64,
) (bool, error) {
WaitForBlock:
for {
select {
case <-ctx.Done():
return false, ctx.Err()
case <-b.checkTrigger:
if blockNumber <= b.latestBlockNo.Load() {
break WaitForBlock
}
if blockNumber <= b.latestBlockNo.Load() {
return b.checkTxnInclusion(ctx, txHash, blockNumber)
}

waitCh := make(chan struct{})
go func() {
b.checkCond.L.Lock()
defer b.checkCond.L.Unlock()
for blockNumber > b.latestBlockNo.Load() {
b.checkCond.Wait()
}
close(waitCh)
}()

select {
case <-ctx.Done():
return false, ctx.Err()
case <-waitCh:
return b.checkTxnInclusion(ctx, txHash, blockNumber)
}
}

func (b *blockTracker) checkTxnInclusion(ctx context.Context, txHash common.Hash, blockNumber uint64) (bool, error) {
var err error
block, ok := b.blocks.Get(blockNumber)
if !ok {
block, err := b.client.BlockByNumber(ctx, big.NewInt(int64(blockNumber)))
block, err = b.client.BlockByNumber(ctx, big.NewInt(int64(blockNumber)))
if err != nil {
b.log.Error("Failed to get block by number", "error", err, "blockNumber", blockNumber)
return false, err
}
_ = b.blocks.Add(blockNumber, block)
}

for _, tx := range block.Transactions() {
if tx.Hash().Cmp(txHash) == 0 {
return true, nil
}
if txn := block.Transaction(txHash); txn != nil {
return true, nil
}

return false, nil
}
46 changes: 42 additions & 4 deletions tools/preconf-rpc/blocktracker/blocktracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ package blocktracker_test
import (
"context"
"hash"
"log/slog"
"math/big"
"os"
"testing"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/primev/mev-commit/tools/preconf-rpc/blocktracker"
"github.com/primev/mev-commit/x/util"
"golang.org/x/crypto/sha3"
)

Expand Down Expand Up @@ -74,7 +76,7 @@ func TestBlockTracker(t *testing.T) {
blk1 := types.NewBlock(
&types.Header{
Number: big.NewInt(100),
Time: 1622547800,
Time: uint64(time.Now().Unix()),
},
&types.Body{Transactions: []*types.Transaction{tx1, tx2}},
nil, // No receipts
Expand All @@ -84,7 +86,7 @@ func TestBlockTracker(t *testing.T) {
blk2 := types.NewBlock(
&types.Header{
Number: big.NewInt(101),
Time: 1622547900,
Time: uint64(time.Now().Add(12 * time.Second).Unix()),
},
&types.Body{Transactions: []*types.Transaction{tx3}},
nil, // No receipts
Expand All @@ -99,7 +101,7 @@ func TestBlockTracker(t *testing.T) {
},
}

tracker, err := blocktracker.NewBlockTracker(client, slog.Default())
tracker, err := blocktracker.NewBlockTracker(client, util.NewTestLogger(os.Stdout))
if err != nil {
t.Fatalf("Failed to create block tracker: %v", err)
}
Expand All @@ -112,6 +114,26 @@ func TestBlockTracker(t *testing.T) {

client.blockNumber <- 100

start := time.Now()
for {
bidBlockNo, duration, err := tracker.NextBlockNumber()
if err == nil {
if bidBlockNo != 101 {
t.Fatalf("Expected next block number to be 101, got %d", bidBlockNo)
}
if duration <= 0 {
t.Fatalf("Expected positive duration, got %v", duration)
}
break
} else {
t.Logf("Waiting for next block number: %v", err)
}
if time.Since(start) > 5*time.Second {
t.Fatalf("Timeout waiting for next block number")
}
time.Sleep(100 * time.Millisecond)
}

included, err := tracker.CheckTxnInclusion(ctx, tx1.Hash(), 100)
if err != nil {
t.Fatalf("Error checking transaction inclusion: %v", err)
Expand All @@ -128,6 +150,22 @@ func TestBlockTracker(t *testing.T) {

client.blockNumber <- 101

start = time.Now()
for {
bidBlockNo, duration, err := tracker.NextBlockNumber()
if err == nil {
if bidBlockNo == 102 && duration > 0 {
break
}
} else {
t.Logf("Waiting for next block number: %v", err)
}
if time.Since(start) > 5*time.Second {
t.Fatalf("Timeout waiting for next block number")
}
time.Sleep(100 * time.Millisecond)
}

included, err = tracker.CheckTxnInclusion(ctx, tx4.Hash(), 101)
if err != nil {
t.Fatalf("Error checking transaction inclusion: %v", err)
Expand Down
35 changes: 8 additions & 27 deletions tools/preconf-rpc/handlers/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
bidderapiv1 "github.com/primev/mev-commit/p2p/gen/go/bidderapi/v1"
"github.com/primev/mev-commit/tools/preconf-rpc/pricer"
"github.com/primev/mev-commit/tools/preconf-rpc/rpcserver"
"github.com/primev/mev-commit/tools/preconf-rpc/sender"
)
Expand All @@ -22,7 +21,7 @@ type Bidder interface {
}

type Pricer interface {
EstimatePrice(ctx context.Context) (*pricer.BlockPrices, error)
EstimatePrice(ctx context.Context) map[int64]float64
}

type Store interface {
Expand Down Expand Up @@ -135,14 +134,7 @@ func (h *rpcMethodHandler) RegisterMethods(server *rpcserver.JSONRPCServer) {
return json.RawMessage(fmt.Sprintf(`{"timeInSecs": "%d"}`, timeToOptIn)), false, nil
})
server.RegisterHandler("mevcommit_estimateDeposit", func(ctx context.Context, params ...any) (json.RawMessage, bool, error) {
blockPrices, err := h.pricer.EstimatePrice(ctx)
if err != nil {
h.logger.Error("Failed to estimate deposit price", "error", err)
return nil, false, rpcserver.NewJSONErr(
rpcserver.CodeCustomError,
"failed to estimate deposit price",
)
}
blockPrices := h.pricer.EstimatePrice(ctx)
cost := getNextBlockPrice(blockPrices)
result := map[string]interface{}{
"bidAmount": cost.String(),
Expand All @@ -161,14 +153,7 @@ func (h *rpcMethodHandler) RegisterMethods(server *rpcserver.JSONRPCServer) {
return resultJSON, false, nil
})
server.RegisterHandler("mevcommit_estimateBridge", func(ctx context.Context, params ...any) (json.RawMessage, bool, error) {
blockPrices, err := h.pricer.EstimatePrice(ctx)
if err != nil {
h.logger.Error("Failed to estimate bridge price", "error", err)
return nil, false, rpcserver.NewJSONErr(
rpcserver.CodeCustomError,
"failed to estimate bridge price",
)
}
blockPrices := h.pricer.EstimatePrice(ctx)
cost := getNextBlockPrice(blockPrices)
bridgeCost := new(big.Int).Mul(cost, big.NewInt(2))
result := map[string]interface{}{
Expand All @@ -192,15 +177,11 @@ func (h *rpcMethodHandler) RegisterMethods(server *rpcserver.JSONRPCServer) {
server.RegisterHandler("mevcommit_getBalance", h.handleMevCommitGetBalance)
}

func getNextBlockPrice(blockPrices *pricer.BlockPrices) *big.Int {
for _, price := range blockPrices.Prices {
if price.BlockNumber == blockPrices.CurrentBlockNumber+1 {
for _, estimate := range price.EstimatedPrices {
if estimate.Confidence == 99 {
priceInWei := estimate.PriorityFeePerGasGwei * 1e9
return new(big.Int).Mul(new(big.Int).SetUint64(uint64(priceInWei)), big.NewInt(21000))
}
}
func getNextBlockPrice(blockPrices map[int64]float64) *big.Int {
for confidence, price := range blockPrices {
if confidence == 99 {
priceInWei := price * 1e9 // Convert Gwei to Wei
return new(big.Int).Mul(new(big.Int).SetUint64(uint64(priceInWei)), big.NewInt(21000))
}
}

Expand Down
Loading
Loading