Skip to content

Commit 965cb46

Browse files
committed
100mb max batch size
1 parent 9f84919 commit 965cb46

File tree

1 file changed

+2
-2
lines changed

1 file changed

+2
-2
lines changed

internal/storage/kafka_publisher.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ func NewKafkaPublisher(cfg *config.KafkaConfig) (*KafkaPublisher, error) {
6969
kgo.TransactionalID(fmt.Sprintf("insight-producer-%s", chainID)),
7070
kgo.MaxBufferedBytes(2 * 1024 * 1024 * 1024), // 2GB
7171
kgo.MaxBufferedRecords(1_000_000),
72-
kgo.ProducerBatchMaxBytes(10 * 1024 * 1024), // 100MB
72+
kgo.ProducerBatchMaxBytes(100 * 1024 * 1024), // 100MB
7373
kgo.RecordPartitioner(kgo.ManualPartitioner()),
7474
kgo.ProduceRequestTimeout(30 * time.Second),
7575
kgo.MetadataMaxAge(60 * time.Second),
@@ -172,7 +172,7 @@ func (p *KafkaPublisher) publishMessages(ctx context.Context, messages []*kgo.Re
172172
p.client.Produce(ctx, msg, func(r *kgo.Record, err error) {
173173
defer wg.Done()
174174
if err != nil {
175-
log.Error().Err(err).Any("headers", r.Headers).Msg(">>>>>>>>>>>>>>>>>>>>>>>BLOCK WATCH:: KAFKA PUBLISHER::publishMessages::err")
175+
log.Error().Err(err).Any("headers", r.Headers).Msg("KAFKA PUBLISHER::publishMessages::err")
176176
produceErrorsMu.Lock()
177177
produceErrors = append(produceErrors, err)
178178
produceErrorsMu.Unlock()

0 commit comments

Comments
 (0)