Skip to content

Commit c94beb3

Browse files
committed
Fix serialize and add test.
1 parent 08e0220 commit c94beb3

File tree

5 files changed

+83
-8
lines changed

5 files changed

+83
-8
lines changed

flink-connector-clickhouse-1.17/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncSinkSerializer.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ public class ClickHouseAsyncSinkSerializer extends AsyncSinkWriterStateSerialize
1616
protected void serializeRequestToStream(ClickHousePayload clickHousePayload, DataOutputStream dataOutputStream) throws IOException {
1717
byte[] bytes = clickHousePayload.getPayload();
1818
if (bytes != null) {
19+
dataOutputStream.writeInt(V1);
1920
dataOutputStream.writeInt(bytes.length);
2021
dataOutputStream.write(bytes);
2122
} else {
@@ -34,11 +35,16 @@ private ClickHousePayload deserializeV1(DataInputStream dataInputStream) throws
3435
}
3536

3637
@Override
37-
protected ClickHousePayload deserializeRequestFromStream(long version, DataInputStream dataInputStream) throws IOException {
38-
if (version == V1) {
39-
return deserializeV1(dataInputStream);
38+
protected ClickHousePayload deserializeRequestFromStream(long requestSize, DataInputStream dataInputStream) throws IOException {
39+
if (requestSize > 0) {
40+
int version = dataInputStream.readInt();
41+
if (version == V1) {
42+
return deserializeV1(dataInputStream);
43+
} else {
44+
throw new IOException("Unsupported version " + version);
45+
}
4046
} else {
41-
throw new IOException("Unsupported version: " + version);
47+
throw new IOException("request size < 0");
4248
}
4349
}
4450

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package org.apache.flink.connector.clickhouse.sink;
2+
3+
import org.apache.flink.connector.clickhouse.data.ClickHousePayload;
4+
import org.junit.jupiter.api.Assertions;
5+
import org.junit.jupiter.api.Test;
6+
7+
import java.io.ByteArrayInputStream;
8+
import java.io.ByteArrayOutputStream;
9+
import java.io.DataInputStream;
10+
import java.io.DataOutputStream;
11+
12+
public class ClickHouseSinkStateTests {
13+
14+
@Test
15+
void SerializerTest() throws Exception {
16+
byte[] data = {'H', 'e', 'l', 'l', 'o', 'W', 'o', 'r', 'l', 'd'};
17+
ClickHousePayload clickHousePayload = new ClickHousePayload(data);
18+
19+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
20+
DataOutputStream dos = new DataOutputStream(baos);
21+
ClickHouseAsyncSinkSerializer serializer = new ClickHouseAsyncSinkSerializer();
22+
serializer.serializeRequestToStream(clickHousePayload, dos);
23+
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));
24+
25+
ClickHousePayload clickHousePayload1 = serializer.deserializeRequestFromStream(dos.size(), dis);
26+
Assertions.assertEquals(clickHousePayload.getPayloadLength(), clickHousePayload1.getPayloadLength());
27+
Assertions.assertArrayEquals(clickHousePayload.getPayload(), clickHousePayload1.getPayload());
28+
}
29+
}

flink-connector-clickhouse-2.0.0/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncSinkSerializer.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ public class ClickHouseAsyncSinkSerializer extends AsyncSinkWriterStateSerialize
1616
protected void serializeRequestToStream(ClickHousePayload clickHousePayload, DataOutputStream dataOutputStream) throws IOException {
1717
byte[] bytes = clickHousePayload.getPayload();
1818
if (bytes != null) {
19+
dataOutputStream.writeInt(V1);
1920
dataOutputStream.writeInt(bytes.length);
2021
dataOutputStream.write(bytes);
2122
} else {
@@ -34,11 +35,16 @@ private ClickHousePayload deserializeV1(DataInputStream dataInputStream) throws
3435
}
3536

3637
@Override
37-
protected ClickHousePayload deserializeRequestFromStream(long version, DataInputStream dataInputStream) throws IOException {
38-
if (version == V1) {
39-
return deserializeV1(dataInputStream);
38+
protected ClickHousePayload deserializeRequestFromStream(long requestSize, DataInputStream dataInputStream) throws IOException {
39+
if (requestSize > 0) {
40+
int version = dataInputStream.readInt();
41+
if (version == V1) {
42+
return deserializeV1(dataInputStream);
43+
} else {
44+
throw new IOException("Unsupported version " + version);
45+
}
4046
} else {
41-
throw new IOException("Unsupported version: " + version);
47+
throw new IOException("request size < 0");
4248
}
4349
}
4450

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package org.apache.flink.connector.clickhouse.sink;
2+
3+
import org.apache.flink.connector.clickhouse.data.ClickHousePayload;
4+
import org.junit.jupiter.api.Assertions;
5+
import org.junit.jupiter.api.Test;
6+
7+
import java.io.ByteArrayInputStream;
8+
import java.io.ByteArrayOutputStream;
9+
import java.io.DataInputStream;
10+
import java.io.DataOutputStream;
11+
12+
public class ClickHouseSinkStateTests {
13+
14+
@Test
15+
void SerializerTest() throws Exception {
16+
byte[] data = {'H', 'e', 'l', 'l', 'o', 'W', 'o', 'r', 'l', 'd'};
17+
ClickHousePayload clickHousePayload = new ClickHousePayload(data);
18+
19+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
20+
DataOutputStream dos = new DataOutputStream(baos);
21+
ClickHouseAsyncSinkSerializer serializer = new ClickHouseAsyncSinkSerializer();
22+
serializer.serializeRequestToStream(clickHousePayload, dos);
23+
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));
24+
25+
ClickHousePayload clickHousePayload1 = serializer.deserializeRequestFromStream(dos.size(), dis);
26+
Assertions.assertEquals(clickHousePayload.getPayloadLength(), clickHousePayload1.getPayloadLength());
27+
Assertions.assertArrayEquals(clickHousePayload.getPayload(), clickHousePayload1.getPayload());
28+
}
29+
}

flink-connector-clickhouse-2.0.0/src/test/java/org/apache/flink/connector/test/embedded/flink/EmbeddedFlinkClusterForTests.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ static int getFromEnvOrDefault(String key, int defaultValue) {
2121

2222
public static void setUp() throws Exception {
2323
Configuration config = new Configuration();
24+
setUp(config);
25+
}
26+
27+
public static void setUp(Configuration config) throws Exception {
2428
config.set(RestOptions.PORT, REST_PORT); // web UI port (optional)
2529
config.set(TaskManagerOptions.NUM_TASK_SLOTS, NUM_TASK_SLOTS);
2630
flinkCluster = new MiniClusterWithClientResource(
@@ -31,6 +35,7 @@ public static void setUp() throws Exception {
3135
.build());
3236
flinkCluster.before();
3337
}
38+
3439
public static void tearDown() {
3540
if (flinkCluster != null) {
3641
flinkCluster.after();

0 commit comments

Comments
 (0)