Skip to content

Commit 2eb1f54

Browse files
author
Alok
committed
fix: temp
1 parent dea01d0 commit 2eb1f54

File tree

2 files changed

+74
-27
lines changed

2 files changed

+74
-27
lines changed

tools/preconf-rpc/blocktracker/blocktracker.go

Lines changed: 31 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"log/slog"
66
"math/big"
7+
"sync"
78
"sync/atomic"
89
"time"
910

@@ -22,7 +23,7 @@ type blockTracker struct {
2223
blocks *lru.Cache[uint64, *types.Block]
2324
client EthClient
2425
log *slog.Logger
25-
checkTrigger chan struct{}
26+
checkCond *sync.Cond
2627
}
2728

2829
func NewBlockTracker(client EthClient, log *slog.Logger) (*blockTracker, error) {
@@ -36,7 +37,7 @@ func NewBlockTracker(client EthClient, log *slog.Logger) (*blockTracker, error)
3637
blocks: cache,
3738
client: client,
3839
log: log,
39-
checkTrigger: make(chan struct{}, 1),
40+
checkCond: sync.NewCond(&sync.Mutex{}),
4041
}, nil
4142
}
4243

@@ -73,11 +74,9 @@ func (b *blockTracker) Start(ctx context.Context) <-chan struct{} {
7374
}
7475

7576
func (b *blockTracker) triggerCheck() {
76-
select {
77-
case b.checkTrigger <- struct{}{}:
78-
default:
79-
// Non-blocking send, if channel is full, we skip
80-
}
77+
b.checkCond.L.Lock()
78+
b.checkCond.Broadcast()
79+
b.checkCond.L.Unlock()
8180
}
8281

8382
func (b *blockTracker) LatestBlockNumber() uint64 {
@@ -89,32 +88,43 @@ func (b *blockTracker) CheckTxnInclusion(
8988
txHash common.Hash,
9089
blockNumber uint64,
9190
) (bool, error) {
92-
WaitForBlock:
93-
for {
94-
select {
95-
case <-ctx.Done():
96-
return false, ctx.Err()
97-
case <-b.checkTrigger:
98-
if blockNumber <= b.latestBlockNo.Load() {
99-
break WaitForBlock
100-
}
91+
if blockNumber <= b.latestBlockNo.Load() {
92+
return b.checkTxnInclusion(ctx, txHash, blockNumber)
93+
}
94+
95+
waitCh := make(chan struct{})
96+
go func() {
97+
b.checkCond.L.Lock()
98+
defer b.checkCond.L.Unlock()
99+
for blockNumber > b.latestBlockNo.Load() {
100+
b.checkCond.Wait()
101101
}
102+
close(waitCh)
103+
}()
104+
105+
select {
106+
case <-ctx.Done():
107+
return false, ctx.Err()
108+
case <-waitCh:
109+
return b.checkTxnInclusion(ctx, txHash, blockNumber)
102110
}
111+
}
103112

113+
func (b *blockTracker) checkTxnInclusion(ctx context.Context, txHash common.Hash, blockNumber uint64) (bool, error) {
114+
var err error
104115
block, ok := b.blocks.Get(blockNumber)
105116
if !ok {
106-
block, err := b.client.BlockByNumber(ctx, big.NewInt(int64(blockNumber)))
117+
block, err = b.client.BlockByNumber(ctx, big.NewInt(int64(blockNumber)))
107118
if err != nil {
108119
b.log.Error("Failed to get block by number", "error", err, "blockNumber", blockNumber)
109120
return false, err
110121
}
111122
_ = b.blocks.Add(blockNumber, block)
112123
}
113124

114-
for _, tx := range block.Transactions() {
115-
if tx.Hash().Cmp(txHash) == 0 {
116-
return true, nil
117-
}
125+
if txn := block.Transaction(txHash); txn != nil {
126+
return true, nil
118127
}
128+
119129
return false, nil
120130
}

tools/preconf-rpc/pricer/pricer.go

Lines changed: 43 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@ import (
55
"encoding/json"
66
"errors"
77
"io"
8+
"log/slog"
89
"net/http"
10+
"sync"
911
"time"
1012
)
1113

@@ -21,23 +23,58 @@ type BlockPrice struct {
2123
EstimatedPrices []EstimatedPrice `json:"estimatedPrices"`
2224
}
2325

24-
type BlockPrices struct {
25-
MsSinceLastBlock int64 `json:"msSinceLastBlock"`
26+
type blockPrices struct {
2627
CurrentBlockNumber int64 `json:"currentBlockNumber"`
2728
Prices []BlockPrice `json:"blockPrices"`
2829
}
2930

3031
type BidPricer struct {
31-
apiKey string
32+
apiKey string
33+
log *slog.Logger
34+
mu sync.RWMutex // Protects currentEstimates
35+
currentEstimates map[int64]float64
3236
}
3337

34-
func NewPricer(apiKey string) *BidPricer {
38+
func NewPricer(apiKey string, logger *slog.Logger) *BidPricer {
3539
return &BidPricer{
3640
apiKey: apiKey,
41+
log: logger,
3742
}
3843
}
3944

40-
func (b *BidPricer) EstimatePrice(ctx context.Context) (*BlockPrices, error) {
45+
func (b *BidPricer) Start(ctx context.Context) <-chan struct{} {
46+
done := make(chan struct{})
47+
go func() {
48+
defer close(done)
49+
ticker := time.NewTicker(2 * time.Second) // Adjust the ticker interval as needed
50+
defer ticker.Stop()
51+
52+
for {
53+
select {
54+
case <-ctx.Done():
55+
return
56+
case <-ticker.C:
57+
if _, err := b.syncEstimate(ctx); err != nil {
58+
b.log.Error("Failed to estimate price", "error", err)
59+
}
60+
}
61+
}
62+
}()
63+
return done
64+
}
65+
66+
func (b *BidPricer) EstimatePrice(ctx context.Context) map[int64]float64 {
67+
b.mu.RLock()
68+
defer b.mu.RUnlock()
69+
70+
estimates := make(map[int64]float64)
71+
for blockNumber, price := range b.currentEstimates {
72+
estimates[blockNumber] = price
73+
}
74+
return estimates
75+
}
76+
77+
func (b *BidPricer) SyncEstimate(ctx context.Context) error {
4178
client := &http.Client{
4279
Timeout: 10 * time.Second,
4380
}
@@ -69,7 +106,7 @@ func (b *BidPricer) EstimatePrice(ctx context.Context) (*BlockPrices, error) {
69106
return nil, err
70107
}
71108

72-
bp := new(BlockPrices)
109+
bp := new(blockPrices)
73110
if err := json.Unmarshal(respBuf, bp); err != nil {
74111
return nil, err
75112
}

0 commit comments

Comments
 (0)