Skip to content

Latest commit

 

History

History
223 lines (174 loc) · 8.56 KB

File metadata and controls

223 lines (174 loc) · 8.56 KB
sidebar_label title sidebar_position
Getting Started
Getting Started with Spark
1

Getting Started with Spark Engine

Supported Spark Versions

Fluss Connector Versions Supported Spark Versions
$FLUSS_VERSION_SHORT$ 3.4, 3.5, 4.1

:::note Spark 4.x requires Scala 2.13. The Fluss connector for Spark 4.1 is built with Scala 2.13, while the connectors for Spark 3.x are built with Scala 2.12. :::

Feature Support

Fluss supports Apache Spark's SQL API and Spark Structured Streaming.

Feature Support Spark Notes
SQL Create Catalog ✔️
SQL Create Database ✔️
SQL Drop Database ✔️
SQL Create Table ✔️
SQL Drop Table ✔️
SQL Describe Table ✔️
SQL Show Tables ✔️
SQL Alter Table ✔️ SET/UNSET TBLPROPERTIES
SQL Show Partitions ✔️
SQL Add Partition ✔️
SQL Drop Partition ✔️
SQL Select (Batch) ✔️ Log table and primary-key table
SQL Insert Into ✔️ Log table and primary-key table
Structured Streaming Read ✔️ Log table and primary-key table
Structured Streaming Write ✔️ Log table and primary-key table

Preparation when using Spark SQL

  • Download Spark

Spark runs on all UNIX-like environments, i.e., Linux, Mac OS X. You can download the binary release of Spark from the Apache Spark Downloads page, then extract the archive:

For Spark 3.5:

tar -xzf spark-3.5.7-bin-hadoop3.tgz

For Spark 4.1:

tar -xzf spark-4.1.1-bin-hadoop3.tgz
  • Copy Fluss Spark Bundled Jar

Download Fluss Spark Bundled jar and copy to the jars directory of your Spark home.

For Spark 3.5 (Scala 2.12):

cp fluss-spark-3.5_2.12-$FLUSS_VERSION$.jar <SPARK_HOME>/jars/

For Spark 4.1 (Scala 2.13):

cp fluss-spark-4.1_2.13-$FLUSS_VERSION$.jar <SPARK_HOME>/jars/
  • Start Spark SQL

To quickly start the Spark SQL CLI, you can use the provided script:

<SPARK_HOME>/bin/spark-sql \
  --conf spark.sql.catalog.fluss_catalog=org.apache.fluss.spark.SparkCatalog \
  --conf spark.sql.catalog.fluss_catalog.bootstrap.servers=localhost:9123 \
  --conf spark.sql.extensions=org.apache.fluss.spark.FlussSparkSessionExtensions

Or start Spark Shell:

<SPARK_HOME>/bin/spark-shell \
  --conf spark.sql.catalog.fluss_catalog=org.apache.fluss.spark.SparkCatalog \
  --conf spark.sql.catalog.fluss_catalog.bootstrap.servers=localhost:9123 \
  --conf spark.sql.extensions=org.apache.fluss.spark.FlussSparkSessionExtensions

Creating a Catalog

The Fluss catalog can be configured in spark-defaults.conf or passed as command-line arguments.

Using spark-defaults.conf:

spark.sql.catalog.fluss_catalog=org.apache.fluss.spark.SparkCatalog
spark.sql.catalog.fluss_catalog.bootstrap.servers=localhost:9123
spark.sql.extensions=org.apache.fluss.spark.FlussSparkSessionExtensions

Or configure programmatically in Scala/Python:

val spark = SparkSession.builder()
  .config("spark.sql.catalog.fluss_catalog", "org.apache.fluss.spark.SparkCatalog")
  .config("spark.sql.catalog.fluss_catalog.bootstrap.servers", "localhost:9123")
  .config("spark.sql.extensions", "org.apache.fluss.spark.FlussSparkSessionExtensions")
  .getOrCreate()

:::note

  1. The spark.sql.catalog.fluss_catalog.bootstrap.servers means the Fluss server address. Before you config the bootstrap.servers, you should start the Fluss server first. See Deploying Fluss for how to build a Fluss cluster. Here, it is assumed that there is a Fluss cluster running on your local machine and the CoordinatorServer port is 9123.
  2. The spark.sql.catalog.fluss_catalog.bootstrap.servers configuration is used to discover all nodes within the Fluss cluster. It can be set with one or more (up to three) Fluss server addresses (either CoordinatorServer or TabletServer) separated by commas. :::

Creating a Database

USE fluss_catalog;
CREATE DATABASE fluss_db;
USE fluss_db;

Creating a Table

CREATE TABLE pk_table (
  shop_id BIGINT,
  user_id BIGINT,
  num_orders INT,
  total_amount INT
) TBLPROPERTIES (
  'primary.key' = 'shop_id,user_id',
  'bucket.num' = '4'
);

Data Writing

To append new data to a table, you can use INSERT INTO:

INSERT INTO pk_table VALUES
  (1234, 1234, 1, 1),
  (12345, 12345, 2, 2),
  (123456, 123456, 3, 3);

Data Reading

To retrieve data, you can use a SELECT statement:

SELECT * FROM pk_table ORDER BY shop_id;

To preview a subset of data from a log table with projection and filter:

SELECT shop_id, total_amount FROM pk_table WHERE num_orders > 1;

Type Conversion

Fluss's integration for Spark automatically converts between Spark and Fluss types.

Fluss -> Apache Spark

Fluss Spark
BOOLEAN BooleanType
TINYINT ByteType
SMALLINT ShortType
INT IntegerType
BIGINT LongType
FLOAT FloatType
DOUBLE DoubleType
CHAR CharType
STRING StringType
DECIMAL DecimalType
DATE DateType
TIMESTAMP TimestampNTZType
TIMESTAMP_LTZ TimestampType
BYTES BinaryType
ARRAY ArrayType
MAP MapType
ROW StructType

:::note The MAP type is currently supported for table creation and schema mapping, but read and write operations on MAP columns are not yet supported. Full MAP type read/write support will be available soon. :::

Apache Spark -> Fluss

Spark Fluss
BooleanType BOOLEAN
ByteType TINYINT
ShortType SMALLINT
IntegerType INT
LongType BIGINT
FloatType FLOAT
DoubleType DOUBLE
CharType CHAR
StringType STRING
VarcharType STRING
DecimalType DECIMAL
DateType DATE
TimestampType TIMESTAMP_LTZ
TimestampNTZType TIMESTAMP
BinaryType BYTES
ArrayType ARRAY
MapType MAP (read/write not yet supported)
StructType ROW