-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathr2dc_spark_config.py.example
More file actions
61 lines (52 loc) · 2.86 KB
/
r2dc_spark_config.py.example
File metadata and controls
61 lines (52 loc) · 2.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
"""
R2 Data Catalog Spark Configuration
This module provides shared Spark session configuration for Apache Iceberg operations.
"""
from pyspark.sql import SparkSession
import os
# User Configuration - Update these values with your R2 Data Catalog credentials
WAREHOUSE = "<your-account-id>_<your-warehouse-name>"
TOKEN = "<your-r2-catalog-token>"
ENDPOINT = "https://catalog.cloudflarestorage.com/<your-account-id>/<your-warehouse-name>"
# Optional S3 Configuration (for orphan file cleanup and direct S3 access)
# If not provided, vended credentials will be used (some operations like orphan file removal may not work)
S3_ACCESS_KEY_ID = None
S3_SECRET_ACCESS_KEY = None
S3_ENDPOINT = None #IE "https://<account-id>.r2.cloudflarestorage.com"
# SSL Certificate Configuration (for Cloudflare WARP users)
def get_spark_session(app_name="R2DataCatalogApp"):
"""
Creates and returns a Spark session configured for R2 Data Catalog with Apache Iceberg.
Args:
app_name (str): Name of the Spark application
Returns:
SparkSession: Configured Spark session
"""
builder = SparkSession.builder \
.appName(app_name) \
.config('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1,org.apache.iceberg:iceberg-aws-bundle:1.6.1,org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.12.262') \
.config("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.catalog.r2dc", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.r2dc.type", "rest") \
.config("spark.sql.catalog.r2dc.uri", ENDPOINT) \
.config("spark.sql.catalog.r2dc.warehouse", WAREHOUSE) \
.config("spark.sql.catalog.r2dc.token", TOKEN) \
.config("spark.sql.catalog.r2dc.header.X-Iceberg-Access-Delegation", "vended-credentials") \
.config("spark.sql.catalog.r2dc.s3.remote-signing-enabled", "false") \
.config("spark.sql.defaultCatalog", "r2dc")
# Configure S3 credentials if provided (enables orphan file cleanup)
if S3_ACCESS_KEY_ID and S3_SECRET_ACCESS_KEY:
builder = builder \
.config("spark.hadoop.fs.s3a.access.key", S3_ACCESS_KEY_ID) \
.config("spark.hadoop.fs.s3a.secret.key", S3_SECRET_ACCESS_KEY)
if S3_ENDPOINT:
builder = builder \
.config("spark.hadoop.fs.s3a.endpoint", S3_ENDPOINT) \
.config("spark.hadoop.fs.s3a.path.style.access", "true")
else:
print("S3 credentials not configured - using vended credentials (orphan file cleanup may not work)")
spark = builder.getOrCreate()
spark.sql("USE r2dc")
return spark