diff --git a/docker-compose.yml b/docker-compose.yml index f5112ce..67adc07 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -87,6 +87,8 @@ services: KAFKA_LOG_DIRS: /tmp/kraft-combined-logs KAFKA_LOG_CLEANUP_POLICY: compact KAFKA_LOG_SEGMENT_MS: 10000 + KAFKA_MESSAGE_MAX_BYTES: 104857600 + KAFKA_REPLICA_FETCH_MAX_BYTES: 104857600 CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk profiles: - streaming diff --git a/internal/publisher/publisher.go b/internal/publisher/publisher.go index 68b97ce..0df01c1 100644 --- a/internal/publisher/publisher.go +++ b/internal/publisher/publisher.go @@ -65,7 +65,7 @@ func (p *Publisher) initialize() error { kgo.ProducerBatchCompression(kgo.SnappyCompression()), kgo.ClientID(fmt.Sprintf("insight-indexer-%s", config.Cfg.RPC.ChainID)), kgo.MaxBufferedRecords(1_000_000), - kgo.ProducerBatchMaxBytes(16_000_000), + kgo.ProducerBatchMaxBytes(100 * 1024 * 1024), // 100MB kgo.RecordPartitioner(kgo.UniformBytesPartitioner(1_000_000, false, false, nil)), kgo.MetadataMaxAge(60 * time.Second), kgo.DialTimeout(10 * time.Second), diff --git a/internal/storage/kafka_publisher.go b/internal/storage/kafka_publisher.go index 72dc96f..9a05366 100644 --- a/internal/storage/kafka_publisher.go +++ b/internal/storage/kafka_publisher.go @@ -69,7 +69,7 @@ func NewKafkaPublisher(cfg *config.KafkaConfig) (*KafkaPublisher, error) { kgo.TransactionalID(fmt.Sprintf("insight-producer-%s", chainID)), kgo.MaxBufferedBytes(2 * 1024 * 1024 * 1024), // 2GB kgo.MaxBufferedRecords(1_000_000), - kgo.ProducerBatchMaxBytes(16_000_000), + kgo.ProducerBatchMaxBytes(100 * 1024 * 1024), // 100MB kgo.RecordPartitioner(kgo.ManualPartitioner()), kgo.ProduceRequestTimeout(30 * time.Second), kgo.MetadataMaxAge(60 * time.Second), @@ -161,9 +161,23 @@ func (p *KafkaPublisher) publishMessages(ctx context.Context, messages []*kgo.Re return fmt.Errorf("failed to begin transaction: %v", err) } + // Track if any produce errors occur + var produceErrors []error + var produceErrorsMu sync.Mutex + var wg sync.WaitGroup + // Produce all messages in the transaction for _, msg := range messages { - p.client.Produce(ctx, msg, nil) + wg.Add(1) + p.client.Produce(ctx, msg, func(r *kgo.Record, err error) { + defer wg.Done() + if err != nil { + log.Error().Err(err).Any("headers", r.Headers).Msg("KAFKA PUBLISHER::publishMessages::err") + produceErrorsMu.Lock() + produceErrors = append(produceErrors, err) + produceErrorsMu.Unlock() + } + }) } // Flush all messages @@ -172,6 +186,18 @@ func (p *KafkaPublisher) publishMessages(ctx context.Context, messages []*kgo.Re return fmt.Errorf("failed to flush messages: %v", err) } + // Wait for all callbacks to complete + wg.Wait() + + // Check if any produce errors occurred + hasErrors := len(produceErrors) > 0 + + if hasErrors { + // Abort the transaction if any produce errors occurred + p.client.EndTransaction(ctx, kgo.TryAbort) + return fmt.Errorf("transaction aborted due to produce errors: %v", produceErrors) + } + // Commit the transaction if err := p.client.EndTransaction(ctx, kgo.TryCommit); err != nil { return fmt.Errorf("failed to commit transaction: %v", err)