Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ public class ClickHouseAsyncSinkSerializer extends AsyncSinkWriterStateSerialize
protected void serializeRequestToStream(ClickHousePayload clickHousePayload, DataOutputStream dataOutputStream) throws IOException {
byte[] bytes = clickHousePayload.getPayload();
if (bytes != null) {
dataOutputStream.writeInt(V1);
dataOutputStream.writeInt(bytes.length);
dataOutputStream.write(bytes);
} else {
dataOutputStream.writeInt(V1);
dataOutputStream.writeInt(-1);
}

Expand All @@ -34,11 +36,16 @@ private ClickHousePayload deserializeV1(DataInputStream dataInputStream) throws
}

@Override
protected ClickHousePayload deserializeRequestFromStream(long version, DataInputStream dataInputStream) throws IOException {
if (version == V1) {
return deserializeV1(dataInputStream);
protected ClickHousePayload deserializeRequestFromStream(long requestSize, DataInputStream dataInputStream) throws IOException {
if (requestSize > 0) {
int version = dataInputStream.readInt();
if (version == V1) {
return deserializeV1(dataInputStream);
} else {
throw new IOException("Unsupported serialization version: " + version);
}
} else {
throw new IOException("Unsupported version: " + version);
throw new IOException("Invalid request size: " + requestSize);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package org.apache.flink.connector.clickhouse.sink;

import org.apache.flink.connector.clickhouse.data.ClickHousePayload;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.io.*;

public class ClickHouseSinkStateTests {

@Test
void testSerializeAndDeserializePayload() throws Exception {
byte[] data = {'H', 'e', 'l', 'l', 'o', 'W', 'o', 'r', 'l', 'd'};
ClickHousePayload clickHousePayload = new ClickHousePayload(data);

ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
ClickHouseAsyncSinkSerializer serializer = new ClickHouseAsyncSinkSerializer();
serializer.serializeRequestToStream(clickHousePayload, dos);
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));

ClickHousePayload clickHousePayload1 = serializer.deserializeRequestFromStream(dos.size(), dis);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's an issue with the size parameter in the deserialization call. DataOutputStream doesn't have a size() method, but ByteArrayOutputStream does. You should use baos.size() instead of dos.size().

Suggested change
ClickHousePayload clickHousePayload1 = serializer.deserializeRequestFromStream(dos.size(), dis);
ClickHousePayload clickHousePayload1 = serializer.deserializeRequestFromStream(baos.size(), dis);

Assertions.assertEquals(clickHousePayload.getPayloadLength(), clickHousePayload1.getPayloadLength());
Assertions.assertArrayEquals(clickHousePayload.getPayload(), clickHousePayload1.getPayload());
}

@Test
void testSerializeAndDeserializeEmptyPayload() throws Exception {
ClickHousePayload clickHousePayload = new ClickHousePayload(null);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
ClickHouseAsyncSinkSerializer serializer = new ClickHouseAsyncSinkSerializer();
serializer.serializeRequestToStream(clickHousePayload, dos);
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));
ClickHousePayload clickHousePayload1 = serializer.deserializeRequestFromStream(dos.size(), dis);
Assertions.assertEquals(clickHousePayload.getPayloadLength(), clickHousePayload1.getPayloadLength());
Assertions.assertArrayEquals(clickHousePayload.getPayload(), clickHousePayload1.getPayload());
}

@Test
void testDeserializePayloadWithUnsuportedVersion() throws IOException {
byte[] data = {'H', 'e', 'l', 'l', 'o', 'W', 'o', 'r', 'l', 'd'};
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
DataOutputStream dataOutputStream = new DataOutputStream(baos);
int V2 = 2;
dataOutputStream.writeInt(V2);
dataOutputStream.writeInt(data.length);
dataOutputStream.write(data);
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));

ClickHouseAsyncSinkSerializer serializer = new ClickHouseAsyncSinkSerializer();
Exception exception = Assertions.assertThrows(IOException.class, () -> {
serializer.deserializeRequestFromStream(dataOutputStream.size(), dis);
});
Assertions.assertEquals("Unsupported serialization version: 2", exception.getMessage());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class ClickHouseSinkTests extends FlinkClusterTests {
static final long MAX_RECORD_SIZE_IN_BYTES = 1000;

static final int STREAM_PARALLELISM = 5;
static final int NUMBER_OF_RETRIES = 10;

private String createSimplePOJOTableSQL(String database, String tableName, int parts_to_throw_insert) {
String createTable = createSimplePOJOTableSQL(database, tableName);
Expand Down Expand Up @@ -511,7 +512,7 @@ void SimplePOJODataTooManyPartsTest() throws Exception {
env.setParallelism(STREAM_PARALLELISM);

ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(getServerURL(), getUsername(), getPassword(), getDatabase(), tableName);
clickHouseClientConfig.setNumberOfRetries(10);
clickHouseClientConfig.setNumberOfRetries(NUMBER_OF_RETRIES);
clickHouseClientConfig.setSupportDefault(simpleTableSchema.hasDefaults());

ElementConverter<SimplePOJO, ClickHousePayload> convertorCovid = new ClickHouseConvertor<>(SimplePOJO.class, simplePOJOConvertor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ public class ClickHouseAsyncSinkSerializer extends AsyncSinkWriterStateSerialize
protected void serializeRequestToStream(ClickHousePayload clickHousePayload, DataOutputStream dataOutputStream) throws IOException {
byte[] bytes = clickHousePayload.getPayload();
if (bytes != null) {
dataOutputStream.writeInt(V1);
dataOutputStream.writeInt(bytes.length);
dataOutputStream.write(bytes);
} else {
dataOutputStream.writeInt(V1);
dataOutputStream.writeInt(-1);
}

Expand All @@ -34,11 +36,16 @@ private ClickHousePayload deserializeV1(DataInputStream dataInputStream) throws
}

@Override
protected ClickHousePayload deserializeRequestFromStream(long version, DataInputStream dataInputStream) throws IOException {
if (version == V1) {
return deserializeV1(dataInputStream);
protected ClickHousePayload deserializeRequestFromStream(long requestSize, DataInputStream dataInputStream) throws IOException {
if (requestSize > 0) {
int version = dataInputStream.readInt();
if (version == V1) {
return deserializeV1(dataInputStream);
} else {
throw new IOException("Unsupported serialization version: " + version);
}
} else {
throw new IOException("Unsupported version: " + version);
throw new IOException("Invalid request size: " + requestSize);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package org.apache.flink.connector.clickhouse.sink;

import org.apache.flink.connector.clickhouse.data.ClickHousePayload;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.io.*;

public class ClickHouseSinkStateTests {

@Test
void testSerializeAndDeserializePayload() throws Exception {
byte[] data = {'H', 'e', 'l', 'l', 'o', 'W', 'o', 'r', 'l', 'd'};
ClickHousePayload clickHousePayload = new ClickHousePayload(data);

ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
ClickHouseAsyncSinkSerializer serializer = new ClickHouseAsyncSinkSerializer();
serializer.serializeRequestToStream(clickHousePayload, dos);
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));
ClickHousePayload clickHousePayload1 = serializer.deserializeRequestFromStream(dos.size(), dis);
Assertions.assertEquals(clickHousePayload.getPayloadLength(), clickHousePayload1.getPayloadLength());
Assertions.assertArrayEquals(clickHousePayload.getPayload(), clickHousePayload1.getPayload());
}

@Test
void testSerializeAndDeserializeEmptyPayload() throws Exception {
ClickHousePayload clickHousePayload = new ClickHousePayload(null);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
ClickHouseAsyncSinkSerializer serializer = new ClickHouseAsyncSinkSerializer();
serializer.serializeRequestToStream(clickHousePayload, dos);
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));
ClickHousePayload clickHousePayload1 = serializer.deserializeRequestFromStream(dos.size(), dis);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's an issue with the deserialization step. DataOutputStream doesn't have a size() method. You should use baos.size() instead to get the correct byte count.

Suggested change
ClickHousePayload clickHousePayload1 = serializer.deserializeRequestFromStream(dos.size(), dis);
ClickHousePayload clickHousePayload1 = serializer.deserializeRequestFromStream(baos.size(), dis);

Assertions.assertEquals(clickHousePayload.getPayloadLength(), clickHousePayload1.getPayloadLength());
Assertions.assertArrayEquals(clickHousePayload.getPayload(), clickHousePayload1.getPayload());
}

@Test
void testDeserializePayloadWithUnsuportedVersion() throws IOException {
byte[] data = {'H', 'e', 'l', 'l', 'o', 'W', 'o', 'r', 'l', 'd'};
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
DataOutputStream dataOutputStream = new DataOutputStream(baos);
int V2 = 2;
dataOutputStream.writeInt(V2);
dataOutputStream.writeInt(data.length);
dataOutputStream.write(data);
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));

ClickHouseAsyncSinkSerializer serializer = new ClickHouseAsyncSinkSerializer();
Exception exception = Assertions.assertThrows(IOException.class, () -> {
serializer.deserializeRequestFromStream(dataOutputStream.size(), dis);
});
Assertions.assertEquals("Unsupported serialization version: 2", exception.getMessage());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class ClickHouseSinkTests extends FlinkClusterTests {
static final long MAX_RECORD_SIZE_IN_BYTES = 1000;

static final int STREAM_PARALLELISM = 5;
static final int NUMBER_OF_RETRIES = 10;

private String createSimplePOJOTableSQL(String database, String tableName, int parts_to_throw_insert) {
String createTable = createSimplePOJOTableSQL(database, tableName);
Expand Down Expand Up @@ -508,7 +509,7 @@ void SimplePOJODataTooManyPartsTest() throws Exception {
env.setParallelism(STREAM_PARALLELISM);

ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(getServerURL(), getUsername(), getPassword(), getDatabase(), tableName);
clickHouseClientConfig.setNumberOfRetries(10);
clickHouseClientConfig.setNumberOfRetries(NUMBER_OF_RETRIES);
clickHouseClientConfig.setSupportDefault(simpleTableSchema.hasDefaults());

ElementConverter<SimplePOJO, ClickHousePayload> convertorCovid = new ClickHouseConvertor<>(SimplePOJO.class, simplePOJOConvertor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ static int getFromEnvOrDefault(String key, int defaultValue) {

public static void setUp() throws Exception {
Configuration config = new Configuration();
setUp(config);
}

public static void setUp(Configuration config) throws Exception {
config.set(RestOptions.PORT, REST_PORT); // web UI port (optional)
config.set(TaskManagerOptions.NUM_TASK_SLOTS, NUM_TASK_SLOTS);
flinkCluster = new MiniClusterWithClientResource(
Expand All @@ -31,6 +35,7 @@ public static void setUp() throws Exception {
.build());
flinkCluster.before();
}

public static void tearDown() {
if (flinkCluster != null) {
flinkCluster.after();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ public ClickHousePayload(byte[] payload) {
this.payload = payload;
}
public byte[] getPayload() { return payload; }
public int getPayloadLength() { return payload.length; }
public int getPayloadLength() {
if (payload == null) return -1;
return payload.length;
}
public int getAttemptCount() { return attemptCount; }
public void incrementAttempts() { attemptCount++; }
}
Loading