Skip to content

Commit 2914e3f

Browse files
authored
Fix worker stall (#282)
* Fix worker stall * Fix CI errors * Clean up unused config
1 parent 7325d08 commit 2914e3f

File tree

7 files changed

+97
-195
lines changed

7 files changed

+97
-195
lines changed

cmd/root.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@ func init() {
6565
rootCmd.PersistentFlags().Int("poller-s3-maxConcurrentDownloads", 3, "Max concurrent downloads for poller archive source")
6666
rootCmd.PersistentFlags().Bool("committer-enabled", true, "Toggle committer")
6767
rootCmd.PersistentFlags().Int("committer-blocks-per-commit", 10, "How many blocks to commit each interval")
68-
rootCmd.PersistentFlags().Int("committer-interval", 1000, "How often to commit blocks in milliseconds")
6968
rootCmd.PersistentFlags().Int("committer-from-block", 0, "From which block to start committing")
7069
rootCmd.PersistentFlags().Int("committer-to-block", 0, "To which block to commit")
7170
rootCmd.PersistentFlags().Bool("reorgHandler-enabled", true, "Toggle reorg handler")
@@ -191,8 +190,6 @@ func init() {
191190
rootCmd.PersistentFlags().String("publisher-events-topicName", "", "Kafka topic name for events")
192191
rootCmd.PersistentFlags().String("publisher-events-addressFilter", "", "Filter events by address")
193192
rootCmd.PersistentFlags().String("publisher-events-topic0Filter", "", "Filter events by topic0")
194-
rootCmd.PersistentFlags().Int("workMode-checkIntervalMinutes", 10, "How often to check work mode in minutes")
195-
rootCmd.PersistentFlags().Int64("workMode-liveModeThreshold", 500, "How many blocks the indexer can be behind before switching to live mode")
196193
rootCmd.PersistentFlags().String("validation-mode", "strict", "Validation mode. Strict will validate logsBloom and transactionsRoot. Minimal will validate transaction count and logs existence.")
197194
rootCmd.PersistentFlags().String("migrator-destination-type", "auto", "Storage type for migrator destination (auto, clickhouse, postgres, kafka, badger, pebble, s3)")
198195
rootCmd.PersistentFlags().String("migrator-destination-clickhouse-host", "", "Clickhouse host for migrator destination")
@@ -263,7 +260,6 @@ func init() {
263260
viper.BindPFlag("poller.s3.maxConcurrentDownloads", rootCmd.PersistentFlags().Lookup("poller-s3-maxConcurrentDownloads"))
264261
viper.BindPFlag("committer.enabled", rootCmd.PersistentFlags().Lookup("committer-enabled"))
265262
viper.BindPFlag("committer.blocksPerCommit", rootCmd.PersistentFlags().Lookup("committer-blocks-per-commit"))
266-
viper.BindPFlag("committer.interval", rootCmd.PersistentFlags().Lookup("committer-interval"))
267263
viper.BindPFlag("committer.fromBlock", rootCmd.PersistentFlags().Lookup("committer-from-block"))
268264
viper.BindPFlag("committer.toBlock", rootCmd.PersistentFlags().Lookup("committer-to-block"))
269265
viper.BindPFlag("reorgHandler.enabled", rootCmd.PersistentFlags().Lookup("reorgHandler-enabled"))
@@ -389,8 +385,6 @@ func init() {
389385
viper.BindPFlag("publisher.events.topicName", rootCmd.PersistentFlags().Lookup("publisher-events-topicName"))
390386
viper.BindPFlag("publisher.events.addressFilter", rootCmd.PersistentFlags().Lookup("publisher-events-addressFilter"))
391387
viper.BindPFlag("publisher.events.topic0Filter", rootCmd.PersistentFlags().Lookup("publisher-events-topic0Filter"))
392-
viper.BindPFlag("workMode.checkIntervalMinutes", rootCmd.PersistentFlags().Lookup("workMode-checkIntervalMinutes"))
393-
viper.BindPFlag("workMode.liveModeThreshold", rootCmd.PersistentFlags().Lookup("workMode-liveModeThreshold"))
394388
viper.BindPFlag("validation.mode", rootCmd.PersistentFlags().Lookup("validation-mode"))
395389
// Migrator viper bindings
396390
viper.BindPFlag("migrator.destination.type", rootCmd.PersistentFlags().Lookup("migrator-destination-type"))

configs/config.go

Lines changed: 10 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ type PollerConfig struct {
2424

2525
type CommitterConfig struct {
2626
Enabled bool `mapstructure:"enabled"`
27-
Interval int `mapstructure:"interval"`
2827
BlocksPerCommit int `mapstructure:"blocksPerCommit"`
2928
FromBlock int `mapstructure:"fromBlock"`
3029
ToBlock int `mapstructure:"toBlock"`
@@ -38,12 +37,6 @@ type ReorgHandlerConfig struct {
3837
ForceFromBlock bool `mapstructure:"forceFromBlock"`
3938
}
4039

41-
type FailureRecovererConfig struct {
42-
Enabled bool `mapstructure:"enabled"`
43-
Interval int `mapstructure:"interval"`
44-
BlocksPerRun int `mapstructure:"blocksPerRun"`
45-
}
46-
4740
type StorageConfig struct {
4841
Orchestrator StorageOrchestratorConfig `mapstructure:"orchestrator"`
4942
Staging StorageStagingConfig `mapstructure:"staging"`
@@ -254,11 +247,6 @@ type S3SourceConfig struct {
254247
MaxConcurrentDownloads int `mapstructure:"maxConcurrentDownloads"`
255248
}
256249

257-
type WorkModeConfig struct {
258-
CheckIntervalMinutes int `mapstructure:"checkIntervalMinutes"`
259-
LiveModeThreshold int64 `mapstructure:"liveModeThreshold"`
260-
}
261-
262250
type ValidationConfig struct {
263251
Mode string `mapstructure:"mode"` // "disabled", "minimal", "strict"
264252
}
@@ -272,18 +260,16 @@ type MigratorConfig struct {
272260
}
273261

274262
type Config struct {
275-
RPC RPCConfig `mapstructure:"rpc"`
276-
Log LogConfig `mapstructure:"log"`
277-
Poller PollerConfig `mapstructure:"poller"`
278-
Committer CommitterConfig `mapstructure:"committer"`
279-
FailureRecoverer FailureRecovererConfig `mapstructure:"failureRecoverer"`
280-
ReorgHandler ReorgHandlerConfig `mapstructure:"reorgHandler"`
281-
Storage StorageConfig `mapstructure:"storage"`
282-
API APIConfig `mapstructure:"api"`
283-
Publisher PublisherConfig `mapstructure:"publisher"`
284-
WorkMode WorkModeConfig `mapstructure:"workMode"`
285-
Validation ValidationConfig `mapstructure:"validation"`
286-
Migrator MigratorConfig `mapstructure:"migrator"`
263+
RPC RPCConfig `mapstructure:"rpc"`
264+
Log LogConfig `mapstructure:"log"`
265+
Poller PollerConfig `mapstructure:"poller"`
266+
Committer CommitterConfig `mapstructure:"committer"`
267+
ReorgHandler ReorgHandlerConfig `mapstructure:"reorgHandler"`
268+
Storage StorageConfig `mapstructure:"storage"`
269+
API APIConfig `mapstructure:"api"`
270+
Publisher PublisherConfig `mapstructure:"publisher"`
271+
Validation ValidationConfig `mapstructure:"validation"`
272+
Migrator MigratorConfig `mapstructure:"migrator"`
287273
}
288274

289275
var Cfg Config

internal/orchestrator/committer.go

Lines changed: 22 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,9 @@ import (
1919
"github.com/thirdweb-dev/indexer/internal/worker"
2020
)
2121

22-
const DEFAULT_COMMITTER_TRIGGER_INTERVAL = 2000
2322
const DEFAULT_BLOCKS_PER_COMMIT = 1000
2423

2524
type Committer struct {
26-
triggerIntervalMs int
2725
blocksPerCommit int
2826
storage storage.IStorage
2927
commitFromBlock *big.Int
@@ -39,11 +37,6 @@ type Committer struct {
3937
type CommitterOption func(*Committer)
4038

4139
func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage, poller *Poller, opts ...CommitterOption) *Committer {
42-
triggerInterval := config.Cfg.Committer.Interval
43-
if triggerInterval == 0 {
44-
triggerInterval = DEFAULT_COMMITTER_TRIGGER_INTERVAL
45-
}
46-
4740
blocksPerCommit := config.Cfg.Committer.BlocksPerCommit
4841
if blocksPerCommit == 0 {
4942
blocksPerCommit = DEFAULT_BLOCKS_PER_COMMIT
@@ -56,15 +49,14 @@ func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage, poller *Poller,
5649

5750
commitFromBlock := big.NewInt(int64(config.Cfg.Committer.FromBlock))
5851
committer := &Committer{
59-
triggerIntervalMs: triggerInterval,
60-
blocksPerCommit: blocksPerCommit,
61-
storage: storage,
62-
commitFromBlock: commitFromBlock,
63-
commitToBlock: big.NewInt(int64(commitToBlock)),
64-
rpc: rpc,
65-
publisher: publisher.GetInstance(),
66-
poller: poller,
67-
validator: NewValidator(rpc, storage, worker.NewWorker(rpc)), // validator uses worker without sources
52+
blocksPerCommit: blocksPerCommit,
53+
storage: storage,
54+
commitFromBlock: commitFromBlock,
55+
commitToBlock: big.NewInt(int64(commitToBlock)),
56+
rpc: rpc,
57+
publisher: publisher.GetInstance(),
58+
poller: poller,
59+
validator: NewValidator(rpc, storage, worker.NewWorker(rpc)), // validator uses worker without sources
6860
}
6961
cfb := commitFromBlock.Uint64()
7062
committer.lastCommittedBlock.Store(cfb)
@@ -78,8 +70,6 @@ func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage, poller *Poller,
7870
}
7971

8072
func (c *Committer) Start(ctx context.Context) {
81-
interval := time.Duration(c.triggerIntervalMs) * time.Millisecond
82-
8373
log.Debug().Msgf("Committer running")
8474
chainID := c.rpc.GetChainID()
8575

@@ -175,40 +165,35 @@ func (c *Committer) Start(ctx context.Context) {
175165

176166
if config.Cfg.Publisher.Mode == "parallel" {
177167
var wg sync.WaitGroup
178-
publishInterval := interval / 2
179-
if publishInterval <= 0 {
180-
publishInterval = interval
181-
}
182168
wg.Add(2)
169+
183170
go func() {
184171
defer wg.Done()
185-
c.runPublishLoop(ctx, publishInterval)
172+
c.runPublishLoop(ctx)
186173
}()
187174

188-
// allow the publisher to start before the committer
189-
time.Sleep(publishInterval)
190175
go func() {
191176
defer wg.Done()
192-
c.runCommitLoop(ctx, interval)
177+
c.runCommitLoop(ctx)
193178
}()
194179

195180
<-ctx.Done()
181+
196182
wg.Wait()
197183
} else {
198-
c.runCommitLoop(ctx, interval)
184+
c.runCommitLoop(ctx)
199185
}
200186

201187
log.Info().Msg("Committer shutting down")
202188
c.publisher.Close()
203189
}
204190

205-
func (c *Committer) runCommitLoop(ctx context.Context, interval time.Duration) {
191+
func (c *Committer) runCommitLoop(ctx context.Context) {
206192
for {
207193
select {
208194
case <-ctx.Done():
209195
return
210196
default:
211-
time.Sleep(interval)
212197
if c.commitToBlock.Sign() > 0 && c.lastCommittedBlock.Load() >= c.commitToBlock.Uint64() {
213198
// Completing the commit loop if we've committed more than commit to block
214199
log.Info().Msgf("Committer reached configured toBlock %s, the last commit block is %d, stopping commits", c.commitToBlock.String(), c.lastCommittedBlock.Load())
@@ -231,13 +216,17 @@ func (c *Committer) runCommitLoop(ctx context.Context, interval time.Duration) {
231216
}
232217
}
233218

234-
func (c *Committer) runPublishLoop(ctx context.Context, interval time.Duration) {
219+
func (c *Committer) runPublishLoop(ctx context.Context) {
235220
for {
236221
select {
237222
case <-ctx.Done():
238223
return
239224
default:
240-
time.Sleep(interval)
225+
if c.commitToBlock.Sign() > 0 && c.lastPublishedBlock.Load() >= c.commitToBlock.Uint64() {
226+
// Completing the publish loop if we've published more than commit to block
227+
log.Info().Msgf("Committer reached configured toBlock %s, the last publish block is %d, stopping publishes", c.commitToBlock.String(), c.lastPublishedBlock.Load())
228+
return
229+
}
241230
if err := c.publish(ctx); err != nil {
242231
log.Error().Err(err).Msg("Error publishing blocks")
243232
}
@@ -397,8 +386,8 @@ func (c *Committer) getBlockToCommitUntil(ctx context.Context, latestCommittedBl
397386
func (c *Committer) fetchBlockData(ctx context.Context, blockNumbers []*big.Int) ([]common.BlockData, error) {
398387
blocksData := c.poller.Request(ctx, blockNumbers)
399388
if len(blocksData) == 0 {
400-
// TODO: should wait a little bit, as it may take time to load
401-
log.Warn().Msgf("Committer didn't find the following range: %v - %v", blockNumbers[0].Int64(), blockNumbers[len(blockNumbers)-1].Int64())
389+
log.Warn().Msgf("Committer didn't find the following range: %v - %v. %v", blockNumbers[0].Int64(), blockNumbers[len(blockNumbers)-1].Int64(), c.poller.GetPollerStatus())
390+
time.Sleep(500 * time.Millisecond) // TODO: wait for block time
402391
return nil, nil
403392
}
404393
return blocksData, nil
Lines changed: 0 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -1,92 +1 @@
11
package orchestrator
2-
3-
import (
4-
"context"
5-
"math/big"
6-
"testing"
7-
8-
"github.com/stretchr/testify/assert"
9-
"github.com/stretchr/testify/mock"
10-
"github.com/thirdweb-dev/indexer/internal/rpc"
11-
"github.com/thirdweb-dev/indexer/internal/storage"
12-
mocks "github.com/thirdweb-dev/indexer/test/mocks"
13-
)
14-
15-
func TestNewCommitter(t *testing.T) {
16-
mockRPC := mocks.NewMockIRPCClient(t)
17-
mockMainStorage := mocks.NewMockIMainStorage(t)
18-
mockStagingStorage := mocks.NewMockIStagingStorage(t)
19-
20-
mockStorage := storage.IStorage{
21-
MainStorage: mockMainStorage,
22-
StagingStorage: mockStagingStorage,
23-
}
24-
25-
// Mock the GetBlocksPerRequest call that happens in NewWorker
26-
mockRPC.EXPECT().GetBlocksPerRequest().Return(rpc.BlocksPerRequestConfig{Blocks: 100})
27-
28-
poller := &Poller{}
29-
committer := NewCommitter(mockRPC, mockStorage, poller)
30-
31-
assert.NotNil(t, committer)
32-
assert.Equal(t, DEFAULT_COMMITTER_TRIGGER_INTERVAL, committer.triggerIntervalMs)
33-
assert.Equal(t, DEFAULT_BLOCKS_PER_COMMIT, committer.blocksPerCommit)
34-
}
35-
36-
// Removed - test needs to be updated for new implementation
37-
38-
// Removed - test needs to be updated for new implementation
39-
40-
// Removed - test needs to be updated for new implementation
41-
42-
// Removed - test needs to be updated for new implementation
43-
44-
// Removed - test needs to be updated for new implementation
45-
46-
// Removed - test needs to be updated for new implementation
47-
48-
// Removed - test needs to be updated for new implementation
49-
50-
// Removed - test needs to be updated for new implementation
51-
52-
// Removed - test needs to be updated for new implementation
53-
54-
// Removed - test needs to be updated for new implementation
55-
56-
// Removed - test needs to be updated for new implementation
57-
58-
// Removed - test needs to be updated for new implementation
59-
60-
func TestCleanupProcessedStagingBlocks(t *testing.T) {
61-
mockRPC := mocks.NewMockIRPCClient(t)
62-
mockMainStorage := mocks.NewMockIMainStorage(t)
63-
mockStagingStorage := mocks.NewMockIStagingStorage(t)
64-
mockOrchestratorStorage := mocks.NewMockIOrchestratorStorage(t)
65-
mockStorage := storage.IStorage{
66-
MainStorage: mockMainStorage,
67-
StagingStorage: mockStagingStorage,
68-
OrchestratorStorage: mockOrchestratorStorage,
69-
}
70-
71-
// Mock the GetBlocksPerRequest call that happens in NewWorker
72-
mockRPC.EXPECT().GetBlocksPerRequest().Return(rpc.BlocksPerRequestConfig{Blocks: 100})
73-
74-
poller := &Poller{}
75-
committer := NewCommitter(mockRPC, mockStorage, poller)
76-
77-
chainID := big.NewInt(1)
78-
committer.lastCommittedBlock.Store(100)
79-
committer.lastPublishedBlock.Store(0)
80-
81-
ctx := context.Background()
82-
committer.cleanupProcessedStagingBlocks(ctx)
83-
mockStagingStorage.AssertNotCalled(t, "DeleteStagingDataOlderThan", mock.Anything, mock.Anything)
84-
85-
committer.lastPublishedBlock.Store(90)
86-
mockRPC.EXPECT().GetChainID().Return(chainID)
87-
mockStagingStorage.EXPECT().DeleteStagingDataOlderThan(chainID, big.NewInt(90)).Return(nil)
88-
committer.cleanupProcessedStagingBlocks(ctx)
89-
}
90-
91-
func TestStartCommitter(t *testing.T) {
92-
}

0 commit comments

Comments
 (0)