Skip to content

Commit 4595fa6

Browse files
committed
optimize s3 insertion
1 parent 86f3d68 commit 4595fa6

File tree

4 files changed

+71
-47
lines changed

4 files changed

+71
-47
lines changed

internal/storage/block_buffer.go

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package storage
22

33
import (
4+
"bytes"
5+
"encoding/gob"
46
"fmt"
57
"math/big"
68
"sync"
@@ -20,7 +22,7 @@ type BlockBuffer struct {
2022

2123
// IBlockBuffer defines the interface for block buffer implementations
2224
type IBlockBuffer interface {
23-
Add(blocks []common.BlockData, actualSizeBytes int64) bool
25+
Add(blocks []common.BlockData) bool
2426
Flush() []common.BlockData
2527
ShouldFlush() bool
2628
Size() (int64, int)
@@ -31,6 +33,7 @@ type IBlockBuffer interface {
3133
GetMaxBlockNumber(chainId *big.Int) *big.Int
3234
Clear()
3335
Stats() BufferStats
36+
Close() error
3437
}
3538

3639
// NewBlockBuffer creates a new in-memory block buffer
@@ -49,21 +52,35 @@ func NewBlockBufferWithBadger(maxSizeMB int64, maxBlocks int) (IBlockBuffer, err
4952
}
5053

5154
// Add adds blocks to the buffer and returns true if flush is needed
52-
func (b *BlockBuffer) Add(blocks []common.BlockData, actualSizeBytes int64) bool {
55+
func (b *BlockBuffer) Add(blocks []common.BlockData) bool {
5356
if len(blocks) == 0 {
5457
return false
5558
}
5659

5760
b.mu.Lock()
5861
defer b.mu.Unlock()
5962

63+
// Calculate actual size by marshaling the entire batch once
64+
// This gives us accurate size with minimal overhead since we marshal once per Add call
65+
var actualSize int64
66+
var buf bytes.Buffer
67+
enc := gob.NewEncoder(&buf)
68+
69+
// Marshal all blocks to get actual serialized size
70+
if err := enc.Encode(blocks); err != nil {
71+
// If encoding fails, use estimation as fallback
72+
log.Warn().Err(err).Msg("Failed to marshal blocks for size calculation, buffer size is not reported correctly")
73+
} else {
74+
actualSize = int64(buf.Len())
75+
}
76+
6077
// Add to buffer
6178
b.data = append(b.data, blocks...)
62-
b.sizeBytes += actualSizeBytes
79+
b.sizeBytes += actualSize
6380

6481
log.Debug().
6582
Int("block_count", len(blocks)).
66-
Int64("size_bytes", actualSizeBytes).
83+
Int64("actual_size_bytes", actualSize).
6784
Int64("total_size_bytes", b.sizeBytes).
6885
Int("total_blocks", len(b.data)).
6986
Msg("Added blocks to buffer")
@@ -248,3 +265,18 @@ func (s BufferStats) String() string {
248265
return fmt.Sprintf("BufferStats{blocks=%d, size=%dMB, chains=%d}",
249266
s.BlockCount, s.SizeBytes/(1024*1024), s.ChainCount)
250267
}
268+
269+
// Close closes the buffer (no-op for in-memory buffer)
270+
func (b *BlockBuffer) Close() error {
271+
b.mu.Lock()
272+
defer b.mu.Unlock()
273+
274+
// Clear the buffer to free memory
275+
b.data = nil
276+
b.sizeBytes = 0
277+
278+
return nil
279+
}
280+
281+
// Ensure BlockBuffer implements IBlockBuffer interface
282+
var _ IBlockBuffer = (*BlockBuffer)(nil)

internal/storage/block_buffer_badger.go

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ type BadgerBlockBuffer struct {
2020
mu sync.RWMutex
2121
db *badger.DB
2222
tempDir string
23-
sizeBytes int64
2423
maxSizeBytes int64
2524
maxBlocks int
2625
blockCount int
@@ -97,7 +96,7 @@ func NewBadgerBlockBuffer(maxSizeMB int64, maxBlocks int) (*BadgerBlockBuffer, e
9796
}
9897

9998
// Add adds blocks to the buffer and returns true if flush is needed
100-
func (b *BadgerBlockBuffer) Add(blocks []common.BlockData, actualSizeBytes int64) bool {
99+
func (b *BadgerBlockBuffer) Add(blocks []common.BlockData) bool {
101100
if len(blocks) == 0 {
102101
return false
103102
}
@@ -128,7 +127,6 @@ func (b *BadgerBlockBuffer) Add(blocks []common.BlockData, actualSizeBytes int64
128127

129128
// Update counters
130129
b.blockCount += len(blocks)
131-
b.sizeBytes += actualSizeBytes
132130

133131
// Update chain metadata for O(1) lookups
134132
for _, block := range blocks {
@@ -154,8 +152,6 @@ func (b *BadgerBlockBuffer) Add(blocks []common.BlockData, actualSizeBytes int64
154152

155153
log.Debug().
156154
Int("block_count", len(blocks)).
157-
Int64("size_bytes", actualSizeBytes).
158-
Int64("total_size_bytes", b.sizeBytes).
159155
Int("total_blocks", b.blockCount).
160156
Msg("Added blocks to badger buffer")
161157

@@ -212,7 +208,6 @@ func (b *BadgerBlockBuffer) Flush() []common.BlockData {
212208
// Reset counters and metadata
213209
oldCount := b.blockCount
214210
b.blockCount = 0
215-
b.sizeBytes = 0
216211
b.chainMetadata = make(map[uint64]*ChainMetadata)
217212

218213
log.Info().
@@ -233,7 +228,10 @@ func (b *BadgerBlockBuffer) ShouldFlush() bool {
233228
func (b *BadgerBlockBuffer) Size() (int64, int) {
234229
b.mu.RLock()
235230
defer b.mu.RUnlock()
236-
return b.sizeBytes, b.blockCount
231+
232+
// Get actual size from Badger's LSM tree
233+
lsm, _ := b.db.Size()
234+
return lsm, b.blockCount
237235
}
238236

239237
// IsEmpty returns true if the buffer is empty
@@ -382,7 +380,6 @@ func (b *BadgerBlockBuffer) Clear() {
382380
}
383381

384382
b.blockCount = 0
385-
b.sizeBytes = 0
386383
b.chainMetadata = make(map[uint64]*ChainMetadata)
387384
}
388385

@@ -391,9 +388,12 @@ func (b *BadgerBlockBuffer) Stats() BufferStats {
391388
b.mu.RLock()
392389
defer b.mu.RUnlock()
393390

391+
// Get actual size from Badger
392+
lsm, _ := b.db.Size()
393+
394394
stats := BufferStats{
395395
BlockCount: b.blockCount,
396-
SizeBytes: b.sizeBytes,
396+
SizeBytes: lsm,
397397
ChainCount: len(b.chainMetadata),
398398
ChainStats: make(map[uint64]ChainStats),
399399
}
@@ -439,9 +439,12 @@ func (b *BadgerBlockBuffer) Close() error {
439439
// Private methods
440440

441441
func (b *BadgerBlockBuffer) shouldFlushLocked() bool {
442-
// Check size limit
443-
if b.maxSizeBytes > 0 && b.sizeBytes >= b.maxSizeBytes {
444-
return true
442+
// Check size limit using Badger's actual size
443+
if b.maxSizeBytes > 0 {
444+
lsm, _ := b.db.Size()
445+
if lsm >= b.maxSizeBytes {
446+
return true
447+
}
445448
}
446449

447450
// Check block count limit
@@ -474,3 +477,6 @@ func (b *BadgerBlockBuffer) runGC() {
474477
}
475478
}
476479
}
480+
481+
// Ensure BadgerBlockBuffer implements IBlockBuffer interface
482+
var _ IBlockBuffer = (*BadgerBlockBuffer)(nil)

internal/storage/block_buffer_badger_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ func TestBadgerBlockBufferMetadataOptimization(t *testing.T) {
4343
},
4444
}
4545

46-
buffer.Add(blocks, 1024)
46+
buffer.Add(blocks)
4747

4848
// Test O(1) GetMaxBlockNumber
4949
start := time.Now()
@@ -82,7 +82,7 @@ func TestBadgerBlockBufferMetadataOptimization(t *testing.T) {
8282
},
8383
},
8484
}
85-
buffer.Add(newBlocks, 512)
85+
buffer.Add(newBlocks)
8686

8787
maxBlock = buffer.GetMaxBlockNumber(chainId)
8888
assert.NotNil(t, maxBlock)
@@ -107,7 +107,7 @@ func BenchmarkBadgerBlockBufferGetMaxBlockNumber(b *testing.B) {
107107
},
108108
},
109109
}
110-
buffer.Add(blocks, 1024)
110+
buffer.Add(blocks)
111111
}
112112

113113
b.ResetTimer()
@@ -133,7 +133,7 @@ func BenchmarkBadgerBlockBufferStats(b *testing.B) {
133133
},
134134
},
135135
}
136-
buffer.Add(blocks, 1024)
136+
buffer.Add(blocks)
137137
}
138138
}
139139

internal/storage/s3.go

Lines changed: 13 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -115,10 +115,11 @@ func NewS3Connector(cfg *config.S3StorageConfig) (*S3Connector, error) {
115115
}
116116

117117
// Create buffer with configured settings
118-
buffer, err := NewBlockBufferWithBadger(cfg.BufferSize, cfg.MaxBlocksPerFile)
118+
var buffer IBlockBuffer
119+
buffer, err = NewBadgerBlockBuffer(cfg.BufferSize, cfg.MaxBlocksPerFile)
119120
if err != nil {
120-
// Fall back to in-memory buffer if Badger fails
121-
log.Warn().Err(err).Msg("Failed to create Badger buffer, falling back to in-memory buffer")
121+
// fallback
122+
log.Error().Err(err).Msg("Failed to create Badger buffer, falling back to in-memory buffer")
122123
buffer = NewBlockBuffer(cfg.BufferSize, cfg.MaxBlocksPerFile)
123124
}
124125

@@ -144,27 +145,14 @@ func (s *S3Connector) InsertBlockData(data []common.BlockData) error {
144145
return nil
145146
}
146147

147-
// Calculate actual serialized size for accurate memory tracking
148-
formattedData, err := s.formatter.FormatBlockData(data)
149-
if err != nil {
150-
return fmt.Errorf("failed to format block data for size calculation: %w", err)
151-
}
152-
153-
// Use actual serialized size for accurate memory tracking
154-
actualSize := int64(len(formattedData))
155-
log.Debug().
156-
Int("block_count", len(data)).
157-
Int64("size_bytes", actualSize).
158-
Int64("avg_bytes_per_block", actualSize/int64(len(data))).
159-
Msg("Calculated actual block data size")
160-
161148
// Add to buffer and check if flush is needed
162-
shouldFlush := s.buffer.Add(data, actualSize)
149+
shouldFlush := s.buffer.Add(data)
163150

164151
// Start or reset timer when first data is added
165152
s.timerMu.Lock()
166-
sizeBytes, blockCount := s.buffer.Size()
167-
if sizeBytes == actualSize && blockCount == len(data) && s.config.BufferTimeout > 0 {
153+
_, blockCount := s.buffer.Size()
154+
// Check if this is the first batch added (buffer was empty before)
155+
if blockCount == len(data) && s.config.BufferTimeout > 0 {
168156
// First data added to buffer, track time and start timer
169157
s.lastAddTime = time.Now()
170158
if s.flushTimer != nil {
@@ -357,13 +345,11 @@ func (s *S3Connector) Close() error {
357345
// Wait for worker to finish
358346
s.wg.Wait()
359347

360-
// Clean up buffer resources (especially important for BadgerBlockBuffer)
361-
if badgerBuffer, ok := s.buffer.(*BadgerBlockBuffer); ok {
362-
if err := badgerBuffer.Close(); err != nil {
363-
log.Error().Err(err).Msg("Error closing badger buffer")
364-
if closeErr == nil {
365-
closeErr = err
366-
}
348+
// Clean up buffer resources
349+
if err := s.buffer.Close(); err != nil {
350+
log.Error().Err(err).Msg("Error closing buffer")
351+
if closeErr == nil {
352+
closeErr = err
367353
}
368354
}
369355
})

0 commit comments

Comments
 (0)