Skip to content

Commit 998d80b

Browse files
committed
Adding mote test & fixing edge case
1 parent 0e5ea14 commit 998d80b

File tree

5 files changed

+77
-15
lines changed

5 files changed

+77
-15
lines changed

flink-connector-clickhouse-1.17/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncSinkSerializer.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ protected void serializeRequestToStream(ClickHousePayload clickHousePayload, Dat
2020
dataOutputStream.writeInt(bytes.length);
2121
dataOutputStream.write(bytes);
2222
} else {
23+
dataOutputStream.writeInt(V1);
2324
dataOutputStream.writeInt(-1);
2425
}
2526

@@ -41,10 +42,10 @@ protected ClickHousePayload deserializeRequestFromStream(long requestSize, DataI
4142
if (version == V1) {
4243
return deserializeV1(dataInputStream);
4344
} else {
44-
throw new IOException("Unsupported version " + version);
45+
throw new IOException("Unsupported serialization version: " + version);
4546
}
4647
} else {
47-
throw new IOException("Request size: " + requestSize);
48+
throw new IOException("Invalid request size: " + requestSize);
4849
}
4950
}
5051

flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkStateTests.java

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,12 @@
44
import org.junit.jupiter.api.Assertions;
55
import org.junit.jupiter.api.Test;
66

7-
import java.io.ByteArrayInputStream;
8-
import java.io.ByteArrayOutputStream;
9-
import java.io.DataInputStream;
10-
import java.io.DataOutputStream;
7+
import java.io.*;
118

129
public class ClickHouseSinkStateTests {
1310

1411
@Test
15-
void SerializerTest() throws Exception {
12+
void testSerializeAndDeserializePayload() throws Exception {
1613
byte[] data = {'H', 'e', 'l', 'l', 'o', 'W', 'o', 'r', 'l', 'd'};
1714
ClickHousePayload clickHousePayload = new ClickHousePayload(data);
1815

@@ -26,4 +23,36 @@ void SerializerTest() throws Exception {
2623
Assertions.assertEquals(clickHousePayload.getPayloadLength(), clickHousePayload1.getPayloadLength());
2724
Assertions.assertArrayEquals(clickHousePayload.getPayload(), clickHousePayload1.getPayload());
2825
}
26+
27+
@Test
28+
void testSerializeAndDeserializeEmptyPayload() throws Exception {
29+
ClickHousePayload clickHousePayload = new ClickHousePayload(null);
30+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
31+
DataOutputStream dos = new DataOutputStream(baos);
32+
ClickHouseAsyncSinkSerializer serializer = new ClickHouseAsyncSinkSerializer();
33+
serializer.serializeRequestToStream(clickHousePayload, dos);
34+
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));
35+
ClickHousePayload clickHousePayload1 = serializer.deserializeRequestFromStream(dos.size(), dis);
36+
Assertions.assertEquals(clickHousePayload.getPayloadLength(), clickHousePayload1.getPayloadLength());
37+
Assertions.assertArrayEquals(clickHousePayload.getPayload(), clickHousePayload1.getPayload());
38+
}
39+
40+
@Test
41+
void testDeserializePayloadWithUnsuportedVersion() throws IOException {
42+
byte[] data = {'H', 'e', 'l', 'l', 'o', 'W', 'o', 'r', 'l', 'd'};
43+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
44+
DataOutputStream dos = new DataOutputStream(baos);
45+
DataOutputStream dataOutputStream = new DataOutputStream(baos);
46+
int V2 = 2;
47+
dataOutputStream.writeInt(V2);
48+
dataOutputStream.writeInt(data.length);
49+
dataOutputStream.write(data);
50+
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));
51+
52+
ClickHouseAsyncSinkSerializer serializer = new ClickHouseAsyncSinkSerializer();
53+
Exception exception = Assertions.assertThrows(IOException.class, () -> {
54+
serializer.deserializeRequestFromStream(dataOutputStream.size(), dis);
55+
});
56+
Assertions.assertEquals("Unsupported serialization version: 2", exception.getMessage());
57+
}
2958
}

flink-connector-clickhouse-2.0.0/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncSinkSerializer.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ protected void serializeRequestToStream(ClickHousePayload clickHousePayload, Dat
2020
dataOutputStream.writeInt(bytes.length);
2121
dataOutputStream.write(bytes);
2222
} else {
23+
dataOutputStream.writeInt(V1);
2324
dataOutputStream.writeInt(-1);
2425
}
2526

@@ -41,10 +42,10 @@ protected ClickHousePayload deserializeRequestFromStream(long requestSize, DataI
4142
if (version == V1) {
4243
return deserializeV1(dataInputStream);
4344
} else {
44-
throw new IOException("Unsupported version " + version);
45+
throw new IOException("Unsupported serialization version: " + version);
4546
}
4647
} else {
47-
throw new IOException("Request size: " + requestSize);
48+
throw new IOException("Invalid request size: " + requestSize);
4849
}
4950
}
5051

flink-connector-clickhouse-2.0.0/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkStateTests.java

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,12 @@
44
import org.junit.jupiter.api.Assertions;
55
import org.junit.jupiter.api.Test;
66

7-
import java.io.ByteArrayInputStream;
8-
import java.io.ByteArrayOutputStream;
9-
import java.io.DataInputStream;
10-
import java.io.DataOutputStream;
7+
import java.io.*;
118

129
public class ClickHouseSinkStateTests {
1310

1411
@Test
15-
void SerializerTest() throws Exception {
12+
void testSerializeAndDeserializePayload() throws Exception {
1613
byte[] data = {'H', 'e', 'l', 'l', 'o', 'W', 'o', 'r', 'l', 'd'};
1714
ClickHousePayload clickHousePayload = new ClickHousePayload(data);
1815

@@ -21,9 +18,40 @@ void SerializerTest() throws Exception {
2118
ClickHouseAsyncSinkSerializer serializer = new ClickHouseAsyncSinkSerializer();
2219
serializer.serializeRequestToStream(clickHousePayload, dos);
2320
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));
21+
ClickHousePayload clickHousePayload1 = serializer.deserializeRequestFromStream(dos.size(), dis);
22+
Assertions.assertEquals(clickHousePayload.getPayloadLength(), clickHousePayload1.getPayloadLength());
23+
Assertions.assertArrayEquals(clickHousePayload.getPayload(), clickHousePayload1.getPayload());
24+
}
2425

26+
@Test
27+
void testSerializeAndDeserializeEmptyPayload() throws Exception {
28+
ClickHousePayload clickHousePayload = new ClickHousePayload(null);
29+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
30+
DataOutputStream dos = new DataOutputStream(baos);
31+
ClickHouseAsyncSinkSerializer serializer = new ClickHouseAsyncSinkSerializer();
32+
serializer.serializeRequestToStream(clickHousePayload, dos);
33+
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));
2534
ClickHousePayload clickHousePayload1 = serializer.deserializeRequestFromStream(dos.size(), dis);
2635
Assertions.assertEquals(clickHousePayload.getPayloadLength(), clickHousePayload1.getPayloadLength());
2736
Assertions.assertArrayEquals(clickHousePayload.getPayload(), clickHousePayload1.getPayload());
2837
}
38+
39+
@Test
40+
void testDeserializePayloadWithUnsuportedVersion() throws IOException {
41+
byte[] data = {'H', 'e', 'l', 'l', 'o', 'W', 'o', 'r', 'l', 'd'};
42+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
43+
DataOutputStream dos = new DataOutputStream(baos);
44+
DataOutputStream dataOutputStream = new DataOutputStream(baos);
45+
int V2 = 2;
46+
dataOutputStream.writeInt(V2);
47+
dataOutputStream.writeInt(data.length);
48+
dataOutputStream.write(data);
49+
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));
50+
51+
ClickHouseAsyncSinkSerializer serializer = new ClickHouseAsyncSinkSerializer();
52+
Exception exception = Assertions.assertThrows(IOException.class, () -> {
53+
serializer.deserializeRequestFromStream(dataOutputStream.size(), dis);
54+
});
55+
Assertions.assertEquals("Unsupported serialization version: 2", exception.getMessage());
56+
}
2957
}

flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/data/ClickHousePayload.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,10 @@ public ClickHousePayload(byte[] payload) {
1515
this.payload = payload;
1616
}
1717
public byte[] getPayload() { return payload; }
18-
public int getPayloadLength() { return payload.length; }
18+
public int getPayloadLength() {
19+
if (payload == null) return -1;
20+
return payload.length;
21+
}
1922
public int getAttemptCount() { return attemptCount; }
2023
public void incrementAttempts() { attemptCount++; }
2124
}

0 commit comments

Comments
 (0)