Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ func init() {
rootCmd.PersistentFlags().Int("poller-s3-maxConcurrentDownloads", 3, "Max concurrent downloads for poller archive source")
rootCmd.PersistentFlags().Bool("committer-enabled", true, "Toggle committer")
rootCmd.PersistentFlags().Int("committer-blocks-per-commit", 10, "How many blocks to commit each interval")
rootCmd.PersistentFlags().Int("committer-interval", 1000, "How often to commit blocks in milliseconds")
rootCmd.PersistentFlags().Int("committer-from-block", 0, "From which block to start committing")
rootCmd.PersistentFlags().Int("committer-to-block", 0, "To which block to commit")
rootCmd.PersistentFlags().Bool("reorgHandler-enabled", true, "Toggle reorg handler")
Expand Down Expand Up @@ -191,8 +190,6 @@ func init() {
rootCmd.PersistentFlags().String("publisher-events-topicName", "", "Kafka topic name for events")
rootCmd.PersistentFlags().String("publisher-events-addressFilter", "", "Filter events by address")
rootCmd.PersistentFlags().String("publisher-events-topic0Filter", "", "Filter events by topic0")
rootCmd.PersistentFlags().Int("workMode-checkIntervalMinutes", 10, "How often to check work mode in minutes")
rootCmd.PersistentFlags().Int64("workMode-liveModeThreshold", 500, "How many blocks the indexer can be behind before switching to live mode")
rootCmd.PersistentFlags().String("validation-mode", "strict", "Validation mode. Strict will validate logsBloom and transactionsRoot. Minimal will validate transaction count and logs existence.")
rootCmd.PersistentFlags().String("migrator-destination-type", "auto", "Storage type for migrator destination (auto, clickhouse, postgres, kafka, badger, pebble, s3)")
rootCmd.PersistentFlags().String("migrator-destination-clickhouse-host", "", "Clickhouse host for migrator destination")
Expand Down Expand Up @@ -263,7 +260,6 @@ func init() {
viper.BindPFlag("poller.s3.maxConcurrentDownloads", rootCmd.PersistentFlags().Lookup("poller-s3-maxConcurrentDownloads"))
viper.BindPFlag("committer.enabled", rootCmd.PersistentFlags().Lookup("committer-enabled"))
viper.BindPFlag("committer.blocksPerCommit", rootCmd.PersistentFlags().Lookup("committer-blocks-per-commit"))
viper.BindPFlag("committer.interval", rootCmd.PersistentFlags().Lookup("committer-interval"))
viper.BindPFlag("committer.fromBlock", rootCmd.PersistentFlags().Lookup("committer-from-block"))
viper.BindPFlag("committer.toBlock", rootCmd.PersistentFlags().Lookup("committer-to-block"))
viper.BindPFlag("reorgHandler.enabled", rootCmd.PersistentFlags().Lookup("reorgHandler-enabled"))
Expand Down Expand Up @@ -389,8 +385,6 @@ func init() {
viper.BindPFlag("publisher.events.topicName", rootCmd.PersistentFlags().Lookup("publisher-events-topicName"))
viper.BindPFlag("publisher.events.addressFilter", rootCmd.PersistentFlags().Lookup("publisher-events-addressFilter"))
viper.BindPFlag("publisher.events.topic0Filter", rootCmd.PersistentFlags().Lookup("publisher-events-topic0Filter"))
viper.BindPFlag("workMode.checkIntervalMinutes", rootCmd.PersistentFlags().Lookup("workMode-checkIntervalMinutes"))
viper.BindPFlag("workMode.liveModeThreshold", rootCmd.PersistentFlags().Lookup("workMode-liveModeThreshold"))
viper.BindPFlag("validation.mode", rootCmd.PersistentFlags().Lookup("validation-mode"))
// Migrator viper bindings
viper.BindPFlag("migrator.destination.type", rootCmd.PersistentFlags().Lookup("migrator-destination-type"))
Expand Down
34 changes: 10 additions & 24 deletions configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ type PollerConfig struct {

type CommitterConfig struct {
Enabled bool `mapstructure:"enabled"`
Interval int `mapstructure:"interval"`
BlocksPerCommit int `mapstructure:"blocksPerCommit"`
FromBlock int `mapstructure:"fromBlock"`
ToBlock int `mapstructure:"toBlock"`
Expand All @@ -38,12 +37,6 @@ type ReorgHandlerConfig struct {
ForceFromBlock bool `mapstructure:"forceFromBlock"`
}

type FailureRecovererConfig struct {
Enabled bool `mapstructure:"enabled"`
Interval int `mapstructure:"interval"`
BlocksPerRun int `mapstructure:"blocksPerRun"`
}

type StorageConfig struct {
Orchestrator StorageOrchestratorConfig `mapstructure:"orchestrator"`
Staging StorageStagingConfig `mapstructure:"staging"`
Expand Down Expand Up @@ -254,11 +247,6 @@ type S3SourceConfig struct {
MaxConcurrentDownloads int `mapstructure:"maxConcurrentDownloads"`
}

type WorkModeConfig struct {
CheckIntervalMinutes int `mapstructure:"checkIntervalMinutes"`
LiveModeThreshold int64 `mapstructure:"liveModeThreshold"`
}

type ValidationConfig struct {
Mode string `mapstructure:"mode"` // "disabled", "minimal", "strict"
}
Expand All @@ -272,18 +260,16 @@ type MigratorConfig struct {
}

type Config struct {
RPC RPCConfig `mapstructure:"rpc"`
Log LogConfig `mapstructure:"log"`
Poller PollerConfig `mapstructure:"poller"`
Committer CommitterConfig `mapstructure:"committer"`
FailureRecoverer FailureRecovererConfig `mapstructure:"failureRecoverer"`
ReorgHandler ReorgHandlerConfig `mapstructure:"reorgHandler"`
Storage StorageConfig `mapstructure:"storage"`
API APIConfig `mapstructure:"api"`
Publisher PublisherConfig `mapstructure:"publisher"`
WorkMode WorkModeConfig `mapstructure:"workMode"`
Validation ValidationConfig `mapstructure:"validation"`
Migrator MigratorConfig `mapstructure:"migrator"`
RPC RPCConfig `mapstructure:"rpc"`
Log LogConfig `mapstructure:"log"`
Poller PollerConfig `mapstructure:"poller"`
Committer CommitterConfig `mapstructure:"committer"`
ReorgHandler ReorgHandlerConfig `mapstructure:"reorgHandler"`
Storage StorageConfig `mapstructure:"storage"`
API APIConfig `mapstructure:"api"`
Publisher PublisherConfig `mapstructure:"publisher"`
Validation ValidationConfig `mapstructure:"validation"`
Migrator MigratorConfig `mapstructure:"migrator"`
}

var Cfg Config
Expand Down
55 changes: 22 additions & 33 deletions internal/orchestrator/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@ import (
"github.com/thirdweb-dev/indexer/internal/worker"
)

const DEFAULT_COMMITTER_TRIGGER_INTERVAL = 2000
const DEFAULT_BLOCKS_PER_COMMIT = 1000

type Committer struct {
triggerIntervalMs int
blocksPerCommit int
storage storage.IStorage
commitFromBlock *big.Int
Expand All @@ -39,11 +37,6 @@ type Committer struct {
type CommitterOption func(*Committer)

func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage, poller *Poller, opts ...CommitterOption) *Committer {
triggerInterval := config.Cfg.Committer.Interval
if triggerInterval == 0 {
triggerInterval = DEFAULT_COMMITTER_TRIGGER_INTERVAL
}

blocksPerCommit := config.Cfg.Committer.BlocksPerCommit
if blocksPerCommit == 0 {
blocksPerCommit = DEFAULT_BLOCKS_PER_COMMIT
Expand All @@ -56,15 +49,14 @@ func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage, poller *Poller,

commitFromBlock := big.NewInt(int64(config.Cfg.Committer.FromBlock))
committer := &Committer{
triggerIntervalMs: triggerInterval,
blocksPerCommit: blocksPerCommit,
storage: storage,
commitFromBlock: commitFromBlock,
commitToBlock: big.NewInt(int64(commitToBlock)),
rpc: rpc,
publisher: publisher.GetInstance(),
poller: poller,
validator: NewValidator(rpc, storage, worker.NewWorker(rpc)), // validator uses worker without sources
blocksPerCommit: blocksPerCommit,
storage: storage,
commitFromBlock: commitFromBlock,
commitToBlock: big.NewInt(int64(commitToBlock)),
rpc: rpc,
publisher: publisher.GetInstance(),
poller: poller,
validator: NewValidator(rpc, storage, worker.NewWorker(rpc)), // validator uses worker without sources
}
cfb := commitFromBlock.Uint64()
committer.lastCommittedBlock.Store(cfb)
Expand All @@ -78,8 +70,6 @@ func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage, poller *Poller,
}

func (c *Committer) Start(ctx context.Context) {
interval := time.Duration(c.triggerIntervalMs) * time.Millisecond

log.Debug().Msgf("Committer running")
chainID := c.rpc.GetChainID()

Expand Down Expand Up @@ -175,40 +165,35 @@ func (c *Committer) Start(ctx context.Context) {

if config.Cfg.Publisher.Mode == "parallel" {
var wg sync.WaitGroup
publishInterval := interval / 2
if publishInterval <= 0 {
publishInterval = interval
}
wg.Add(2)

go func() {
defer wg.Done()
c.runPublishLoop(ctx, publishInterval)
c.runPublishLoop(ctx)
}()

// allow the publisher to start before the committer
time.Sleep(publishInterval)
go func() {
defer wg.Done()
c.runCommitLoop(ctx, interval)
c.runCommitLoop(ctx)
}()

<-ctx.Done()

wg.Wait()
} else {
c.runCommitLoop(ctx, interval)
c.runCommitLoop(ctx)
}

log.Info().Msg("Committer shutting down")
c.publisher.Close()
}

func (c *Committer) runCommitLoop(ctx context.Context, interval time.Duration) {
func (c *Committer) runCommitLoop(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
default:
time.Sleep(interval)
if c.commitToBlock.Sign() > 0 && c.lastCommittedBlock.Load() >= c.commitToBlock.Uint64() {
// Completing the commit loop if we've committed more than commit to block
log.Info().Msgf("Committer reached configured toBlock %s, the last commit block is %d, stopping commits", c.commitToBlock.String(), c.lastCommittedBlock.Load())
Expand All @@ -231,13 +216,17 @@ func (c *Committer) runCommitLoop(ctx context.Context, interval time.Duration) {
}
}

func (c *Committer) runPublishLoop(ctx context.Context, interval time.Duration) {
func (c *Committer) runPublishLoop(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
default:
time.Sleep(interval)
if c.commitToBlock.Sign() > 0 && c.lastPublishedBlock.Load() >= c.commitToBlock.Uint64() {
// Completing the publish loop if we've published more than commit to block
log.Info().Msgf("Committer reached configured toBlock %s, the last publish block is %d, stopping publishes", c.commitToBlock.String(), c.lastPublishedBlock.Load())
return
}
if err := c.publish(ctx); err != nil {
log.Error().Err(err).Msg("Error publishing blocks")
}
Expand Down Expand Up @@ -397,8 +386,8 @@ func (c *Committer) getBlockToCommitUntil(ctx context.Context, latestCommittedBl
func (c *Committer) fetchBlockData(ctx context.Context, blockNumbers []*big.Int) ([]common.BlockData, error) {
blocksData := c.poller.Request(ctx, blockNumbers)
if len(blocksData) == 0 {
// TODO: should wait a little bit, as it may take time to load
log.Warn().Msgf("Committer didn't find the following range: %v - %v", blockNumbers[0].Int64(), blockNumbers[len(blockNumbers)-1].Int64())
log.Warn().Msgf("Committer didn't find the following range: %v - %v. %v", blockNumbers[0].Int64(), blockNumbers[len(blockNumbers)-1].Int64(), c.poller.GetPollerStatus())
time.Sleep(500 * time.Millisecond) // TODO: wait for block time
return nil, nil
}
return blocksData, nil
Expand Down
91 changes: 0 additions & 91 deletions internal/orchestrator/committer_test.go
Original file line number Diff line number Diff line change
@@ -1,92 +1 @@
package orchestrator

import (
"context"
"math/big"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/thirdweb-dev/indexer/internal/rpc"
"github.com/thirdweb-dev/indexer/internal/storage"
mocks "github.com/thirdweb-dev/indexer/test/mocks"
)

func TestNewCommitter(t *testing.T) {
mockRPC := mocks.NewMockIRPCClient(t)
mockMainStorage := mocks.NewMockIMainStorage(t)
mockStagingStorage := mocks.NewMockIStagingStorage(t)

mockStorage := storage.IStorage{
MainStorage: mockMainStorage,
StagingStorage: mockStagingStorage,
}

// Mock the GetBlocksPerRequest call that happens in NewWorker
mockRPC.EXPECT().GetBlocksPerRequest().Return(rpc.BlocksPerRequestConfig{Blocks: 100})

poller := &Poller{}
committer := NewCommitter(mockRPC, mockStorage, poller)

assert.NotNil(t, committer)
assert.Equal(t, DEFAULT_COMMITTER_TRIGGER_INTERVAL, committer.triggerIntervalMs)
assert.Equal(t, DEFAULT_BLOCKS_PER_COMMIT, committer.blocksPerCommit)
}

// Removed - test needs to be updated for new implementation

// Removed - test needs to be updated for new implementation

// Removed - test needs to be updated for new implementation

// Removed - test needs to be updated for new implementation

// Removed - test needs to be updated for new implementation

// Removed - test needs to be updated for new implementation

// Removed - test needs to be updated for new implementation

// Removed - test needs to be updated for new implementation

// Removed - test needs to be updated for new implementation

// Removed - test needs to be updated for new implementation

// Removed - test needs to be updated for new implementation

// Removed - test needs to be updated for new implementation

func TestCleanupProcessedStagingBlocks(t *testing.T) {
mockRPC := mocks.NewMockIRPCClient(t)
mockMainStorage := mocks.NewMockIMainStorage(t)
mockStagingStorage := mocks.NewMockIStagingStorage(t)
mockOrchestratorStorage := mocks.NewMockIOrchestratorStorage(t)
mockStorage := storage.IStorage{
MainStorage: mockMainStorage,
StagingStorage: mockStagingStorage,
OrchestratorStorage: mockOrchestratorStorage,
}

// Mock the GetBlocksPerRequest call that happens in NewWorker
mockRPC.EXPECT().GetBlocksPerRequest().Return(rpc.BlocksPerRequestConfig{Blocks: 100})

poller := &Poller{}
committer := NewCommitter(mockRPC, mockStorage, poller)

chainID := big.NewInt(1)
committer.lastCommittedBlock.Store(100)
committer.lastPublishedBlock.Store(0)

ctx := context.Background()
committer.cleanupProcessedStagingBlocks(ctx)
mockStagingStorage.AssertNotCalled(t, "DeleteStagingDataOlderThan", mock.Anything, mock.Anything)

committer.lastPublishedBlock.Store(90)
mockRPC.EXPECT().GetChainID().Return(chainID)
mockStagingStorage.EXPECT().DeleteStagingDataOlderThan(chainID, big.NewInt(90)).Return(nil)
committer.cleanupProcessedStagingBlocks(ctx)
}

func TestStartCommitter(t *testing.T) {
}
Loading