Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/ci-pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,22 @@ jobs:
name: Flink ${{ matrix.flink }}
runs-on: ubuntu-latest
strategy:
fail-fast: true
fail-fast: false
max-parallel: 3
matrix:
flink: ['1.20','1.19','1.18','1.17','1.16','1.15']
flink: ['2.1','2.0']
env:
CUSTOM_MVN: mvn -B -ntp
steps:
- name: Checkout
uses: actions/checkout@v2
with:
fetch-depth: 0
- name: Set up JDK 8
- name: Set up JDK 11
uses: actions/setup-java@v3
with:
distribution: 'zulu'
java-version: '8'
java-version: '11'
cache: 'maven'
- name: Machine Info
run: |
Expand Down
4 changes: 3 additions & 1 deletion build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ flink_minor_version=$1
check_flink_version_supported $flink_minor_version
flink_version="$(get_flink_version $flink_minor_version)"
kafka_connector_version="$(get_kafka_connector_version $flink_minor_version)"
flink_shaded_guava_version="$(get_flink_shaded_guava_version $flink_minor_version)"

# control whether to run tests (default: skip tests)
skip_tests=true
Expand All @@ -54,7 +55,8 @@ fi
${MVN_CMD} clean package ${mvn_skip_flag} \
-Dflink.minor.version=${flink_minor_version} \
-Dflink.version=${flink_version} \
-Dkafka.connector.version=${kafka_connector_version}
-Dkafka.connector.version=${kafka_connector_version} \
-Dflink.shaded.guava.version=${flink_shaded_guava_version}

echo "*********************************************************************"
echo "Successfully build Flink StarRocks Connector for Flink $flink_minor_version"
Expand Down
27 changes: 24 additions & 3 deletions common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ if ! ${MVN_CMD} --version; then
fi
export MVN_CMD

SUPPORTED_MINOR_VERSION=("1.15" "1.16" "1.17" "1.18" "1.19" "1.20")
# version formats are different among flink versions
SUPPORTED_KAFKA_CONNECTOR_VERSION=("1.15.0" "1.16.0" "1.17.0" "3.0.1-1.18" "3.2.0-1.19" "3.4.0-1.20")
SUPPORTED_MINOR_VERSION=("2.0" "2.1")
SUPPORTED_KAFKA_CONNECTOR_VERSION=("4.0.1-2.0" "4.0.1-2.0")
SUPPORTED_FLINK_SHADED_VERSION=("32.1.3-jre-19.0" "33.4.0-jre-20.0")

VERSION_MESSAGE=$(IFS=, ; echo "${SUPPORTED_MINOR_VERSION[*]}")

function check_flink_version_supported() {
Expand Down Expand Up @@ -70,3 +71,23 @@ function get_kafka_connector_version() {
exit 1
fi
}

function get_flink_shaded_guava_version() {
local FLINK_MINOR_VERSION=$1
local index=-1
for ((i=0; i<${#SUPPORTED_MINOR_VERSION[@]}; i++)); do
if [ "${SUPPORTED_MINOR_VERSION[i]}" = "$FLINK_MINOR_VERSION" ]; then
index=$i
break
fi
done

if [ "$index" != -1 ];
then
local FLINK_SHADED_VERSION="${SUPPORTED_FLINK_SHADED_VERSION[index]}"
echo $FLINK_SHADED_VERSION
else
echo "Can't find flink shaded guava version for flink-${FLINK_MINOR_VERSION}"
exit 1
fi
}
4 changes: 2 additions & 2 deletions examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ limitations under the License.
<version>1.0-SNAPSHOT</version>

<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<file_encoding>UTF-8</file_encoding>
<flink.minor.version>1.17</flink.minor.version>
<flink.version>1.17.0</flink.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.legacy.api.TableSchema;

/**
* This example will show how to load records to StarRocks table using Flink DataStream.
Expand Down
21 changes: 11 additions & 10 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,20 @@ limitations under the License.

<properties>
<srfc.version>1.2.13</srfc.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<file_encoding>UTF-8</file_encoding>
<maven-enforcer-plugin.version>3.0.0-M3</maven-enforcer-plugin.version>
<maven-surefire-plugin.version>3.0.0-M4</maven-surefire-plugin.version>
<flink.minor.version>1.20</flink.minor.version>
<flink.version>1.20.0</flink.version>
<kafka.connector.version>3.4.0-1.20</kafka.connector.version>
<flink.minor.version>2.1</flink.minor.version>
<flink.version>2.1.1</flink.version>
<kafka.connector.version>4.0.1-2.0</kafka.connector.version>
<flink.shaded.guava.version>33.4.0-jre-20.0</flink.shaded.guava.version>
<arrow.version>5.0.0</arrow.version>
<kafka.version>2.8.1</kafka.version>
<scala.binary.version>2.12</scala.binary.version>
<guava.version>32.0.1-jre</guava.version>
<guava.version>33.4.0-jre</guava.version>
<shading.prefix>com.starrocks.shade</shading.prefix>
<alibaba-fastjson.version>1.2.83</alibaba-fastjson.version>
<jmockit.version>1.49</jmockit.version>
Expand Down Expand Up @@ -329,7 +330,7 @@ limitations under the License.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-guava</artifactId>
<version>31.1-jre-17.0</version>
<version>${flink.shaded.guava.version}</version>
<scope>test</scope>
</dependency>

Expand Down Expand Up @@ -368,8 +369,8 @@ limitations under the License.
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
<source>11</source>
<target>11</target>
<encoding>${file_encoding}</encoding>
</configuration>
</plugin>
Expand Down Expand Up @@ -629,7 +630,7 @@ limitations under the License.
</property>
</activation>
<properties>
<target.java.version>1.8</target.java.version>
<target.java.version>11</target.java.version>
</properties>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import com.starrocks.connector.flink.row.sink.StarRocksSinkRowBuilder;
import com.starrocks.connector.flink.table.sink.SinkFunctionFactory;
import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
import org.apache.flink.table.legacy.api.TableSchema;

public class StarRocksSink {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@

import com.starrocks.connector.flink.table.source.StarRocksDynamicSourceFunction;
import com.starrocks.connector.flink.table.source.StarRocksSourceOptions;

import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.legacy.api.TableSchema;


public class StarRocksSource {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,12 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep
properties.putAll(getSourceConfig(tablePath.getDatabaseName(), tablePath.getObjectName()));
properties.putAll(getSinkConfig(tablePath.getDatabaseName(), tablePath.getObjectName()));

return CatalogTable.of(flinkSchema, starRocksTable.getComment().orElse(null), new ArrayList<>(), properties);
return CatalogTable.newBuilder()
.schema(flinkSchema)
.comment(starRocksTable.getComment().orElse(null))
.partitionKeys(new ArrayList<>())
.options(properties)
.build();
} catch (StarRocksCatalogException e) {
throw new CatalogException(
String.format("Failed to get table %s in catalog %s", tablePath.getFullName(), getName()), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
package com.starrocks.connector.flink.catalog;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.legacy.api.TableSchema;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
Expand Down
20 changes: 10 additions & 10 deletions src/main/java/com/starrocks/connector/flink/cdc/DatabaseSync.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
Expand Down Expand Up @@ -122,7 +122,7 @@ public void build() throws Exception {
OutputTag<String> recordOutputTag = ParsingProcessFunction.createRecordOutputTag(table);
DataStream<String> sideOutput = parsedStream.getSideOutput(recordOutputTag);

int sinkParallel = sinkConfig.getInteger(StarRocksSinkOptions.SINK_PARALLELISM, sideOutput.getParallelism());
int sinkParallel = sinkConfig.get(StarRocksSinkOptions.SINK_PARALLELISM, sideOutput.getParallelism());

StarRocksSinkOptions starRocksSinkOptions = getStarRocksSinkOptions(table);
SinkFunction<String> starRockSink = StarRocksSink.sink(starRocksSinkOptions);
Expand All @@ -137,8 +137,8 @@ private boolean checkFastSchemaEvolution() {
}

private DebeziumJsonSerializer getSerializer(String table) {
String user = sinkConfig.getString(StarRocksSinkOptions.USERNAME);
String passwd = sinkConfig.getString(StarRocksSinkOptions.PASSWORD, "");
String user = sinkConfig.get(StarRocksSinkOptions.USERNAME);
String passwd = sinkConfig.get(StarRocksSinkOptions.PASSWORD, "");
String jdbcUrl = sinkConfig.get(StarRocksSinkOptions.JDBC_URL);

StarRocksOptions.Builder starRocksBuilder = StarRocksOptions.builder();
Expand All @@ -159,9 +159,9 @@ private DebeziumJsonSerializer getSerializer(String table) {
private StarRocksSinkOptions getStarRocksSinkOptions(String table) {
String jdbcUrl = sinkConfig.get(StarRocksSinkOptions.JDBC_URL);
String loadUrl = String.join(";", sinkConfig.get(StarRocksSinkOptions.LOAD_URL));
String user = sinkConfig.getString(StarRocksSinkOptions.USERNAME);
String passwd = sinkConfig.getString(StarRocksSinkOptions.PASSWORD, "");
String labelPrefix = sinkConfig.getString(StarRocksSinkOptions.SINK_LABEL_PREFIX);
String user = sinkConfig.get(StarRocksSinkOptions.USERNAME);
String passwd = sinkConfig.get(StarRocksSinkOptions.PASSWORD, "");
String labelPrefix = sinkConfig.get(StarRocksSinkOptions.SINK_LABEL_PREFIX);

StarRocksSinkOptions options = StarRocksSinkOptions.builder()
.withProperty("jdbc-url", jdbcUrl)
Expand All @@ -184,9 +184,9 @@ private StarRocksSinkOptions getStarRocksSinkOptions(String table) {
}

private StarRocksJdbcConnectionOptions getStarRocksConnectionOptions() {
String user = sinkConfig.getString(StarRocksSinkOptions.USERNAME);
String passwd = sinkConfig.getString(StarRocksSinkOptions.PASSWORD, "");
String jdbcUrl = sinkConfig.getString(StarRocksSinkOptions.JDBC_URL);
String user = sinkConfig.get(StarRocksSinkOptions.USERNAME);
String passwd = sinkConfig.get(StarRocksSinkOptions.PASSWORD, "");
String jdbcUrl = sinkConfig.get(StarRocksSinkOptions.JDBC_URL);
Preconditions.checkNotNull(user, "username is empty in sink-conf");
Preconditions.checkNotNull(jdbcUrl, "jdbcurl is empty in sink-conf");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
package com.starrocks.connector.flink.cdc;

import com.starrocks.connector.flink.cdc.mysql.MysqlDatabaseSync;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.MultipleParameterTool;
import org.apache.flink.util.StringUtils;

import java.util.Arrays;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.starrocks.connector.flink.cdc.DatabaseSync;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
Expand All @@ -40,7 +40,7 @@ public ParsingProcessFunction(DatabaseSync.TableNameConverter converter) {
}

@Override
public void open(Configuration parameters) throws Exception {
public void open(OpenContext context) throws Exception {
recordOutputTags = new HashMap<>();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogram;
import org.apache.flink.table.api.TableColumn;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.constraints.UniqueConstraint;
import org.apache.flink.table.legacy.api.TableColumn;
import org.apache.flink.table.legacy.api.TableSchema;
import org.apache.flink.table.legacy.api.constraints.UniqueConstraint;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
import com.starrocks.connector.flink.connection.StarRocksJdbcConnectionProvider;
import com.starrocks.connector.flink.table.StarRocksDataType;
import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
import org.apache.flink.table.api.TableColumn;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.constraints.UniqueConstraint;
import org.apache.flink.table.legacy.api.TableColumn;
import org.apache.flink.table.legacy.api.TableSchema;
import org.apache.flink.table.legacy.api.constraints.UniqueConstraint;
import org.apache.flink.table.types.logical.LogicalTypeRoot;

import java.util.ArrayList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import com.starrocks.connector.flink.table.StarRocksDataType;

import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.legacy.api.TableSchema;
import org.apache.flink.table.data.RowData;

import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import com.starrocks.connector.flink.table.StarRocksDataType;
import com.starrocks.connector.flink.tools.JsonWrapper;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.legacy.api.TableSchema;

import java.io.Serializable;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import com.starrocks.connector.flink.tools.JsonWrapper;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.legacy.api.TableSchema;
import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.GenericArrayData;
import org.apache.flink.table.data.GenericMapData;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package com.starrocks.connector.flink.table.sink;

import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.legacy.api.TableSchema;
import org.apache.flink.table.data.RowData;

import com.starrocks.connector.flink.manager.StarRocksSinkTable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@
import com.starrocks.connector.flink.row.sink.StarRocksSerializerFactory;
import com.starrocks.connector.flink.tools.EnvUtils;
import com.starrocks.connector.flink.tools.JsonWrapper;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.legacy.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.RowKind;
import org.slf4j.Logger;
Expand Down Expand Up @@ -79,8 +79,8 @@ public StarRocksDynamicSinkFunction(StarRocksSinkOptions sinkOptions) {
}

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
public void open(OpenContext context) throws Exception {
super.open(context);
this.jsonWrapper = new JsonWrapper();
if (serializer != null) {
this.serializer.open(new StarRocksISerializer.SerializerContext(jsonWrapper));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.legacy.RichSinkFunction;

/** Base class for all sink implementations. */
public abstract class StarRocksDynamicSinkFunctionBase<T> extends RichSinkFunction<T>
Expand Down
Loading