Skip to content

Commit 824184d

Browse files
authored
Merge pull request #64 from ClickHouse/state-bug-fix
2 parents 08e0220 + 998d80b commit 824184d

File tree

8 files changed

+150
-11
lines changed

8 files changed

+150
-11
lines changed

flink-connector-clickhouse-1.17/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncSinkSerializer.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,11 @@ public class ClickHouseAsyncSinkSerializer extends AsyncSinkWriterStateSerialize
1616
protected void serializeRequestToStream(ClickHousePayload clickHousePayload, DataOutputStream dataOutputStream) throws IOException {
1717
byte[] bytes = clickHousePayload.getPayload();
1818
if (bytes != null) {
19+
dataOutputStream.writeInt(V1);
1920
dataOutputStream.writeInt(bytes.length);
2021
dataOutputStream.write(bytes);
2122
} else {
23+
dataOutputStream.writeInt(V1);
2224
dataOutputStream.writeInt(-1);
2325
}
2426

@@ -34,11 +36,16 @@ private ClickHousePayload deserializeV1(DataInputStream dataInputStream) throws
3436
}
3537

3638
@Override
37-
protected ClickHousePayload deserializeRequestFromStream(long version, DataInputStream dataInputStream) throws IOException {
38-
if (version == V1) {
39-
return deserializeV1(dataInputStream);
39+
protected ClickHousePayload deserializeRequestFromStream(long requestSize, DataInputStream dataInputStream) throws IOException {
40+
if (requestSize > 0) {
41+
int version = dataInputStream.readInt();
42+
if (version == V1) {
43+
return deserializeV1(dataInputStream);
44+
} else {
45+
throw new IOException("Unsupported serialization version: " + version);
46+
}
4047
} else {
41-
throw new IOException("Unsupported version: " + version);
48+
throw new IOException("Invalid request size: " + requestSize);
4249
}
4350
}
4451

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package org.apache.flink.connector.clickhouse.sink;
2+
3+
import org.apache.flink.connector.clickhouse.data.ClickHousePayload;
4+
import org.junit.jupiter.api.Assertions;
5+
import org.junit.jupiter.api.Test;
6+
7+
import java.io.*;
8+
9+
public class ClickHouseSinkStateTests {
10+
11+
@Test
12+
void testSerializeAndDeserializePayload() throws Exception {
13+
byte[] data = {'H', 'e', 'l', 'l', 'o', 'W', 'o', 'r', 'l', 'd'};
14+
ClickHousePayload clickHousePayload = new ClickHousePayload(data);
15+
16+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
17+
DataOutputStream dos = new DataOutputStream(baos);
18+
ClickHouseAsyncSinkSerializer serializer = new ClickHouseAsyncSinkSerializer();
19+
serializer.serializeRequestToStream(clickHousePayload, dos);
20+
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));
21+
22+
ClickHousePayload clickHousePayload1 = serializer.deserializeRequestFromStream(dos.size(), dis);
23+
Assertions.assertEquals(clickHousePayload.getPayloadLength(), clickHousePayload1.getPayloadLength());
24+
Assertions.assertArrayEquals(clickHousePayload.getPayload(), clickHousePayload1.getPayload());
25+
}
26+
27+
@Test
28+
void testSerializeAndDeserializeEmptyPayload() throws Exception {
29+
ClickHousePayload clickHousePayload = new ClickHousePayload(null);
30+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
31+
DataOutputStream dos = new DataOutputStream(baos);
32+
ClickHouseAsyncSinkSerializer serializer = new ClickHouseAsyncSinkSerializer();
33+
serializer.serializeRequestToStream(clickHousePayload, dos);
34+
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));
35+
ClickHousePayload clickHousePayload1 = serializer.deserializeRequestFromStream(dos.size(), dis);
36+
Assertions.assertEquals(clickHousePayload.getPayloadLength(), clickHousePayload1.getPayloadLength());
37+
Assertions.assertArrayEquals(clickHousePayload.getPayload(), clickHousePayload1.getPayload());
38+
}
39+
40+
@Test
41+
void testDeserializePayloadWithUnsuportedVersion() throws IOException {
42+
byte[] data = {'H', 'e', 'l', 'l', 'o', 'W', 'o', 'r', 'l', 'd'};
43+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
44+
DataOutputStream dos = new DataOutputStream(baos);
45+
DataOutputStream dataOutputStream = new DataOutputStream(baos);
46+
int V2 = 2;
47+
dataOutputStream.writeInt(V2);
48+
dataOutputStream.writeInt(data.length);
49+
dataOutputStream.write(data);
50+
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));
51+
52+
ClickHouseAsyncSinkSerializer serializer = new ClickHouseAsyncSinkSerializer();
53+
Exception exception = Assertions.assertThrows(IOException.class, () -> {
54+
serializer.deserializeRequestFromStream(dataOutputStream.size(), dis);
55+
});
56+
Assertions.assertEquals("Unsupported serialization version: 2", exception.getMessage());
57+
}
58+
}

flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ public class ClickHouseSinkTests extends FlinkClusterTests {
4545
static final long MAX_RECORD_SIZE_IN_BYTES = 1000;
4646

4747
static final int STREAM_PARALLELISM = 5;
48+
static final int NUMBER_OF_RETRIES = 10;
4849

4950
private String createSimplePOJOTableSQL(String database, String tableName, int parts_to_throw_insert) {
5051
String createTable = createSimplePOJOTableSQL(database, tableName);
@@ -511,7 +512,7 @@ void SimplePOJODataTooManyPartsTest() throws Exception {
511512
env.setParallelism(STREAM_PARALLELISM);
512513

513514
ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(getServerURL(), getUsername(), getPassword(), getDatabase(), tableName);
514-
clickHouseClientConfig.setNumberOfRetries(10);
515+
clickHouseClientConfig.setNumberOfRetries(NUMBER_OF_RETRIES);
515516
clickHouseClientConfig.setSupportDefault(simpleTableSchema.hasDefaults());
516517

517518
ElementConverter<SimplePOJO, ClickHousePayload> convertorCovid = new ClickHouseConvertor<>(SimplePOJO.class, simplePOJOConvertor);

flink-connector-clickhouse-2.0.0/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncSinkSerializer.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,11 @@ public class ClickHouseAsyncSinkSerializer extends AsyncSinkWriterStateSerialize
1616
protected void serializeRequestToStream(ClickHousePayload clickHousePayload, DataOutputStream dataOutputStream) throws IOException {
1717
byte[] bytes = clickHousePayload.getPayload();
1818
if (bytes != null) {
19+
dataOutputStream.writeInt(V1);
1920
dataOutputStream.writeInt(bytes.length);
2021
dataOutputStream.write(bytes);
2122
} else {
23+
dataOutputStream.writeInt(V1);
2224
dataOutputStream.writeInt(-1);
2325
}
2426

@@ -34,11 +36,16 @@ private ClickHousePayload deserializeV1(DataInputStream dataInputStream) throws
3436
}
3537

3638
@Override
37-
protected ClickHousePayload deserializeRequestFromStream(long version, DataInputStream dataInputStream) throws IOException {
38-
if (version == V1) {
39-
return deserializeV1(dataInputStream);
39+
protected ClickHousePayload deserializeRequestFromStream(long requestSize, DataInputStream dataInputStream) throws IOException {
40+
if (requestSize > 0) {
41+
int version = dataInputStream.readInt();
42+
if (version == V1) {
43+
return deserializeV1(dataInputStream);
44+
} else {
45+
throw new IOException("Unsupported serialization version: " + version);
46+
}
4047
} else {
41-
throw new IOException("Unsupported version: " + version);
48+
throw new IOException("Invalid request size: " + requestSize);
4249
}
4350
}
4451

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package org.apache.flink.connector.clickhouse.sink;
2+
3+
import org.apache.flink.connector.clickhouse.data.ClickHousePayload;
4+
import org.junit.jupiter.api.Assertions;
5+
import org.junit.jupiter.api.Test;
6+
7+
import java.io.*;
8+
9+
public class ClickHouseSinkStateTests {
10+
11+
@Test
12+
void testSerializeAndDeserializePayload() throws Exception {
13+
byte[] data = {'H', 'e', 'l', 'l', 'o', 'W', 'o', 'r', 'l', 'd'};
14+
ClickHousePayload clickHousePayload = new ClickHousePayload(data);
15+
16+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
17+
DataOutputStream dos = new DataOutputStream(baos);
18+
ClickHouseAsyncSinkSerializer serializer = new ClickHouseAsyncSinkSerializer();
19+
serializer.serializeRequestToStream(clickHousePayload, dos);
20+
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));
21+
ClickHousePayload clickHousePayload1 = serializer.deserializeRequestFromStream(dos.size(), dis);
22+
Assertions.assertEquals(clickHousePayload.getPayloadLength(), clickHousePayload1.getPayloadLength());
23+
Assertions.assertArrayEquals(clickHousePayload.getPayload(), clickHousePayload1.getPayload());
24+
}
25+
26+
@Test
27+
void testSerializeAndDeserializeEmptyPayload() throws Exception {
28+
ClickHousePayload clickHousePayload = new ClickHousePayload(null);
29+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
30+
DataOutputStream dos = new DataOutputStream(baos);
31+
ClickHouseAsyncSinkSerializer serializer = new ClickHouseAsyncSinkSerializer();
32+
serializer.serializeRequestToStream(clickHousePayload, dos);
33+
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));
34+
ClickHousePayload clickHousePayload1 = serializer.deserializeRequestFromStream(dos.size(), dis);
35+
Assertions.assertEquals(clickHousePayload.getPayloadLength(), clickHousePayload1.getPayloadLength());
36+
Assertions.assertArrayEquals(clickHousePayload.getPayload(), clickHousePayload1.getPayload());
37+
}
38+
39+
@Test
40+
void testDeserializePayloadWithUnsuportedVersion() throws IOException {
41+
byte[] data = {'H', 'e', 'l', 'l', 'o', 'W', 'o', 'r', 'l', 'd'};
42+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
43+
DataOutputStream dos = new DataOutputStream(baos);
44+
DataOutputStream dataOutputStream = new DataOutputStream(baos);
45+
int V2 = 2;
46+
dataOutputStream.writeInt(V2);
47+
dataOutputStream.writeInt(data.length);
48+
dataOutputStream.write(data);
49+
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));
50+
51+
ClickHouseAsyncSinkSerializer serializer = new ClickHouseAsyncSinkSerializer();
52+
Exception exception = Assertions.assertThrows(IOException.class, () -> {
53+
serializer.deserializeRequestFromStream(dataOutputStream.size(), dis);
54+
});
55+
Assertions.assertEquals("Unsupported serialization version: 2", exception.getMessage());
56+
}
57+
}

flink-connector-clickhouse-2.0.0/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ public class ClickHouseSinkTests extends FlinkClusterTests {
4545
static final long MAX_RECORD_SIZE_IN_BYTES = 1000;
4646

4747
static final int STREAM_PARALLELISM = 5;
48+
static final int NUMBER_OF_RETRIES = 10;
4849

4950
private String createSimplePOJOTableSQL(String database, String tableName, int parts_to_throw_insert) {
5051
String createTable = createSimplePOJOTableSQL(database, tableName);
@@ -508,7 +509,7 @@ void SimplePOJODataTooManyPartsTest() throws Exception {
508509
env.setParallelism(STREAM_PARALLELISM);
509510

510511
ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(getServerURL(), getUsername(), getPassword(), getDatabase(), tableName);
511-
clickHouseClientConfig.setNumberOfRetries(10);
512+
clickHouseClientConfig.setNumberOfRetries(NUMBER_OF_RETRIES);
512513
clickHouseClientConfig.setSupportDefault(simpleTableSchema.hasDefaults());
513514

514515
ElementConverter<SimplePOJO, ClickHousePayload> convertorCovid = new ClickHouseConvertor<>(SimplePOJO.class, simplePOJOConvertor);

flink-connector-clickhouse-2.0.0/src/test/java/org/apache/flink/connector/test/embedded/flink/EmbeddedFlinkClusterForTests.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ static int getFromEnvOrDefault(String key, int defaultValue) {
2121

2222
public static void setUp() throws Exception {
2323
Configuration config = new Configuration();
24+
setUp(config);
25+
}
26+
27+
public static void setUp(Configuration config) throws Exception {
2428
config.set(RestOptions.PORT, REST_PORT); // web UI port (optional)
2529
config.set(TaskManagerOptions.NUM_TASK_SLOTS, NUM_TASK_SLOTS);
2630
flinkCluster = new MiniClusterWithClientResource(
@@ -31,6 +35,7 @@ public static void setUp() throws Exception {
3135
.build());
3236
flinkCluster.before();
3337
}
38+
3439
public static void tearDown() {
3540
if (flinkCluster != null) {
3641
flinkCluster.after();

flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/data/ClickHousePayload.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,10 @@ public ClickHousePayload(byte[] payload) {
1515
this.payload = payload;
1616
}
1717
public byte[] getPayload() { return payload; }
18-
public int getPayloadLength() { return payload.length; }
18+
public int getPayloadLength() {
19+
if (payload == null) return -1;
20+
return payload.length;
21+
}
1922
public int getAttemptCount() { return attemptCount; }
2023
public void incrementAttempts() { attemptCount++; }
2124
}

0 commit comments

Comments
 (0)