Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 133 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,37 +13,167 @@ Table of Contents
* [Table API](#table-api)
* [Artifacts](#artifacts-1)
* [Examples](#examples-1)
* [Supported ClickHouse Types](#supported-clickHouse-types)
* [Configuration Options](#configuration-options)
* [Client Configuration](#client-configuration)
* [Sink Configuration](#sink-configuration)
* [Limitations](#limitations)
* [Contributing](#contributing)

## About The Project

This is a repo of ClickHouse official Apache Flink Connector supported by the ClickHouse team.
The Connector supports to main Apache Flink API's
The connector supports two main Apache Flink APIs:
- DataStreamAPI
- Table API
- Table API (This feature is not implemented yet and is planned for a future release)

## Supported Flink Versions

| Version | Dependency | ClickHouse Client Version |
|---------|----------------------------------|---------------------------|
| latest | flink-connector-clickhouse-2.0.0 | 0.9.1 |
| 2.0.0 | flink-connector-clickhouse-2.0.0 | 0.9.1 |
| 1.20.2 | flink-connector-clickhouse-1.17 | 0.9.1 |
| 1.19.3 | flink-connector-clickhouse-1.17 | 0.9.1 |
| 1.18.1 | flink-connector-clickhouse-1.17 | 0.9.1 |
| 1.17.2 | flink-connector-clickhouse-1.17 | 0.9.1 |

## Installation

### For Flink 2.0.0+

Maven

```xml
<dependency>
<groupId>com.clickhouse.flink</groupId>
<artifactId>flink-connector-clickhouse-2.0.0</artifactId>
<version>0.0.1</version>
<type>pom</type>
</dependency>
```

### For Flink 1.17+

Maven

```xml
<dependency>
<groupId>com.clickhouse.flink</groupId>
<artifactId>flink-connector-clickhouse-1.17</artifactId>
<version>0.0.1</version>
<type>pom</type>
</dependency>
```

## DataStream API

### Snippet

Configure ClickHouseClient

```java
ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(url, username, password, database, tableName);
```
If you are planning to insert RAW CSV data as is

Create an ElementConverter

```java
ElementConverter<String, ClickHousePayload> convertorString = new ClickHouseConvertor<>(String.class);
```

Create the sink and set the format using `setClickHouseFormat`

```java
ClickHouseAsyncSink<String> csvSink = new ClickHouseAsyncSink<>(
convertorString,
MAX_BATCH_SIZE,
MAX_IN_FLIGHT_REQUESTS,
MAX_BUFFERED_REQUESTS,
MAX_BATCH_SIZE_IN_BYTES,
MAX_TIME_IN_BUFFER_MS,
MAX_RECORD_SIZE_IN_BYTES,
clickHouseClientConfig
);

csvSink.setClickHouseFormat(ClickHouseFormat.CSV);
```

Finally, connect your DataStream to the sink.

```java
data.sinkTo(csvSink);
```

More examples and snippets can be found in our tests [flink-connector-clickhouse-1.17](flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink) and [flink-connector-clickhouse-2.0.0](flink-connector-clickhouse-2.0.0/src/test/java/org/apache/flink/connector/clickhouse/sink)

### Example

We have created maven based example for easy start with ClickHouse Sink
Different versions for Flink

- [Flink 1.17+](examples/maven/flink-v1.7/covid)
- [Flink 2.0.0+](examples/maven/flink-v2/covid)

For more detailed instructions, see the [Example Guide](examples#readme)

## Table API

Table API is planned for a future release. This section will be updated once available.

### Snippet

### Example

## Supported ClickHouse Types

| Java Type | ClickHouse Type | Supported | Serialize Method |
|-----------------|-----------------|-----------|------------------------|
| byte/Byte | Int8 | ✅ | Serialize.writeInt8 |
| short/Short | Int16 | ✅ | Serialize.writeInt16 |
| int/Integer | Int32 | ✅ | Serialize.writeInt32 |
| long/Long | Int64 | ✅ | Serialize.writeInt64 |
| float/Float | Float | ✅ | Serialize.writeFloat32 |
| double/Double | Double | ✅ | Serialize.writeFloat64 |
| boolean/Boolean | Boolean | ✅ | Serialize.writeBoolean |
| String | String | ✅ | Serialize.writeString |

## Configuration Options

### Client configuration

| Parameters | Description | Default Value |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can set some defaults to work with a localhost instance

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, we have one constructor for all params, a constructor without a URL parameter that is set to http://localhost:8123. Do you think that can somehow help the users?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, from the table below it's unclear that the connector has some defaults

|---------------|------------------------------|----------|
| url | fully qualified URL | N/A |
| username | ClickHouse database username | N/A |
| password | ClickHouse database password | N/A |
| database | ClickHouse database name | N/A |
| table | ClickHouse table name | N/A |

### Sink configuration

Our Sink is built on top of Flink’s `AsyncSinkBase`

| Parameters | Description | Default Value |
|---------------|---------------------------------------------------------------------------------------|----------|
| maxBatchSize | Maximum number of records inserted in a single batch | N/A |
| maxInFlightRequests | The maximum number of in flight requests allowed before the sink applies backpressure | N/A |
| maxBufferedRequests | The maximum number of records that may be buffered in the sink before backpressure is applied | N/A |
| maxBatchSizeInBytes | The maximum size (in bytes) a batch may become. All batches sent will be smaller than or equal to this size | N/A |
| maxTimeInBufferMS | The maximum time a record may stay in the sink before being flushed | N/A |
| maxRecordSizeInBytes | The maximum record size that the sink will accept, records larger than this will be automatically rejected | N/A |

## Limitations

* Currently the sink does not support exactly-once semantics


## Compatibility

- All projects in this repo are tested with all [active LTS versions](https://github.com/ClickHouse/ClickHouse/pulls?q=is%3Aopen+is%3Apr+label%3Arelease) of ClickHouse.
- [Support policy](https://github.com/ClickHouse/ClickHouse/blob/master/SECURITY.md#security-change-log-and-support)
- We recommend to upgrade connector continuously to not miss security fixes and new improvements
- We recommend upgrading the connector continuously to not miss security fixes and new improvements
- If you have an issue with migration - create and issue and we will respond!

## Contributing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,9 @@ void SimplePOJODataTest() throws Exception {
"floatObject Float," +
"doublePrimitive Double," +
"doubleObject Double," +
"booleanPrimitive Boolean," +
"booleanObject Boolean," +
"str String," +
") " +
"ENGINE = MergeTree " +
"ORDER BY (longPrimitive); ";
Expand Down Expand Up @@ -468,6 +471,9 @@ void SimplePOJODataTooManyPartsTest() throws Exception {
"floatObject Float," +
"doublePrimitive Double," +
"doubleObject Double," +
"booleanPrimitive Boolean," +
"booleanObject Boolean," +
"str String," +
") " +
"ENGINE = MergeTree " +
"ORDER BY (longPrimitive) " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,11 @@ public void instrument(OutputStream out, SimplePOJO input) throws IOException {

Serialize.writeFloat64(out, input.getDoublePrimitive(), false, false, ClickHouseDataType.Float64, false, "doublePrimitive");
Serialize.writeFloat64(out, input.getDoubleObject(), false, false, ClickHouseDataType.Float64, false, "doubleObject");

Serialize.writeBoolean(out, input.isBooleanPrimitive(), false, false, ClickHouseDataType.Bool, false, "booleanPrimitive");
Serialize.writeBoolean(out, input.getBooleanObject(), false, false, ClickHouseDataType.Bool, false, "booleanObject");

Serialize.writeString(out, input.getStr(), false, false, ClickHouseDataType.String, false, "String");

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ public class SimplePOJO {
private double doublePrimitive;
private Double doubleObject;

private boolean booleanPrimitive;
private Boolean booleanObject;

private String str;

public SimplePOJO(int index) {
this.bytePrimitive = Byte.MIN_VALUE;
this.byteObject = Byte.MAX_VALUE;
Expand All @@ -38,6 +43,11 @@ public SimplePOJO(int index) {

this.doublePrimitive = Double.MIN_VALUE;
this.doubleObject = Double.MAX_VALUE;

this.booleanPrimitive = true;
this.booleanObject = Boolean.FALSE;

this.str = "str" + longPrimitive;
}

public byte getBytePrimitive() {
Expand Down Expand Up @@ -87,4 +97,10 @@ public double getDoublePrimitive() {
public Double getDoubleObject() {
return doubleObject;
}

public boolean isBooleanPrimitive() { return booleanPrimitive; }

public Boolean getBooleanObject() { return booleanObject; }

public String getStr() { return str; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -182,4 +182,11 @@ public static void writeFloat64(OutputStream out, Double value, boolean defaults
}
}

// Boolean
public static void writeBoolean(OutputStream out, Boolean value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column) throws IOException {
if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) {
BinaryStreamUtils.writeBoolean(out, value);
}
}

}
Loading