File tree Expand file tree Collapse file tree 2 files changed +4
-3
lines changed Expand file tree Collapse file tree 2 files changed +4
-3
lines changed Original file line number Diff line number Diff line change @@ -289,6 +289,7 @@ type Config struct {
289289 CommitterKafkaEnableTLS bool `env:"COMMITTER_KAFKA_ENABLE_TLS" envDefault:"true"`
290290 CommitterMaxMemoryMB int `env:"COMMITTER_MAX_MEMORY_MB" envDefault:"512"`
291291 CommitterCompressionThresholdMB int `env:"COMMITTER_COMPRESSION_THRESHOLD_MB" envDefault:"50"`
292+ CommitterKafkaBatchSize int `env:"COMMITTER_KAFKA_BATCH_SIZE" envDefault:"500"`
292293 StagingS3Bucket string `env:"STAGING_S3_BUCKET" envDefault:"thirdweb-insight-production"`
293294 StagingS3Region string `env:"STAGING_S3_REGION" envDefault:"us-west-2"`
294295 StagingS3AccessKeyID string `env:"STAGING_S3_ACCESS_KEY_ID"`
Original file line number Diff line number Diff line change @@ -19,7 +19,7 @@ func processBlocks() {
1919 indexerName := config .Cfg .ZeetProjectName
2020
2121 totalBytesInBatch := uint64 (0 )
22- blockBatch := make ([]* common.BlockData , 0 , 500 )
22+ blockBatch := make ([]* common.BlockData , 0 , config . Cfg . CommitterKafkaBatchSize )
2323 defer func () {
2424 releaseMemoryPermit (totalBytesInBatch )
2525 }()
@@ -37,7 +37,7 @@ func processBlocks() {
3737 if block .Acquired {
3838 totalBytesInBatch += block .ByteSize
3939 }
40- if len (blockBatch ) == 500 {
40+ if len (blockBatch ) == config . Cfg . CommitterKafkaBatchSize {
4141 if err := libs .KafkaPublisherV2 .PublishBlockData (blockBatch ); err != nil {
4242 log .Panic ().
4343 Err (err ).
@@ -56,7 +56,7 @@ func processBlocks() {
5656 Uint64 ("memory_released_bytes" , totalBytesInBatch ).
5757 Msg ("Successfully published batch to Kafka" )
5858
59- blockBatch = make ([]* common.BlockData , 0 , 500 )
59+ blockBatch = make ([]* common.BlockData , 0 , config . Cfg . CommitterKafkaBatchSize )
6060 }
6161
6262 nextBlockNumber ++
You can’t perform that action at this time.
0 commit comments