|
| 1 | +--- |
| 2 | +title: Spark (PySpark) |
| 3 | +pcx_content_type: example |
| 4 | +--- |
| 5 | + |
| 6 | +Below is an example of using [PySpark](https://spark.apache.org/docs/latest/api/python/index.html) to connect to R2 Data Catalog. |
| 7 | + |
| 8 | +## Prerequisites |
| 9 | + |
| 10 | +- Sign up for a [Cloudflare account](https://dash.cloudflare.com/sign-up/workers-and-pages). |
| 11 | +- [Create an R2 bucket](/r2/buckets/create-buckets/) and [enable the data catalog](/r2/data-catalog/manage-catalogs/#enable-r2-data-catalog-on-a-bucket). |
| 12 | +- [Create an R2 API token](/r2/api/tokens/) with both [R2 and data catalog permissions](/r2/api/tokens/#permissions). |
| 13 | +- Install the [PySpark](https://spark.apache.org/docs/latest/api/python/getting_started/install.html) library. |
| 14 | + |
| 15 | +## Example usage |
| 16 | + |
| 17 | +```py |
| 18 | +from pyspark.sql import SparkSession |
| 19 | + |
| 20 | +# Define catalog connection details (replace variables) |
| 21 | +WAREHOUSE = "<WAREHOUSE>" |
| 22 | +TOKEN = "<TOKEN>" |
| 23 | +CATALOG_URI = "<CATALOG_URI>" |
| 24 | + |
| 25 | +# Build Spark session with Iceberg configurations |
| 26 | +spark = SparkSession.builder \ |
| 27 | + .appName("R2DataCatalogExample") \ |
| 28 | + .config('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1,org.apache.iceberg:iceberg-aws-bundle:1.6.1') \ |
| 29 | + .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ |
| 30 | + .config("spark.sql.catalog.my_catalog", "org.apache.iceberg.spark.SparkCatalog") \ |
| 31 | + .config("spark.sql.catalog.my_catalog.type", "rest") \ |
| 32 | + .config("spark.sql.catalog.my_catalog.uri", CATALOG_URI) \ |
| 33 | + .config("spark.sql.catalog.my_catalog.warehouse", WAREHOUSE) \ |
| 34 | + .config("spark.sql.catalog.my_catalog.token", TOKEN) \ |
| 35 | + .config("spark.sql.catalog.my_catalog.header.X-Iceberg-Access-Delegation", "vended-credentials") \ |
| 36 | + .config("spark.sql.catalog.my_catalog.s3.remote-signing-enabled", "false") \ |
| 37 | + .config("spark.sql.defaultCatalog", "my_catalog") \ |
| 38 | + .getOrCreate() |
| 39 | +spark.sql("USE my_catalog") |
| 40 | + |
| 41 | +# Create namespace if it does not exist |
| 42 | +spark.sql("CREATE NAMESPACE IF NOT EXISTS default") |
| 43 | + |
| 44 | +# Create a table in the namespace using Iceberg |
| 45 | +spark.sql(""" |
| 46 | + CREATE TABLE IF NOT EXISTS default.my_table ( |
| 47 | + id BIGINT, |
| 48 | + name STRING |
| 49 | + ) |
| 50 | + USING iceberg |
| 51 | +""") |
| 52 | + |
| 53 | +# Create a simple DataFrame |
| 54 | +df = spark.createDataFrame( |
| 55 | + [(1, "Alice"), (2, "Bob"), (3, "Charlie")], |
| 56 | + ["id", "name"] |
| 57 | +) |
| 58 | + |
| 59 | +# Write the DataFrame to the Iceberg table |
| 60 | +df.write \ |
| 61 | + .format("iceberg") \ |
| 62 | + .mode("append") \ |
| 63 | + .save("default.my_table") |
| 64 | + |
| 65 | +# Read the data back from the Iceberg table |
| 66 | +result_df = spark.read \ |
| 67 | + .format("iceberg") \ |
| 68 | + .load("default.my_table") |
| 69 | + |
| 70 | +result_df.show() |
| 71 | +``` |
0 commit comments