Skip to content

Commit 86b2ab2

Browse files
committed
draft: bump Flink dependency to 2.0
Signed-off-by: yuxiqian <34335406+yuxiqian@users.noreply.github.com> # Conflicts: # pom.xml
1 parent 18b56fa commit 86b2ab2

44 files changed

Lines changed: 258 additions & 173 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/ci-pipeline.yml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,22 +36,22 @@ jobs:
3636
name: Flink ${{ matrix.flink }}
3737
runs-on: ubuntu-latest
3838
strategy:
39-
fail-fast: true
39+
fail-fast: false
4040
max-parallel: 3
4141
matrix:
42-
flink: ['1.20','1.19','1.18','1.17','1.16','1.15']
42+
flink: ['2.1','2.0']
4343
env:
4444
CUSTOM_MVN: mvn -B -ntp
4545
steps:
4646
- name: Checkout
4747
uses: actions/checkout@v2
4848
with:
4949
fetch-depth: 0
50-
- name: Set up JDK 8
50+
- name: Set up JDK 11
5151
uses: actions/setup-java@v3
5252
with:
5353
distribution: 'zulu'
54-
java-version: '8'
54+
java-version: '11'
5555
cache: 'maven'
5656
- name: Machine Info
5757
run: |

build.sh

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ flink_minor_version=$1
3434
check_flink_version_supported $flink_minor_version
3535
flink_version="$(get_flink_version $flink_minor_version)"
3636
kafka_connector_version="$(get_kafka_connector_version $flink_minor_version)"
37+
flink_shaded_guava_version="$(get_flink_shaded_guava_version $flink_minor_version)"
3738

3839
# control whether to run tests (default: skip tests)
3940
skip_tests=true
@@ -54,7 +55,8 @@ fi
5455
${MVN_CMD} clean package ${mvn_skip_flag} \
5556
-Dflink.minor.version=${flink_minor_version} \
5657
-Dflink.version=${flink_version} \
57-
-Dkafka.connector.version=${kafka_connector_version}
58+
-Dkafka.connector.version=${kafka_connector_version} \
59+
-Dflink.shaded.guava.version=${flink_shaded_guava_version}
5860

5961
echo "*********************************************************************"
6062
echo "Successfully build Flink StarRocks Connector for Flink $flink_minor_version"

common.sh

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,10 @@ if ! ${MVN_CMD} --version; then
3232
fi
3333
export MVN_CMD
3434

35-
SUPPORTED_MINOR_VERSION=("1.15" "1.16" "1.17" "1.18" "1.19" "1.20")
36-
# version formats are different among flink versions
37-
SUPPORTED_KAFKA_CONNECTOR_VERSION=("1.15.0" "1.16.0" "1.17.0" "3.0.1-1.18" "3.2.0-1.19" "3.4.0-1.20")
35+
SUPPORTED_MINOR_VERSION=("2.0" "2.1")
36+
SUPPORTED_KAFKA_CONNECTOR_VERSION=("4.0.1-2.0" "4.0.1-2.0")
37+
SUPPORTED_FLINK_SHADED_VERSION=("32.1.3-jre-19.0" "33.4.0-jre-20.0")
38+
3839
VERSION_MESSAGE=$(IFS=, ; echo "${SUPPORTED_MINOR_VERSION[*]}")
3940

4041
function check_flink_version_supported() {
@@ -70,3 +71,23 @@ function get_kafka_connector_version() {
7071
exit 1
7172
fi
7273
}
74+
75+
function get_flink_shaded_guava_version() {
76+
local FLINK_MINOR_VERSION=$1
77+
local index=-1
78+
for ((i=0; i<${#SUPPORTED_MINOR_VERSION[@]}; i++)); do
79+
if [ "${SUPPORTED_MINOR_VERSION[i]}" = "$FLINK_MINOR_VERSION" ]; then
80+
index=$i
81+
break
82+
fi
83+
done
84+
85+
if [ "$index" != -1 ];
86+
then
87+
local FLINK_SHADED_VERSION="${SUPPORTED_FLINK_SHADED_VERSION[index]}"
88+
echo $FLINK_SHADED_VERSION
89+
else
90+
echo "Can't find flink shaded guava version for flink-${FLINK_MINOR_VERSION}"
91+
exit 1
92+
fi
93+
}

examples/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ limitations under the License.
2222
<version>1.0-SNAPSHOT</version>
2323

2424
<properties>
25-
<maven.compiler.source>1.8</maven.compiler.source>
26-
<maven.compiler.target>1.8</maven.compiler.target>
25+
<maven.compiler.source>11</maven.compiler.source>
26+
<maven.compiler.target>11</maven.compiler.target>
2727
<file_encoding>UTF-8</file_encoding>
2828
<flink.minor.version>1.17</flink.minor.version>
2929
<flink.version>1.17.0</flink.version>

examples/src/main/java/com/starrocks/connector/flink/examples/datastream/LoadCustomJavaRecords.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
3030
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
3131
import org.apache.flink.table.api.DataTypes;
32-
import org.apache.flink.table.api.TableSchema;
32+
import org.apache.flink.table.legacy.api.TableSchema;
3333

3434
/**
3535
* This example will show how to load records to StarRocks table using Flink DataStream.

pom.xml

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -47,19 +47,20 @@ limitations under the License.
4747

4848
<properties>
4949
<srfc.version>1.2.13</srfc.version>
50-
<maven.compiler.source>1.8</maven.compiler.source>
51-
<maven.compiler.target>1.8</maven.compiler.target>
50+
<maven.compiler.source>11</maven.compiler.source>
51+
<maven.compiler.target>11</maven.compiler.target>
5252
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
5353
<file_encoding>UTF-8</file_encoding>
5454
<maven-enforcer-plugin.version>3.0.0-M3</maven-enforcer-plugin.version>
5555
<maven-surefire-plugin.version>3.0.0-M4</maven-surefire-plugin.version>
56-
<flink.minor.version>1.20</flink.minor.version>
57-
<flink.version>1.20.0</flink.version>
58-
<kafka.connector.version>3.4.0-1.20</kafka.connector.version>
56+
<flink.minor.version>2.1</flink.minor.version>
57+
<flink.version>2.1.1</flink.version>
58+
<kafka.connector.version>4.0.1-2.0</kafka.connector.version>
59+
<flink.shaded.guava.version>33.4.0-jre-20.0</flink.shaded.guava.version>
5960
<arrow.version>5.0.0</arrow.version>
6061
<kafka.version>2.8.1</kafka.version>
6162
<scala.binary.version>2.12</scala.binary.version>
62-
<guava.version>32.0.1-jre</guava.version>
63+
<guava.version>33.4.0-jre</guava.version>
6364
<shading.prefix>com.starrocks.shade</shading.prefix>
6465
<alibaba-fastjson.version>1.2.83</alibaba-fastjson.version>
6566
<jmockit.version>1.49</jmockit.version>
@@ -329,7 +330,7 @@ limitations under the License.
329330
<dependency>
330331
<groupId>org.apache.flink</groupId>
331332
<artifactId>flink-shaded-guava</artifactId>
332-
<version>31.1-jre-17.0</version>
333+
<version>${flink.shaded.guava.version}</version>
333334
<scope>test</scope>
334335
</dependency>
335336

@@ -368,8 +369,8 @@ limitations under the License.
368369
<groupId>org.apache.maven.plugins</groupId>
369370
<artifactId>maven-compiler-plugin</artifactId>
370371
<configuration>
371-
<source>1.8</source>
372-
<target>1.8</target>
372+
<source>11</source>
373+
<target>11</target>
373374
<encoding>${file_encoding}</encoding>
374375
</configuration>
375376
</plugin>
@@ -629,7 +630,7 @@ limitations under the License.
629630
</property>
630631
</activation>
631632
<properties>
632-
<target.java.version>1.8</target.java.version>
633+
<target.java.version>11</target.java.version>
633634
</properties>
634635
<build>
635636
<plugins>

src/main/java/com/starrocks/connector/flink/StarRocksSink.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919
import com.starrocks.connector.flink.row.sink.StarRocksSinkRowBuilder;
2020
import com.starrocks.connector.flink.table.sink.SinkFunctionFactory;
2121
import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
22-
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
23-
import org.apache.flink.table.api.TableSchema;
22+
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
23+
import org.apache.flink.table.legacy.api.TableSchema;
2424

2525
public class StarRocksSink {
2626

src/main/java/com/starrocks/connector/flink/StarRocksSource.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,7 @@
1616

1717
import com.starrocks.connector.flink.table.source.StarRocksDynamicSourceFunction;
1818
import com.starrocks.connector.flink.table.source.StarRocksSourceOptions;
19-
20-
import org.apache.flink.table.api.TableSchema;
19+
import org.apache.flink.table.legacy.api.TableSchema;
2120

2221

2322
public class StarRocksSource {

src/main/java/com/starrocks/connector/flink/catalog/FlinkCatalog.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,12 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep
278278
properties.putAll(getSourceConfig(tablePath.getDatabaseName(), tablePath.getObjectName()));
279279
properties.putAll(getSinkConfig(tablePath.getDatabaseName(), tablePath.getObjectName()));
280280

281-
return CatalogTable.of(flinkSchema, starRocksTable.getComment().orElse(null), new ArrayList<>(), properties);
281+
return CatalogTable.newBuilder()
282+
.schema(flinkSchema)
283+
.comment(starRocksTable.getComment().orElse(null))
284+
.partitionKeys(new ArrayList<>())
285+
.options(properties)
286+
.build();
282287
} catch (StarRocksCatalogException e) {
283288
throw new CatalogException(
284289
String.format("Failed to get table %s in catalog %s", tablePath.getFullName(), getName()), e);

src/main/java/com/starrocks/connector/flink/catalog/StarRocksUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
package com.starrocks.connector.flink.catalog;
2222

2323
import org.apache.flink.configuration.Configuration;
24-
import org.apache.flink.table.api.TableSchema;
24+
import org.apache.flink.table.legacy.api.TableSchema;
2525
import org.apache.flink.table.catalog.CatalogBaseTable;
2626
import org.apache.flink.table.catalog.ObjectPath;
2727
import org.apache.flink.table.catalog.exceptions.CatalogException;

0 commit comments

Comments
 (0)