Skip to content

Commit 7cb6ff1

Browse files
committed
block buffer
1 parent 31d923f commit 7cb6ff1

File tree

2 files changed

+334
-116
lines changed

2 files changed

+334
-116
lines changed

internal/storage/block_buffer.go

Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
package storage
2+
3+
import (
4+
"fmt"
5+
"math/big"
6+
"sync"
7+
8+
"github.com/rs/zerolog/log"
9+
"github.com/thirdweb-dev/indexer/internal/common"
10+
)
11+
12+
// BlockBuffer manages buffering of block data with size and count limits
13+
type BlockBuffer struct {
14+
mu sync.RWMutex
15+
data []common.BlockData
16+
sizeBytes int64
17+
maxSizeBytes int64
18+
maxBlocks int
19+
}
20+
21+
// NewBlockBuffer creates a new block buffer
22+
func NewBlockBuffer(maxSizeMB int64, maxBlocks int) *BlockBuffer {
23+
return &BlockBuffer{
24+
data: make([]common.BlockData, 0),
25+
maxSizeBytes: maxSizeMB * 1024 * 1024,
26+
maxBlocks: maxBlocks,
27+
}
28+
}
29+
30+
// Add adds blocks to the buffer and returns true if flush is needed
31+
func (b *BlockBuffer) Add(blocks []common.BlockData, actualSizeBytes int64) bool {
32+
if len(blocks) == 0 {
33+
return false
34+
}
35+
36+
b.mu.Lock()
37+
defer b.mu.Unlock()
38+
39+
// Add to buffer
40+
b.data = append(b.data, blocks...)
41+
b.sizeBytes += actualSizeBytes
42+
43+
log.Debug().
44+
Int("block_count", len(blocks)).
45+
Int64("size_bytes", actualSizeBytes).
46+
Int64("total_size_bytes", b.sizeBytes).
47+
Int("total_blocks", len(b.data)).
48+
Msg("Added blocks to buffer")
49+
50+
// Check if flush is needed
51+
return b.shouldFlushLocked()
52+
}
53+
54+
// Flush removes all data from the buffer and returns it
55+
func (b *BlockBuffer) Flush() []common.BlockData {
56+
b.mu.Lock()
57+
defer b.mu.Unlock()
58+
59+
if len(b.data) == 0 {
60+
return nil
61+
}
62+
63+
// Take ownership of data
64+
data := b.data
65+
b.data = make([]common.BlockData, 0)
66+
b.sizeBytes = 0
67+
68+
log.Info().
69+
Int("block_count", len(data)).
70+
Msg("Flushing buffer")
71+
72+
return data
73+
}
74+
75+
// ShouldFlush checks if the buffer should be flushed based on configured thresholds
76+
func (b *BlockBuffer) ShouldFlush() bool {
77+
b.mu.RLock()
78+
defer b.mu.RUnlock()
79+
return b.shouldFlushLocked()
80+
}
81+
82+
// Size returns the current buffer size in bytes and block count
83+
func (b *BlockBuffer) Size() (int64, int) {
84+
b.mu.RLock()
85+
defer b.mu.RUnlock()
86+
return b.sizeBytes, len(b.data)
87+
}
88+
89+
// IsEmpty returns true if the buffer is empty
90+
func (b *BlockBuffer) IsEmpty() bool {
91+
b.mu.RLock()
92+
defer b.mu.RUnlock()
93+
return len(b.data) == 0
94+
}
95+
96+
// GetData returns a copy of the current buffer data
97+
func (b *BlockBuffer) GetData() []common.BlockData {
98+
b.mu.RLock()
99+
defer b.mu.RUnlock()
100+
101+
result := make([]common.BlockData, len(b.data))
102+
copy(result, b.data)
103+
return result
104+
}
105+
106+
// GetBlocksInRange returns blocks from the buffer that fall within the given range
107+
func (b *BlockBuffer) GetBlocksInRange(chainId *big.Int, startBlock, endBlock *big.Int) []common.BlockData {
108+
b.mu.RLock()
109+
defer b.mu.RUnlock()
110+
111+
var result []common.BlockData
112+
for _, block := range b.data {
113+
if block.Block.ChainId.Cmp(chainId) == 0 {
114+
blockNum := block.Block.Number
115+
if blockNum.Cmp(startBlock) >= 0 && blockNum.Cmp(endBlock) <= 0 {
116+
result = append(result, block)
117+
}
118+
}
119+
}
120+
return result
121+
}
122+
123+
// GetBlockByNumber returns a specific block from the buffer if it exists
124+
func (b *BlockBuffer) GetBlockByNumber(chainId *big.Int, blockNumber *big.Int) *common.BlockData {
125+
b.mu.RLock()
126+
defer b.mu.RUnlock()
127+
128+
for _, block := range b.data {
129+
if block.Block.ChainId.Cmp(chainId) == 0 && block.Block.Number.Cmp(blockNumber) == 0 {
130+
blockCopy := block
131+
return &blockCopy
132+
}
133+
}
134+
return nil
135+
}
136+
137+
// GetMaxBlockNumber returns the maximum block number for a chain in the buffer
138+
func (b *BlockBuffer) GetMaxBlockNumber(chainId *big.Int) *big.Int {
139+
b.mu.RLock()
140+
defer b.mu.RUnlock()
141+
142+
var maxBlock *big.Int
143+
for _, block := range b.data {
144+
if block.Block.ChainId.Cmp(chainId) == 0 {
145+
if maxBlock == nil || block.Block.Number.Cmp(maxBlock) > 0 {
146+
maxBlock = new(big.Int).Set(block.Block.Number)
147+
}
148+
}
149+
}
150+
return maxBlock
151+
}
152+
153+
// Clear empties the buffer without returning data
154+
func (b *BlockBuffer) Clear() {
155+
b.mu.Lock()
156+
defer b.mu.Unlock()
157+
158+
b.data = make([]common.BlockData, 0)
159+
b.sizeBytes = 0
160+
}
161+
162+
// Stats returns statistics about the buffer
163+
func (b *BlockBuffer) Stats() BufferStats {
164+
b.mu.RLock()
165+
defer b.mu.RUnlock()
166+
167+
stats := BufferStats{
168+
BlockCount: len(b.data),
169+
SizeBytes: b.sizeBytes,
170+
ChainCount: 0,
171+
ChainStats: make(map[uint64]ChainStats),
172+
}
173+
174+
// Calculate per-chain statistics
175+
for _, block := range b.data {
176+
chainId := block.Block.ChainId.Uint64()
177+
chainStat := stats.ChainStats[chainId]
178+
179+
if chainStat.MinBlock == nil || block.Block.Number.Cmp(chainStat.MinBlock) < 0 {
180+
chainStat.MinBlock = new(big.Int).Set(block.Block.Number)
181+
}
182+
if chainStat.MaxBlock == nil || block.Block.Number.Cmp(chainStat.MaxBlock) > 0 {
183+
chainStat.MaxBlock = new(big.Int).Set(block.Block.Number)
184+
}
185+
chainStat.BlockCount++
186+
187+
stats.ChainStats[chainId] = chainStat
188+
}
189+
190+
stats.ChainCount = len(stats.ChainStats)
191+
return stats
192+
}
193+
194+
// Private methods
195+
196+
func (b *BlockBuffer) shouldFlushLocked() bool {
197+
// Check size limit
198+
if b.maxSizeBytes > 0 && b.sizeBytes >= b.maxSizeBytes {
199+
return true
200+
}
201+
202+
// Check block count limit
203+
if b.maxBlocks > 0 && len(b.data) >= b.maxBlocks {
204+
return true
205+
}
206+
207+
return false
208+
}
209+
210+
// BufferStats contains statistics about the buffer
211+
type BufferStats struct {
212+
BlockCount int
213+
SizeBytes int64
214+
ChainCount int
215+
ChainStats map[uint64]ChainStats
216+
}
217+
218+
// ChainStats contains per-chain statistics
219+
type ChainStats struct {
220+
BlockCount int
221+
MinBlock *big.Int
222+
MaxBlock *big.Int
223+
}
224+
225+
// String returns a string representation of buffer stats
226+
func (s BufferStats) String() string {
227+
return fmt.Sprintf("BufferStats{blocks=%d, size=%dMB, chains=%d}",
228+
s.BlockCount, s.SizeBytes/(1024*1024), s.ChainCount)
229+
}

0 commit comments

Comments
 (0)