-
Notifications
You must be signed in to change notification settings - Fork 28
error on kafka produce failure #285
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughKafka broker limits increased to 100MB via docker-compose. Producer batch max bytes raised to 100MB in publisher initialization. Kafka publisher’s transactional publish flow updated to use per-record async callbacks, collect produce errors with synchronization, wait for callbacks post-flush, and abort or commit the transaction accordingly. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor C as Caller
participant KP as KafkaPublisher
participant P as Producer (kgo)
participant K as Kafka
Note over KP: Batch size set to 100MB
C->>KP: publishMessages(records)
activate KP
loop For each record
KP->>P: Produce(record, callback)
note right of P: Callback captures per-record error<br/>and appends to shared errors slice
P-->>KP: async callback(err)
end
KP->>P: Flush()
alt Flush fails
KP->>P: EndTransaction(TryAbort)
KP-->>C: error (flush failed)
else Flush succeeds
KP->>KP: Wait for all callbacks
alt Any produce error captured
KP->>P: EndTransaction(TryAbort)
KP-->>C: aggregated produce error(s)
else No produce errors
KP->>P: EndTransaction(Commit)
KP-->>C: success
end
end
deactivate KP
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes ✨ Finishing Touches
🧪 Generate unit tests
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
internal/publisher/publisher.go (1)
68-72: Make ProducerBatchMaxBytes configurable via Publisher config
- Replace the hard-coded
100 * 1024 * 1024inkgo.ProducerBatchMaxBytes(internal/publisher/publisher.go:68) with a newPublisher.MaxBatchBytesfield wired through your config. Ensure brokers’socket.request.max.bytes, each topic’smax.message.bytes, and consumers’max.partition.fetch.bytes(andfetch.max.bytes) are ≥ this value.(Optional)
- Switch to
kgo.ZstdCompression()for better wire compression.- Add
kgo.ProduceRequestTimeout(30 * time.Second)andkgo.RequestRetries(5)for increased resiliency.
🧹 Nitpick comments (3)
internal/storage/kafka_publisher.go (3)
72-79: 100MB batches: align with broker/topic/consumer limits and consider configurability.The bump is fine, but ensure:
- Broker
socket.request.max.bytes> 100MB and topicmax.message.bytes≥ 100MB.- Consumers’
max.partition.fetch.bytes≥ 100MB.- Optionally make this tunable via config to avoid per-env edits.
164-168: Good: per-record error collection with proper synchronization.Pre-allocate and prefer
errors.Joinfor aggregation to keep error type semantics.- // Track if any produce errors occur - var produceErrors []error + // Track if any produce errors occur + produceErrors := make([]error, 0, 8) // small cap to reduce reallocs var produceErrorsMu sync.Mutex var wg sync.WaitGroup
189-200: Join errors and log abort failures for better debuggability.
- Prefer
errors.Join(produceErrors...)so callers can inspect individual causes.- If
EndTransaction(abort) fails, log it; currently ignored.- hasErrors := len(produceErrors) > 0 - if hasErrors { - // Abort the transaction if any produce errors occurred - p.client.EndTransaction(ctx, kgo.TryAbort) - return fmt.Errorf("transaction aborted due to produce errors: %v", produceErrors) - } + if len(produceErrors) > 0 { + if err := p.client.EndTransaction(ctx, kgo.TryAbort); err != nil { + log.Error().Err(err).Msg("failed to abort transaction after produce errors") + } + return errors.Join(produceErrors...) + }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (3)
docker-compose.yml(1 hunks)internal/publisher/publisher.go(1 hunks)internal/storage/kafka_publisher.go(3 hunks)
🔇 Additional comments (2)
docker-compose.yml (1)
90-91: Cannot verify topic configs in this environment; please run the providedkafka-topics/kafka-configscommands on a host with the Kafka CLI installed to confirm per-topicmax.message.bytessettings and apply updates as needed.internal/storage/kafka_publisher.go (1)
171-180: LGTM: callback-based error capture inside a transaction.The WaitGroup usage is correct (Add before Produce) and thread-safe error capture is solid.
Summary by CodeRabbit