Skip to content

Commit eb47a1d

Browse files
committed
clickhouse and poller settings
1 parent b0a4ef9 commit eb47a1d

File tree

4 files changed

+42
-23
lines changed

4 files changed

+42
-23
lines changed

cmd/root.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ func init() {
5555
rootCmd.PersistentFlags().Int("poller-from-block", 0, "From which block to start polling")
5656
rootCmd.PersistentFlags().Bool("poller-force-from-block", false, "Force the poller to start from the block specified in `poller-from-block`")
5757
rootCmd.PersistentFlags().Int("poller-until-block", 0, "Until which block to poll")
58+
rootCmd.PersistentFlags().Int("poller-parallel-pollers", 5, "Maximum number of parallel pollers")
5859
rootCmd.PersistentFlags().Bool("committer-enabled", true, "Toggle committer")
5960
rootCmd.PersistentFlags().Int("committer-blocks-per-commit", 10, "How many blocks to commit each interval")
6061
rootCmd.PersistentFlags().Int("committer-interval", 1000, "How often to commit blocks in milliseconds")
@@ -76,6 +77,7 @@ func init() {
7677
rootCmd.PersistentFlags().String("storage-orchestrator-clickhouse-host", "", "Clickhouse host for orchestrator storage")
7778
rootCmd.PersistentFlags().String("storage-orchestrator-clickhouse-username", "", "Clickhouse username for orchestrator storage")
7879
rootCmd.PersistentFlags().String("storage-orchestrator-clickhouse-password", "", "Clickhouse password for orchestrator storage")
80+
rootCmd.PersistentFlags().Bool("storage-orchestrator-clickhouse-asyncInsert", false, "Clickhouse async insert for orchestrator storage")
7981
rootCmd.PersistentFlags().Int("storage-orchestrator-memory-maxItems", 0, "Max items for orchestrator memory storage")
8082
rootCmd.PersistentFlags().Int("storage-orchestrator-redis-poolSize", 0, "Redis pool size for orchestrator storage")
8183
rootCmd.PersistentFlags().String("storage-orchestrator-redis-addr", "", "Redis address for orchestrator storage")
@@ -85,8 +87,10 @@ func init() {
8587
rootCmd.PersistentFlags().String("storage-main-clickhouse-host", "", "Clickhouse host for main storage")
8688
rootCmd.PersistentFlags().String("storage-main-clickhouse-username", "", "Clickhouse username for main storage")
8789
rootCmd.PersistentFlags().String("storage-main-clickhouse-password", "", "Clickhouse password for main storage")
90+
rootCmd.PersistentFlags().Bool("storage-main-clickhouse-asyncInsert", false, "Clickhouse async insert for main storage")
8891
rootCmd.PersistentFlags().String("storage-staging-clickhouse-username", "", "Clickhouse username for staging storage")
8992
rootCmd.PersistentFlags().String("storage-staging-clickhouse-password", "", "Clickhouse password for staging storage")
93+
rootCmd.PersistentFlags().Bool("storage-staging-clickhouse-asyncInsert", false, "Clickhouse async insert for staging storage")
9094
rootCmd.PersistentFlags().String("api-host", "localhost:3000", "API host")
9195
viper.BindPFlag("rpc.url", rootCmd.PersistentFlags().Lookup("rpc-url"))
9296
viper.BindPFlag("rpc.blocks.blocksPerRequest", rootCmd.PersistentFlags().Lookup("rpc-blocks-blocksPerRequest"))
@@ -107,6 +111,7 @@ func init() {
107111
viper.BindPFlag("poller.fromBlock", rootCmd.PersistentFlags().Lookup("poller-from-block"))
108112
viper.BindPFlag("poller.forceFromBlock", rootCmd.PersistentFlags().Lookup("poller-force-from-block"))
109113
viper.BindPFlag("poller.untilBlock", rootCmd.PersistentFlags().Lookup("poller-until-block"))
114+
viper.BindPFlag("poller.parallelPollers", rootCmd.PersistentFlags().Lookup("poller-parallel-pollers"))
110115
viper.BindPFlag("committer.enabled", rootCmd.PersistentFlags().Lookup("committer-enabled"))
111116
viper.BindPFlag("committer.blocksPerCommit", rootCmd.PersistentFlags().Lookup("committer-blocks-per-commit"))
112117
viper.BindPFlag("committer.interval", rootCmd.PersistentFlags().Lookup("committer-interval"))
@@ -122,18 +127,21 @@ func init() {
122127
viper.BindPFlag("storage.staging.clickhouse.database", rootCmd.PersistentFlags().Lookup("storage-staging-clickhouse-database"))
123128
viper.BindPFlag("storage.staging.clickhouse.host", rootCmd.PersistentFlags().Lookup("storage-staging-clickhouse-host"))
124129
viper.BindPFlag("storage.staging.clickhouse.port", rootCmd.PersistentFlags().Lookup("storage-staging-clickhouse-port"))
130+
viper.BindPFlag("storage.staging.clickhouse.username", rootCmd.PersistentFlags().Lookup("storage-staging-clickhouse-username"))
131+
viper.BindPFlag("storage.staging.clickhouse.password", rootCmd.PersistentFlags().Lookup("storage-staging-clickhouse-password"))
132+
viper.BindPFlag("storage.staging.clickhouse.asyncInsert", rootCmd.PersistentFlags().Lookup("storage-staging-clickhouse-asyncInsert"))
125133
viper.BindPFlag("storage.main.clickhouse.database", rootCmd.PersistentFlags().Lookup("storage-main-clickhouse-database"))
126134
viper.BindPFlag("storage.main.clickhouse.host", rootCmd.PersistentFlags().Lookup("storage-main-clickhouse-host"))
127135
viper.BindPFlag("storage.main.clickhouse.port", rootCmd.PersistentFlags().Lookup("storage-main-clickhouse-port"))
128136
viper.BindPFlag("storage.main.clickhouse.username", rootCmd.PersistentFlags().Lookup("storage-main-clickhouse-username"))
129137
viper.BindPFlag("storage.main.clickhouse.password", rootCmd.PersistentFlags().Lookup("storage-main-clickhouse-password"))
130-
viper.BindPFlag("storage.staging.clickhouse.username", rootCmd.PersistentFlags().Lookup("storage-main-clickhouse-username"))
131-
viper.BindPFlag("storage.staging.clickhouse.password", rootCmd.PersistentFlags().Lookup("storage-main-clickhouse-password"))
138+
viper.BindPFlag("storage.main.clickhouse.asyncInsert", rootCmd.PersistentFlags().Lookup("storage-main-clickhouse-asyncInsert"))
132139
viper.BindPFlag("storage.orchestrator.clickhouse.database", rootCmd.PersistentFlags().Lookup("storage-orchestrator-clickhouse-database"))
133140
viper.BindPFlag("storage.orchestrator.clickhouse.host", rootCmd.PersistentFlags().Lookup("storage-orchestrator-clickhouse-host"))
134141
viper.BindPFlag("storage.orchestrator.clickhouse.port", rootCmd.PersistentFlags().Lookup("storage-orchestrator-clickhouse-port"))
135142
viper.BindPFlag("storage.orchestrator.clickhouse.username", rootCmd.PersistentFlags().Lookup("storage-orchestrator-clickhouse-username"))
136143
viper.BindPFlag("storage.orchestrator.clickhouse.password", rootCmd.PersistentFlags().Lookup("storage-orchestrator-clickhouse-password"))
144+
viper.BindPFlag("storage.orchestrator.clickhouse.asyncInsert", rootCmd.PersistentFlags().Lookup("storage-orchestrator-clickhouse-asyncInsert"))
137145
viper.BindPFlag("storage.orchestrator.memory.maxItems", rootCmd.PersistentFlags().Lookup("storage-orchestrator-memory-maxItems"))
138146
viper.BindPFlag("storage.orchestrator.redis.poolSize", rootCmd.PersistentFlags().Lookup("storage-orchestrator-redis-poolSize"))
139147
viper.BindPFlag("storage.orchestrator.redis.addr", rootCmd.PersistentFlags().Lookup("storage-orchestrator-redis-addr"))

configs/config.go

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,13 @@ type LogConfig struct {
1414
}
1515

1616
type PollerConfig struct {
17-
Enabled bool `mapstructure:"enabled"`
18-
Interval int `mapstructure:"interval"`
19-
BlocksPerPoll int `mapstructure:"blocksPerPoll"`
20-
FromBlock int `mapstructure:"fromBlock"`
21-
ForceFromBlock bool `mapstructure:"forceFromBlock"`
22-
UntilBlock int `mapstructure:"untilBlock"`
17+
Enabled bool `mapstructure:"enabled"`
18+
Interval int `mapstructure:"interval"`
19+
BlocksPerPoll int `mapstructure:"blocksPerPoll"`
20+
FromBlock int `mapstructure:"fromBlock"`
21+
ForceFromBlock bool `mapstructure:"forceFromBlock"`
22+
UntilBlock int `mapstructure:"untilBlock"`
23+
ParallelPollers int `mapstructure:"parallelPollers"`
2324
}
2425

2526
type CommitterConfig struct {
@@ -63,12 +64,13 @@ type StorageConnectionConfig struct {
6364
}
6465

6566
type ClickhouseConfig struct {
66-
Host string `mapstructure:"host"`
67-
Port int `mapstructure:"port"`
68-
Username string `mapstructure:"username"`
69-
Password string `mapstructure:"password"`
70-
Database string `mapstructure:"database"`
71-
DisableTLS bool `mapstructure:"disableTLS"`
67+
Host string `mapstructure:"host"`
68+
Port int `mapstructure:"port"`
69+
Username string `mapstructure:"username"`
70+
Password string `mapstructure:"password"`
71+
Database string `mapstructure:"database"`
72+
DisableTLS bool `mapstructure:"disableTLS"`
73+
AsyncInsert bool `mapstructure:"asyncInsert"`
7274
}
7375

7476
type MemoryConfig struct {

internal/orchestrator/poller.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ type Poller struct {
2525
storage storage.IStorage
2626
lastPolledBlock *big.Int
2727
pollUntilBlock *big.Int
28+
parallelPollers int
2829
}
2930

3031
type BlockNumberWithError struct {
@@ -62,19 +63,18 @@ func NewPoller(rpc rpc.IRPCClient, storage storage.IStorage) *Poller {
6263
storage: storage,
6364
lastPolledBlock: lastPolledBlock,
6465
pollUntilBlock: untilBlock,
66+
parallelPollers: config.Cfg.Poller.ParallelPollers,
6567
}
6668
}
6769

6870
func (p *Poller) Start() {
6971
interval := time.Duration(p.triggerIntervalMs) * time.Millisecond
7072
ticker := time.NewTicker(interval)
7173

72-
// TODO: make this configurable?
73-
const numWorkers = 5
74-
tasks := make(chan struct{}, numWorkers)
74+
tasks := make(chan struct{}, p.parallelPollers)
7575
var blockRangeMutex sync.Mutex
7676

77-
for i := 0; i < numWorkers; i++ {
77+
for i := 0; i < p.parallelPollers; i++ {
7878
go func() {
7979
for range tasks {
8080
blockRangeMutex.Lock()

internal/storage/clickhouse.go

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,15 @@ func connectDB(cfg *config.ClickhouseConfig) (clickhouse.Conn, error) {
5555
Username: cfg.Username,
5656
Password: cfg.Password,
5757
},
58+
Settings: func() clickhouse.Settings {
59+
if cfg.AsyncInsert {
60+
return clickhouse.Settings{
61+
"async_insert": "1",
62+
"wait_for_async_insert": "1",
63+
}
64+
}
65+
return clickhouse.Settings{}
66+
}(),
5867
})
5968
if err != nil {
6069
return nil, err
@@ -631,7 +640,7 @@ func (c *ClickHouseConnector) InsertStagingData(data []common.BlockData) error {
631640
}
632641

633642
func (c *ClickHouseConnector) GetStagingData(qf QueryFilter) (*[]common.BlockData, error) {
634-
query := fmt.Sprintf("SELECT data FROM %s.block_data FINAL WHERE block_number IN (%s) AND is_deleted = 0",
643+
query := fmt.Sprintf("SELECT data FROM %s.block_data WHERE block_number IN (%s) AND is_deleted = 0",
635644
c.cfg.Database, getBlockNumbersStringArray(qf.BlockNumbers))
636645

637646
if qf.ChainId.Sign() != 0 {
@@ -919,7 +928,7 @@ func (c *ClickHouseConnector) InsertBlockData(data *[]common.BlockData) error {
919928
defer wg.Done()
920929
if err := c.insertBlocks(&blocks); err != nil {
921930
saveErrMutex.Lock()
922-
saveErr = fmt.Errorf("error deleting blocks: %v", err)
931+
saveErr = fmt.Errorf("error inserting blocks: %v", err)
923932
saveErrMutex.Unlock()
924933
}
925934
}()
@@ -931,7 +940,7 @@ func (c *ClickHouseConnector) InsertBlockData(data *[]common.BlockData) error {
931940
defer wg.Done()
932941
if err := c.insertLogs(&logs); err != nil {
933942
saveErrMutex.Lock()
934-
saveErr = fmt.Errorf("error deleting logs: %v", err)
943+
saveErr = fmt.Errorf("error inserting logs: %v", err)
935944
saveErrMutex.Unlock()
936945
}
937946
}()
@@ -943,7 +952,7 @@ func (c *ClickHouseConnector) InsertBlockData(data *[]common.BlockData) error {
943952
defer wg.Done()
944953
if err := c.insertTransactions(&transactions); err != nil {
945954
saveErrMutex.Lock()
946-
saveErr = fmt.Errorf("error deleting transactions: %v", err)
955+
saveErr = fmt.Errorf("error inserting transactions: %v", err)
947956
saveErrMutex.Unlock()
948957
}
949958
}()
@@ -955,7 +964,7 @@ func (c *ClickHouseConnector) InsertBlockData(data *[]common.BlockData) error {
955964
defer wg.Done()
956965
if err := c.insertTraces(&traces); err != nil {
957966
saveErrMutex.Lock()
958-
saveErr = fmt.Errorf("error deleting traces: %v", err)
967+
saveErr = fmt.Errorf("error inserting traces: %v", err)
959968
saveErrMutex.Unlock()
960969
}
961970
}()

0 commit comments

Comments
 (0)