Skip to content

Commit faacca6

Browse files
author
kant
committed
feat: indexer indexes blocks and txs of mev-commit chain to any pluggable storage
1 parent 11e33f9 commit faacca6

File tree

13 files changed

+1372
-0
lines changed

13 files changed

+1372
-0
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ jobs:
8787
8888
ADDITIONAL_MODULES=(
8989
"${GITHUB_WORKSPACE}/external/geth"
90+
"${GITHUB_WORKSPACE}/indexer"
9091
)
9192
9293
ALL_MODULES=$(printf "%s\n" "${WORKSPACE_MODULES}" "${ADDITIONAL_MODULES[@]}")

indexer/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
# indexer

indexer/cmd/indexer.go

Lines changed: 387 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,387 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"encoding/hex"
6+
"fmt"
7+
"log/slog"
8+
"math/big"
9+
"time"
10+
11+
"github.com/ethereum/go-ethereum/common"
12+
"github.com/ethereum/go-ethereum/core/types"
13+
"github.com/primev/mev-commit/indexer/pkg/ethclient"
14+
"github.com/primev/mev-commit/indexer/pkg/store"
15+
)
16+
17+
const TimeLayOut = "2006-01-02T15:04:05.000Z"
18+
19+
type Config struct {
20+
EthClient ethclient.EthereumClient
21+
Storage store.Storage
22+
IndexInterval time.Duration
23+
AccountAddresses []string
24+
MinBlocksToFetchAccountAddresses uint
25+
TimeoutToFetchAccountAddresses time.Duration
26+
}
27+
28+
type BlockchainIndexer struct {
29+
ethClient ethclient.EthereumClient
30+
storage store.Storage
31+
forwardBlockChan chan *types.Block
32+
backwardBlockChan chan *types.Block
33+
txChan chan *types.Transaction
34+
indexInterval time.Duration
35+
lastForwardIndexedBlock *big.Int
36+
lastBackwardIndexedBlock *big.Int
37+
logger *slog.Logger
38+
accountAddresses []string
39+
blockCounter uint
40+
minBlocksToFetchAccountAddresses uint
41+
timeoutToFetchAccountAddresses time.Duration
42+
}
43+
44+
func NewBlockchainIndexer(config Config) *BlockchainIndexer {
45+
return &BlockchainIndexer{
46+
ethClient: config.EthClient,
47+
storage: config.Storage,
48+
forwardBlockChan: make(chan *types.Block, 100),
49+
backwardBlockChan: make(chan *types.Block, 100),
50+
txChan: make(chan *types.Transaction, 100),
51+
indexInterval: config.IndexInterval,
52+
logger: slog.Default(),
53+
accountAddresses: config.AccountAddresses,
54+
blockCounter: 0,
55+
minBlocksToFetchAccountAddresses: config.MinBlocksToFetchAccountAddresses,
56+
timeoutToFetchAccountAddresses: config.TimeoutToFetchAccountAddresses,
57+
}
58+
}
59+
60+
func (bi *BlockchainIndexer) Start(ctx context.Context) error {
61+
if err := bi.storage.CreateIndices(ctx); err != nil {
62+
return fmt.Errorf("failed to create indices: %w", err)
63+
}
64+
65+
latestBlockNumber, err := bi.ethClient.BlockNumber(ctx)
66+
bi.logger.Info("latest block number", "block number", latestBlockNumber)
67+
if err != nil {
68+
return fmt.Errorf("failed to get latest block number: %w", err)
69+
}
70+
71+
if err = bi.initializeForwardIndex(ctx, latestBlockNumber.Uint64()); err != nil {
72+
return err
73+
}
74+
75+
if err = bi.initializeBackwardIndex(ctx, latestBlockNumber.Uint64()); err != nil {
76+
return err
77+
}
78+
79+
go bi.fetchForwardBlocks(ctx)
80+
go bi.processForwardBlocks(ctx)
81+
go bi.fetchBackwardBlocks(ctx)
82+
go bi.processBackwardBlocks(ctx)
83+
go bi.IndexAccountBalances(ctx)
84+
85+
<-ctx.Done()
86+
return ctx.Err()
87+
}
88+
89+
func (bi *BlockchainIndexer) initializeForwardIndex(ctx context.Context, latestBlockNumber uint64) error {
90+
lastForwardIndexedBlock, err := bi.storage.GetLastIndexedBlock(ctx, "forward")
91+
if err != nil {
92+
return fmt.Errorf("failed to get last forward indexed block: %w", err)
93+
}
94+
95+
bi.logger.Info("last indexed block", "blockNumber", lastForwardIndexedBlock, "direction", "forward")
96+
97+
if lastForwardIndexedBlock == nil || lastForwardIndexedBlock.Sign() == 0 {
98+
bi.lastForwardIndexedBlock = new(big.Int).SetUint64(latestBlockNumber - 1)
99+
} else {
100+
bi.lastForwardIndexedBlock = lastForwardIndexedBlock
101+
}
102+
103+
return nil
104+
}
105+
106+
func (bi *BlockchainIndexer) initializeBackwardIndex(ctx context.Context, latestBlockNumber uint64) error {
107+
lastBackwardIndexedBlock, err := bi.storage.GetLastIndexedBlock(ctx, "backward")
108+
if err != nil {
109+
return fmt.Errorf("failed to get last backward indexed block: %w", err)
110+
}
111+
112+
bi.logger.Info("last indexed block", "blockNumber", lastBackwardIndexedBlock, "direction", "backward")
113+
114+
if lastBackwardIndexedBlock == nil || lastBackwardIndexedBlock.Sign() == 0 {
115+
bi.lastBackwardIndexedBlock = new(big.Int).SetUint64(latestBlockNumber)
116+
} else {
117+
bi.lastBackwardIndexedBlock = lastBackwardIndexedBlock
118+
}
119+
120+
return nil
121+
}
122+
123+
func (bi *BlockchainIndexer) fetchForwardBlocks(ctx context.Context) {
124+
ticker := time.NewTicker(bi.indexInterval)
125+
defer ticker.Stop()
126+
127+
for {
128+
select {
129+
case <-ctx.Done():
130+
return
131+
case <-ticker.C:
132+
latestBlockNumber, err := bi.ethClient.BlockNumber(ctx)
133+
if err != nil {
134+
bi.logger.Error("failed to get latest block number", "error", err)
135+
continue
136+
}
137+
138+
for blockNum := new(big.Int).Add(bi.lastForwardIndexedBlock, big.NewInt(1)); blockNum.Cmp(latestBlockNumber) <= 0; blockNum.Add(blockNum, big.NewInt(5)) {
139+
endBlockNum := new(big.Int).Add(blockNum, big.NewInt(4))
140+
if endBlockNum.Cmp(latestBlockNumber) > 0 {
141+
endBlockNum.Set(latestBlockNumber)
142+
}
143+
144+
var blockNums []*big.Int
145+
for bn := new(big.Int).Set(blockNum); bn.Cmp(endBlockNum) <= 0; bn.Add(bn, big.NewInt(1)) {
146+
blockNums = append(blockNums, new(big.Int).Set(bn))
147+
}
148+
149+
blocks, err := bi.fetchBlocks(ctx, blockNums)
150+
if err != nil {
151+
bi.logger.Error("failed to fetch blocks", "start", blockNum, "end", endBlockNum, "error", err)
152+
continue
153+
}
154+
155+
for _, block := range blocks {
156+
bi.forwardBlockChan <- block
157+
bi.lastForwardIndexedBlock.Set(block.Number())
158+
bi.blockCounter++
159+
}
160+
}
161+
}
162+
}
163+
}
164+
165+
func (bi *BlockchainIndexer) fetchBackwardBlocks(ctx context.Context) {
166+
ticker := time.NewTicker(bi.indexInterval)
167+
defer ticker.Stop()
168+
169+
for {
170+
select {
171+
case <-ctx.Done():
172+
return
173+
case <-ticker.C:
174+
if bi.lastBackwardIndexedBlock.Sign() <= 0 {
175+
return
176+
}
177+
zeroBigNum := big.NewInt(0)
178+
blockNum := new(big.Int).Sub(bi.lastBackwardIndexedBlock, big.NewInt(1))
179+
180+
for i := 0; blockNum.Cmp(zeroBigNum) >= 0; i++ {
181+
endBlockNum := new(big.Int).Sub(blockNum, big.NewInt(4))
182+
if endBlockNum.Cmp(zeroBigNum) < 0 {
183+
endBlockNum.Set(zeroBigNum)
184+
}
185+
186+
var blockNums []*big.Int
187+
for bn := new(big.Int).Set(blockNum); bn.Cmp(endBlockNum) >= 0; bn.Sub(bn, big.NewInt(1)) {
188+
blockNums = append(blockNums, new(big.Int).Set(bn))
189+
}
190+
191+
blocks, err := bi.fetchBlocks(ctx, blockNums)
192+
if err != nil {
193+
bi.logger.Error("failed to fetch blocks", "start", blockNum, "end", endBlockNum, "error", err)
194+
break
195+
}
196+
197+
for _, block := range blocks {
198+
bi.backwardBlockChan <- block
199+
bi.lastBackwardIndexedBlock.Set(block.Number())
200+
if block.Number().Cmp(zeroBigNum) == 0 {
201+
bi.logger.Info("done fetching backward blocks...")
202+
return
203+
}
204+
}
205+
blockNum.Sub(endBlockNum, big.NewInt(1))
206+
}
207+
}
208+
}
209+
}
210+
211+
func (bi *BlockchainIndexer) processForwardBlocks(ctx context.Context) {
212+
for {
213+
select {
214+
case <-ctx.Done():
215+
return
216+
case block := <-bi.forwardBlockChan:
217+
if err := bi.indexBlock(ctx, block); err != nil {
218+
bi.logger.Error("failed to index block", "error", err)
219+
}
220+
if err := bi.indexTransactions(ctx, block); err != nil {
221+
bi.logger.Error("failed to index transactions", "error", err)
222+
}
223+
}
224+
}
225+
}
226+
227+
func (bi *BlockchainIndexer) processBackwardBlocks(ctx context.Context) {
228+
for {
229+
select {
230+
case <-ctx.Done():
231+
return
232+
case block := <-bi.backwardBlockChan:
233+
if err := bi.indexBlock(ctx, block); err != nil {
234+
bi.logger.Error("failed to index block", "error", err)
235+
}
236+
if err := bi.indexTransactions(ctx, block); err != nil {
237+
bi.logger.Error("failed to index transactions", "error", err)
238+
}
239+
if block.Number().Cmp(big.NewInt(0)) == 0 {
240+
bi.logger.Info("done processing backward blocks...")
241+
return
242+
}
243+
}
244+
}
245+
}
246+
247+
func (bi *BlockchainIndexer) IndexAccountBalances(ctx context.Context) {
248+
timer := time.NewTimer(bi.timeoutToFetchAccountAddresses)
249+
defer timer.Stop()
250+
251+
for {
252+
select {
253+
case <-ctx.Done():
254+
return
255+
case <-timer.C:
256+
if err := bi.indexBalances(ctx, 0); err != nil {
257+
return
258+
}
259+
bi.blockCounter = 0
260+
timer.Reset(bi.timeoutToFetchAccountAddresses)
261+
default:
262+
if bi.blockCounter >= bi.minBlocksToFetchAccountAddresses {
263+
if err := bi.indexBalances(ctx, bi.lastForwardIndexedBlock.Uint64()); err != nil {
264+
return
265+
}
266+
bi.blockCounter = 0
267+
timer.Reset(bi.timeoutToFetchAccountAddresses)
268+
}
269+
}
270+
}
271+
}
272+
273+
func (bi *BlockchainIndexer) indexBalances(ctx context.Context, blockNumber uint64) error {
274+
addresses, err := bi.storage.GetAddresses(ctx)
275+
if err != nil {
276+
return err
277+
}
278+
279+
addresses = append(addresses, bi.accountAddresses...)
280+
281+
addrs := make([]common.Address, len(addresses))
282+
for i, address := range addresses {
283+
addrs[i] = common.HexToAddress(address)
284+
}
285+
286+
accBalances, err := bi.ethClient.AccountBalances(ctx, addrs, blockNumber)
287+
if err != nil {
288+
return err
289+
}
290+
291+
return bi.storage.IndexAccountBalances(ctx, accBalances)
292+
}
293+
294+
func (bi *BlockchainIndexer) indexBlock(ctx context.Context, block *types.Block) error {
295+
timestamp := time.UnixMilli(int64(block.Time())).UTC().Format(TimeLayOut)
296+
indexBlock := &store.IndexBlock{
297+
Number: block.NumberU64(),
298+
Hash: block.Hash().Hex(),
299+
ParentHash: block.ParentHash().Hex(),
300+
Root: block.Root().Hex(),
301+
Nonce: block.Nonce(),
302+
Timestamp: timestamp,
303+
Transactions: len(block.Transactions()),
304+
BaseFee: block.BaseFee().Uint64(),
305+
GasLimit: block.GasLimit(),
306+
GasUsed: block.GasUsed(),
307+
Difficulty: block.Difficulty().Uint64(),
308+
ExtraData: hex.EncodeToString(block.Extra()),
309+
}
310+
311+
return bi.storage.IndexBlock(ctx, indexBlock)
312+
}
313+
314+
func (bi *BlockchainIndexer) indexTransactions(ctx context.Context, block *types.Block) error {
315+
var transactions []*store.IndexTransaction
316+
var txHashes []string
317+
318+
for _, tx := range block.Transactions() {
319+
from, err := types.Sender(types.NewCancunSigner(tx.ChainId()), tx)
320+
if err != nil {
321+
return fmt.Errorf("failed to derive sender: %w", err)
322+
}
323+
324+
v, r, s := tx.RawSignatureValues()
325+
timestamp := tx.Time().UTC().Format(TimeLayOut)
326+
transaction := &store.IndexTransaction{
327+
Hash: tx.Hash().Hex(),
328+
From: from.Hex(),
329+
Gas: tx.Gas(),
330+
Nonce: tx.Nonce(),
331+
BlockHash: block.Hash().Hex(),
332+
BlockNumber: block.NumberU64(),
333+
ChainId: tx.ChainId().String(),
334+
V: v.String(),
335+
R: r.String(),
336+
S: s.String(),
337+
Input: hex.EncodeToString(tx.Data()),
338+
Timestamp: timestamp,
339+
}
340+
341+
if tx.To() != nil {
342+
transaction.To = tx.To().Hex()
343+
}
344+
if tx.GasPrice() != nil {
345+
transaction.GasPrice = tx.GasPrice().Uint64()
346+
}
347+
if tx.GasTipCap() != nil {
348+
transaction.GasTipCap = tx.GasTipCap().Uint64()
349+
}
350+
if tx.GasFeeCap() != nil {
351+
transaction.GasFeeCap = tx.GasFeeCap().Uint64()
352+
}
353+
if tx.Value() != nil {
354+
transaction.Value = tx.Value().String()
355+
}
356+
357+
transactions = append(transactions, transaction)
358+
txHashes = append(txHashes, tx.Hash().Hex())
359+
}
360+
361+
receipts, err := bi.fetchReceipts(ctx, txHashes)
362+
if err != nil {
363+
return fmt.Errorf("failed to fetch transaction receipts: %w", err)
364+
}
365+
366+
for _, tx := range transactions {
367+
if receipt, ok := receipts[tx.Hash]; ok {
368+
tx.Status = receipt.Status
369+
tx.GasUsed = receipt.GasUsed
370+
tx.CumulativeGasUsed = receipt.CumulativeGasUsed
371+
tx.ContractAddress = receipt.ContractAddress.Hex()
372+
tx.TransactionIndex = receipt.TransactionIndex
373+
tx.ReceiptBlockHash = receipt.BlockHash.Hex()
374+
tx.ReceiptBlockNumber = receipt.BlockNumber.Uint64()
375+
}
376+
}
377+
378+
return bi.storage.IndexTransactions(ctx, transactions)
379+
}
380+
381+
func (bi *BlockchainIndexer) fetchReceipts(ctx context.Context, txHashes []string) (map[string]*types.Receipt, error) {
382+
return bi.ethClient.TxReceipts(ctx, txHashes)
383+
}
384+
385+
func (bi *BlockchainIndexer) fetchBlocks(ctx context.Context, blockNums []*big.Int) ([]*types.Block, error) {
386+
return bi.ethClient.GetBlocks(ctx, blockNums)
387+
}

0 commit comments

Comments
 (0)