Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ services:
KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
KAFKA_LOG_CLEANUP_POLICY: compact
KAFKA_LOG_SEGMENT_MS: 10000
KAFKA_MESSAGE_MAX_BYTES: 104857600
KAFKA_REPLICA_FETCH_MAX_BYTES: 104857600
CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
profiles:
- streaming
Expand Down
2 changes: 1 addition & 1 deletion internal/publisher/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (p *Publisher) initialize() error {
kgo.ProducerBatchCompression(kgo.SnappyCompression()),
kgo.ClientID(fmt.Sprintf("insight-indexer-%s", config.Cfg.RPC.ChainID)),
kgo.MaxBufferedRecords(1_000_000),
kgo.ProducerBatchMaxBytes(16_000_000),
kgo.ProducerBatchMaxBytes(100 * 1024 * 1024), // 100MB
kgo.RecordPartitioner(kgo.UniformBytesPartitioner(1_000_000, false, false, nil)),
kgo.MetadataMaxAge(60 * time.Second),
kgo.DialTimeout(10 * time.Second),
Expand Down
30 changes: 28 additions & 2 deletions internal/storage/kafka_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func NewKafkaPublisher(cfg *config.KafkaConfig) (*KafkaPublisher, error) {
kgo.TransactionalID(fmt.Sprintf("insight-producer-%s", chainID)),
kgo.MaxBufferedBytes(2 * 1024 * 1024 * 1024), // 2GB
kgo.MaxBufferedRecords(1_000_000),
kgo.ProducerBatchMaxBytes(16_000_000),
kgo.ProducerBatchMaxBytes(100 * 1024 * 1024), // 100MB
kgo.RecordPartitioner(kgo.ManualPartitioner()),
kgo.ProduceRequestTimeout(30 * time.Second),
kgo.MetadataMaxAge(60 * time.Second),
Expand Down Expand Up @@ -161,9 +161,23 @@ func (p *KafkaPublisher) publishMessages(ctx context.Context, messages []*kgo.Re
return fmt.Errorf("failed to begin transaction: %v", err)
}

// Track if any produce errors occur
var produceErrors []error
var produceErrorsMu sync.Mutex
var wg sync.WaitGroup

// Produce all messages in the transaction
for _, msg := range messages {
p.client.Produce(ctx, msg, nil)
wg.Add(1)
p.client.Produce(ctx, msg, func(r *kgo.Record, err error) {
defer wg.Done()
if err != nil {
log.Error().Err(err).Any("headers", r.Headers).Msg("KAFKA PUBLISHER::publishMessages::err")
produceErrorsMu.Lock()
produceErrors = append(produceErrors, err)
produceErrorsMu.Unlock()
}
})
}

// Flush all messages
Expand All @@ -172,6 +186,18 @@ func (p *KafkaPublisher) publishMessages(ctx context.Context, messages []*kgo.Re
return fmt.Errorf("failed to flush messages: %v", err)
}

// Wait for all callbacks to complete
wg.Wait()

// Check if any produce errors occurred
hasErrors := len(produceErrors) > 0

if hasErrors {
// Abort the transaction if any produce errors occurred
p.client.EndTransaction(ctx, kgo.TryAbort)
return fmt.Errorf("transaction aborted due to produce errors: %v", produceErrors)
}

// Commit the transaction
if err := p.client.EndTransaction(ctx, kgo.TryCommit); err != nil {
return fmt.Errorf("failed to commit transaction: %v", err)
Expand Down