Skip to content

Commit 5752940

Browse files
committed
ethmonitor: LogsStream
1 parent c335c85 commit 5752940

File tree

5 files changed

+351
-9
lines changed

5 files changed

+351
-9
lines changed

TODO.md

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
TODO
2+
====
3+
4+
ethtxn and ethwallet improvements .. naming etc.
5+
6+
------
7+
8+
1. Look at some of the viem and ox terminology.. "provider" ..?
9+
2. rename..? ethrpc.Client and ethrpc.Provider .. and .NewClient .. its a bit confusing
10+
.... lets ask the AI its opinion..
11+
12+
13+
------
14+
15+
16+
17+
1. ethreceipts using AsMessage .. dont need to, see note in receipt.go
18+
2. search "TODO" in ethkit..
19+
20+
---
21+
22+
23+
4. ethrpc, pass in a json marshaller, would be better..
24+
5. review ethmonitor for the logs and streaming stuff, to build blocks from
25+
subscriptions, etc.
26+
27+
6. any other cleanup in ethkit I should do...? lets make a list anyways..
28+
29+
30+
31+
32+
33+
--------
34+
35+
IMPROVEMENTS
36+
============
37+
38+
1. ethmonitor: newHeads and newLogs .. can we use these..?
39+
2. ethreceipts: review TODO's to make it more efficient -- not worth it right now..
40+
3. ethrpc: remove pkg-level methods and pass in optional jsonmarshaller/unmarshaller -- have an idea how to do it..
41+
4. ethrpc: rename interface.go -- yes, at the right time..
42+
43+
44+
---------
45+
46+
ethmonitor..
47+
48+
1. lets adapt chain-newheads to see how "newHeads" looks..?

cmd/chain-watch/main.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,11 @@ func main() {
7474
chainID, _ := provider.ChainID(context.Background())
7575
fmt.Println("=> chain id:", chainID.String())
7676

77+
ls := ethmonitor.NewLogsStream(provider)
78+
ls.Do(context.Background())
79+
80+
return
81+
7782
// Monitor options
7883
cachestore.MaxKeyLength = 180
7984
monitorOptions := ethmonitor.DefaultOptions

ethmonitor/ethmonitor.go

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -524,7 +524,7 @@ func (m *Monitor) monitor() error {
524524
}
525525

526526
// fetch the next block, either via the stream or via a poll
527-
nextBlock, nextBlockPayload, miss, err := m.fetchNextBlock(ctx)
527+
nextBlock, _, miss, err := m.fetchNextBlock(ctx)
528528
if err != nil {
529529
if errors.Is(err, context.DeadlineExceeded) {
530530
m.log.Info(fmt.Sprintf("ethmonitor: fetchNextBlock timed out: '%v', for blockNum:%v, retrying..", err, m.nextBlockNumber))
@@ -546,7 +546,7 @@ func (m *Monitor) monitor() error {
546546
}
547547

548548
// build deterministic set of add/remove events which construct the canonical chain
549-
events, err = m.buildCanonicalChain(ctx, nextBlock, nextBlockPayload, events)
549+
events, err = m.buildCanonicalChain(ctx, nextBlock, events)
550550
if err != nil {
551551
m.log.Warn(fmt.Sprintf("ethmonitor: error reported '%v', failed to build chain for next blockNum:%d blockHash:%s, retrying..",
552552
err, nextBlock.NumberU64(), nextBlock.Hash().Hex()))
@@ -582,7 +582,7 @@ func (m *Monitor) monitor() error {
582582
}
583583
}
584584

585-
func (m *Monitor) buildCanonicalChain(ctx context.Context, nextBlock *types.Block, nextBlockPayload []byte, events Blocks) (Blocks, error) {
585+
func (m *Monitor) buildCanonicalChain(ctx context.Context, nextBlock *types.Block, events Blocks) (Blocks, error) {
586586
select {
587587
case <-ctx.Done():
588588
return nil, ctx.Err()
@@ -629,13 +629,13 @@ func (m *Monitor) buildCanonicalChain(ctx context.Context, nextBlock *types.Bloc
629629
time.Sleep(pause)
630630

631631
// Fetch/connect the broken chain backwards by traversing recursively via parent hashes
632-
nextParentBlock, nextParentBlockPayload, err := m.fetchBlockByHash(ctx, nextBlock.ParentHash())
632+
nextParentBlock, _, err := m.fetchBlockByHash(ctx, nextBlock.ParentHash())
633633
if err != nil {
634634
// NOTE: this is okay, it will auto-retry
635635
return events, err
636636
}
637637

638-
events, err = m.buildCanonicalChain(ctx, nextParentBlock, nextParentBlockPayload, events)
638+
events, err = m.buildCanonicalChain(ctx, nextParentBlock, events)
639639
if err != nil {
640640
// NOTE: this is okay, it will auto-retry
641641
return events, err
@@ -796,7 +796,10 @@ func (m *Monitor) fetchNextBlock(ctx context.Context) (*types.Block, []byte, boo
796796

797797
getter := func(ctx context.Context, _ string) ([]byte, error) {
798798
if m.options.DebugLogging {
799-
m.log.Debug(fmt.Sprintf("ethmonitor: fetchNextBlock is calling origin for number %s", m.nextBlockNumber))
799+
m.nextBlockNumberMu.Lock()
800+
nextNumForLog := m.nextBlockNumber
801+
m.nextBlockNumberMu.Unlock()
802+
m.log.Debug(fmt.Sprintf("ethmonitor: fetchNextBlock is calling origin for number %v", nextNumForLog))
800803
}
801804
for {
802805
select {
@@ -805,9 +808,15 @@ func (m *Monitor) fetchNextBlock(ctx context.Context) (*types.Block, []byte, boo
805808
default:
806809
}
807810

808-
nextBlockPayload, err := m.fetchRawBlockByNumber(ctx, m.nextBlockNumber)
811+
m.nextBlockNumberMu.Lock()
812+
nextNum := m.nextBlockNumber
813+
m.nextBlockNumberMu.Unlock()
814+
nextBlockPayload, err := m.fetchRawBlockByNumber(ctx, nextNum)
809815
if err != nil {
810-
m.log.Debug(fmt.Sprintf("ethmonitor: [retrying] failed to fetch next block # %d, due to: %v", m.nextBlockNumber, err))
816+
m.nextBlockNumberMu.Lock()
817+
logNum := m.nextBlockNumber
818+
m.nextBlockNumberMu.Unlock()
819+
m.log.Debug(fmt.Sprintf("ethmonitor: [retrying] failed to fetch next block # %v, due to: %v", logNum, err))
811820
miss = true
812821
if m.IsStreamingMode() {
813822
// in streaming mode, we'll use a shorter time to pause before we refetch
@@ -996,7 +1005,16 @@ func (m *Monitor) publish(ctx context.Context, events Blocks) error {
9961005
// Check for trail-behind-head mode and set maxBlockNum if applicable
9971006
maxBlockNum := uint64(0)
9981007
if m.options.TrailNumBlocksBehindHead > 0 {
999-
maxBlockNum = m.LatestBlock().NumberU64() - uint64(m.options.TrailNumBlocksBehindHead)
1008+
latest := m.LatestBlock()
1009+
if latest != nil {
1010+
lb := latest.NumberU64()
1011+
trail := uint64(m.options.TrailNumBlocksBehindHead)
1012+
if lb > trail {
1013+
maxBlockNum = lb - trail
1014+
} else {
1015+
maxBlockNum = 0
1016+
}
1017+
}
10001018
}
10011019

10021020
// Enqueue

ethmonitor/logs_stream.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package ethmonitor
2+
3+
import (
4+
"context"
5+
"log/slog"
6+
"time"
7+
8+
"github.com/0xsequence/ethkit/ethrpc"
9+
"github.com/0xsequence/ethkit/go-ethereum"
10+
"github.com/0xsequence/ethkit/go-ethereum/core/types"
11+
)
12+
13+
type LogsStream struct {
14+
provider ethrpc.Interface
15+
}
16+
17+
func NewLogsStream(provider ethrpc.Interface) *LogsStream {
18+
return &LogsStream{
19+
provider: provider,
20+
}
21+
}
22+
23+
// func (s *LogsStream) StreamLogs(ctx context.Context) (<-chan [][]types.Log, <-chan error) {
24+
// logsCh := make(chan [][]types.Log)
25+
// errCh := make(chan error, 1)
26+
//
27+
// go func() {
28+
// defer close(logsCh)
29+
// defer close(errCh)
30+
//
31+
// }()
32+
//
33+
// return logsCh, errCh
34+
// }
35+
36+
func (s *LogsStream) Do(ctx context.Context) {
37+
ch := make(chan types.Log)
38+
39+
sub, err := s.provider.SubscribeFilterLogs(ctx, ethereum.FilterQuery{}, ch)
40+
if err != nil {
41+
panic(err)
42+
}
43+
defer sub.Unsubscribe()
44+
45+
go func() {
46+
for {
47+
select {
48+
case err := <-sub.Err():
49+
panic(err)
50+
case log := <-ch:
51+
slog.Info("log received", "block", log.BlockNumber, "txIndex", log.TxIndex, "logIndex", log.Index)
52+
}
53+
54+
// TODO: can a block tell us how many txns there are...?
55+
// yes..
56+
}
57+
}()
58+
59+
time.Sleep(50 * time.Second)
60+
}

0 commit comments

Comments
 (0)