@@ -2,8 +2,10 @@ package blocktracker
22
33import (
44 "context"
5+ "errors"
56 "log/slog"
67 "math/big"
8+ "sync"
79 "sync/atomic"
810 "time"
911
@@ -22,7 +24,7 @@ type blockTracker struct {
2224 blocks * lru.Cache [uint64 , * types.Block ]
2325 client EthClient
2426 log * slog.Logger
25- checkTrigger chan struct {}
27+ checkCond * sync. Cond
2628}
2729
2830func NewBlockTracker (client EthClient , log * slog.Logger ) (* blockTracker , error ) {
@@ -36,7 +38,7 @@ func NewBlockTracker(client EthClient, log *slog.Logger) (*blockTracker, error)
3638 blocks : cache ,
3739 client : client ,
3840 log : log ,
39- checkTrigger : make ( chan struct {}, 1 ),
41+ checkCond : sync . NewCond ( & sync. Mutex {} ),
4042 }, nil
4143}
4244
@@ -63,8 +65,8 @@ func (b *blockTracker) Start(ctx context.Context) <-chan struct{} {
6365 }
6466 _ = b .blocks .Add (blockNo , block )
6567 b .latestBlockNo .Store (block .NumberU64 ())
66- b .log .Debug ("New block detected" , "number" , block .NumberU64 (), "hash" , block .Hash ().Hex ())
6768 b .triggerCheck ()
69+ b .log .Debug ("New block detected" , "number" , block .NumberU64 (), "hash" , block .Hash ().Hex ())
6870 }
6971 }
7072 }
@@ -73,48 +75,66 @@ func (b *blockTracker) Start(ctx context.Context) <-chan struct{} {
7375}
7476
7577func (b * blockTracker ) triggerCheck () {
76- select {
77- case b .checkTrigger <- struct {}{}:
78- default :
79- // Non-blocking send, if channel is full, we skip
80- }
78+ b .checkCond .L .Lock ()
79+ b .checkCond .Broadcast ()
80+ b .checkCond .L .Unlock ()
8181}
8282
8383func (b * blockTracker ) LatestBlockNumber () uint64 {
8484 return b .latestBlockNo .Load ()
8585}
8686
87+ func (b * blockTracker ) NextBlockNumber () (uint64 , time.Duration , error ) {
88+ block , found := b .blocks .Get (b .latestBlockNo .Load ())
89+ if ! found {
90+ return 0 , 0 , errors .New ("latest block not found in cache" )
91+ }
92+ blockTime := time .Unix (int64 (block .Time ()), 0 )
93+ return b .latestBlockNo .Load () + 1 , time .Until (blockTime .Add (12 * time .Second )), nil
94+ }
95+
8796func (b * blockTracker ) CheckTxnInclusion (
8897 ctx context.Context ,
8998 txHash common.Hash ,
9099 blockNumber uint64 ,
91100) (bool , error ) {
92- WaitForBlock:
93- for {
94- select {
95- case <- ctx .Done ():
96- return false , ctx .Err ()
97- case <- b .checkTrigger :
98- if blockNumber <= b .latestBlockNo .Load () {
99- break WaitForBlock
100- }
101+ if blockNumber <= b .latestBlockNo .Load () {
102+ return b .checkTxnInclusion (ctx , txHash , blockNumber )
103+ }
104+
105+ waitCh := make (chan struct {})
106+ go func () {
107+ b .checkCond .L .Lock ()
108+ defer b .checkCond .L .Unlock ()
109+ for blockNumber > b .latestBlockNo .Load () {
110+ b .checkCond .Wait ()
101111 }
112+ close (waitCh )
113+ }()
114+
115+ select {
116+ case <- ctx .Done ():
117+ return false , ctx .Err ()
118+ case <- waitCh :
119+ return b .checkTxnInclusion (ctx , txHash , blockNumber )
102120 }
121+ }
103122
123+ func (b * blockTracker ) checkTxnInclusion (ctx context.Context , txHash common.Hash , blockNumber uint64 ) (bool , error ) {
124+ var err error
104125 block , ok := b .blocks .Get (blockNumber )
105126 if ! ok {
106- block , err : = b .client .BlockByNumber (ctx , big .NewInt (int64 (blockNumber )))
127+ block , err = b .client .BlockByNumber (ctx , big .NewInt (int64 (blockNumber )))
107128 if err != nil {
108129 b .log .Error ("Failed to get block by number" , "error" , err , "blockNumber" , blockNumber )
109130 return false , err
110131 }
111132 _ = b .blocks .Add (blockNumber , block )
112133 }
113134
114- for _ , tx := range block .Transactions () {
115- if tx .Hash ().Cmp (txHash ) == 0 {
116- return true , nil
117- }
135+ if txn := block .Transaction (txHash ); txn != nil {
136+ return true , nil
118137 }
138+
119139 return false , nil
120140}
0 commit comments