Skip to content

Commit acbcc17

Browse files
committed
starting point
1 parent ed6103e commit acbcc17

File tree

3 files changed

+658
-0
lines changed

3 files changed

+658
-0
lines changed

lib/logpoller/chain_log_poller.go

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
package logpoller
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"math/big"
7+
"sync"
8+
"time"
9+
10+
"github.com/ethereum/go-ethereum"
11+
"github.com/smartcontractkit/chainlink-testing-framework/lib/logging"
12+
"github.com/smartcontractkit/chainlink-testing-framework/seth"
13+
)
14+
15+
// ChainLogPollerConfig holds configuration for ChainLogPoller
16+
type ChainLogPollerConfig struct {
17+
Seth *seth.Client
18+
PollInterval time.Duration
19+
Logger logging.Logger
20+
}
21+
22+
// ChainLogPoller polls logs from a specific blockchain and manages subscriptions
23+
type ChainLogPoller struct {
24+
seth *seth.Client
25+
logger logging.Logger
26+
pollInterval time.Duration
27+
lastBlock *big.Int
28+
SubscriptionManager *SubscriptionManager
29+
started bool
30+
startMutex sync.Mutex
31+
wg sync.WaitGroup
32+
}
33+
34+
// NewChainLogPoller initializes a new ChainLogPoller
35+
func NewChainLogPoller(cfg ChainLogPollerConfig) (*ChainLogPoller, error) {
36+
// Initialize the lastBlock to the latest block
37+
latestBlock, err := cfg.Seth.Client.BlockNumber(context.Background())
38+
if err != nil {
39+
return nil, fmt.Errorf("failed to get latest block: %w", err)
40+
}
41+
latestBlockPlus := new(big.Int).SetUint64(latestBlock)
42+
// Subtract 1 to start polling from the latest block at initialization
43+
lastBlock := new(big.Int).Sub(latestBlockPlus, big.NewInt(1))
44+
45+
// Initialize SubscriptionManager
46+
subMgr := NewSubscriptionManager(cfg.Logger, int64(cfg.Seth.ChainID))
47+
48+
return &ChainLogPoller{
49+
seth: cfg.Seth,
50+
logger: cfg.Logger,
51+
pollInterval: cfg.PollInterval,
52+
lastBlock: lastBlock,
53+
SubscriptionManager: subMgr,
54+
}, nil
55+
}
56+
57+
// Start begins the polling process only if not already started
58+
func (clp *ChainLogPoller) Start(ctx context.Context) {
59+
clp.startMutex.Lock()
60+
defer clp.startMutex.Unlock()
61+
62+
if clp.started {
63+
clp.logger.Warn().Msg("Poller already started")
64+
}
65+
66+
clp.started = true
67+
clp.wg.Add(1)
68+
go clp.pollWithSubscriptions(ctx)
69+
70+
clp.logger.Info().
71+
Str("PollInterval", clp.pollInterval.String()).
72+
Msg("Poller started.")
73+
}
74+
75+
// pollWithSubscriptions handles the periodic polling
76+
func (clp *ChainLogPoller) pollWithSubscriptions(ctx context.Context) {
77+
defer clp.wg.Done()
78+
ticker := time.NewTicker(clp.pollInterval)
79+
defer ticker.Stop()
80+
81+
clp.logger.Info().Msg("Polling loop started.")
82+
83+
for {
84+
select {
85+
case <-ctx.Done():
86+
clp.logger.Info().Msg("Shutting down ChainLogPoller")
87+
return
88+
case <-ticker.C:
89+
clp.logger.Debug().Msg("Starting polling cycle.")
90+
clp.pollLogs(ctx)
91+
}
92+
}
93+
}
94+
95+
// Wait waits for the polling goroutine to finish
96+
func (clp *ChainLogPoller) Wait() {
97+
clp.wg.Wait()
98+
}
99+
100+
// pollLogs fetches logs from the blockchain and broadcasts them to subscribers
101+
func (clp *ChainLogPoller) pollLogs(ctx context.Context) {
102+
startTime := time.Now()
103+
latestBlock, err := clp.seth.Client.BlockNumber(ctx)
104+
if err != nil {
105+
clp.logger.Error().Err(err).Msg("Failed to get latest block")
106+
return
107+
}
108+
fromBlock := new(big.Int).Add(clp.lastBlock, big.NewInt(1))
109+
toBlock := new(big.Int).SetUint64(latestBlock)
110+
111+
addresses, topics := clp.SubscriptionManager.GetAddressesAndTopics()
112+
if len(addresses) == 0 || len(topics) == 0 {
113+
// No active subscriptions, skip polling
114+
clp.logger.Debug().
115+
Int("Addresses", len(addresses)).
116+
Int("Topics", len(topics)).
117+
Msg("No active subscriptions, skipping poll")
118+
return
119+
}
120+
121+
query := ethereum.FilterQuery{
122+
FromBlock: fromBlock,
123+
ToBlock: toBlock,
124+
Addresses: addresses,
125+
Topics: topics,
126+
}
127+
128+
logs, err := clp.seth.Client.FilterLogs(ctx, query)
129+
if err != nil {
130+
clp.logger.Error().Err(err).Msg("Failed to filter logs")
131+
return
132+
}
133+
clp.logger.Debug().
134+
Int64("ChainID", clp.SubscriptionManager.chainID).
135+
Str("FromBlock", fromBlock.String()).
136+
Str("ToBlock", toBlock.String()).
137+
Int("LogsFetched", len(logs)).
138+
Msg("Fetched logs from blockchain")
139+
140+
if len(logs) == 0 {
141+
clp.logger.Info().
142+
Int64("ChainID", clp.SubscriptionManager.chainID).
143+
Str("FromBlock", fromBlock.String()).
144+
Str("ToBlock", toBlock.String()).
145+
Msg("No new logs found in the current polling cycle")
146+
return
147+
}
148+
149+
for _, vLog := range logs {
150+
if len(vLog.Topics) == 0 {
151+
continue // Skip logs without topics
152+
}
153+
// Iterate over all topics in the log
154+
for _, topic := range vLog.Topics {
155+
eventKey := EventKey{
156+
Address: vLog.Address,
157+
Topic: topic,
158+
}
159+
160+
logEvent := LogEvent{
161+
BlockNumber: vLog.BlockNumber,
162+
TxHash: vLog.TxHash,
163+
Data: vLog.Data,
164+
}
165+
166+
clp.SubscriptionManager.BroadcastLog(eventKey, logEvent)
167+
}
168+
}
169+
clp.lastBlock = toBlock
170+
171+
duration := time.Since(startTime)
172+
173+
clp.logger.Debug().
174+
Int64("ChainID", clp.SubscriptionManager.chainID).
175+
Str("FromBlock", fromBlock.String()).
176+
Str("ToBlock", toBlock.String()).
177+
Str("LastBlock", clp.lastBlock.String()).
178+
Dur("PollingDuration", duration).
179+
Msg("Completed polling cycle")
180+
181+
}

0 commit comments

Comments
 (0)