Skip to content

Commit 63d7061

Browse files
committed
fail on any publish failure
1 parent f8eb5f0 commit 63d7061

File tree

1 file changed

+22
-0
lines changed

1 file changed

+22
-0
lines changed

internal/storage/kafka_publisher.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,11 +161,21 @@ func (p *KafkaPublisher) publishMessages(ctx context.Context, messages []*kgo.Re
161161
return fmt.Errorf("failed to begin transaction: %v", err)
162162
}
163163

164+
// Track if any produce errors occur
165+
var produceErrors []error
166+
var produceErrorsMu sync.Mutex
167+
var wg sync.WaitGroup
168+
164169
// Produce all messages in the transaction
165170
for _, msg := range messages {
171+
wg.Add(1)
166172
p.client.Produce(ctx, msg, func(r *kgo.Record, err error) {
173+
defer wg.Done()
167174
if err != nil {
168175
log.Error().Err(err).Any("headers", r.Headers).Msg(">>>>>>>>>>>>>>>>>>>>>>>BLOCK WATCH:: KAFKA PUBLISHER::publishMessages::err")
176+
produceErrorsMu.Lock()
177+
produceErrors = append(produceErrors, err)
178+
produceErrorsMu.Unlock()
169179
}
170180
})
171181
}
@@ -176,6 +186,18 @@ func (p *KafkaPublisher) publishMessages(ctx context.Context, messages []*kgo.Re
176186
return fmt.Errorf("failed to flush messages: %v", err)
177187
}
178188

189+
// Wait for all callbacks to complete
190+
wg.Wait()
191+
192+
// Check if any produce errors occurred
193+
hasErrors := len(produceErrors) > 0
194+
195+
if hasErrors {
196+
// Abort the transaction if any produce errors occurred
197+
p.client.EndTransaction(ctx, kgo.TryAbort)
198+
return fmt.Errorf("transaction aborted due to produce errors: %v", produceErrors)
199+
}
200+
179201
// Commit the transaction
180202
if err := p.client.EndTransaction(ctx, kgo.TryCommit); err != nil {
181203
return fmt.Errorf("failed to commit transaction: %v", err)

0 commit comments

Comments
 (0)