Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ public class ClickHouseAsyncSinkSerializer extends AsyncSinkWriterStateSerialize
protected void serializeRequestToStream(ClickHousePayload clickHousePayload, DataOutputStream dataOutputStream) throws IOException {
byte[] bytes = clickHousePayload.getPayload();
if (bytes != null) {
dataOutputStream.writeInt(V1);
dataOutputStream.writeInt(bytes.length);
dataOutputStream.write(bytes);
} else {
Expand All @@ -34,11 +35,16 @@ private ClickHousePayload deserializeV1(DataInputStream dataInputStream) throws
}

@Override
protected ClickHousePayload deserializeRequestFromStream(long version, DataInputStream dataInputStream) throws IOException {
if (version == V1) {
return deserializeV1(dataInputStream);
protected ClickHousePayload deserializeRequestFromStream(long requestSize, DataInputStream dataInputStream) throws IOException {
if (requestSize > 0) {
int version = dataInputStream.readInt();
if (version == V1) {
return deserializeV1(dataInputStream);
} else {
throw new IOException("Unsupported version " + version);
}
} else {
throw new IOException("Unsupported version: " + version);
throw new IOException("Request size: " + requestSize);
}
Comment on lines 39 to 49
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a mismatch between serialization and deserialization logic. In serializeRequestToStream(), you write version first, then length. But in deserializeRequestFromStream(), you're checking requestSize before reading the version. This could cause issues deserializing data that was serialized with the previous implementation.

Consider ensuring the serialization and deserialization logic are symmetric to maintain backward compatibility.

}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package org.apache.flink.connector.clickhouse.sink;

import org.apache.flink.connector.clickhouse.data.ClickHousePayload;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;

public class ClickHouseSinkStateTests {

@Test
void SerializerTest() throws Exception {
byte[] data = {'H', 'e', 'l', 'l', 'o', 'W', 'o', 'r', 'l', 'd'};
ClickHousePayload clickHousePayload = new ClickHousePayload(data);

ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
ClickHouseAsyncSinkSerializer serializer = new ClickHouseAsyncSinkSerializer();
serializer.serializeRequestToStream(clickHousePayload, dos);
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));

ClickHousePayload clickHousePayload1 = serializer.deserializeRequestFromStream(dos.size(), dis);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's an issue with the size parameter in the deserialization call. DataOutputStream doesn't have a size() method, but ByteArrayOutputStream does. You should use baos.size() instead of dos.size().

Suggested change
ClickHousePayload clickHousePayload1 = serializer.deserializeRequestFromStream(dos.size(), dis);
ClickHousePayload clickHousePayload1 = serializer.deserializeRequestFromStream(baos.size(), dis);

Assertions.assertEquals(clickHousePayload.getPayloadLength(), clickHousePayload1.getPayloadLength());
Assertions.assertArrayEquals(clickHousePayload.getPayload(), clickHousePayload1.getPayload());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class ClickHouseSinkTests extends FlinkClusterTests {
static final long MAX_RECORD_SIZE_IN_BYTES = 1000;

static final int STREAM_PARALLELISM = 5;
static final int NUMBER_OF_RETRIES = 10;

private String createSimplePOJOTableSQL(String database, String tableName, int parts_to_throw_insert) {
String createTable = createSimplePOJOTableSQL(database, tableName);
Expand Down Expand Up @@ -511,7 +512,7 @@ void SimplePOJODataTooManyPartsTest() throws Exception {
env.setParallelism(STREAM_PARALLELISM);

ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(getServerURL(), getUsername(), getPassword(), getDatabase(), tableName);
clickHouseClientConfig.setNumberOfRetries(10);
clickHouseClientConfig.setNumberOfRetries(NUMBER_OF_RETRIES);
clickHouseClientConfig.setSupportDefault(simpleTableSchema.hasDefaults());

ElementConverter<SimplePOJO, ClickHousePayload> convertorCovid = new ClickHouseConvertor<>(SimplePOJO.class, simplePOJOConvertor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ public class ClickHouseAsyncSinkSerializer extends AsyncSinkWriterStateSerialize
protected void serializeRequestToStream(ClickHousePayload clickHousePayload, DataOutputStream dataOutputStream) throws IOException {
byte[] bytes = clickHousePayload.getPayload();
if (bytes != null) {
dataOutputStream.writeInt(V1);
dataOutputStream.writeInt(bytes.length);
dataOutputStream.write(bytes);
} else {
Expand All @@ -34,11 +35,16 @@ private ClickHousePayload deserializeV1(DataInputStream dataInputStream) throws
}

@Override
protected ClickHousePayload deserializeRequestFromStream(long version, DataInputStream dataInputStream) throws IOException {
if (version == V1) {
return deserializeV1(dataInputStream);
protected ClickHousePayload deserializeRequestFromStream(long requestSize, DataInputStream dataInputStream) throws IOException {
if (requestSize > 0) {
int version = dataInputStream.readInt();
if (version == V1) {
return deserializeV1(dataInputStream);
} else {
throw new IOException("Unsupported version " + version);
}
} else {
throw new IOException("Unsupported version: " + version);
throw new IOException("Request size: " + requestSize);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package org.apache.flink.connector.clickhouse.sink;

import org.apache.flink.connector.clickhouse.data.ClickHousePayload;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;

public class ClickHouseSinkStateTests {

@Test
void SerializerTest() throws Exception {
byte[] data = {'H', 'e', 'l', 'l', 'o', 'W', 'o', 'r', 'l', 'd'};
ClickHousePayload clickHousePayload = new ClickHousePayload(data);

ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
ClickHouseAsyncSinkSerializer serializer = new ClickHouseAsyncSinkSerializer();
serializer.serializeRequestToStream(clickHousePayload, dos);
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));

ClickHousePayload clickHousePayload1 = serializer.deserializeRequestFromStream(dos.size(), dis);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's an issue with the deserialization step. DataOutputStream doesn't have a size() method. You should use baos.size() instead to get the correct byte count.

Suggested change
ClickHousePayload clickHousePayload1 = serializer.deserializeRequestFromStream(dos.size(), dis);
ClickHousePayload clickHousePayload1 = serializer.deserializeRequestFromStream(baos.size(), dis);

Assertions.assertEquals(clickHousePayload.getPayloadLength(), clickHousePayload1.getPayloadLength());
Assertions.assertArrayEquals(clickHousePayload.getPayload(), clickHousePayload1.getPayload());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class ClickHouseSinkTests extends FlinkClusterTests {
static final long MAX_RECORD_SIZE_IN_BYTES = 1000;

static final int STREAM_PARALLELISM = 5;
static final int NUMBER_OF_RETRIES = 10;

private String createSimplePOJOTableSQL(String database, String tableName, int parts_to_throw_insert) {
String createTable = createSimplePOJOTableSQL(database, tableName);
Expand Down Expand Up @@ -508,7 +509,7 @@ void SimplePOJODataTooManyPartsTest() throws Exception {
env.setParallelism(STREAM_PARALLELISM);

ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(getServerURL(), getUsername(), getPassword(), getDatabase(), tableName);
clickHouseClientConfig.setNumberOfRetries(10);
clickHouseClientConfig.setNumberOfRetries(NUMBER_OF_RETRIES);
clickHouseClientConfig.setSupportDefault(simpleTableSchema.hasDefaults());

ElementConverter<SimplePOJO, ClickHousePayload> convertorCovid = new ClickHouseConvertor<>(SimplePOJO.class, simplePOJOConvertor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ static int getFromEnvOrDefault(String key, int defaultValue) {

public static void setUp() throws Exception {
Configuration config = new Configuration();
setUp(config);
}

public static void setUp(Configuration config) throws Exception {
config.set(RestOptions.PORT, REST_PORT); // web UI port (optional)
config.set(TaskManagerOptions.NUM_TASK_SLOTS, NUM_TASK_SLOTS);
flinkCluster = new MiniClusterWithClientResource(
Expand All @@ -31,6 +35,7 @@ public static void setUp() throws Exception {
.build());
flinkCluster.before();
}

public static void tearDown() {
if (flinkCluster != null) {
flinkCluster.after();
Expand Down
Loading