Skip to content

Commit 2e17514

Browse files
authored
ethmonitor: trail N number of blocks behind head (#22)
1 parent dd91a40 commit 2e17514

File tree

2 files changed

+90
-30
lines changed

2 files changed

+90
-30
lines changed

ethmonitor/chain.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,17 @@ func (c *Chain) GetBlock(hash common.Hash) *Block {
9696
return nil
9797
}
9898

99+
func (c *Chain) GetBlockByNumber(blockNum uint64, event Event) *Block {
100+
c.mu.Lock()
101+
defer c.mu.Unlock()
102+
for i := len(c.blocks) - 1; i >= 0; i-- {
103+
if c.blocks[i].NumberU64() == blockNum && c.blocks[i].Event == event {
104+
return c.blocks[i]
105+
}
106+
}
107+
return nil
108+
}
109+
99110
func (c *Chain) GetTransaction(hash common.Hash) *types.Transaction {
100111
c.mu.Lock()
101112
defer c.mu.Unlock()

ethmonitor/ethmonitor.go

Lines changed: 79 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -17,25 +17,27 @@ import (
1717
)
1818

1919
var DefaultOptions = Options{
20-
Logger: log.New(os.Stdout, "", 0),
21-
PollingInterval: 1 * time.Second,
22-
PollingTimeout: 120 * time.Second,
23-
StartBlockNumber: nil, // latest
24-
BlockRetentionLimit: 100,
25-
WithLogs: true,
26-
LogTopics: []common.Hash{}, // all logs
27-
DebugLogging: false,
20+
Logger: log.New(os.Stdout, "", 0),
21+
PollingInterval: 1 * time.Second,
22+
PollingTimeout: 120 * time.Second,
23+
StartBlockNumber: nil, // latest
24+
TrailNumBlocksBehindHead: 0, // latest
25+
BlockRetentionLimit: 100,
26+
WithLogs: true,
27+
LogTopics: []common.Hash{}, // all logs
28+
DebugLogging: false,
2829
}
2930

3031
type Options struct {
31-
Logger util.Logger
32-
PollingInterval time.Duration
33-
PollingTimeout time.Duration
34-
StartBlockNumber *big.Int
35-
BlockRetentionLimit int
36-
WithLogs bool
37-
LogTopics []common.Hash
38-
DebugLogging bool
32+
Logger util.Logger
33+
PollingInterval time.Duration
34+
PollingTimeout time.Duration
35+
StartBlockNumber *big.Int
36+
TrailNumBlocksBehindHead int
37+
BlockRetentionLimit int
38+
WithLogs bool
39+
LogTopics []common.Hash
40+
DebugLogging bool
3941
}
4042

4143
type Monitor struct {
@@ -44,11 +46,13 @@ type Monitor struct {
4446
ctx context.Context
4547
ctxStop context.CancelFunc
4648

47-
log util.Logger
48-
provider *ethrpc.Provider
49+
log util.Logger
50+
provider *ethrpc.Provider
51+
4952
chain *Chain
50-
subscribers []*subscriber
5153
nextBlockNumber *big.Int
54+
sentBlockNumber *big.Int
55+
subscribers []*subscriber
5256

5357
started bool
5458
running sync.WaitGroup
@@ -65,6 +69,8 @@ func NewMonitor(provider *ethrpc.Provider, opts ...Options) (*Monitor, error) {
6569
return nil, fmt.Errorf("ethmonitor: logger is nil")
6670
}
6771

72+
options.BlockRetentionLimit += options.TrailNumBlocksBehindHead
73+
6874
return &Monitor{
6975
options: options,
7076
log: options.Logger,
@@ -103,7 +109,7 @@ func (m *Monitor) Start(ctx context.Context) error {
103109
go func() {
104110
m.running.Add(1)
105111
defer m.running.Done()
106-
m.poll(m.ctx)
112+
m.poller(m.ctx)
107113
}()
108114

109115
return nil
@@ -139,7 +145,7 @@ func (m *Monitor) Provider() *ethrpc.Provider {
139145
return m.provider
140146
}
141147

142-
func (m *Monitor) poll(ctx context.Context) error {
148+
func (m *Monitor) poller(ctx context.Context) error {
143149
events := Blocks{}
144150

145151
// TODO: all fine and well, but what happens if we get re-org then another re-org
@@ -197,19 +203,13 @@ func (m *Monitor) poll(ctx context.Context) error {
197203
}
198204
}
199205

200-
// notify all subscribers..
201-
for _, sub := range m.subscribers {
202-
// non-blocking send
203-
select {
204-
case sub.ch <- events:
205-
default:
206-
}
207-
}
206+
// broadcast events
207+
m.broadcast(events)
208208

209209
// TODO: if we hit a reorg, we may want to wait 1-2 blocks
210210
// after the reorg so its safe, merge the event groups, and publish
211211

212-
// clear events sink upon publishing it to subscribers
212+
// clear events sink upon broadcasting it to subscribers
213213
events = events[:0]
214214
}
215215
}
@@ -357,6 +357,55 @@ func (m *Monitor) debugLogf(format string, v ...interface{}) {
357357
m.log.Printf(format, v...)
358358
}
359359

360+
func (m *Monitor) broadcast(events Blocks) {
361+
numBlocksBehindHead := m.options.TrailNumBlocksBehindHead
362+
363+
// Broadcast immediately
364+
if numBlocksBehindHead == 0 {
365+
for _, sub := range m.subscribers {
366+
// non-blocking send
367+
select {
368+
case sub.ch <- events:
369+
default:
370+
}
371+
}
372+
return
373+
}
374+
375+
// Trail behind the head
376+
if len(m.chain.blocks) <= numBlocksBehindHead {
377+
// skip the broadcast as we don't have enough data yet
378+
return
379+
}
380+
381+
if m.sentBlockNumber == nil {
382+
m.sentBlockNumber = big.NewInt(0).SetUint64(m.chain.Head().NumberU64() - uint64(numBlocksBehindHead))
383+
} else {
384+
m.sentBlockNumber = big.NewInt(0).Add(m.sentBlockNumber, big.NewInt(1))
385+
}
386+
387+
// TODO: improve, we can also include re-org data here as well, even though
388+
// the general use of trail is to avoid reorgs
389+
b := m.chain.GetBlockByNumber(m.sentBlockNumber.Uint64(), Added)
390+
if b == nil {
391+
b = m.chain.GetBlockByNumber(m.sentBlockNumber.Uint64(), Updated)
392+
if b == nil {
393+
m.sentBlockNumber = big.NewInt(0).Sub(m.sentBlockNumber, big.NewInt(1))
394+
return
395+
}
396+
}
397+
398+
blocks := Blocks{b}
399+
400+
for _, sub := range m.subscribers {
401+
// non-blocking send
402+
select {
403+
case sub.ch <- blocks:
404+
default:
405+
}
406+
}
407+
}
408+
360409
func (m *Monitor) Subscribe() Subscription {
361410
m.mu.Lock()
362411
defer m.mu.Unlock()

0 commit comments

Comments
 (0)