Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions .github/workflows/ci-template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ jobs:
strategy:
fail-fast: false
matrix:
module: [ core, flink, spark3, lake ]
module: [ core, flink, spark3, spark4, lake ]
name: "${{ matrix.module }}"
steps:
- name: Checkout code
Expand All @@ -48,7 +48,11 @@ jobs:
distribution: 'temurin'
- name: Build
run: |
mvn -T 1C -B clean install -DskipTests ${{ inputs.maven-parameters }}
if [ "${{ matrix.module }}" == "spark4" ]; then
mvn -T 1C -B clean install -DskipTests -Pspark4 ${{ inputs.maven-parameters }}
else
mvn -T 1C -B clean install -DskipTests ${{ inputs.maven-parameters }}
fi
- name: Test
timeout-minutes: 60
run: |
Expand Down
13 changes: 13 additions & 0 deletions .github/workflows/stage.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
STAGE_CORE="core"
STAGE_FLINK="flink"
STAGE_SPARK="spark3"
STAGE_SPARK4="spark4"
STAGE_LAKE="lake"

MODULES_FLINK="\
Expand All @@ -43,6 +44,13 @@ fluss-spark/fluss-spark-3.5,\
fluss-spark/fluss-spark-3.4,\
"

MODULES_SPARK4="\
fluss-spark,\
fluss-spark/fluss-spark-common,\
fluss-spark/fluss-spark-ut,\
fluss-spark/fluss-spark-4.1,\
"

# we move Flink legacy version tests to "lake" module for balancing testing time
MODULES_LAKE="\
fluss-flink/fluss-flink-1.19,\
Expand All @@ -64,6 +72,8 @@ function get_test_modules_for_stage() {
local negated_lake=\!${MODULES_LAKE//,/,\!}
local modules_core="$negated_flink,$negated_spark,$negated_lake"

local modules_spark4=$MODULES_SPARK4

case ${stage} in
(${STAGE_CORE})
echo "-pl $modules_core"
Expand All @@ -74,6 +84,9 @@ function get_test_modules_for_stage() {
(${STAGE_SPARK})
echo "-Pspark3 -pl fluss-test-coverage,$modules_spark3"
;;
(${STAGE_SPARK4})
echo "-Pspark4 -pl fluss-test-coverage,$modules_spark4"
;;
(${STAGE_LAKE})
echo "-pl fluss-test-coverage,$modules_lake"
;;
Expand Down
1 change: 0 additions & 1 deletion .mvn/wrapper/maven-wrapper.properties
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,3 @@
wrapperVersion=3.3.2
distributionType=only-script
distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.8.6/apache-maven-3.8.6-bin.zip
distributionSha256Sum=ccf20a80e75a17ffc34d47c5c95c98c39d426ca17d670f09cd91e877072a9309
330 changes: 330 additions & 0 deletions fluss-docgen/website/docs/_configs/_partial_config.mdx

Large diffs are not rendered by default.

129 changes: 129 additions & 0 deletions fluss-spark/fluss-spark-4.1/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.fluss</groupId>
<artifactId>fluss-spark</artifactId>
<version>0.10-SNAPSHOT</version>
</parent>

<artifactId>fluss-spark-4.1_${scala.binary.version}</artifactId>
<name>Fluss : Engine Spark : 4.1</name>

<properties>
<spark.version>4.1.1</spark.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.fluss</groupId>
<artifactId>fluss-spark-common_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.fluss</groupId>
<artifactId>fluss-spark-ut_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<!-- compilation of main sources -->
<skipMain>${skip.on.java8}</skipMain>
<!-- compilation of test sources -->
<skip>${skip.on.java8}</skip>
</configuration>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.0.0-M5</version>
<executions>
<!-- Test end with ITCase is e2e test in this module -->
<execution>
<id>integration-tests</id>
<phase>integration-test</phase>
<inherited>false</inherited>
<goals>
<goal>test</goal>
</goals>
<configuration>
<skip>${skip.on.java8}</skip>
<includes>
<include>**/*ITCase.*</include>
</includes>
<!-- e2e test with flink/zookeeper cluster, we set forkCount=1 -->
<forkCount>1</forkCount>
</configuration>
</execution>
<!-- others unit tests -->
<execution>
<id>default-test</id>
<phase>test</phase>
<inherited>false</inherited>
<goals>
<goal>test</goal>
</goals>
<configuration>
<skip>${skip.on.java8}</skip>
<excludes>
<exclude>**/*ITCase.*</exclude>
</excludes>
</configuration>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<id>shade-fluss</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<includes combine.children="append">
<include>org.apache.fluss:fluss-spark-common_${scala.binary.version}</include>
<include>org.apache.fluss:fluss-client</include>
</includes>
</artifactSet>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -77,21 +77,21 @@ class DataConverterTest extends AnyFunSuite {
val sparkDecimal = DataConverter.toSparkDecimal(flussDecimal)

assertThat(sparkDecimal).isInstanceOf(classOf[SparkDecimal])
assertThat(sparkDecimal.toBigDecimal.doubleValue()).isEqualTo(123.45)
assertThat(sparkDecimal.toBigDecimal.doubleValue).isEqualTo(123.45)
}

test("toSparkDecimal: negative decimal") {
val flussDecimal = FlussDecimal.fromBigDecimal(new java.math.BigDecimal("-999.99"), 5, 2)
val sparkDecimal = DataConverter.toSparkDecimal(flussDecimal)

assertThat(sparkDecimal.toBigDecimal.doubleValue()).isEqualTo(-999.99)
assertThat(sparkDecimal.toBigDecimal.doubleValue).isEqualTo(-999.99)
}

test("toSparkDecimal: zero") {
val flussDecimal = FlussDecimal.fromBigDecimal(java.math.BigDecimal.ZERO, 5, 2)
val sparkDecimal = DataConverter.toSparkDecimal(flussDecimal)

assertThat(sparkDecimal.toBigDecimal.doubleValue()).isEqualTo(0.0)
assertThat(sparkDecimal.toBigDecimal.doubleValue).isEqualTo(0.0)
}

test("toSparkUTF8String: ASCII string") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,9 @@ class FlussAsSparkArrayTest extends AnyFunSuite {
val flussArray = new GenericArray(decimals)
val sparkArray = new FlussAsSparkArray(elementType).replace(flussArray)

assertThat(sparkArray.getDecimal(0, 10, 2).toBigDecimal.doubleValue()).isEqualTo(10.50)
assertThat(sparkArray.getDecimal(1, 10, 2).toBigDecimal.doubleValue()).isEqualTo(20.75)
assertThat(sparkArray.getDecimal(2, 10, 2).toBigDecimal.doubleValue()).isEqualTo(30.99)
assertThat(sparkArray.getDecimal(0, 10, 2).toBigDecimal.doubleValue).isEqualTo(10.50)
assertThat(sparkArray.getDecimal(1, 10, 2).toBigDecimal.doubleValue).isEqualTo(20.75)
assertThat(sparkArray.getDecimal(2, 10, 2).toBigDecimal.doubleValue).isEqualTo(30.99)
}

test("getUTF8String: read string array") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ class FlussAsSparkRowTest extends AnyFunSuite {
val sparkRow = new FlussAsSparkRow(rowType).replace(flussRow)

val sparkDecimal = sparkRow.getDecimal(0, 10, 2)
assertThat(sparkDecimal.toBigDecimal.doubleValue()).isEqualTo(123.45)
assertThat(sparkDecimal.toBigDecimal.doubleValue).isEqualTo(123.45)
}

test("getUTF8String: read string values") {
Expand Down Expand Up @@ -479,6 +479,6 @@ class FlussAsSparkRowTest extends AnyFunSuite {
assertThat(sparkRow.getFloat(5)).isEqualTo(3.14f)
assertThat(sparkRow.getDouble(6)).isEqualTo(2.718)
assertThat(sparkRow.getUTF8String(7).toString).isEqualTo("test")
assertThat(sparkRow.getDecimal(8, 10, 2).toBigDecimal.doubleValue()).isEqualTo(99.99)
assertThat(sparkRow.getDecimal(8, 10, 2).toBigDecimal.doubleValue).isEqualTo(99.99)
}
}
26 changes: 24 additions & 2 deletions fluss-spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,32 @@
<modules>
<module>fluss-spark-common</module>
<module>fluss-spark-ut</module>
<module>fluss-spark-3.5</module>
<module>fluss-spark-3.4</module>
</modules>

<profiles>
<profile>
<id>spark3</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<modules>
<module>fluss-spark-3.5</module>
<module>fluss-spark-3.4</module>
</modules>
</profile>
<profile>
<!-- Spark 4.x requires Scala 2.13; activate with -Pspark4 -->
<id>spark4</id>
<properties>
<scala.binary.version>2.13</scala.binary.version>
<scala.version>${scala213.version}</scala.version>
</properties>
<modules>
<module>fluss-spark-4.1</module>
</modules>
</profile>
</profiles>

<dependencies>
<dependency>
<groupId>org.apache.fluss</groupId>
Expand Down
38 changes: 38 additions & 0 deletions fluss-test-coverage/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,44 @@
</build>
</profile>

<profile>
<id>test-spark4</id>
<build>
<plugins>
<!-- required by jacoco for the goal: check to work -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<executions>
<execution>
<id>copy-class-files</id>
<phase>generate-resources</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<overwrite>false</overwrite>
<resources>
<resource>
<directory>${project.basedir}/../</directory>
<includes>
<include>fluss-spark/**/target/classes/**</include>
</includes>
<excludes>
<exclude>fluss-test-coverage/**</exclude>
<exclude>fluss-test-utils/**</exclude>
</excludes>
</resource>
</resources>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>

<profile>
<id>test-lake</id>
<build>
Expand Down
4 changes: 2 additions & 2 deletions website/docs/_configs/_partial_config.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
| `client.lookup.batch-timeout` | `0 s` | Duration | The maximum time to wait for the lookup batch to full, if this timeout is reached, the lookup batch will be closed to send. |
| `client.lookup.max-retries` | `2147483647` | Integer | Setting a value greater than zero will cause the client to resend any lookup request that fails with a potentially transient error. |
| `client.scanner.remote-log.prefetch-num` | `4` | Integer | The number of remote log segments to keep in local temp file for LogScanner, which download from remote storage. The default setting is 4. |
| `client.scanner.io.tmpdir` | `/var/folders/bp/v2l48kz51mx86d743qv0zhzh0000gn/T//fluss` | String | Local directory that is used by client for storing the data files (like kv snapshot, log segment files) to read temporarily |
| `client.scanner.io.tmpdir` | `/var/folders/40/2pbg9z414hd04xpyr5136hl00000gn/T//fluss` | String | Local directory that is used by client for storing the data files (like kv snapshot, log segment files) to read temporarily |
| `client.remote-file.download-thread-num` | `3` | Integer | The number of threads the client uses to download remote files. |
| `client.filesystem.security.token.renewal.backoff` | `1 hours` | Duration | The time period how long to wait before retrying to obtain new security tokens for filesystem after a failure. |
| `client.filesystem.security.token.renewal.time-ratio` | `0.75` | Double | Ratio of the token's expiration time when new credentials for access filesystem should be re-obtained. |
Expand Down Expand Up @@ -284,7 +284,7 @@
| `table.auto-partition.enabled` | `false` | Boolean | Whether enable auto partition for the table. Disable by default. When auto partition is enabled, the partitions of the table will be created automatically. |
| `table.auto-partition.key` | `none` | String | This configuration defines the time-based partition key to be used for auto-partitioning when a table is partitioned with multiple keys. Auto-partitioning utilizes a time-based partition key to handle partitions automatically, including creating new ones and removing outdated ones, by comparing the time value of the partition with the current system time. In the case of a table using multiple partition keys (such as a composite partitioning strategy), this feature determines which key should serve as the primary time dimension for making auto-partitioning decisions.And If the table has only one partition key, this config is not necessary. Otherwise, it must be specified. |
| `table.auto-partition.time-unit` | `DAY` | AutoPartitionTimeUnit | The time granularity for auto created partitions. The default value is `DAY`. Valid values are `HOUR`, `DAY`, `MONTH`, `QUARTER`, `YEAR`. If the value is `HOUR`, the partition format for auto created is yyyyMMddHH. If the value is `DAY`, the partition format for auto created is yyyyMMdd. If the value is `MONTH`, the partition format for auto created is yyyyMM. If the value is `QUARTER`, the partition format for auto created is yyyyQ. If the value is `YEAR`, the partition format for auto created is yyyy. |
| `table.auto-partition.time-zone` | `Europe/Paris` | String | The time zone for auto partitions, which is by default the same as the system time zone. |
| `table.auto-partition.time-zone` | `Europe/Budapest` | String | The time zone for auto partitions, which is by default the same as the system time zone. |
| `table.auto-partition.num-precreate` | `2` | Integer | The number of partitions to pre-create for auto created partitions in each check for auto partition. For example, if the current check time is 2024-11-11 and the value is configured as 3, then partitions 20241111, 20241112, 20241113 will be pre-created. If any one partition exists, it'll skip creating the partition. The default value is 2, which means 2 partitions will be pre-created. If the `table.auto-partition.time-unit` is `DAY`(default), one precreated partition is for today and another one is for tomorrow.For a partition table with multiple partition keys, pre-create is unsupported and will be set to 0 automatically when creating table if it is not explicitly specified. |
| `table.auto-partition.num-retention` | `7` | Integer | The number of history partitions to retain for auto created partitions in each check for auto partition. For example, if the current check time is 2024-11-11, time-unit is DAY, and the value is configured as 3, then the history partitions 20241108, 20241109, 20241110 will be retained. The partitions earlier than 20241108 will be deleted. The default value is 7. |
| `table.log.ttl` | `168 hours` | Duration | The time to live for log segments. The configuration controls the maximum time we will retain a log before we will delete old segments to free up space. If set to -1, the log will not be deleted. |
Expand Down
18 changes: 17 additions & 1 deletion website/docs/engine-spark/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@ sidebar_position: 1
## Supported Spark Versions
| Fluss Connector Versions | Supported Spark Versions |
|--------------------------|--------------------------|
| $FLUSS_VERSION_SHORT$ | 3.4, 3.5 |
| $FLUSS_VERSION_SHORT$ | 3.4, 3.5, 4.1 |

:::note
Spark 4.x requires Scala 2.13. The Fluss connector for Spark 4.1 is built with Scala 2.13, while the connectors for Spark 3.x are built with Scala 2.12.
:::


## Feature Support
Expand Down Expand Up @@ -40,18 +44,30 @@ Fluss supports Apache Spark's SQL API and Spark Structured Streaming.

Spark runs on all UNIX-like environments, i.e., Linux, Mac OS X. You can download the binary release of Spark from the [Apache Spark Downloads](https://spark.apache.org/downloads.html) page, then extract the archive:

For Spark 3.5:
```shell
tar -xzf spark-3.5.7-bin-hadoop3.tgz
```

For Spark 4.1:
```shell
tar -xzf spark-4.1.1-bin-hadoop3.tgz
```

- **Copy Fluss Spark Bundled Jar**

Download [Fluss Spark Bundled jar](/downloads) and copy to the `jars` directory of your Spark home.

For Spark 3.5 (Scala 2.12):
```shell
cp fluss-spark-3.5_2.12-$FLUSS_VERSION$.jar <SPARK_HOME>/jars/
```

For Spark 4.1 (Scala 2.13):
```shell
cp fluss-spark-4.1_2.13-$FLUSS_VERSION$.jar <SPARK_HOME>/jars/
```

- **Start Spark SQL**

To quickly start the Spark SQL CLI, you can use the provided script:
Expand Down
Loading