Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 170 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,42 +8,205 @@ Table of Contents
* [Supported Flink Versions](#supported-flink-versions)
* [Installation](#installation)
* [DataStream API](#dataStream-api)
* [Snippets](#snippets)
* [Examples](#examples)
* [Snippets](#snippet)
* [Examples](#example)
* [Table API](#table-api)
* [Artifacts](#artifacts-1)
* [Examples](#examples-1)
* [Snippets](#snippet-1)
* [Examples](#example-1)
* [Supported ClickHouse Types](#supported-clickHouse-types)
* [Configuration Options](#configuration-options)
* [Client Configuration](#client-configuration)
* [Sink Configuration](#sink-configuration)
* [Limitations](#limitations)
* [Contributing](#contributing)

## About The Project

This is a repo of ClickHouse official Apache Flink Connector supported by the ClickHouse team.
The Connector supports to main Apache Flink API's
The connector supports two main Apache Flink APIs:
- DataStreamAPI
- Table API
- Table API (This feature is not implemented yet and is planned for a future release)

## Supported Flink Versions

| Version | Dependency | ClickHouse Client Version | Required Java |
|---------|----------------------------------|---------------------------|---------------|
| latest | flink-connector-clickhouse-2.0.0 | 0.9.1 | Java 17+ |
| 2.0.0 | flink-connector-clickhouse-2.0.0 | 0.9.1 | Java 17+ |
| 1.20.2 | flink-connector-clickhouse-1.17 | 0.9.1 | Java 11+ |
| 1.19.3 | flink-connector-clickhouse-1.17 | 0.9.1 | Java 11+ |
| 1.18.1 | flink-connector-clickhouse-1.17 | 0.9.1 | Java 11+ |
| 1.17.2 | flink-connector-clickhouse-1.17 | 0.9.1 | Java 11+ |

## Installation

### For Flink 2.0.0+

Maven

```xml
<dependency>
<groupId>com.clickhouse.flink</groupId>
<artifactId>flink-connector-clickhouse-2.0.0</artifactId>
<version>0.0.1</version>
<type>pom</type>
</dependency>
```

### For Flink 1.17+

Maven

```xml
<dependency>
<groupId>com.clickhouse.flink</groupId>
<artifactId>flink-connector-clickhouse-1.17</artifactId>
<version>0.0.1</version>
<type>pom</type>
</dependency>
```

## DataStream API

### Snippet

Configure ClickHouseClient

```java
ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(url, username, password, database, tableName);
```
If you are planning to insert RAW CSV data as is

Create an ElementConverter

```java
ElementConverter<String, ClickHousePayload> convertorString = new ClickHouseConvertor<>(String.class);
```

Create the sink and set the format using `setClickHouseFormat`

```java
ClickHouseAsyncSink<String> csvSink = new ClickHouseAsyncSink<>(
convertorString,
MAX_BATCH_SIZE,
MAX_IN_FLIGHT_REQUESTS,
MAX_BUFFERED_REQUESTS,
MAX_BATCH_SIZE_IN_BYTES,
MAX_TIME_IN_BUFFER_MS,
MAX_RECORD_SIZE_IN_BYTES,
clickHouseClientConfig
);

csvSink.setClickHouseFormat(ClickHouseFormat.CSV);
```

Finally, connect your DataStream to the sink.

```java
data.sinkTo(csvSink);
```

More examples and snippets can be found in our tests [flink-connector-clickhouse-1.17](flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink) and [flink-connector-clickhouse-2.0.0](flink-connector-clickhouse-2.0.0/src/test/java/org/apache/flink/connector/clickhouse/sink)

### Example

We have created maven based example for easy start with ClickHouse Sink
Different versions for Flink

- [Flink 1.17+](examples/maven/flink-v1.7/covid)
- [Flink 2.0.0+](examples/maven/flink-v2/covid)

For more detailed instructions, see the [Example Guide](examples#readme)

## Table API

Table API is planned for a future release. This section will be updated once available.

### Snippet

Planned for a future release — this section will provide a usage snippet for configuring the Table API.

### Example

Planned for a future release — a complete end-to-end example will be added once the Table API becomes available.

## Supported ClickHouse Types

| Java Type | ClickHouse Type | Supported | Serialize Method |
|-----------------|-----------------|-----------|-------------------------|
| byte/Byte | Int8 | ✅ | Serialize.writeInt8 |
| short/Short | Int16 | ✅ | Serialize.writeInt16 |
| int/Integer | Int32 | ✅ | Serialize.writeInt32 |
| long/Long | Int64 | ✅ | Serialize.writeInt64 |
| BigInteger | Int128 | ✅ | Serialize.writeInt124 |
| BigInteger | Int256 | ✅ | Serialize.writeInt256 |
| byte/Byte | UInt8 | ❌ | N/A |
| short/Short | UInt16 | ❌ | N/A |
| int/Integer | UInt32 | ❌ | N/A |
| long/Long | UInt64 | ❌ | N/A |
| BigInteger | UInt128 | ❌ | N/A |
| BigInteger | UInt256 | ❌ | N/A |
| BigDecimal | Decimal | ❌ | N/A |
| BigDecimal | Decimal32 | ❌ | N/A |
| BigDecimal | Decimal64 | ❌ | N/A |
| BigDecimal | Decimal128 | ❌ | N/A |
| BigDecimal | Decimal256 | ❌ | N/A |
| float/Float | Float | ✅ | Serialize.writeFloat32 |
| double/Double | Double | ✅ | Serialize.writeFloat64 |
| boolean/Boolean | Boolean | ✅ | Serialize.writeBoolean |
| String | String | ✅ | Serialize.writeString |
| String | FixedString | ❌ | N/A |
| LocalDate | Date | ❌ | N/A |
| LocalDate | Date32 | ❌ | N/A |
| LocalDateTime | DateTime | ❌ | N/A |
| LocalDateTime | DateTime64 | ❌ | N/A |
| int/Integer | Time | ❌ | N/A |
| long/Long | Time64 | ❌ | N/A |
| byte/Byte | Enum8 | ✅ | Serialize.writeInt8 |
| int/Integer | Enum16 | ✅ | Serialize.writeInt16 |
| String | JSON | ❌ | N/A |
| Array<Type> | Array<Type> | ❌ | N/A |
| Map<K,V> | Map<K,V> | ❌ | N/A |
| Tuple<Type,..> | Map<T1,T2,..> | ❌ | N/A |
| Object | Variant | ❌ | N/A |

* For date operation need to provide ZoneId.

## Configuration Options

### Client configuration

| Parameters | Description | Default Value |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can set some defaults to work with a localhost instance

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, we have one constructor for all params, a constructor without a URL parameter that is set to http://localhost:8123. Do you think that can somehow help the users?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, from the table below it's unclear that the connector has some defaults

|---------------|------------------------------|----------|
| url | fully qualified URL | N/A |
| username | ClickHouse database username | N/A |
| password | ClickHouse database password | N/A |
| database | ClickHouse database name | N/A |
| table | ClickHouse table name | N/A |

### Sink configuration

Our Sink is built on top of Flink’s `AsyncSinkBase`

| Parameters | Description | Default Value |
|---------------|---------------------------------------------------------------------------------------|----------|
| maxBatchSize | Maximum number of records inserted in a single batch | N/A |
| maxInFlightRequests | The maximum number of in flight requests allowed before the sink applies backpressure | N/A |
| maxBufferedRequests | The maximum number of records that may be buffered in the sink before backpressure is applied | N/A |
| maxBatchSizeInBytes | The maximum size (in bytes) a batch may become. All batches sent will be smaller than or equal to this size | N/A |
| maxTimeInBufferMS | The maximum time a record may stay in the sink before being flushed | N/A |
| maxRecordSizeInBytes | The maximum record size that the sink will accept, records larger than this will be automatically rejected | N/A |

## Limitations

* Currently the sink does not support exactly-once semantics


## Compatibility

- All projects in this repo are tested with all [active LTS versions](https://github.com/ClickHouse/ClickHouse/pulls?q=is%3Aopen+is%3Apr+label%3Arelease) of ClickHouse.
- [Support policy](https://github.com/ClickHouse/ClickHouse/blob/master/SECURITY.md#security-change-log-and-support)
- We recommend to upgrade connector continuously to not miss security fixes and new improvements
- We recommend upgrading the connector continuously to not miss security fixes and new improvements
- If you have an issue with migration - create and issue and we will respond!

## Contributing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,15 @@ void SimplePOJODataTest() throws Exception {
"integerObject Int32," +
"longPrimitive Int64," +
"longObject Int64," +
"bigInteger128 Int128," +
"bigInteger256 Int256," +
"floatPrimitive Float," +
"floatObject Float," +
"doublePrimitive Double," +
"doubleObject Double," +
"booleanPrimitive Boolean," +
"booleanObject Boolean," +
"str String," +
") " +
"ENGINE = MergeTree " +
"ORDER BY (longPrimitive); ";
Expand Down Expand Up @@ -464,10 +469,15 @@ void SimplePOJODataTooManyPartsTest() throws Exception {
"integerObject Int32," +
"longPrimitive Int64," +
"longObject Int64," +
"bigInteger128 Int128," +
"bigInteger256 Int256," +
"floatPrimitive Float," +
"floatObject Float," +
"doublePrimitive Double," +
"doubleObject Double," +
"booleanPrimitive Boolean," +
"booleanObject Boolean," +
"str String," +
") " +
"ENGINE = MergeTree " +
"ORDER BY (longPrimitive) " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,19 @@ public void instrument(OutputStream out, SimplePOJO input) throws IOException {
Serialize.writeInt64(out, input.getLongPrimitive(), false, false, ClickHouseDataType.Int64, false, "longPrimitive");
Serialize.writeInt64(out, input.getLongObject(), false, false, ClickHouseDataType.Int64, false, "longObject");

Serialize.writeInt128(out, input.getBigInteger128(), false, false, ClickHouseDataType.Int128, false, "bigInteger128");
Serialize.writeInt256(out, input.getBigInteger256(), false, false, ClickHouseDataType.Int256, false, "bigInteger256");

Serialize.writeFloat32(out, input.getFloatPrimitive(), false, false, ClickHouseDataType.Float32, false, "floatPrimitive");
Serialize.writeFloat32(out, input.getFloatObject(), false, false, ClickHouseDataType.Float32, false, "floatObject");

Serialize.writeFloat64(out, input.getDoublePrimitive(), false, false, ClickHouseDataType.Float64, false, "doublePrimitive");
Serialize.writeFloat64(out, input.getDoubleObject(), false, false, ClickHouseDataType.Float64, false, "doubleObject");

Serialize.writeBoolean(out, input.isBooleanPrimitive(), false, false, ClickHouseDataType.Bool, false, "booleanPrimitive");
Serialize.writeBoolean(out, input.getBooleanObject(), false, false, ClickHouseDataType.Bool, false, "booleanObject");

Serialize.writeString(out, input.getStr(), false, false, ClickHouseDataType.String, false, "String");

}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.apache.flink.connector.clickhouse.sink.pojo;

import java.math.BigInteger;

public class SimplePOJO {

private byte bytePrimitive;
Expand All @@ -20,6 +22,14 @@ public class SimplePOJO {
private double doublePrimitive;
private Double doubleObject;

private boolean booleanPrimitive;
private Boolean booleanObject;

private String str;

private BigInteger bigInteger128;
private BigInteger bigInteger256;

public SimplePOJO(int index) {
this.bytePrimitive = Byte.MIN_VALUE;
this.byteObject = Byte.MAX_VALUE;
Expand All @@ -38,6 +48,14 @@ public SimplePOJO(int index) {

this.doublePrimitive = Double.MIN_VALUE;
this.doubleObject = Double.MAX_VALUE;

this.booleanPrimitive = true;
this.booleanObject = Boolean.FALSE;

this.str = "str" + longPrimitive;

this.bigInteger128 = BigInteger.valueOf(longPrimitive);
this.bigInteger256 = BigInteger.valueOf(longPrimitive);
}

public byte getBytePrimitive() {
Expand Down Expand Up @@ -87,4 +105,14 @@ public double getDoublePrimitive() {
public Double getDoubleObject() {
return doubleObject;
}

public boolean isBooleanPrimitive() { return booleanPrimitive; }

public Boolean getBooleanObject() { return booleanObject; }

public String getStr() { return str; }

public BigInteger getBigInteger128() { return bigInteger128; }

public BigInteger getBigInteger256() { return bigInteger256; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.io.IOException;
import java.io.OutputStream;
import java.lang.reflect.Method;
import java.math.BigInteger;
import java.time.LocalDate;
import java.time.ZoneId;
import java.time.ZonedDateTime;
Expand Down Expand Up @@ -168,6 +169,20 @@ public static void writeInt64(OutputStream out, Long value, boolean defaultsSupp
}
}

// Int128
public static void writeInt128(OutputStream out, BigInteger value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column) throws IOException {
if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) {
BinaryStreamUtils.writeInt128(out, SerializerUtils.convertToBigInteger(value));
}
}

// Int256
public static void writeInt256(OutputStream out, BigInteger value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column) throws IOException {
if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) {
BinaryStreamUtils.writeInt256(out, SerializerUtils.convertToBigInteger(value));
}
}

// Float32
public static void writeFloat32(OutputStream out, Float value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column) throws IOException {
if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) {
Expand All @@ -182,4 +197,11 @@ public static void writeFloat64(OutputStream out, Double value, boolean defaults
}
}

// Boolean
public static void writeBoolean(OutputStream out, Boolean value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column) throws IOException {
if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) {
BinaryStreamUtils.writeBoolean(out, value);
}
}

}
Loading