Skip to content

Commit 608b071

Browse files
committed
SinkWriter implementation of String data type
1 parent cbb1944 commit 608b071

File tree

11 files changed

+480
-2
lines changed

11 files changed

+480
-2
lines changed
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package org.apache.flink.connector.clickhouse.convertor;
2+
3+
import org.apache.flink.api.connector.sink2.SinkWriter;
4+
import org.apache.flink.api.connector.sink2.WriterInitContext;
5+
import org.apache.flink.connector.base.sink.writer.ElementConverter;
6+
import org.apache.flink.connector.clickhouse.data.ClickHousePayload;
7+
import org.slf4j.Logger;
8+
import org.slf4j.LoggerFactory;
9+
10+
public class ClickHouseConvertor<InputT> implements ElementConverter<InputT, ClickHousePayload> {
11+
private static final Logger LOG = LoggerFactory.getLogger(ClickHouseConvertor.class);
12+
13+
enum Types {
14+
STRING,
15+
POJO,
16+
}
17+
private final Types type;
18+
19+
public ClickHouseConvertor(Class<?> clazz) {
20+
if (clazz == null) {
21+
throw new IllegalArgumentException("clazz must not be not null");
22+
}
23+
if (clazz == String.class) {
24+
type = Types.STRING;
25+
26+
} else {
27+
type = Types.POJO;
28+
// lets register it
29+
30+
}
31+
}
32+
33+
@Override
34+
public ClickHousePayload apply( InputT o, SinkWriter.Context context) {
35+
if (o == null) {
36+
// we need to skip it
37+
return null;
38+
}
39+
//
40+
if (o instanceof String && type == Types.STRING) {
41+
String payload = o.toString();
42+
if (payload.isEmpty()) {
43+
return new ClickHousePayload(null);
44+
}
45+
if (payload.endsWith("\n"))
46+
return new ClickHousePayload(payload.getBytes());
47+
return new ClickHousePayload((payload + "\n").getBytes());
48+
}
49+
if (type == Types.POJO) {
50+
// TODO Convert to byte stream
51+
return null;
52+
}
53+
throw new IllegalArgumentException("unable to convert " + o + " to " + type);
54+
}
55+
56+
@Override
57+
public void open(WriterInitContext context) {
58+
ElementConverter.super.open(context);
59+
}
60+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package org.apache.flink.connector.clickhouse.data;
2+
3+
import org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer;
4+
import org.slf4j.Logger;
5+
import org.slf4j.LoggerFactory;
6+
7+
import java.io.Serializable;
8+
9+
public class ClickHousePayload implements Serializable {
10+
private static final Logger LOG = LoggerFactory.getLogger(ClickHousePayload.class);
11+
12+
private final byte[] payload;
13+
public ClickHousePayload(byte[] payload) {
14+
this.payload = payload;
15+
}
16+
public byte[] getPayload() { return payload; }
17+
public int getPayloadLength() { return payload.length; }
18+
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package org.apache.flink.connector.clickhouse.sink;
2+
3+
4+
import com.clickhouse.data.ClickHouseFormat;
5+
import org.apache.flink.api.connector.sink2.SinkWriter;
6+
import org.apache.flink.api.connector.sink2.StatefulSinkWriter;
7+
import org.apache.flink.api.connector.sink2.WriterInitContext;
8+
import org.apache.flink.connector.base.sink.AsyncSinkBase;
9+
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
10+
import org.apache.flink.connector.base.sink.writer.ElementConverter;
11+
import org.apache.flink.connector.clickhouse.data.ClickHousePayload;
12+
import org.apache.flink.core.io.SimpleVersionedSerializer;
13+
import org.slf4j.Logger;
14+
import org.slf4j.LoggerFactory;
15+
16+
import java.io.IOException;
17+
import java.util.Collection;
18+
import java.util.Collections;
19+
20+
public class ClickHouseAsyncSink<InputT> extends AsyncSinkBase<InputT, ClickHousePayload> {
21+
private static final Logger LOG = LoggerFactory.getLogger(ClickHouseAsyncSink.class);
22+
23+
protected ClickHouseClientConfig clickHouseClientConfig;
24+
protected ClickHouseFormat clickHouseFormat = null;
25+
26+
protected ClickHouseAsyncSink(
27+
ElementConverter<InputT, ClickHousePayload> converter,
28+
int maxBatchSize,
29+
int maxInFlightRequests,
30+
int maxBufferedRequests,
31+
long maxBatchSizeInBytes,
32+
long maxTimeInBufferMS,
33+
long maxRecordSizeInByte,
34+
ClickHouseClientConfig clickHouseClientConfig
35+
) {
36+
super(converter,
37+
maxBatchSize,
38+
maxInFlightRequests,
39+
maxBufferedRequests,
40+
maxBatchSizeInBytes,
41+
maxTimeInBufferMS,
42+
maxRecordSizeInByte);
43+
44+
this.clickHouseClientConfig = clickHouseClientConfig;
45+
}
46+
47+
public void setClickHouseFormat(ClickHouseFormat clickHouseFormat) {
48+
this.clickHouseFormat = clickHouseFormat;
49+
}
50+
51+
public ClickHouseFormat getClickHouseFormat() { return this.clickHouseFormat; }
52+
53+
@Override
54+
public SinkWriter<InputT> createWriter(WriterInitContext writerInitContext) throws IOException {
55+
return restoreWriter(writerInitContext, Collections.emptyList());
56+
}
57+
58+
@Override
59+
public StatefulSinkWriter<InputT, BufferedRequestState<ClickHousePayload>> restoreWriter(WriterInitContext writerInitContext, Collection<BufferedRequestState<ClickHousePayload>> collection) throws IOException {
60+
return new ClickHouseAsyncWriter<>(
61+
getElementConverter(),
62+
writerInitContext,
63+
getMaxBatchSize(),
64+
getMaxInFlightRequests(),
65+
getMaxBufferedRequests(),
66+
getMaxBatchSizeInBytes(),
67+
getMaxTimeInBufferMS(),
68+
getMaxRecordSizeInBytes(),
69+
clickHouseClientConfig,
70+
clickHouseFormat,
71+
collection
72+
);
73+
}
74+
75+
@Override
76+
public SimpleVersionedSerializer<BufferedRequestState<ClickHousePayload>> getWriterStateSerializer() {
77+
return new ClickHouseAsyncSinkSerializer();
78+
}
79+
}
80+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package org.apache.flink.connector.clickhouse.sink;
2+
3+
import org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer;
4+
import org.apache.flink.connector.clickhouse.data.ClickHousePayload;
5+
import org.slf4j.Logger;
6+
import org.slf4j.LoggerFactory;
7+
8+
import java.io.DataInputStream;
9+
import java.io.DataOutputStream;
10+
import java.io.IOException;
11+
12+
public class ClickHouseAsyncSinkSerializer extends AsyncSinkWriterStateSerializer<ClickHousePayload> {
13+
private static final Logger LOG = LoggerFactory.getLogger(AsyncSinkWriterStateSerializer.class);
14+
15+
@Override
16+
protected void serializeRequestToStream(ClickHousePayload clickHousePayload, DataOutputStream dataOutputStream) throws IOException {
17+
byte[] bytes = clickHousePayload.getPayload();
18+
dataOutputStream.writeInt(bytes.length);
19+
dataOutputStream.write(bytes);
20+
}
21+
22+
private ClickHousePayload deserializeV1(DataInputStream dataInputStream) throws IOException {
23+
int len = dataInputStream.readInt();
24+
byte[] payload = dataInputStream.readNBytes(len);
25+
return new ClickHousePayload(payload);
26+
}
27+
28+
@Override
29+
protected ClickHousePayload deserializeRequestFromStream(long version, DataInputStream dataInputStream) throws IOException {
30+
if (version == 1) {
31+
return deserializeV1(dataInputStream);
32+
} else {
33+
throw new IOException("Unsupported version: " + version);
34+
}
35+
}
36+
37+
@Override
38+
public int getVersion() {
39+
return 1;
40+
}
41+
}
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
package org.apache.flink.connector.clickhouse.sink;
2+
3+
import com.clickhouse.client.api.Client;
4+
import com.clickhouse.client.api.ClientConfigProperties;
5+
import com.clickhouse.client.api.insert.InsertResponse;
6+
import com.clickhouse.client.api.insert.InsertSettings;
7+
import com.clickhouse.data.ClickHouseFormat;
8+
import org.apache.flink.api.connector.sink2.WriterInitContext;
9+
import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
10+
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
11+
import org.apache.flink.connector.base.sink.writer.ElementConverter;
12+
import org.apache.flink.connector.base.sink.writer.ResultHandler;
13+
import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
14+
import org.apache.flink.connector.clickhouse.data.ClickHousePayload;
15+
import org.slf4j.Logger;
16+
import org.slf4j.LoggerFactory;
17+
18+
import java.util.Collection;
19+
import java.util.List;
20+
import java.util.concurrent.CompletableFuture;
21+
import java.util.concurrent.atomic.AtomicInteger;
22+
23+
public class ClickHouseAsyncWriter<InputT> extends AsyncSinkWriter<InputT, ClickHousePayload> {
24+
private static final Logger LOG = LoggerFactory.getLogger(ClickHouseAsyncWriter.class);
25+
26+
private final ClickHouseClientConfig clickHouseClientConfig;
27+
private ClickHouseFormat clickHouseFormat = null;
28+
29+
public ClickHouseAsyncWriter(ElementConverter<InputT, ClickHousePayload> elementConverter,
30+
WriterInitContext context,
31+
int maxBatchSize,
32+
int maxInFlightRequests,
33+
int maxBufferedRequests,
34+
long maxBatchSizeInBytes,
35+
long maxTimeInBufferMS,
36+
long maxRecordSizeInBytes,
37+
ClickHouseClientConfig clickHouseClientConfig,
38+
ClickHouseFormat clickHouseFormat,
39+
Collection<BufferedRequestState<ClickHousePayload>> state) {
40+
super(elementConverter,
41+
context,
42+
AsyncSinkWriterConfiguration.builder()
43+
.setMaxBatchSize(maxBatchSize)
44+
.setMaxBatchSizeInBytes(maxBatchSizeInBytes)
45+
.setMaxInFlightRequests(maxInFlightRequests)
46+
.setMaxBufferedRequests(maxBufferedRequests)
47+
.setMaxTimeInBufferMS(maxTimeInBufferMS)
48+
.setMaxRecordSizeInBytes(maxRecordSizeInBytes)
49+
.build(),
50+
state);
51+
this.clickHouseClientConfig = clickHouseClientConfig;
52+
this.clickHouseFormat = clickHouseFormat;
53+
}
54+
55+
@Override
56+
protected long getSizeInBytes(ClickHousePayload clickHousePayload) {
57+
return clickHousePayload.getPayloadLength();
58+
}
59+
60+
@Override
61+
protected void submitRequestEntries(List<ClickHousePayload> requestEntries, ResultHandler<ClickHousePayload> resultHandler) {
62+
LOG.info("Submitting request entries...");
63+
System.out.println("Submitting request entries...");
64+
AtomicInteger totalSizeSend = new AtomicInteger();
65+
Client chClient = this.clickHouseClientConfig.createClient();
66+
String tableName = clickHouseClientConfig.getTableName();
67+
// TODO: get from constructor or ClickHousePayload need to think what is the best way
68+
ClickHouseFormat format = null;
69+
if (clickHouseFormat == null) {
70+
// this not define lets try to get it from ClickHousePayload in case of POJO can be RowBinary or RowBinaryWithDefaults
71+
} else {
72+
format = clickHouseFormat;
73+
}
74+
try {
75+
CompletableFuture<InsertResponse> response = chClient.insert(tableName, out -> {
76+
for (ClickHousePayload requestEntry : requestEntries) {
77+
byte[] payload = requestEntry.getPayload();
78+
totalSizeSend.addAndGet(payload.length);
79+
out.write(payload);
80+
}
81+
LOG.info("Data that will be send to ClickHouse in bytes {} and the amount of records {}.", totalSizeSend.get(), requestEntries.size());
82+
out.close();
83+
84+
// .setOption(ClientConfigProperties.ASYNC_OPERATIONS.getKey(), "true").serverSetting(ServerSettings.WAIT_END_OF_QUERY, "0")
85+
}, format, new InsertSettings().setOption(ClientConfigProperties.ASYNC_OPERATIONS.getKey(), "true"));
86+
response.whenComplete((insertResponse, throwable) -> {
87+
if (throwable != null) {
88+
System.out.println(throwable.getMessage());
89+
} else {
90+
handleSuccessfulRequest(resultHandler, insertResponse);
91+
}
92+
}).join();
93+
} catch (Exception e) {
94+
LOG.error("Error: ", e);
95+
}
96+
LOG.info("Finished submitting request entries.");
97+
}
98+
99+
private void handleSuccessfulRequest(
100+
ResultHandler<ClickHousePayload> resultHandler, InsertResponse response) {
101+
resultHandler.complete();
102+
LOG.info("Successfully completed." + response.getWrittenRows());
103+
LOG.info("Successfully completed." + response.getServerTime());
104+
}
105+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package org.apache.flink.connector.clickhouse.sink;
2+
3+
import com.clickhouse.client.api.Client;
4+
import com.clickhouse.client.api.ClientConfigProperties;
5+
import org.apache.flink.connector.clickhouse.data.ClickHousePayload;
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
9+
import java.io.Serializable;
10+
11+
public class ClickHouseClientConfig implements Serializable {
12+
private static final Logger LOG = LoggerFactory.getLogger(ClickHouseClientConfig.class);
13+
14+
private final String url;
15+
private final String username;
16+
private final String password;
17+
private final String database;
18+
private final String tableName;
19+
20+
public ClickHouseClientConfig(String url, String username, String password, String database, String tableName) {
21+
this.url = url;
22+
this.username = username;
23+
this.password = password;
24+
this.database = database;
25+
this.tableName = tableName;
26+
}
27+
28+
public Client createClient(String database) {
29+
return new Client.Builder()
30+
.addEndpoint(url)
31+
.setUsername(username)
32+
.setPassword(password)
33+
.setDefaultDatabase(database)
34+
.setOption(ClientConfigProperties.ASYNC_OPERATIONS.getKey(), "true")
35+
.build();
36+
}
37+
38+
public Client createClient() {
39+
return createClient(this.database);
40+
}
41+
42+
public String getTableName() { return tableName; }
43+
44+
45+
}

0 commit comments

Comments
 (0)