-
Notifications
You must be signed in to change notification settings - Fork 1
Fix serialize and add test. #64
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Other comments (1)
- flink-connector-clickhouse-2.0.0/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncSinkSerializer.java (47-47) The error message `"Request size: " + requestSize` doesn't provide enough context about what went wrong. Consider making it more descriptive, such as `"Invalid request size: " + requestSize + ", expected a positive value"`.
💡 To request another review, post a new comment with "/windsurf-review".
| serializer.serializeRequestToStream(clickHousePayload, dos); | ||
| DataInputStream dis = new DataInputStream(new ByteArrayInputStream(baos.toByteArray())); | ||
|
|
||
| ClickHousePayload clickHousePayload1 = serializer.deserializeRequestFromStream(dos.size(), dis); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's an issue with the size parameter in the deserialization call. DataOutputStream doesn't have a size() method, but ByteArrayOutputStream does. You should use baos.size() instead of dos.size().
| ClickHousePayload clickHousePayload1 = serializer.deserializeRequestFromStream(dos.size(), dis); | |
| ClickHousePayload clickHousePayload1 = serializer.deserializeRequestFromStream(baos.size(), dis); |
| serializer.serializeRequestToStream(clickHousePayload, dos); | ||
| DataInputStream dis = new DataInputStream(new ByteArrayInputStream(baos.toByteArray())); | ||
|
|
||
| ClickHousePayload clickHousePayload1 = serializer.deserializeRequestFromStream(dos.size(), dis); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's an issue with the deserialization step. DataOutputStream doesn't have a size() method. You should use baos.size() instead to get the correct byte count.
| ClickHousePayload clickHousePayload1 = serializer.deserializeRequestFromStream(dos.size(), dis); | |
| ClickHousePayload clickHousePayload1 = serializer.deserializeRequestFromStream(baos.size(), dis); |
| protected ClickHousePayload deserializeRequestFromStream(long requestSize, DataInputStream dataInputStream) throws IOException { | ||
| if (requestSize > 0) { | ||
| int version = dataInputStream.readInt(); | ||
| if (version == V1) { | ||
| return deserializeV1(dataInputStream); | ||
| } else { | ||
| throw new IOException("Unsupported version " + version); | ||
| } | ||
| } else { | ||
| throw new IOException("Unsupported version: " + version); | ||
| throw new IOException("Request size: " + requestSize); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a mismatch between serialization and deserialization logic. In serializeRequestToStream(), you write version first, then length. But in deserializeRequestFromStream(), you're checking requestSize before reading the version. This could cause issues deserializing data that was serialized with the previous implementation.
Consider ensuring the serialization and deserialization logic are symmetric to maintain backward compatibility.
@mzitnik do we need to review the area and expand the test coverage? |
Summary
Due to the incorrect implementation of the serialization state
Closes #52 #63
Checklist
Delete items not relevant to your PR: