Skip to content

Commit 895fd8f

Browse files
committed
parallelize staging cleanup
1 parent a5ed360 commit 895fd8f

File tree

2 files changed

+31
-7
lines changed

2 files changed

+31
-7
lines changed

internal/orchestrator/committer.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -259,12 +259,16 @@ func (c *Committer) commit(ctx context.Context, blockData []common.BlockData) er
259259
}
260260
}()
261261

262-
stagingDeleteStart := time.Now()
263-
if err := c.storage.StagingStorage.DeleteStagingData(blockData); err != nil {
264-
return fmt.Errorf("error deleting data from staging storage: %v", err)
262+
if c.workMode == WorkModeBackfill {
263+
go func() {
264+
stagingDeleteStart := time.Now()
265+
if err := c.storage.StagingStorage.DeleteStagingData(blockData); err != nil {
266+
log.Error().Err(err).Msg("Failed to delete staging data")
267+
}
268+
log.Debug().Str("metric", "staging_delete_duration").Msgf("StagingStorage.DeleteStagingData duration: %f", time.Since(stagingDeleteStart).Seconds())
269+
metrics.StagingDeleteDuration.Observe(time.Since(stagingDeleteStart).Seconds())
270+
}()
265271
}
266-
log.Debug().Str("metric", "staging_delete_duration").Msgf("StagingStorage.DeleteStagingData duration: %f", time.Since(stagingDeleteStart).Seconds())
267-
metrics.StagingDeleteDuration.Observe(time.Since(stagingDeleteStart).Seconds())
268272

269273
// Find highest block number from committed blocks
270274
highestBlock := blockData[0].Block

internal/orchestrator/committer_test.go

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package orchestrator
33
import (
44
"context"
55
"math/big"
6+
"sync"
67
"testing"
78
"time"
89

@@ -329,12 +330,31 @@ func TestCommit(t *testing.T) {
329330
{Block: common.Block{Number: big.NewInt(102)}},
330331
}
331332

333+
// Create a channel to signal when DeleteStagingData is called
334+
deleteDone := make(chan struct{})
335+
var wg sync.WaitGroup
336+
wg.Add(1)
337+
332338
mockMainStorage.EXPECT().InsertBlockData(blockData).Return(nil)
333-
mockStagingStorage.EXPECT().DeleteStagingData(blockData).Return(nil)
339+
mockStagingStorage.EXPECT().DeleteStagingData(blockData).RunAndReturn(func(data []common.BlockData) error {
340+
defer wg.Done()
341+
close(deleteDone)
342+
return nil
343+
})
334344

335345
err := committer.commit(context.Background(), blockData)
336-
337346
assert.NoError(t, err)
347+
348+
// Wait for DeleteStagingData to be called with a timeout
349+
select {
350+
case <-deleteDone:
351+
// Success - DeleteStagingData was called
352+
case <-time.After(2 * time.Second):
353+
t.Fatal("DeleteStagingData was not called within timeout period")
354+
}
355+
356+
// Wait for the goroutine to complete
357+
wg.Wait()
338358
}
339359

340360
func TestHandleGap(t *testing.T) {

0 commit comments

Comments
 (0)