-
Notifications
You must be signed in to change notification settings - Fork 1
Retury and failare #25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
# Conflicts: # flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java
|
/windsurf-review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Other comments (6)
- flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncWriter.java (180-183) The retry logic only increments the attempt count for the first element in the batch (`firstElement.incrementAttempts()`), but the retry decision is applied to the entire batch. This could lead to inconsistent retry tracking if batches are split or merged during retries. Consider incrementing the attempt count for all elements in the batch.
- flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseClientConfig.java (27-27) The `client` field is marked as transient but there's no handling for when the object is deserialized. Since this class implements Serializable, the client will be null after deserialization. Consider adding proper handling for this case or documenting this behavior.
- flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseClientConfig.java (41-55) The `createClient` method now caches the client instance, but there's no method to close or reset this connection. This could lead to resource leaks or stale connections if the configuration changes. Consider adding a `closeClient()` method to properly manage the client lifecycle.
- flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseClientConfig.java (41-55) The `createClient(String database)` method doesn't check if the provided database is different from the cached client's database. If called with different database parameters, it will still return the same cached client which might be configured for a different database.
- flink-connector-clickhouse-base/build.gradle.kts (95-95) Upgrading the Java language version from 11 to 17 is a significant change that should be explicitly mentioned in the PR description and potentially in the CHANGELOG. This change may affect users who are still on Java 11 environments.
- flink-connector-clickhouse-base/src/main/java/com/clickhouse/utils/Utils.java (51-51) This debug print statement should be removed as the information is already being properly logged with LOG.warn above.
💡 To request another review, post a new comment with "/windsurf-review".
| firstElement.incrementAttempts(); | ||
| if (firstElement.getAttemptCount() <= this.numberOfRetries) { | ||
| totalBatchRetriesCounter.inc(); | ||
| LOG.warn("Retriable exception occurred while processing request. Left attempts {}.", this.numberOfRetries - (firstElement.getAttemptCount() - 1) ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The calculation for remaining retry attempts is incorrect. Since getAttemptCount() is called after incrementing the counter, the correct calculation should be this.numberOfRetries - firstElement.getAttemptCount().
| LOG.warn("Retriable exception occurred while processing request. Left attempts {}.", this.numberOfRetries - (firstElement.getAttemptCount() - 1) ); | |
| LOG.warn("Retriable exception occurred while processing request. Left attempts {}.", this.numberOfRetries - firstElement.getAttemptCount()); |
Summary
Closes #15 #14
Checklist
Delete items not relevant to your PR: