Conversation
7df69a7 to
6e823d9
Compare
|
I think @chernser is a better person to review this, as I don't have experience with this repo, only the Kafka table engine. |
|
@mzitnik would you please take a look? @gaurav7261 thank you for the contribution!
|
|
Yes working on same only |
6e823d9 to
ee627bd
Compare
|
@mzitnik can you please review now |
|
@gaurav7261 as @chernser asked below |
|
@mzitnik our |
src/main/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTask.java
Outdated
Show resolved
Hide resolved
src/main/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTask.java
Outdated
Show resolved
Hide resolved
9e115b7 to
6560216
Compare
|
@mzitnik please review again, resolved comments |
|
@mzitnik can you please review it |
6560216 to
64b9fb1
Compare
There was a problem hiding this comment.
Pull request overview
This PR adds internal record buffering capability to the ClickHouse Kafka Connect sink connector. The implementation allows records from multiple poll() calls to be accumulated in memory and flushed to ClickHouse as a single large batch, reducing the number of insert operations and improving throughput.
Changes:
- Added two new configuration options:
bufferCount(record count threshold) andbufferFlushTime(time-based threshold in milliseconds), both defaulting to 0 (disabled) for backward compatibility - Implemented buffering logic in
ClickHouseSinkTaskwith proper offset management to ensure at-least-once delivery semantics - Added comprehensive test coverage for buffering behavior, offset management, partition rebalancing, and error handling scenarios
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| src/main/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTask.java | Core buffering implementation with put(), flushBuffer(), preCommit(), close(), and stop() logic |
| src/main/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkConfig.java | Added BUFFER_COUNT and BUFFER_FLUSH_TIME configuration options with defaults and validation |
| src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskBufferTest.java | Comprehensive test suite covering buffering, offset management, partition handling, and error scenarios |
| CHANGELOG.md | Added entry for version 1.3.6 documenting the new buffering feature |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| this.bufferFlushTime = clickHouseSinkConfig.getBufferFlushTime(); | ||
| this.bufferingEnabled = this.bufferCount > 0; | ||
| this.buffer = this.bufferingEnabled ? new ArrayList<>(this.bufferCount) : new ArrayList<>(); | ||
| this.lastFlushTime = System.currentTimeMillis(); | ||
| this.flushedOffsets = new HashMap<>(); |
There was a problem hiding this comment.
Consider adding validation or a warning when bufferFlushTime > 0 but bufferCount = 0. While the config documentation states that bufferFlushTime is "Only effective when bufferCount > 0", users might miss this and wonder why time-based flushing isn't working. Adding a log warning in the start() method (after line 68) would help users catch this misconfiguration: if (this.bufferFlushTime > 0 && this.bufferCount == 0) { LOGGER.warn("bufferFlushTime is set but will be ignored because bufferCount is 0"); }
|
|
||
| if (bufferingEnabled) { | ||
| LOGGER.info("Internal buffering enabled: bufferCount={}, bufferFlushTime={}ms", | ||
| bufferCount, bufferFlushTime); | ||
| } |
There was a problem hiding this comment.
The same log message "Internal buffering enabled: bufferCount={}, bufferFlushTime={}ms" appears in both ClickHouseSinkConfig constructor (line 285) and ClickHouseSinkTask.start() method (line 75). This will result in duplicate log entries when buffering is enabled. Consider removing one of these log statements - the one in ClickHouseSinkConfig seems more appropriate since it logs during configuration parsing, while the one in ClickHouseSinkTask is redundant.
| if (bufferingEnabled) { | |
| LOGGER.info("Internal buffering enabled: bufferCount={}, bufferFlushTime={}ms", | |
| bufferCount, bufferFlushTime); | |
| } |
64b9fb1 to
e84034d
Compare
Summary
• Buffer implementation: Adds internal record buffering to ClickHouseSinkTask so that records from multiple poll() calls are accumulated and flushed as a single large batch, reducing the number of
inserts to ClickHouse.
• Two new config options: bufferCount (number of records to accumulate before flushing) and bufferFlushTime (max time in ms before flushing regardless of count). Both default to 0 (disabled), preserving
backward compatibility.
Checklist
Delete items not relevant to your PR: