Skip to content

Commit bd3eca1

Browse files
authored
error on kafka produce failure (#285)
* log publish failures * fail on any publish failure * 100mb max batch size
1 parent 63e4887 commit bd3eca1

File tree

3 files changed

+31
-3
lines changed

3 files changed

+31
-3
lines changed

docker-compose.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ services:
8787
KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
8888
KAFKA_LOG_CLEANUP_POLICY: compact
8989
KAFKA_LOG_SEGMENT_MS: 10000
90+
KAFKA_MESSAGE_MAX_BYTES: 104857600
91+
KAFKA_REPLICA_FETCH_MAX_BYTES: 104857600
9092
CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
9193
profiles:
9294
- streaming

internal/publisher/publisher.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ func (p *Publisher) initialize() error {
6565
kgo.ProducerBatchCompression(kgo.SnappyCompression()),
6666
kgo.ClientID(fmt.Sprintf("insight-indexer-%s", config.Cfg.RPC.ChainID)),
6767
kgo.MaxBufferedRecords(1_000_000),
68-
kgo.ProducerBatchMaxBytes(16_000_000),
68+
kgo.ProducerBatchMaxBytes(100 * 1024 * 1024), // 100MB
6969
kgo.RecordPartitioner(kgo.UniformBytesPartitioner(1_000_000, false, false, nil)),
7070
kgo.MetadataMaxAge(60 * time.Second),
7171
kgo.DialTimeout(10 * time.Second),

internal/storage/kafka_publisher.go

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ func NewKafkaPublisher(cfg *config.KafkaConfig) (*KafkaPublisher, error) {
6969
kgo.TransactionalID(fmt.Sprintf("insight-producer-%s", chainID)),
7070
kgo.MaxBufferedBytes(2 * 1024 * 1024 * 1024), // 2GB
7171
kgo.MaxBufferedRecords(1_000_000),
72-
kgo.ProducerBatchMaxBytes(16_000_000),
72+
kgo.ProducerBatchMaxBytes(100 * 1024 * 1024), // 100MB
7373
kgo.RecordPartitioner(kgo.ManualPartitioner()),
7474
kgo.ProduceRequestTimeout(30 * time.Second),
7575
kgo.MetadataMaxAge(60 * time.Second),
@@ -161,9 +161,23 @@ func (p *KafkaPublisher) publishMessages(ctx context.Context, messages []*kgo.Re
161161
return fmt.Errorf("failed to begin transaction: %v", err)
162162
}
163163

164+
// Track if any produce errors occur
165+
var produceErrors []error
166+
var produceErrorsMu sync.Mutex
167+
var wg sync.WaitGroup
168+
164169
// Produce all messages in the transaction
165170
for _, msg := range messages {
166-
p.client.Produce(ctx, msg, nil)
171+
wg.Add(1)
172+
p.client.Produce(ctx, msg, func(r *kgo.Record, err error) {
173+
defer wg.Done()
174+
if err != nil {
175+
log.Error().Err(err).Any("headers", r.Headers).Msg("KAFKA PUBLISHER::publishMessages::err")
176+
produceErrorsMu.Lock()
177+
produceErrors = append(produceErrors, err)
178+
produceErrorsMu.Unlock()
179+
}
180+
})
167181
}
168182

169183
// Flush all messages
@@ -172,6 +186,18 @@ func (p *KafkaPublisher) publishMessages(ctx context.Context, messages []*kgo.Re
172186
return fmt.Errorf("failed to flush messages: %v", err)
173187
}
174188

189+
// Wait for all callbacks to complete
190+
wg.Wait()
191+
192+
// Check if any produce errors occurred
193+
hasErrors := len(produceErrors) > 0
194+
195+
if hasErrors {
196+
// Abort the transaction if any produce errors occurred
197+
p.client.EndTransaction(ctx, kgo.TryAbort)
198+
return fmt.Errorf("transaction aborted due to produce errors: %v", produceErrors)
199+
}
200+
175201
// Commit the transaction
176202
if err := p.client.EndTransaction(ctx, kgo.TryCommit); err != nil {
177203
return fmt.Errorf("failed to commit transaction: %v", err)

0 commit comments

Comments
 (0)