Skip to content

Commit c6844ad

Browse files
author
kant
committed
feat: indexer indexes blocks and txs of mev-commit chain to any pluggable storage
1 parent bf20b62 commit c6844ad

File tree

12 files changed

+1138
-0
lines changed

12 files changed

+1138
-0
lines changed

indexer/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
# indexer

indexer/cmd/indexer.go

Lines changed: 319 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,319 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"encoding/hex"
6+
"fmt"
7+
"log/slog"
8+
"math/big"
9+
"time"
10+
11+
"github.com/ethereum/go-ethereum/core/types"
12+
13+
"github.com/primev/mev-commit/indexer/pkg/ethclient"
14+
"github.com/primev/mev-commit/indexer/pkg/store"
15+
)
16+
17+
type BlockchainIndexer struct {
18+
ethClient ethclient.EthereumClient
19+
storage store.Storage
20+
forwardBlockChan chan *types.Block
21+
backwardBlockChan chan *types.Block
22+
txChan chan *types.Transaction
23+
indexInterval time.Duration
24+
lastForwardIndexedBlock *big.Int
25+
lastBackwardIndexedBlock *big.Int
26+
logger *slog.Logger
27+
}
28+
29+
func NewBlockchainIndexer(ethClient ethclient.EthereumClient, storage store.Storage, indexInterval time.Duration) *BlockchainIndexer {
30+
return &BlockchainIndexer{
31+
ethClient: ethClient,
32+
storage: storage,
33+
forwardBlockChan: make(chan *types.Block, 100),
34+
backwardBlockChan: make(chan *types.Block, 100),
35+
txChan: make(chan *types.Transaction, 100),
36+
indexInterval: indexInterval,
37+
logger: slog.Default(),
38+
}
39+
}
40+
41+
func (bi *BlockchainIndexer) Start(ctx context.Context) error {
42+
if err := bi.storage.CreateIndices(ctx); err != nil {
43+
return fmt.Errorf("failed to create indices: %w", err)
44+
}
45+
46+
latestBlockNumber, err := bi.ethClient.BlockNumber(ctx)
47+
bi.logger.Info("latest block number", "block number", latestBlockNumber)
48+
if err != nil {
49+
return fmt.Errorf("failed to get latest block number: %w", err)
50+
}
51+
52+
if err = bi.initializeForwardIndex(ctx, latestBlockNumber.Uint64()); err != nil {
53+
return err
54+
}
55+
56+
if err = bi.initializeBackwardIndex(ctx, latestBlockNumber.Uint64()); err != nil {
57+
return err
58+
}
59+
60+
go bi.fetchForwardBlocks(ctx)
61+
go bi.processForwardBlocks(ctx)
62+
go bi.fetchBackwardBlocks(ctx)
63+
go bi.processBackwardBlocks(ctx)
64+
65+
// Block the main function indefinitely
66+
select {}
67+
}
68+
69+
func (bi *BlockchainIndexer) initializeForwardIndex(ctx context.Context, latestBlockNumber uint64) error {
70+
lastForwardIndexedBlock, err := bi.storage.GetLastIndexedBlock(ctx, "forward")
71+
if err != nil {
72+
return fmt.Errorf("failed to get last forward indexed block: %w", err)
73+
}
74+
75+
bi.logger.Info("last indexed block", "blockNumber", lastForwardIndexedBlock, "direction", "forward")
76+
77+
if lastForwardIndexedBlock == nil || lastForwardIndexedBlock.Cmp(big.NewInt(0)) == 0 {
78+
bi.lastForwardIndexedBlock = new(big.Int).SetUint64(latestBlockNumber - 1)
79+
} else {
80+
bi.lastForwardIndexedBlock = lastForwardIndexedBlock
81+
}
82+
83+
return nil
84+
}
85+
86+
func (bi *BlockchainIndexer) initializeBackwardIndex(ctx context.Context, latestBlockNumber uint64) error {
87+
lastBackwardIndexedBlock, err := bi.storage.GetLastIndexedBlock(ctx, "backward")
88+
if err != nil {
89+
return fmt.Errorf("failed to get last backward indexed block: %w", err)
90+
}
91+
92+
bi.logger.Info("last indexed block", "blockNumber", lastBackwardIndexedBlock, "direction", "backward")
93+
94+
if lastBackwardIndexedBlock == nil || lastBackwardIndexedBlock.Cmp(big.NewInt(0)) == 0 {
95+
bi.lastBackwardIndexedBlock = new(big.Int).SetUint64(latestBlockNumber)
96+
} else {
97+
bi.lastBackwardIndexedBlock = lastBackwardIndexedBlock
98+
}
99+
100+
return nil
101+
}
102+
103+
func (bi *BlockchainIndexer) fetchForwardBlocks(ctx context.Context) {
104+
ticker := time.NewTicker(bi.indexInterval)
105+
defer ticker.Stop()
106+
107+
for {
108+
select {
109+
case <-ctx.Done():
110+
return
111+
case <-ticker.C:
112+
latestBlockNumber, err := bi.ethClient.BlockNumber(ctx)
113+
if err != nil {
114+
bi.logger.Error("failed to get latest block number", "error", err)
115+
continue
116+
}
117+
118+
for blockNum := new(big.Int).Add(bi.lastForwardIndexedBlock, big.NewInt(1)); blockNum.Cmp(latestBlockNumber) <= 0; blockNum.Add(blockNum, big.NewInt(5)) {
119+
endBlockNum := new(big.Int).Add(blockNum, big.NewInt(4))
120+
if endBlockNum.Cmp(latestBlockNumber) > 0 {
121+
endBlockNum.Set(latestBlockNumber)
122+
}
123+
124+
blockNums := []*big.Int{}
125+
for bn := new(big.Int).Set(blockNum); bn.Cmp(endBlockNum) <= 0; bn.Add(bn, big.NewInt(1)) {
126+
blockNums = append(blockNums, new(big.Int).Set(bn))
127+
}
128+
129+
blocks, err := bi.fetchBlocks(ctx, blockNums)
130+
if err != nil {
131+
bi.logger.Error("failed to fetch blocks", "start", blockNum, "end", endBlockNum, "error", err)
132+
continue
133+
}
134+
135+
for _, block := range blocks {
136+
bi.forwardBlockChan <- block
137+
bi.lastForwardIndexedBlock.Set(block.Number())
138+
}
139+
}
140+
}
141+
}
142+
}
143+
144+
func (bi *BlockchainIndexer) fetchBackwardBlocks(ctx context.Context) {
145+
ticker := time.NewTicker(bi.indexInterval)
146+
defer ticker.Stop()
147+
148+
for {
149+
select {
150+
case <-ctx.Done():
151+
return
152+
case <-ticker.C:
153+
if bi.lastBackwardIndexedBlock.Sign() <= 0 {
154+
return
155+
}
156+
zeroBigNum := big.NewInt(0)
157+
blockNum := new(big.Int).Sub(bi.lastBackwardIndexedBlock, big.NewInt(1))
158+
159+
for i := 0; blockNum.Cmp(zeroBigNum) >= 0; i++ {
160+
endBlockNum := new(big.Int).Sub(blockNum, big.NewInt(4))
161+
if endBlockNum.Cmp(zeroBigNum) < 0 {
162+
endBlockNum.Set(zeroBigNum)
163+
}
164+
165+
blockNums := []*big.Int{}
166+
for bn := new(big.Int).Set(blockNum); bn.Cmp(endBlockNum) >= 0; bn.Sub(bn, big.NewInt(1)) {
167+
blockNums = append(blockNums, new(big.Int).Set(bn))
168+
}
169+
170+
blocks, err := bi.fetchBlocks(ctx, blockNums)
171+
if err != nil {
172+
bi.logger.Error("failed to fetch blocks", "start", blockNum, "end", endBlockNum, "error", err)
173+
break
174+
}
175+
176+
for _, block := range blocks {
177+
bi.backwardBlockChan <- block
178+
bi.lastBackwardIndexedBlock.Set(block.Number())
179+
if block.Number().Cmp(zeroBigNum) == 0 {
180+
bi.logger.Info("done fetching backward blocks...")
181+
return
182+
}
183+
}
184+
blockNum.Sub(endBlockNum, big.NewInt(1))
185+
}
186+
}
187+
}
188+
}
189+
190+
func (bi *BlockchainIndexer) processForwardBlocks(ctx context.Context) {
191+
for {
192+
select {
193+
case <-ctx.Done():
194+
return
195+
case block := <-bi.forwardBlockChan:
196+
if err := bi.indexBlock(ctx, block); err != nil {
197+
bi.logger.Error("failed to index block", "error", err)
198+
}
199+
if err := bi.indexTransactions(ctx, block); err != nil {
200+
bi.logger.Error("failed to index transactions", "error", err)
201+
}
202+
}
203+
}
204+
}
205+
206+
func (bi *BlockchainIndexer) processBackwardBlocks(ctx context.Context) {
207+
for {
208+
select {
209+
case <-ctx.Done():
210+
return
211+
case block := <-bi.backwardBlockChan:
212+
if err := bi.indexBlock(ctx, block); err != nil {
213+
bi.logger.Error("failed to index block", "error", err)
214+
}
215+
if err := bi.indexTransactions(ctx, block); err != nil {
216+
bi.logger.Error("failed to index transactions", "error", err)
217+
}
218+
if block.Number().Cmp(big.NewInt(0)) == 0 {
219+
bi.logger.Info("done processing backward blocks...")
220+
return
221+
}
222+
}
223+
}
224+
}
225+
226+
func (bi *BlockchainIndexer) indexBlock(ctx context.Context, block *types.Block) error {
227+
timestamp := time.UnixMilli(int64(block.Time())).UTC().Format("2006-01-02T15:04:05.000Z")
228+
indexBlock := &store.IndexBlock{
229+
Number: block.NumberU64(),
230+
Hash: block.Hash().Hex(),
231+
ParentHash: block.ParentHash().Hex(),
232+
Root: block.Root().Hex(),
233+
Nonce: block.Nonce(),
234+
Timestamp: timestamp,
235+
Transactions: len(block.Transactions()),
236+
BaseFee: block.BaseFee().Uint64(),
237+
GasLimit: block.GasLimit(),
238+
GasUsed: block.GasUsed(),
239+
Difficulty: block.Difficulty().Uint64(),
240+
ExtraData: hex.EncodeToString(block.Extra()),
241+
}
242+
243+
return bi.storage.IndexBlock(ctx, indexBlock)
244+
}
245+
246+
func (bi *BlockchainIndexer) indexTransactions(ctx context.Context, block *types.Block) error {
247+
var transactions []*store.IndexTransaction
248+
var txHashes []string
249+
250+
for _, tx := range block.Transactions() {
251+
from, err := types.Sender(types.NewCancunSigner(tx.ChainId()), tx)
252+
if err != nil {
253+
return fmt.Errorf("failed to derive sender: %w", err)
254+
}
255+
256+
v, r, s := tx.RawSignatureValues()
257+
timestamp := tx.Time().UTC().Format("2006-01-02T15:04:05.000Z")
258+
transaction := &store.IndexTransaction{
259+
Hash: tx.Hash().Hex(),
260+
From: from.Hex(),
261+
Gas: tx.Gas(),
262+
Nonce: tx.Nonce(),
263+
BlockHash: block.Hash().Hex(),
264+
BlockNumber: block.NumberU64(),
265+
ChainId: tx.ChainId().String(),
266+
V: v.String(),
267+
R: r.String(),
268+
S: s.String(),
269+
Input: hex.EncodeToString(tx.Data()),
270+
Timestamp: timestamp,
271+
}
272+
273+
if tx.To() != nil {
274+
transaction.To = tx.To().Hex()
275+
}
276+
if tx.GasPrice() != nil {
277+
transaction.GasPrice = tx.GasPrice().Uint64()
278+
}
279+
if tx.GasTipCap() != nil {
280+
transaction.GasTipCap = tx.GasTipCap().Uint64()
281+
}
282+
if tx.GasFeeCap() != nil {
283+
transaction.GasFeeCap = tx.GasFeeCap().Uint64()
284+
}
285+
if tx.Value() != nil {
286+
transaction.Value = tx.Value().String()
287+
}
288+
289+
transactions = append(transactions, transaction)
290+
txHashes = append(txHashes, tx.Hash().Hex())
291+
}
292+
293+
receipts, err := bi.fetchReceipts(ctx, txHashes)
294+
if err != nil {
295+
return fmt.Errorf("failed to fetch transaction receipts: %w", err)
296+
}
297+
298+
for _, tx := range transactions {
299+
if receipt, ok := receipts[tx.Hash]; ok {
300+
tx.Status = receipt.Status
301+
tx.GasUsed = receipt.GasUsed
302+
tx.CumulativeGasUsed = receipt.CumulativeGasUsed
303+
tx.ContractAddress = receipt.ContractAddress.Hex()
304+
tx.TransactionIndex = receipt.TransactionIndex
305+
tx.ReceiptBlockHash = receipt.BlockHash.Hex()
306+
tx.ReceiptBlockNumber = receipt.BlockNumber.Uint64()
307+
}
308+
}
309+
310+
return bi.storage.IndexTransactions(ctx, transactions)
311+
}
312+
313+
func (bi *BlockchainIndexer) fetchReceipts(ctx context.Context, txHashes []string) (map[string]*types.Receipt, error) {
314+
return bi.ethClient.TxReceipts(ctx, txHashes)
315+
}
316+
317+
func (bi *BlockchainIndexer) fetchBlocks(ctx context.Context, blockNums []*big.Int) ([]*types.Block, error) {
318+
return bi.ethClient.GetBlocks(ctx, blockNums)
319+
}

0 commit comments

Comments
 (0)