diff --git a/.github/workflows/ci-pipeline.yml b/.github/workflows/ci-pipeline.yml
index 6c307bfd8..eba62b789 100644
--- a/.github/workflows/ci-pipeline.yml
+++ b/.github/workflows/ci-pipeline.yml
@@ -36,10 +36,10 @@ jobs:
name: Flink ${{ matrix.flink }}
runs-on: ubuntu-latest
strategy:
- fail-fast: true
+ fail-fast: false
max-parallel: 3
matrix:
- flink: ['1.20','1.19','1.18','1.17','1.16','1.15']
+ flink: ['2.1','2.0']
env:
CUSTOM_MVN: mvn -B -ntp
steps:
@@ -47,11 +47,11 @@ jobs:
uses: actions/checkout@v2
with:
fetch-depth: 0
- - name: Set up JDK 8
+ - name: Set up JDK 11
uses: actions/setup-java@v3
with:
distribution: 'zulu'
- java-version: '8'
+ java-version: '11'
cache: 'maven'
- name: Machine Info
run: |
diff --git a/build.sh b/build.sh
index dfe888563..cc9874ec4 100755
--- a/build.sh
+++ b/build.sh
@@ -34,6 +34,7 @@ flink_minor_version=$1
check_flink_version_supported $flink_minor_version
flink_version="$(get_flink_version $flink_minor_version)"
kafka_connector_version="$(get_kafka_connector_version $flink_minor_version)"
+flink_shaded_guava_version="$(get_flink_shaded_guava_version $flink_minor_version)"
# control whether to run tests (default: skip tests)
skip_tests=true
@@ -54,7 +55,8 @@ fi
${MVN_CMD} clean package ${mvn_skip_flag} \
-Dflink.minor.version=${flink_minor_version} \
-Dflink.version=${flink_version} \
- -Dkafka.connector.version=${kafka_connector_version}
+ -Dkafka.connector.version=${kafka_connector_version} \
+ -Dflink.shaded.guava.version=${flink_shaded_guava_version}
echo "*********************************************************************"
echo "Successfully build Flink StarRocks Connector for Flink $flink_minor_version"
diff --git a/common.sh b/common.sh
index 117bf14d5..49d001c91 100644
--- a/common.sh
+++ b/common.sh
@@ -32,9 +32,10 @@ if ! ${MVN_CMD} --version; then
fi
export MVN_CMD
-SUPPORTED_MINOR_VERSION=("1.15" "1.16" "1.17" "1.18" "1.19" "1.20")
-# version formats are different among flink versions
-SUPPORTED_KAFKA_CONNECTOR_VERSION=("1.15.0" "1.16.0" "1.17.0" "3.0.1-1.18" "3.2.0-1.19" "3.4.0-1.20")
+SUPPORTED_MINOR_VERSION=("2.0" "2.1")
+SUPPORTED_KAFKA_CONNECTOR_VERSION=("4.0.1-2.0" "4.0.1-2.0")
+SUPPORTED_FLINK_SHADED_VERSION=("32.1.3-jre-19.0" "33.4.0-jre-20.0")
+
VERSION_MESSAGE=$(IFS=, ; echo "${SUPPORTED_MINOR_VERSION[*]}")
function check_flink_version_supported() {
@@ -70,3 +71,23 @@ function get_kafka_connector_version() {
exit 1
fi
}
+
+function get_flink_shaded_guava_version() {
+ local FLINK_MINOR_VERSION=$1
+ local index=-1
+ for ((i=0; i<${#SUPPORTED_MINOR_VERSION[@]}; i++)); do
+ if [ "${SUPPORTED_MINOR_VERSION[i]}" = "$FLINK_MINOR_VERSION" ]; then
+ index=$i
+ break
+ fi
+ done
+
+ if [ "$index" != -1 ];
+ then
+ local FLINK_SHADED_VERSION="${SUPPORTED_FLINK_SHADED_VERSION[index]}"
+ echo $FLINK_SHADED_VERSION
+ else
+ echo "Can't find flink shaded guava version for flink-${FLINK_MINOR_VERSION}"
+ exit 1
+ fi
+}
diff --git a/examples/pom.xml b/examples/pom.xml
index ba789af61..cd0144432 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -22,8 +22,8 @@ limitations under the License.
1.0-SNAPSHOT
- 1.8
- 1.8
+ 11
+ 11
UTF-8
1.17
1.17.0
diff --git a/examples/src/main/java/com/starrocks/connector/flink/examples/datastream/LoadCustomJavaRecords.java b/examples/src/main/java/com/starrocks/connector/flink/examples/datastream/LoadCustomJavaRecords.java
index 783e5888f..67e5509fe 100644
--- a/examples/src/main/java/com/starrocks/connector/flink/examples/datastream/LoadCustomJavaRecords.java
+++ b/examples/src/main/java/com/starrocks/connector/flink/examples/datastream/LoadCustomJavaRecords.java
@@ -29,7 +29,7 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.legacy.api.TableSchema;
/**
* This example will show how to load records to StarRocks table using Flink DataStream.
diff --git a/pom.xml b/pom.xml
index 160eed5c9..a48d5c979 100644
--- a/pom.xml
+++ b/pom.xml
@@ -47,19 +47,20 @@ limitations under the License.
1.2.13
- 1.8
- 1.8
+ 11
+ 11
UTF-8
UTF-8
3.0.0-M3
3.0.0-M4
- 1.20
- 1.20.0
- 3.4.0-1.20
+ 2.1
+ 2.1.1
+ 4.0.1-2.0
+ 33.4.0-jre-20.0
5.0.0
2.8.1
2.12
- 32.0.1-jre
+ 33.4.0-jre
com.starrocks.shade
1.2.83
1.49
@@ -329,7 +330,7 @@ limitations under the License.
org.apache.flink
flink-shaded-guava
- 31.1-jre-17.0
+ ${flink.shaded.guava.version}
test
@@ -368,8 +369,8 @@ limitations under the License.
org.apache.maven.plugins
maven-compiler-plugin
- 1.8
- 1.8
+ 11
+ 11
${file_encoding}
@@ -629,7 +630,7 @@ limitations under the License.
- 1.8
+ 11
diff --git a/src/main/java/com/starrocks/connector/flink/StarRocksSink.java b/src/main/java/com/starrocks/connector/flink/StarRocksSink.java
index 6bc7ecfcf..8d0c06965 100644
--- a/src/main/java/com/starrocks/connector/flink/StarRocksSink.java
+++ b/src/main/java/com/starrocks/connector/flink/StarRocksSink.java
@@ -19,8 +19,8 @@
import com.starrocks.connector.flink.row.sink.StarRocksSinkRowBuilder;
import com.starrocks.connector.flink.table.sink.SinkFunctionFactory;
import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
+import org.apache.flink.table.legacy.api.TableSchema;
public class StarRocksSink {
diff --git a/src/main/java/com/starrocks/connector/flink/StarRocksSource.java b/src/main/java/com/starrocks/connector/flink/StarRocksSource.java
index 619655dd8..9c2a47530 100644
--- a/src/main/java/com/starrocks/connector/flink/StarRocksSource.java
+++ b/src/main/java/com/starrocks/connector/flink/StarRocksSource.java
@@ -16,8 +16,7 @@
import com.starrocks.connector.flink.table.source.StarRocksDynamicSourceFunction;
import com.starrocks.connector.flink.table.source.StarRocksSourceOptions;
-
-import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.legacy.api.TableSchema;
public class StarRocksSource {
diff --git a/src/main/java/com/starrocks/connector/flink/catalog/FlinkCatalog.java b/src/main/java/com/starrocks/connector/flink/catalog/FlinkCatalog.java
index 6fdb5ffff..726843e28 100644
--- a/src/main/java/com/starrocks/connector/flink/catalog/FlinkCatalog.java
+++ b/src/main/java/com/starrocks/connector/flink/catalog/FlinkCatalog.java
@@ -278,7 +278,12 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep
properties.putAll(getSourceConfig(tablePath.getDatabaseName(), tablePath.getObjectName()));
properties.putAll(getSinkConfig(tablePath.getDatabaseName(), tablePath.getObjectName()));
- return CatalogTable.of(flinkSchema, starRocksTable.getComment().orElse(null), new ArrayList<>(), properties);
+ return CatalogTable.newBuilder()
+ .schema(flinkSchema)
+ .comment(starRocksTable.getComment().orElse(null))
+ .partitionKeys(new ArrayList<>())
+ .options(properties)
+ .build();
} catch (StarRocksCatalogException e) {
throw new CatalogException(
String.format("Failed to get table %s in catalog %s", tablePath.getFullName(), getName()), e);
diff --git a/src/main/java/com/starrocks/connector/flink/catalog/StarRocksUtils.java b/src/main/java/com/starrocks/connector/flink/catalog/StarRocksUtils.java
index e59231e0e..936d9453b 100644
--- a/src/main/java/com/starrocks/connector/flink/catalog/StarRocksUtils.java
+++ b/src/main/java/com/starrocks/connector/flink/catalog/StarRocksUtils.java
@@ -21,7 +21,7 @@
package com.starrocks.connector.flink.catalog;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.legacy.api.TableSchema;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
diff --git a/src/main/java/com/starrocks/connector/flink/cdc/DatabaseSync.java b/src/main/java/com/starrocks/connector/flink/cdc/DatabaseSync.java
index dc89ff8d4..63f569fbc 100755
--- a/src/main/java/com/starrocks/connector/flink/cdc/DatabaseSync.java
+++ b/src/main/java/com/starrocks/connector/flink/cdc/DatabaseSync.java
@@ -34,7 +34,7 @@
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
@@ -122,7 +122,7 @@ public void build() throws Exception {
OutputTag recordOutputTag = ParsingProcessFunction.createRecordOutputTag(table);
DataStream sideOutput = parsedStream.getSideOutput(recordOutputTag);
- int sinkParallel = sinkConfig.getInteger(StarRocksSinkOptions.SINK_PARALLELISM, sideOutput.getParallelism());
+ int sinkParallel = sinkConfig.get(StarRocksSinkOptions.SINK_PARALLELISM, sideOutput.getParallelism());
StarRocksSinkOptions starRocksSinkOptions = getStarRocksSinkOptions(table);
SinkFunction starRockSink = StarRocksSink.sink(starRocksSinkOptions);
@@ -137,8 +137,8 @@ private boolean checkFastSchemaEvolution() {
}
private DebeziumJsonSerializer getSerializer(String table) {
- String user = sinkConfig.getString(StarRocksSinkOptions.USERNAME);
- String passwd = sinkConfig.getString(StarRocksSinkOptions.PASSWORD, "");
+ String user = sinkConfig.get(StarRocksSinkOptions.USERNAME);
+ String passwd = sinkConfig.get(StarRocksSinkOptions.PASSWORD, "");
String jdbcUrl = sinkConfig.get(StarRocksSinkOptions.JDBC_URL);
StarRocksOptions.Builder starRocksBuilder = StarRocksOptions.builder();
@@ -159,9 +159,9 @@ private DebeziumJsonSerializer getSerializer(String table) {
private StarRocksSinkOptions getStarRocksSinkOptions(String table) {
String jdbcUrl = sinkConfig.get(StarRocksSinkOptions.JDBC_URL);
String loadUrl = String.join(";", sinkConfig.get(StarRocksSinkOptions.LOAD_URL));
- String user = sinkConfig.getString(StarRocksSinkOptions.USERNAME);
- String passwd = sinkConfig.getString(StarRocksSinkOptions.PASSWORD, "");
- String labelPrefix = sinkConfig.getString(StarRocksSinkOptions.SINK_LABEL_PREFIX);
+ String user = sinkConfig.get(StarRocksSinkOptions.USERNAME);
+ String passwd = sinkConfig.get(StarRocksSinkOptions.PASSWORD, "");
+ String labelPrefix = sinkConfig.get(StarRocksSinkOptions.SINK_LABEL_PREFIX);
StarRocksSinkOptions options = StarRocksSinkOptions.builder()
.withProperty("jdbc-url", jdbcUrl)
@@ -184,9 +184,9 @@ private StarRocksSinkOptions getStarRocksSinkOptions(String table) {
}
private StarRocksJdbcConnectionOptions getStarRocksConnectionOptions() {
- String user = sinkConfig.getString(StarRocksSinkOptions.USERNAME);
- String passwd = sinkConfig.getString(StarRocksSinkOptions.PASSWORD, "");
- String jdbcUrl = sinkConfig.getString(StarRocksSinkOptions.JDBC_URL);
+ String user = sinkConfig.get(StarRocksSinkOptions.USERNAME);
+ String passwd = sinkConfig.get(StarRocksSinkOptions.PASSWORD, "");
+ String jdbcUrl = sinkConfig.get(StarRocksSinkOptions.JDBC_URL);
Preconditions.checkNotNull(user, "username is empty in sink-conf");
Preconditions.checkNotNull(jdbcUrl, "jdbcurl is empty in sink-conf");
diff --git a/src/main/java/com/starrocks/connector/flink/cdc/StarRocksCdcTools.java b/src/main/java/com/starrocks/connector/flink/cdc/StarRocksCdcTools.java
index 1dc637b49..5f80d557b 100755
--- a/src/main/java/com/starrocks/connector/flink/cdc/StarRocksCdcTools.java
+++ b/src/main/java/com/starrocks/connector/flink/cdc/StarRocksCdcTools.java
@@ -20,9 +20,9 @@
package com.starrocks.connector.flink.cdc;
import com.starrocks.connector.flink.cdc.mysql.MysqlDatabaseSync;
-import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.MultipleParameterTool;
import org.apache.flink.util.StringUtils;
import java.util.Arrays;
diff --git a/src/main/java/com/starrocks/connector/flink/cdc/mysql/ParsingProcessFunction.java b/src/main/java/com/starrocks/connector/flink/cdc/mysql/ParsingProcessFunction.java
index aa8de2aa5..7f365359c 100755
--- a/src/main/java/com/starrocks/connector/flink/cdc/mysql/ParsingProcessFunction.java
+++ b/src/main/java/com/starrocks/connector/flink/cdc/mysql/ParsingProcessFunction.java
@@ -22,7 +22,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.starrocks.connector.flink.cdc.DatabaseSync;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
@@ -40,7 +40,7 @@ public ParsingProcessFunction(DatabaseSync.TableNameConverter converter) {
}
@Override
- public void open(Configuration parameters) throws Exception {
+ public void open(OpenContext context) throws Exception {
recordOutputTags = new HashMap<>();
}
diff --git a/src/main/java/com/starrocks/connector/flink/manager/StarRocksSinkManager.java b/src/main/java/com/starrocks/connector/flink/manager/StarRocksSinkManager.java
index bfa887fa3..a25953ec4 100644
--- a/src/main/java/com/starrocks/connector/flink/manager/StarRocksSinkManager.java
+++ b/src/main/java/com/starrocks/connector/flink/manager/StarRocksSinkManager.java
@@ -23,9 +23,9 @@
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogram;
-import org.apache.flink.table.api.TableColumn;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.api.constraints.UniqueConstraint;
+import org.apache.flink.table.legacy.api.TableColumn;
+import org.apache.flink.table.legacy.api.TableSchema;
+import org.apache.flink.table.legacy.api.constraints.UniqueConstraint;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.slf4j.Logger;
diff --git a/src/main/java/com/starrocks/connector/flink/manager/StarRocksSinkTable.java b/src/main/java/com/starrocks/connector/flink/manager/StarRocksSinkTable.java
index 1064952ee..cf9277f6f 100644
--- a/src/main/java/com/starrocks/connector/flink/manager/StarRocksSinkTable.java
+++ b/src/main/java/com/starrocks/connector/flink/manager/StarRocksSinkTable.java
@@ -25,9 +25,9 @@
import com.starrocks.connector.flink.connection.StarRocksJdbcConnectionProvider;
import com.starrocks.connector.flink.table.StarRocksDataType;
import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
-import org.apache.flink.table.api.TableColumn;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.api.constraints.UniqueConstraint;
+import org.apache.flink.table.legacy.api.TableColumn;
+import org.apache.flink.table.legacy.api.TableSchema;
+import org.apache.flink.table.legacy.api.constraints.UniqueConstraint;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import java.util.ArrayList;
diff --git a/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksGenericRowTransformer.java b/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksGenericRowTransformer.java
index 423fea0ad..ac0b69ce5 100644
--- a/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksGenericRowTransformer.java
+++ b/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksGenericRowTransformer.java
@@ -17,7 +17,7 @@
import com.starrocks.connector.flink.table.StarRocksDataType;
import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.legacy.api.TableSchema;
import org.apache.flink.table.data.RowData;
import java.util.Map;
diff --git a/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksIRowTransformer.java b/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksIRowTransformer.java
index 6dd9a4a46..8296a4870 100644
--- a/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksIRowTransformer.java
+++ b/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksIRowTransformer.java
@@ -17,7 +17,7 @@
import com.starrocks.connector.flink.table.StarRocksDataType;
import com.starrocks.connector.flink.tools.JsonWrapper;
import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.legacy.api.TableSchema;
import java.io.Serializable;
import java.util.Map;
diff --git a/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksTableRowTransformer.java b/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksTableRowTransformer.java
index 5818b4fd1..ccedd397a 100644
--- a/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksTableRowTransformer.java
+++ b/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksTableRowTransformer.java
@@ -20,7 +20,7 @@
import com.starrocks.connector.flink.tools.JsonWrapper;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.legacy.api.TableSchema;
import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.GenericArrayData;
import org.apache.flink.table.data.GenericMapData;
diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/SinkFunctionFactory.java b/src/main/java/com/starrocks/connector/flink/table/sink/SinkFunctionFactory.java
index 522374779..0b8c90d96 100644
--- a/src/main/java/com/starrocks/connector/flink/table/sink/SinkFunctionFactory.java
+++ b/src/main/java/com/starrocks/connector/flink/table/sink/SinkFunctionFactory.java
@@ -18,7 +18,7 @@
package com.starrocks.connector.flink.table.sink;
-import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.legacy.api.TableSchema;
import org.apache.flink.table.data.RowData;
import com.starrocks.connector.flink.manager.StarRocksSinkTable;
diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunction.java b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunction.java
index 201502f5b..1465d6ba1 100644
--- a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunction.java
+++ b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunction.java
@@ -25,15 +25,15 @@
import com.starrocks.connector.flink.row.sink.StarRocksSerializerFactory;
import com.starrocks.connector.flink.tools.EnvUtils;
import com.starrocks.connector.flink.tools.JsonWrapper;
+import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.legacy.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.RowKind;
import org.slf4j.Logger;
@@ -79,8 +79,8 @@ public StarRocksDynamicSinkFunction(StarRocksSinkOptions sinkOptions) {
}
@Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
+ public void open(OpenContext context) throws Exception {
+ super.open(context);
this.jsonWrapper = new JsonWrapper();
if (serializer != null) {
this.serializer.open(new StarRocksISerializer.SerializerContext(jsonWrapper));
diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunctionBase.java b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunctionBase.java
index a054e738b..c749fb8b5 100644
--- a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunctionBase.java
+++ b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunctionBase.java
@@ -20,7 +20,7 @@
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.legacy.RichSinkFunction;
/** Base class for all sink implementations. */
public abstract class StarRocksDynamicSinkFunctionBase extends RichSinkFunction
diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunctionV2.java b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunctionV2.java
index d5628c777..ef1d60b7b 100644
--- a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunctionV2.java
+++ b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunctionV2.java
@@ -33,16 +33,16 @@
import com.starrocks.data.load.stream.LabelGeneratorFactory;
import com.starrocks.data.load.stream.StreamLoadSnapshot;
import com.starrocks.data.load.stream.v2.StreamLoadManagerV2;
+import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
-import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.legacy.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.RowKind;
import org.slf4j.Logger;
@@ -175,7 +175,7 @@ public void invoke(T value, Context context) throws Exception {
}
@Override
- public void open(Configuration parameters) throws Exception {
+ public void open(OpenContext context) throws Exception {
totalReceivedRows = 0;
if (serializer != null) {
this.serializer.open(new StarRocksISerializer.SerializerContext(getOrCreateJsonWrapper()));
@@ -193,8 +193,8 @@ public void open(Configuration parameters) throws Exception {
} else {
this.exactlyOnceLabelFactory = new ExactlyOnceLabelGeneratorFactory(
labelPrefix,
- getRuntimeContext().getNumberOfParallelSubtasks(),
- getRuntimeContext().getIndexOfThisSubtask(),
+ getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks(),
+ getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(),
restoredCheckpointId);
exactlyOnceLabelFactory.restore(restoredGeneratorSnapshots);
labelGeneratorFactory = exactlyOnceLabelFactory;
@@ -219,7 +219,7 @@ private void openForExactlyOnce() throws Exception {
LingeringTransactionAborter aborter = new LingeringTransactionAborter(
sinkOptions.getLabelPrefix(),
restoredCheckpointId,
- getRuntimeContext().getIndexOfThisSubtask(),
+ getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(),
sinkOptions.getAbortCheckNumTxns(),
sinkOptions.getDbTables(),
restoredGeneratorSnapshots,
diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicTableSink.java b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicTableSink.java
index 41c80227b..a09048411 100644
--- a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicTableSink.java
+++ b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicTableSink.java
@@ -17,13 +17,13 @@
import com.starrocks.connector.flink.row.sink.StarRocksTableRowTransformer;
import com.starrocks.connector.flink.table.sink.v2.StarRocksSink;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
-import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.connector.sink.SinkV2Provider;
+import org.apache.flink.table.connector.sink.legacy.SinkFunctionProvider;
import org.apache.flink.table.data.RowData;
-
+import org.apache.flink.table.legacy.api.TableSchema;
+
public class StarRocksDynamicTableSink implements DynamicTableSink {
private transient TableSchema flinkSchema;
diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicTableSinkFactory.java b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicTableSinkFactory.java
index 4706c431c..276114928 100644
--- a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicTableSinkFactory.java
+++ b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicTableSinkFactory.java
@@ -16,7 +16,7 @@
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.legacy.api.TableSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.FactoryUtil;
diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/v2/DefaultStarRocksSinkContext.java b/src/main/java/com/starrocks/connector/flink/table/sink/v2/DefaultStarRocksSinkContext.java
index 28a239fff..88f29d890 100644
--- a/src/main/java/com/starrocks/connector/flink/table/sink/v2/DefaultStarRocksSinkContext.java
+++ b/src/main/java/com/starrocks/connector/flink/table/sink/v2/DefaultStarRocksSinkContext.java
@@ -20,23 +20,22 @@
package com.starrocks.connector.flink.table.sink.v2;
-import org.apache.flink.api.connector.sink2.Sink;
-
import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
/** Default implementation for {@link StarRocksSinkContext}. */
public class DefaultStarRocksSinkContext implements StarRocksSinkContext {
- private final Sink.InitContext initContext;
+ private final WriterInitContext initContext;
private final StarRocksSinkOptions sinkOptions;
- public DefaultStarRocksSinkContext(Sink.InitContext initContext, StarRocksSinkOptions sinkOptions) {
+ public DefaultStarRocksSinkContext(WriterInitContext initContext, StarRocksSinkOptions sinkOptions) {
this.initContext = initContext;
this.sinkOptions = sinkOptions;
}
@Override
- public Sink.InitContext getInitContext() {
+ public WriterInitContext getInitContext() {
return initContext;
}
diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksSink.java b/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksSink.java
index e992cb667..22e744c44 100644
--- a/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksSink.java
+++ b/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksSink.java
@@ -21,8 +21,11 @@
package com.starrocks.connector.flink.table.sink.v2;
import org.apache.flink.api.connector.sink2.Committer;
-import org.apache.flink.api.connector.sink2.StatefulSink;
-import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+import org.apache.flink.api.connector.sink2.CommitterInitContext;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SupportsCommitter;
+import org.apache.flink.api.connector.sink2.SupportsWriterState;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
@@ -35,7 +38,9 @@
import java.util.Collections;
public class StarRocksSink
- implements StatefulSink, TwoPhaseCommittingSink {
+ implements Sink,
+ SupportsWriterState,
+ SupportsCommitter {
private static final long serialVersionUID = 1L;
@@ -55,12 +60,12 @@ public StarRocksSink(
}
@Override
- public StarRocksWriter createWriter(InitContext context) throws IOException {
+ public StarRocksWriter createWriter(WriterInitContext context) throws IOException {
return restoreWriter(context, Collections.emptyList());
}
@Override
- public StarRocksWriter restoreWriter(InitContext context, Collection recoveredState)
+ public StarRocksWriter restoreWriter(WriterInitContext context, Collection recoveredState)
throws IOException {
try {
return new StarRocksWriter<>(
@@ -81,7 +86,7 @@ public SimpleVersionedSerializer getWriterStateSerializer(
}
@Override
- public Committer createCommitter() throws IOException {
+ public Committer createCommitter(CommitterInitContext context) throws IOException {
return new StarRocksCommitter(sinkOptions, streamLoadProperties);
}
diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksSinkContext.java b/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksSinkContext.java
index 5b2fe6f50..9b82c1f86 100644
--- a/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksSinkContext.java
+++ b/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksSinkContext.java
@@ -20,15 +20,14 @@
package com.starrocks.connector.flink.table.sink.v2;
-import org.apache.flink.api.connector.sink2.Sink;
-
import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
/** This context provides information for {@link RecordSerializationSchema}. */
public interface StarRocksSinkContext {
/** Returns the current sink's init context. */
- Sink.InitContext getInitContext();
+ WriterInitContext getInitContext();
/** Returns the sink options . */
StarRocksSinkOptions getSinkOptions();
diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksWriter.java b/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksWriter.java
index 240888be6..2609c76d1 100644
--- a/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksWriter.java
+++ b/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksWriter.java
@@ -21,9 +21,9 @@
package com.starrocks.connector.flink.table.sink.v2;
import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.api.connector.sink2.Sink;
-import org.apache.flink.api.connector.sink2.StatefulSink;
-import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+import org.apache.flink.api.connector.sink2.CommittingSinkWriter;
+import org.apache.flink.api.connector.sink2.StatefulSinkWriter;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import com.starrocks.connector.flink.manager.StarRocksStreamLoadListener;
@@ -48,8 +48,8 @@
import java.util.List;
public class StarRocksWriter
- implements StatefulSink.StatefulSinkWriter,
- TwoPhaseCommittingSink.PrecommittingSinkWriter {
+ implements StatefulSinkWriter,
+ CommittingSinkWriter {
private static final Logger LOG = LoggerFactory.getLogger(StarRocksWriter.class);
@@ -62,7 +62,7 @@ public class StarRocksWriter
public StarRocksWriter(
StarRocksSinkOptions sinkOptions,
- Sink.InitContext initContext,
+ WriterInitContext initContext,
SerializationSchema.InitializationContext schemaContext,
RecordSerializationSchema serializationSchema,
StreamLoadProperties streamLoadProperties,
@@ -86,8 +86,8 @@ public StarRocksWriter(
} else {
this.labelGeneratorFactory = new ExactlyOnceLabelGeneratorFactory(
labelPrefix,
- initContext.getNumberOfParallelSubtasks(),
- initContext.getSubtaskId(),
+ initContext.getTaskInfo().getNumberOfParallelSubtasks(),
+ initContext.getTaskInfo().getIndexOfThisSubtask(),
restoredCheckpointId);
}
@@ -113,7 +113,7 @@ public StarRocksWriter(
LingeringTransactionAborter aborter = new LingeringTransactionAborter(
sinkOptions.getLabelPrefix(),
restoredCheckpointId,
- initContext.getSubtaskId(),
+ initContext.getTaskInfo().getIndexOfThisSubtask(),
sinkOptions.getAbortCheckNumTxns(),
sinkOptions.getDbTables(),
restoredGeneratorSnapshots,
diff --git a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicSourceFunction.java b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicSourceFunction.java
index 8ecebf633..87f77fe9f 100644
--- a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicSourceFunction.java
+++ b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicSourceFunction.java
@@ -14,13 +14,12 @@
package com.starrocks.connector.flink.table.source;
+import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.streaming.api.functions.source.legacy.RichParallelSourceFunction;
import org.apache.flink.table.data.RowData;
import com.google.common.base.Strings;
@@ -29,6 +28,7 @@
import com.starrocks.connector.flink.table.source.struct.QueryInfo;
import com.starrocks.connector.flink.table.source.struct.SelectColumn;
import com.starrocks.connector.flink.tools.EnvUtils;
+import org.apache.flink.table.legacy.api.TableSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -136,19 +136,19 @@ private String genSQL(StarRocksSourceQueryType queryType, SelectColumn[] selectC
}
@Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
+ public void open(OpenContext context) throws Exception {
+ super.open(context);
this.dataReaderClosed = new AtomicBoolean(false);
this.counterTotalScannedRows = getRuntimeContext().getMetricGroup().counter(TOTAL_SCANNED_ROWS);
- int subTaskId = getRuntimeContext().getIndexOfThisSubtask();
+ int subTaskId = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
if (this.queryType == StarRocksSourceQueryType.QueryCount) {
if (subTaskId == 0) {
StarRocksSourceTrickReader reader = new StarRocksSourceTrickReader(this.dataCount);
this.dataReaderList.add(reader);
}
} else {
- List> lists = StarRocksSourceCommonFunc.splitQueryBeXTablets(getRuntimeContext().getNumberOfParallelSubtasks(), queryInfo);
+ List> lists = StarRocksSourceCommonFunc.splitQueryBeXTablets(getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks(), queryInfo);
lists.get(subTaskId).forEach(beXTablets -> {
StarRocksSourceBeReader beReader = new StarRocksSourceBeReader(beXTablets.getBeNode(), columnRichInfos, selectColumns, sourceOptions);
this.dataReaderList.add(beReader);
diff --git a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSource.java b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSource.java
index e8ea05830..71f28e432 100644
--- a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSource.java
+++ b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSource.java
@@ -14,14 +14,14 @@
package com.starrocks.connector.flink.table.source;
+import org.apache.flink.legacy.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.legacy.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
-import org.apache.flink.table.connector.source.SourceFunctionProvider;
-import org.apache.flink.table.connector.source.TableFunctionProvider;
+import org.apache.flink.table.legacy.connector.source.TableFunctionProvider;
import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
@@ -30,6 +30,7 @@
import com.starrocks.connector.flink.table.source.struct.ColumnRichInfo;
import com.starrocks.connector.flink.table.source.struct.PushDownHolder;
import com.starrocks.connector.flink.table.source.struct.SelectColumn;
+import org.apache.flink.table.types.DataType;
import java.util.ArrayList;
import java.util.Arrays;
@@ -109,7 +110,7 @@ public boolean supportsNestedProjection() {
}
@Override
- public void applyProjection(int[][] projectedFields) {
+ public void applyProjection(int[][] projectedFields, DataType dataType) {
// if columns = "*", this func will not be called, so 'selectColumns' will be null
int[] curProjectedFields = Arrays.stream(projectedFields).mapToInt(value -> value[0]).toArray();
if (curProjectedFields.length == 0 ) {
diff --git a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSourceFactory.java b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSourceFactory.java
index e340b2cd8..7d6fd913f 100644
--- a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSourceFactory.java
+++ b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSourceFactory.java
@@ -17,7 +17,7 @@
import com.starrocks.connector.flink.table.source.struct.PushDownHolder;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.legacy.api.TableSchema;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
diff --git a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksSourceCommonFunc.java b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksSourceCommonFunc.java
index eb2db84af..a47a0e874 100644
--- a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksSourceCommonFunc.java
+++ b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksSourceCommonFunc.java
@@ -23,8 +23,8 @@
import com.starrocks.connector.flink.table.source.struct.SelectColumn;
import com.starrocks.connector.flink.table.source.struct.ColumnRichInfo;
-import org.apache.flink.table.api.TableColumn;
-import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.legacy.api.TableColumn;
+import org.apache.flink.table.legacy.api.TableSchema;
import java.io.IOException;
import java.util.ArrayList;
diff --git a/src/main/java/com/starrocks/connector/flink/tools/ExecuteSQL.java b/src/main/java/com/starrocks/connector/flink/tools/ExecuteSQL.java
index 6e093e8de..4c3ca196e 100644
--- a/src/main/java/com/starrocks/connector/flink/tools/ExecuteSQL.java
+++ b/src/main/java/com/starrocks/connector/flink/tools/ExecuteSQL.java
@@ -15,7 +15,7 @@
package com.starrocks.connector.flink.tools;
import com.google.common.base.Strings;
-import org.apache.flink.api.java.utils.MultipleParameterTool;
+import org.apache.flink.util.MultipleParameterTool;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
diff --git a/src/test/java/com/starrocks/connector/flink/StarRocksSinkBaseTest.java b/src/test/java/com/starrocks/connector/flink/StarRocksSinkBaseTest.java
index 097c459be..f674d055b 100644
--- a/src/test/java/com/starrocks/connector/flink/StarRocksSinkBaseTest.java
+++ b/src/test/java/com/starrocks/connector/flink/StarRocksSinkBaseTest.java
@@ -27,7 +27,6 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import com.alibaba.fastjson.JSON;
@@ -38,14 +37,12 @@
import com.starrocks.connector.flink.table.sink.StarRocksSinkSemantic;
import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.api.TableSchema.Builder;
+import org.apache.flink.table.legacy.api.TableSchema;
import org.junit.After;
import org.junit.Before;
import mockit.Mocked;
-import mockit.Tested;
import mockit.Expectations;
public abstract class StarRocksSinkBaseTest {
@@ -94,7 +91,7 @@ public void initializeTableSchema() {
TABLE_SCHEMA = builder.build();
}
- protected Builder createTableSchemaBuilder() {
+ protected TableSchema.Builder createTableSchemaBuilder() {
return TableSchema.builder()
.field("k1", DataTypes.TINYINT())
.field("k2", DataTypes.VARCHAR(16))
diff --git a/src/test/java/com/starrocks/connector/flink/it/catalog/FlinkCatalogTest.java b/src/test/java/com/starrocks/connector/flink/it/catalog/FlinkCatalogTest.java
index a484034f4..dafa66ec2 100644
--- a/src/test/java/com/starrocks/connector/flink/it/catalog/FlinkCatalogTest.java
+++ b/src/test/java/com/starrocks/connector/flink/it/catalog/FlinkCatalogTest.java
@@ -26,17 +26,18 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
-import org.apache.flink.table.api.TableColumn;
-import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.AbstractCatalogTable;
+import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
-import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.legacy.api.TableColumn;
+import org.apache.flink.table.legacy.api.TableSchema;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CollectionUtil;
@@ -46,7 +47,10 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Optional;
import static com.starrocks.connector.flink.catalog.TypeUtils.MAX_VARCHAR_SIZE;
import static com.starrocks.connector.flink.catalog.TypeUtils.STRING_SIZE;
@@ -189,7 +193,7 @@ public void testCreateTable() throws Exception {
String tbl = "tbl_" + genRandomUuid();
ObjectPath objectPath = new ObjectPath(db, tbl);
TableSchema flinkSchema = createAllTypesFlinkSchema();
- CatalogBaseTable catalogTable = new CatalogTableImpl(flinkSchema, Collections.emptyMap(), null);
+ CatalogBaseTable catalogTable = new MockedCatalogTable(flinkSchema, Collections.emptyMap(), null);
assertFalse(catalog.tableExists(objectPath));
catalog.createTable(objectPath, catalogTable, false);
assertTrue(catalog.tableExists(objectPath));
@@ -383,4 +387,45 @@ private String createPkTable(String tablePrefix) throws Exception {
executeSrSQL(createStarRocksTable);
return tableName;
}
+
+ static class MockedCatalogTable extends AbstractCatalogTable {
+
+ public MockedCatalogTable(
+ TableSchema tableSchema, Map properties, String comment) {
+ this(tableSchema, new ArrayList<>(), properties, comment);
+ }
+
+ public MockedCatalogTable(
+ TableSchema tableSchema,
+ List partitionKeys,
+ Map properties,
+ String comment) {
+ super(tableSchema, partitionKeys, properties, comment);
+ }
+
+ @Override
+ public CatalogBaseTable copy() {
+ return new MockedCatalogTable(
+ getSchema().copy(),
+ new ArrayList<>(getPartitionKeys()),
+ new HashMap<>(getOptions()),
+ getComment());
+ }
+
+ @Override
+ public CatalogTable copy(Map options) {
+ return new MockedCatalogTable(getSchema(), getPartitionKeys(), options, getComment());
+ }
+
+ @Override
+ public Optional getDescription() {
+ return Optional.of(getComment());
+ }
+
+ @Override
+ public Optional getDetailedDescription() {
+ return Optional.of("This is a catalog table in an im-memory catalog");
+ }
+
+ }
}
diff --git a/src/test/java/com/starrocks/connector/flink/it/container/StarRocksITTest.java b/src/test/java/com/starrocks/connector/flink/it/container/StarRocksITTest.java
index 499ba1580..3df2eb58d 100644
--- a/src/test/java/com/starrocks/connector/flink/it/container/StarRocksITTest.java
+++ b/src/test/java/com/starrocks/connector/flink/it/container/StarRocksITTest.java
@@ -28,10 +28,9 @@
import com.starrocks.connector.flink.table.sink.StarRocksSinkSemantic;
import com.starrocks.connector.flink.table.source.StarRocksSourceOptions;
import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.CheckpointListener;
-import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
@@ -41,10 +40,11 @@
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.RestartStrategyUtils;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
-import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.legacy.api.TableSchema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.Row;
@@ -57,6 +57,7 @@
import java.sql.Date;
import java.sql.Timestamp;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -65,6 +66,7 @@
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
+import static org.apache.flink.configuration.CoreOptions.CHECK_LEAKED_CLASSLOADER;
import static org.apache.flink.table.api.Expressions.$;
import static org.junit.Assert.assertNull;
@@ -115,7 +117,7 @@ public void testSink() throws Exception {
STARROCKS_CLUSTER.executeMysqlCommand(createTable);
Configuration conf = new Configuration();
- conf.setBoolean("classloader.check-leaked-classloader", false);
+ conf.set(CHECK_LEAKED_CLASSLOADER, false);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(1);
StreamTableEnvironment tEnv;
@@ -189,7 +191,7 @@ public void testSource() {
STARROCKS_CLUSTER.executeMysqlCommand(inserIntoData);
Configuration conf = new Configuration();
- conf.setBoolean("classloader.check-leaked-classloader", false);
+ conf.set(CHECK_LEAKED_CLASSLOADER, false);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(1);
StreamTableEnvironment tEnv;
@@ -268,7 +270,7 @@ public void testLookup() {
STARROCKS_CLUSTER.executeMysqlCommand(inserIntoData);
Configuration conf = new Configuration();
- conf.setBoolean("classloader.check-leaked-classloader", false);
+ conf.set(CHECK_LEAKED_CLASSLOADER, false);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(1);
StreamTableEnvironment tEnv;
@@ -355,7 +357,7 @@ public void testTableAPIWithRestartCluster() throws Exception {
STARROCKS_CLUSTER.executeMysqlCommand(createTable);
Configuration conf = new Configuration();
- conf.setBoolean("classloader.check-leaked-classloader", false);
+ conf.set(CHECK_LEAKED_CLASSLOADER, false);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(1);
StreamTableEnvironment tEnv;
@@ -474,8 +476,8 @@ public ClusterKillingMapper(
}
@Override
- public void open(Configuration parameters) {
- failer = getRuntimeContext().getIndexOfThisSubtask() == 0;
+ public void open(OpenContext context) {
+ failer = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask() == 0;
}
@Override
@@ -588,7 +590,9 @@ public void testDataStreamAPIWithRestartCluster() throws Exception {
Thread.sleep(10000);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(30, Time.seconds(40)));
+ RestartStrategyUtils.configureFixedDelayRestartStrategy(
+ env, 30, Duration.ofSeconds(40)
+ );
env.addSource(StarRocksSource.source(tableSchema, sourceOptions)).map(new ClusterKillingMapper(STARROCKS_CLUSTER,2)).addSink(StarRocksSink.sink(tableSchema, sinkOptions, new StarRocksSinkRowBuilder() {
@Override
public void accept(Object[] objects, RowData rowData) {
diff --git a/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksGenericSinkITTest.java b/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksGenericSinkITTest.java
index 7a45633a6..92c129776 100644
--- a/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksGenericSinkITTest.java
+++ b/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksGenericSinkITTest.java
@@ -17,10 +17,10 @@
import com.starrocks.connector.flink.StarRocksSink;
import com.starrocks.connector.flink.StarRocksSinkBaseTest;
import mockit.Expectations;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.RestartStrategyUtils;
import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.legacy.api.TableSchema;
import org.junit.Ignore;
import org.junit.Test;
@@ -78,7 +78,7 @@ public void testBatchSink() {
mockStarRocksVersion(null);
mockSuccessResponse();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration());
+ RestartStrategyUtils.configureNoRestartStrategy(env);
env.setParallelism(1);
env.fromElements(TEST_DATA)
.addSink(StarRocksSink.sink(
diff --git a/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksSinkITTest.java b/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksSinkITTest.java
index 2025b9a81..3353757bd 100644
--- a/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksSinkITTest.java
+++ b/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksSinkITTest.java
@@ -21,9 +21,9 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
@@ -54,6 +54,7 @@
import static com.starrocks.connector.flink.it.sink.StarRocksTableUtils.scanTable;
import static com.starrocks.connector.flink.it.sink.StarRocksTableUtils.verifyResult;
+import static org.apache.flink.configuration.CheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assume.assumeTrue;
@@ -430,9 +431,10 @@ public void testExactlyOnce() throws Exception {
testConfigurationBase(options,
env -> {
env.enableCheckpointing(1000);
- env.getCheckpointConfig().setCheckpointStorage(checkpointDir);
Configuration config = new Configuration();
- config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
+ config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
+ config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir);
+ config.set(ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
env.configure(config);
return null;
}
@@ -450,9 +452,10 @@ public void testEnableExactlyOnceLabelGen() throws Exception {
testConfigurationBase(options,
env -> {
env.enableCheckpointing(1000);
- env.getCheckpointConfig().setCheckpointStorage(checkpointDir);
Configuration config = new Configuration();
- config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
+ config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
+ config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir);
+ config.set(ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
env.configure(config);
return null;
}
@@ -469,9 +472,10 @@ public void testAbortLingeringTransactions() throws Exception {
testConfigurationBase(options,
env -> {
env.enableCheckpointing(1000);
- env.getCheckpointConfig().setCheckpointStorage(checkpointDir);
Configuration config = new Configuration();
- config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
+ config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
+ config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir);
+ config.set(ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
env.configure(config);
return null;
}
@@ -489,9 +493,10 @@ public void testAbortLingeringTransactionsWithCheckNum() throws Exception {
testConfigurationBase(options,
env -> {
env.enableCheckpointing(1000);
- env.getCheckpointConfig().setCheckpointStorage(checkpointDir);
Configuration config = new Configuration();
- config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
+ config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
+ config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir);
+ config.set(ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
env.configure(config);
return null;
}
@@ -546,9 +551,10 @@ private void testPreparedTimeoutBase(String sinkSemantic, @Nullable String confi
testConfigurationBase(options,
env -> {
env.enableCheckpointing(1000);
- env.getCheckpointConfig().setCheckpointStorage(checkpointDir);
Configuration config = new Configuration();
- config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
+ config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
+ config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir);
+ config.set(ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
env.configure(config);
return null;
}
diff --git a/src/test/java/com/starrocks/connector/flink/it/sink/kafka/KafkaTableTestBase.java b/src/test/java/com/starrocks/connector/flink/it/sink/kafka/KafkaTableTestBase.java
index 5787e712d..a52d18ddb 100644
--- a/src/test/java/com/starrocks/connector/flink/it/sink/kafka/KafkaTableTestBase.java
+++ b/src/test/java/com/starrocks/connector/flink/it/sink/kafka/KafkaTableTestBase.java
@@ -18,8 +18,8 @@
package com.starrocks.connector.flink.it.sink.kafka;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.RestartStrategyUtils;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.test.util.AbstractTestBase;
@@ -161,7 +161,7 @@ public static void tearDown() throws Exception {
public void setup() {
env = StreamExecutionEnvironment.getExecutionEnvironment();
tEnv = StreamTableEnvironment.create(env);
- env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+ RestartStrategyUtils.configureNoRestartStrategy(env);
// Probe Kafka broker status per 30 seconds
scheduleTimeoutLogger(
diff --git a/src/test/java/com/starrocks/connector/flink/it/sink/kafka/KafkaToStarRocksITTest.java b/src/test/java/com/starrocks/connector/flink/it/sink/kafka/KafkaToStarRocksITTest.java
index fc557dcf0..dd4ab500e 100644
--- a/src/test/java/com/starrocks/connector/flink/it/sink/kafka/KafkaToStarRocksITTest.java
+++ b/src/test/java/com/starrocks/connector/flink/it/sink/kafka/KafkaToStarRocksITTest.java
@@ -25,7 +25,6 @@
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
-import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.table.api.TableResult;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.junit.Before;
@@ -166,7 +165,7 @@ private void testPrimaryKeyBase(String topic, String dataFile, Map lines) throws Exception {
DataStreamSource stream = env.fromCollection(lines);
SerializationSchema serSchema = new SimpleStringSchema();
- FlinkKafkaPartitioner partitioner = new FlinkFixedPartitioner<>();
+ FlinkFixedPartitioner partitioner = new FlinkFixedPartitioner<>();
// the producer must not produce duplicates
Properties producerProperties = getStandardProps();
@@ -182,7 +181,8 @@ private void writeRecordsToKafka(String topic, List lines) throws Except
.setValueSerializationSchema(serSchema)
.setPartitioner(partitioner)
.build())
- .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
+ .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
+ .setTransactionalIdPrefix("tid")
.build());
env.execute("Write sequence");
}
diff --git a/src/test/java/com/starrocks/connector/flink/it/source/StarRocksSourceBaseTest.java b/src/test/java/com/starrocks/connector/flink/it/source/StarRocksSourceBaseTest.java
index d69abfd62..5f9a62e27 100644
--- a/src/test/java/com/starrocks/connector/flink/it/source/StarRocksSourceBaseTest.java
+++ b/src/test/java/com/starrocks/connector/flink/it/source/StarRocksSourceBaseTest.java
@@ -15,7 +15,6 @@
package com.starrocks.connector.flink.it.source;
import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.TableSchema;
import com.alibaba.fastjson.JSONObject;
import com.starrocks.connector.flink.table.source.StarRocksSourceOptions;
@@ -29,6 +28,7 @@
import com.starrocks.shade.org.apache.thrift.transport.TTransportException;
import com.starrocks.shade.org.apache.thrift.transport.TTransportFactory;
import com.starrocks.thrift.TStarrocksExternalService;
+import org.apache.flink.table.legacy.api.TableSchema;
import org.junit.After;
import org.junit.Before;
diff --git a/src/test/java/com/starrocks/connector/flink/manager/sink/StarRocksSinkManagerTest.java b/src/test/java/com/starrocks/connector/flink/manager/sink/StarRocksSinkManagerTest.java
index f26d654ec..86ebfe427 100644
--- a/src/test/java/com/starrocks/connector/flink/manager/sink/StarRocksSinkManagerTest.java
+++ b/src/test/java/com/starrocks/connector/flink/manager/sink/StarRocksSinkManagerTest.java
@@ -21,7 +21,7 @@
import mockit.Mock;
import mockit.MockUp;
import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.TableSchema.Builder;
+import org.apache.flink.table.legacy.api.TableSchema;
import org.junit.Ignore;
import org.junit.Test;
@@ -84,7 +84,7 @@ public void testValidateTableStructure() {
assertTrue(exMsg.length() > 0);
- Builder schemaBuilder = createTableSchemaBuilder();
+ TableSchema.Builder schemaBuilder = createTableSchemaBuilder();
schemaBuilder.field("v6", DataTypes.VARCHAR(20));
try {
new StarRocksSinkManager(OPTIONS, schemaBuilder.build());
diff --git a/src/test/java/com/starrocks/connector/flink/table/sink/SinkFunctionFactoryTest.java b/src/test/java/com/starrocks/connector/flink/table/sink/SinkFunctionFactoryTest.java
index 304782810..eaffc8377 100644
--- a/src/test/java/com/starrocks/connector/flink/table/sink/SinkFunctionFactoryTest.java
+++ b/src/test/java/com/starrocks/connector/flink/table/sink/SinkFunctionFactoryTest.java
@@ -27,6 +27,7 @@
import org.junit.Test;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
@@ -47,12 +48,12 @@ public void testIsStarRocksSupportTransactionLoad() throws Exception {
try (MockFeHttpServer httpServer = new MockFeHttpServer()) {
httpServer.start();
Configuration conf = new Configuration();
- conf.setString(StarRocksSinkOptions.TABLE_NAME, "test");
- conf.setString(StarRocksSinkOptions.DATABASE_NAME, "test");
- conf.setString(StarRocksSinkOptions.LOAD_URL.key(), "127.0.0.1:" + httpServer.getListenPort());
- conf.setString(StarRocksSinkOptions.JDBC_URL, "jdbc://127.0.0.1:1234");
- conf.setString(StarRocksSinkOptions.USERNAME, "root");
- conf.setString(StarRocksSinkOptions.PASSWORD, "");
+ conf.set(StarRocksSinkOptions.TABLE_NAME, "test");
+ conf.set(StarRocksSinkOptions.DATABASE_NAME, "test");
+ conf.set(StarRocksSinkOptions.LOAD_URL, Collections.singletonList("127.0.0.1:" + httpServer.getListenPort()));
+ conf.set(StarRocksSinkOptions.JDBC_URL, "jdbc://127.0.0.1:1234");
+ conf.set(StarRocksSinkOptions.USERNAME, "root");
+ conf.set(StarRocksSinkOptions.PASSWORD, "");
StarRocksSinkOptions sinkOptions = new StarRocksSinkOptions(conf, new HashMap<>());
{
@@ -170,13 +171,13 @@ private void testChooseSinkVersionAutomaticallyBase(
public void testGetSinkVersion() {
Configuration conf = new Configuration();
{
- conf.setString(StarRocksSinkOptions.SINK_VERSION, "V1");
+ conf.set(StarRocksSinkOptions.SINK_VERSION, "V1");
StarRocksSinkOptions sinkOptions = new StarRocksSinkOptions(conf, new HashMap<>());
assertEquals(SinkFunctionFactory.SinkVersion.V1, SinkFunctionFactory.getSinkVersion(sinkOptions));
}
{
- conf.setString(StarRocksSinkOptions.SINK_VERSION, "V2");
+ conf.set(StarRocksSinkOptions.SINK_VERSION, "V2");
StarRocksSinkOptions sinkOptions = new StarRocksSinkOptions(conf, new HashMap<>());
assertEquals(SinkFunctionFactory.SinkVersion.V2, SinkFunctionFactory.getSinkVersion(sinkOptions));
}
@@ -189,7 +190,7 @@ public SinkFunctionFactory.SinkVersion chooseSinkVersionAutomatically(StarRocksS
return autoVersion.get();
}
};
- conf.setString(StarRocksSinkOptions.SINK_VERSION, "AUTO");
+ conf.set(StarRocksSinkOptions.SINK_VERSION, "AUTO");
StarRocksSinkOptions sinkOptions = new StarRocksSinkOptions(conf, new HashMap<>());
autoVersion.set(SinkFunctionFactory.SinkVersion.V1);
@@ -200,7 +201,7 @@ public SinkFunctionFactory.SinkVersion chooseSinkVersionAutomatically(StarRocksS
}
{
- conf.setString(StarRocksSinkOptions.SINK_VERSION, "UNKNOWN");
+ conf.set(StarRocksSinkOptions.SINK_VERSION, "UNKNOWN");
StarRocksSinkOptions sinkOptions = new StarRocksSinkOptions(conf, new HashMap<>());
try {
SinkFunctionFactory.getSinkVersion(sinkOptions);
diff --git a/src/test/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSourceTest.java b/src/test/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSourceTest.java
index 41124a10e..0574a8bc9 100644
--- a/src/test/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSourceTest.java
+++ b/src/test/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSourceTest.java
@@ -52,7 +52,7 @@ public void init() {
@Test
public void testApplyProjection() {
- dynamicTableSource.applyProjection(PROJECTION_ARRAY);
+ dynamicTableSource.applyProjection(PROJECTION_ARRAY, null);
for (int i = 0; i < SELECT_COLUMNS.length; i ++) {
assertEquals(SELECT_COLUMNS[i].getColumnIndexInFlinkTable(), pushDownHolder.getSelectColumns()[i].getColumnIndexInFlinkTable());
@@ -60,7 +60,7 @@ public void testApplyProjection() {
}
assertEquals(StarRocksSourceQueryType.QuerySomeColumns, pushDownHolder.getQueryType());
- dynamicTableSource.applyProjection(PROJECTION_ARRAY_NULL);
+ dynamicTableSource.applyProjection(PROJECTION_ARRAY_NULL, null);
assertEquals(StarRocksSourceQueryType.QueryCount, pushDownHolder.getQueryType());
}
@@ -70,9 +70,9 @@ public void testFilter() {
String filter;
ResolvedExpression c9Ref = new FieldReferenceExpression("c6", DataTypes.STRING(), 0, 2);
- ResolvedExpression c9CharLength = new CallExpression(BuiltInFunctionDefinitions.CHAR_LENGTH, Collections.singletonList(c9Ref), DataTypes.INT());
+ ResolvedExpression c9CharLength = CallExpression.anonymous(BuiltInFunctionDefinitions.CHAR_LENGTH, Collections.singletonList(c9Ref), DataTypes.INT());
ResolvedExpression c9Exp =
- new CallExpression(
+ CallExpression.anonymous(
BuiltInFunctionDefinitions.LESS_THAN,
Arrays.asList(c9CharLength, valueLiteral(10)),
DataTypes.BOOLEAN());
@@ -82,7 +82,7 @@ public void testFilter() {
ResolvedExpression c5Ref = new FieldReferenceExpression("c5", DataTypes.TIMESTAMP(), 0, 2);
ResolvedExpression c5Exp =
- new CallExpression(
+ CallExpression.anonymous(
BuiltInFunctionDefinitions.EQUALS,
Arrays.asList(c5Ref, valueLiteral("2022-1-22 00:00:00")),
DataTypes.BOOLEAN());
@@ -92,7 +92,7 @@ public void testFilter() {
ResolvedExpression c4Ref = new FieldReferenceExpression("c4", DataTypes.DATE(), 0, 2);
ResolvedExpression c4Exp =
- new CallExpression(
+ CallExpression.anonymous(
BuiltInFunctionDefinitions.EQUALS,
Arrays.asList(c4Ref, valueLiteral("2022-1-22")),
DataTypes.BOOLEAN());
@@ -102,7 +102,7 @@ public void testFilter() {
ResolvedExpression c3Ref = new FieldReferenceExpression("c3", DataTypes.BOOLEAN(), 0, 2);
ResolvedExpression c3Exp =
- new CallExpression(
+ CallExpression.anonymous(
BuiltInFunctionDefinitions.EQUALS,
Arrays.asList(c3Ref, valueLiteral(true)),
DataTypes.BOOLEAN());
@@ -112,36 +112,36 @@ public void testFilter() {
ResolvedExpression c2Ref = new FieldReferenceExpression("c2", DataTypes.INT(), 0, 2);
ResolvedExpression c2Exp =
- new CallExpression(
+ CallExpression.anonymous(
BuiltInFunctionDefinitions.EQUALS,
Arrays.asList(c2Ref, valueLiteral(2)),
DataTypes.BOOLEAN());
ResolvedExpression c1Ref = new FieldReferenceExpression("c1", DataTypes.INT(), 0, 2);
ResolvedExpression c1Exp =
- new CallExpression(
+ CallExpression.anonymous(
BuiltInFunctionDefinitions.EQUALS,
Arrays.asList(c1Ref, valueLiteral(1)),
DataTypes.BOOLEAN());
dynamicTableSource.applyFilters(Arrays.asList(c1Exp,
- new CallExpression(
+ CallExpression.anonymous(
BuiltInFunctionDefinitions.NOT_EQUALS,
Arrays.asList(c1Ref, valueLiteral(1)),
DataTypes.BOOLEAN()),
- new CallExpression(
+ CallExpression.anonymous(
BuiltInFunctionDefinitions.GREATER_THAN,
Arrays.asList(c1Ref, valueLiteral(1)),
DataTypes.BOOLEAN()),
- new CallExpression(
+ CallExpression.anonymous(
BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL,
Arrays.asList(c1Ref, valueLiteral(1)),
DataTypes.BOOLEAN()),
- new CallExpression(
+ CallExpression.anonymous(
BuiltInFunctionDefinitions.LESS_THAN,
Arrays.asList(c1Ref, valueLiteral(1)),
DataTypes.BOOLEAN()),
- new CallExpression(
+ CallExpression.anonymous(
BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL,
Arrays.asList(c1Ref, valueLiteral(1)),
DataTypes.BOOLEAN())
@@ -154,13 +154,13 @@ public void testFilter() {
assertEquals("(c1 = 1) and (c2 = 2)", filter);
- dynamicTableSource.applyFilters(Arrays.asList(new CallExpression(BuiltInFunctionDefinitions.OR, Arrays.asList(c1Exp, c3Exp), DataTypes.BOOLEAN())));
+ dynamicTableSource.applyFilters(Arrays.asList(CallExpression.anonymous(BuiltInFunctionDefinitions.OR, Arrays.asList(c1Exp, c3Exp), DataTypes.BOOLEAN())));
filter = pushDownHolder.getFilter();
assertEquals("((c1 = 1) or (c3 = true))", filter);
ResolvedExpression c6Exp =
- new CallExpression(
+ CallExpression.anonymous(
BuiltInFunctionDefinitions.LIKE,
Arrays.asList(c1Ref, valueLiteral(1)),
DataTypes.BOOLEAN());
@@ -172,7 +172,7 @@ public void testFilter() {
}
ResolvedExpression c7Exp =
- new CallExpression(
+ CallExpression.anonymous(
BuiltInFunctionDefinitions.IN,
Arrays.asList(c1Ref, valueLiteral(1)),
DataTypes.BOOLEAN());
@@ -184,7 +184,7 @@ public void testFilter() {
}
ResolvedExpression c8Exp =
- new CallExpression(
+ CallExpression.anonymous(
BuiltInFunctionDefinitions.BETWEEN,
Arrays.asList(c1Ref, valueLiteral(1)),
DataTypes.BOOLEAN());
@@ -232,7 +232,7 @@ public void testDynamicTableSourceDeepCopy() {
ResolvedExpression c5Ref = new FieldReferenceExpression("c5", DataTypes.TIMESTAMP(), 0, 2);
ResolvedExpression c5Exp =
- new CallExpression(
+ CallExpression.anonymous(
BuiltInFunctionDefinitions.EQUALS,
Arrays.asList(c5Ref, valueLiteral("2022-1-22 00:00:00")),
DataTypes.BOOLEAN());