-
Notifications
You must be signed in to change notification settings - Fork 1
SinkWriter implementation of String data type #22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR implements a String-based SinkWriter for ClickHouse, adds necessary client and test utilities, and covers them with integration tests.
- Introduces accessors and URL builder in
ClickHouseServerForTests - Exposes setup/teardown and getters in
FlinkClusterTestsfor reuse - Adds
ClickHouseAsyncSink, writer, serializer, payload, and converter for string data - Provides a CSV integration test (
ClickHouseSinkTests) and updates CI workflows
Reviewed Changes
Copilot reviewed 13 out of 15 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java | Added new getters (getHost, getPort, etc.) and renamed getDatabase. |
| flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/FlinkClusterTests.java | Made setup/teardown methods public and added proxy getters. |
| flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/DummyFlinkClusterTest.java | Updated table creation to use getDatabase(). |
| flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java | New CSV sink integration test covering 10k rows. |
| flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseClientConfig.java | New client config holding URL, creds, database, and table. |
| flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncWriter.java | Implements async submission logic for ClickHouse. |
| flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncSinkSerializer.java | Serializer for buffered request state. |
| flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncSink.java | AsyncSink base integration for ClickHouse. |
| flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/data/ClickHousePayload.java | Wrapper for payload byte array. |
| flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/convertor/ClickHouseConvertor.java | Converts String or POJO to ClickHousePayload. |
| .github/workflows/tests.yaml | Renamed workflow title to distinguish Java tests. |
| .github/workflows/tests-scala.yaml | Added Scala CI workflow with multiple ClickHouse versions and cloud support. |
Files not reviewed (2)
- flink-connector-clickhouse-base/build.gradle.kts: Language not supported
- flink-connector-clickhouse-base/src/test/scala/org/apache/flink/connector/clickhouse/test/scala/ClickHouseSinkTests.scala: Language not supported
Comments suppressed due to low confidence (5)
flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncSinkSerializer.java:13
- [nitpick] The logger is initialized with the wrong class; update to LoggerFactory.getLogger(ClickHouseAsyncSinkSerializer.class) to reflect the actual class.
private static final Logger LOG = LoggerFactory.getLogger(AsyncSinkWriterStateSerializer.class);
flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/convertor/ClickHouseConvertor.java:43
- Constructing a ClickHousePayload with a null byte array will cause a NullPointerException downstream; consider using an empty byte array or skipping payload creation for empty strings.
if (payload.isEmpty()) { return new ClickHousePayload(null); }
flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncSink.java:33
- [nitpick] The parameter name maxRecordSizeInByte is misleadingly singular; rename to maxRecordSizeInBytes for consistency with other size parameters.
public ClickHouseAsyncSink(..., long maxRecordSizeInByte, ...)
flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/DummyFlinkClusterTest.java:55
- [nitpick] Variable name createTableSQl has a typo in casing; rename to createTableSql for clarity.
String createTableSQl = String.format(...);
.github/workflows/tests.yaml:1
- The YAML key has incorrect syntax/indentation (
-name); it should bename:at the document root to define the workflow name correctly.
-name: Apache Flink ClickHouse Connector Tests CI (Java)
...use-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncWriter.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds a full async ClickHouse sink implementation—including payload handling, writers, serializers, and POJO convertors—along with expanded test support and CI updates.
- Introduces ClickHouseAsyncSink, ClickHouseAsyncWriter, client config, payload, and serializer classes
- Adds POJO support with SimplePOJO/CovidPOJO, their convertors, and end-to-end tests
- Extends test harness (ClickHouseServerForTests, FlinkClusterTests), build scripts, and workflows for Scala and Java testing
Reviewed Changes
Copilot reviewed 22 out of 22 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| src/test/java/org/apache/flink/connector/test/embedded/flink/EmbeddedFlinkClusterForTests.java | Removed unused JUnit imports |
| src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java | Renamed getDataBase → getDatabase and added getters/getURL/getTableSchema |
| src/test/java/org/apache/flink/connector/test/FlinkClusterTests.java | Made setUp/tearDown public and added server credential accessors |
| src/test/java/org/apache/flink/connector/test/DummyFlinkClusterTest.java | Switched to specific JUnit imports and updated table creation utility call |
| src/test/java/org/apache/flink/connector/clickhouse/sink/pojo/SimplePOJO.java | New POJO for basic types |
| src/test/java/org/apache/flink/connector/clickhouse/sink/pojo/CovidPOJO.java | New POJO parsing CSV payload |
| src/test/java/org/apache/flink/connector/clickhouse/sink/convertor/SimplePOJOConvertor.java | Convertor for SimplePOJO |
| src/test/java/org/apache/flink/connector/clickhouse/sink/convertor/CovidPOJOConvertor.java | Convertor for CovidPOJO |
| src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java | End-to-end Java tests for CSV, POJO, and simple POJO sinks |
| src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseClientConfig.java | ClickHouse client builder |
| src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncWriter.java | Async sink writer implementation |
| src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncSinkSerializer.java | State serializer for buffered requests |
| src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncSink.java | Async sink entry point |
| src/main/java/org/apache/flink/connector/clickhouse/data/ClickHousePayload.java | Serializable payload holder |
| src/main/java/org/apache/flink/connector/clickhouse/convertor/POJOConvertor.java | Base POJO convertor |
| src/main/java/org/apache/flink/connector/clickhouse/convertor/ClickHouseConvertor.java | ElementConverter for String and POJO |
| src/main/java/com/clickhouse/utils/Serialize.java | Low-level ClickHouse serialization utilities |
| build.gradle.kts | Added Scala plugin, bumped versions, configured Scala source sets and test tasks |
| .github/workflows/tests.yaml | Updated job name for Java CI |
| .github/workflows/tests-scala.yaml | New workflow for Scala tests |
Comments suppressed due to low confidence (3)
flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/FlinkClusterTests.java:16
- The
@AfterAllannotation is used but not imported; addimport org.junit.jupiter.api.AfterAll;to avoid compilation errors.
@AfterAll
flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/DummyFlinkClusterTest.java:57
- [nitpick] Variable name
createTableSQluses mixed case; consider renaming tocreateTableSqlorcreateTableSQLfor consistency.
String createTableSQl = String.format("CREATE TABLE ...");
flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java:200
- Trailing comma before closing parenthesis in the CREATE TABLE statement will cause a SQL syntax error; remove the comma after
doubleObject Double.
"doubleObject Double," +
| this.cumulative_recovered = 0; | ||
| this.cumulative_tested = 0; |
Copilot
AI
Jun 2, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The constructor sets cumulative_recovered and cumulative_tested to 0 instead of parsing values[8] and values[9]; update to use valueOf(values[8]) and valueOf(values[9]).
| this.cumulative_recovered = 0; | |
| this.cumulative_tested = 0; | |
| this.cumulative_recovered = valueOf(values[8].trim()); | |
| this.cumulative_tested = valueOf(values[9].trim()); |
...e/src/test/java/org/apache/flink/connector/clickhouse/sink/convertor/CovidPOJOConvertor.java
Outdated
Show resolved
Hide resolved
...house-base/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java
Outdated
Show resolved
Hide resolved
.../src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncSinkSerializer.java
Outdated
Show resolved
Hide resolved
...-base/src/main/java/org/apache/flink/connector/clickhouse/convertor/ClickHouseConvertor.java
Show resolved
Hide resolved
…/connector/clickhouse/sink/convertor/CovidPOJOConvertor.java Co-authored-by: Copilot <[email protected]>
…nk-connector-clickhouse into implement-sink-writer-api
Summary
Closes #13 #16
Checklist
Delete items not relevant to your PR: