Skip to content

Marcinthecloud/r2dc_pyspark_examples

Repository files navigation

PySpark Examples and Utilities for R2 Data Catalog

A collection of Python scripts (examples) for managing Apache Iceberg tables on Cloudflare R2 Data Catalog using PySpark.

Features

  • Create: Create namespaces and tables from SQL files
  • Insert: Insert data from SQL files, CSV, or Parquet
  • JSON to Iceberg: Read JSON files from R2 and convert them to partitioned Iceberg tables with schema inference
  • Delete: Delete data with optional cleanup of unreferenced files
  • Drop: Drop tables or namespaces with optional purge of all files from R2
  • Orphan File Removal: Clean up old snapshots and orphan files for a single table
  • File Analysis: Analyze Iceberg table metadata to determine if compaction or re-partitioning is needed
  • Spark Config Example: Centralized Spark config example

Prerequisites

  • Python 3.8+
  • Java 8, 11, or 17 (required by PySpark)
  • Cloudflare R2 Data Catalog account

Installation

1. Install Java (if not already installed)

# macOS with Homebrew
brew install openjdk@17

# Add to PATH (add to ~/.zshrc or ~/.bash_profile)
export PATH="/opt/homebrew/opt/openjdk@17/bin:$PATH"

2. Install Python dependencies

pip install -r requirements.txt

Configuration

rename r2dc_spark_config.py.example to r2dc_spark_config.py and update the credentials:

WAREHOUSE = "your-warehouse-path"
TOKEN = "your-token"
ENDPOINT = "your-catalog-uri"

NOTE: if you want to cleanup/remove files, you'll need to configure S3 style access in the config:

S3_ACCESS_KEY_ID = "key"  
S3_SECRET_ACCESS_KEY = "secret"  
S3_ENDPOINT = "https://<account_id>.r2.cloudflarestorage.com/" 

Usage

Create Operations

Create a namespace:

python r2dc_create.py --namespace my_namespace

Create a table from SQL file:

python r2dc_create.py --sql-file create_table.sql

Example create_table.sql:

CREATE TABLE IF NOT EXISTS my_namespace.users (
    id INT,
    name STRING,
    email STRING,
    created_at TIMESTAMP
) PARTITIONED BY (days(created_at))

List all namespaces:

python r2dc_create.py --list-namespaces

List tables in a namespace:

python r2dc_create.py --list-tables my_namespace

Describe a table:

python r2dc_create.py --describe my_namespace.users

Insert Operations

Insert from SQL file:

python r2dc_insert.py --sql-file insert_data.sql

Example insert_data.sql:

INSERT INTO my_namespace.users VALUES
(1, 'Alice', 'alice@example.com', TIMESTAMP '2024-01-01 10:00:00'),
(2, 'Bob', 'bob@example.com', TIMESTAMP '2024-01-02 11:00:00')

Insert from CSV:

python r2dc_insert.py --csv-file data.csv --table my_namespace.users

Insert from Parquet:

python r2dc_insert.py --parquet-file data.parquet --table my_namespace.users

Insert with overwrite mode:

python r2dc_insert.py --csv-file data.csv --table my_namespace.users --mode overwrite

Show table data:

python r2dc_insert.py --show my_namespace.users --limit 20

JSON to Iceberg

Reads JSON files from an R2 bucket, infers the schema, and writes a partitioned Iceberg table. By default, tables are partitioned by days(__ingest_ts) for R2 SQL compatibility.

Basic usage — read all JSON from a bucket:

python r2dc_json_to_iceberg.py --bucket my-data --namespace analytics --table events

Read from a specific prefix (folder):

python r2dc_json_to_iceberg.py --bucket my-data --prefix logs/2026/03 --namespace analytics --table march_logs

Use an existing timestamp column for the ingest timestamp:

python r2dc_json_to_iceberg.py --bucket my-data --namespace analytics --table events --timestamp-col event_time

Custom partition key — identity partition by a column:

python r2dc_json_to_iceberg.py --bucket my-data --namespace analytics --table events --partition-by category

Custom partition key — partition by month instead of day:

python r2dc_json_to_iceberg.py --bucket my-data --namespace analytics --table events --partition-by "months(__ingest_ts)"

Multiple partition keys:

python r2dc_json_to_iceberg.py --bucket my-data --namespace analytics --table events \
  --partition-by "days(__ingest_ts)" --partition-by category

Hash bucket partition (good for high-cardinality columns):

python r2dc_json_to_iceberg.py --bucket my-data --namespace analytics --table events --partition-by "bucket(16, user_id)"

Partition by a nested JSON field (e.g. {"metadata": {"region": "us-east"}}):

python r2dc_json_to_iceberg.py --bucket my-data --namespace analytics --table events --partition-by metadata.region

Nested fields are automatically extracted to top-level columns (metadata.regionmetadata_region).

Append to an existing table:

python r2dc_json_to_iceberg.py --bucket my-data --namespace analytics --table events --mode append

Multi-line JSON files:

python r2dc_json_to_iceberg.py --bucket my-data --namespace analytics --table events --multiline

Supported partition expressions:

Expression Description
days(col) Time-based partition by day (R2 SQL compatible)
hours(col) Time-based partition by hour
months(col) Time-based partition by month
years(col) Time-based partition by year
bucket(n, col) Hash partition into n buckets
truncate(n, col) Truncate partition (width n)
col Identity partition (exact column value)
parent.child Nested field (extracted to parent_child)

Delete Operations

Delete all records:

python r2dc_delete.py --table my_namespace.users

Delete with WHERE clause:

python r2dc_delete.py --table my_namespace.users --where "id > 100"

Delete with cleanup (recommended):

python r2dc_delete.py --table my_namespace.users --cleanup

This will:

  1. Delete the data
  2. Expire old snapshots (default: 7 days)
  3. Remove orphan files (default: 3 days)

Delete with custom cleanup settings:

python r2dc_delete.py --table my_namespace.users --cleanup --expire-days 5 --orphan-days 2

Drop Operations

Drop table (removes from catalog, keeps files in storage):

python r2dc_drop.py --table my_namespace.users

Drop table and DELETE ALL FILES from storage:

python r2dc_drop.py --table my_namespace.users --purge

Drop namespace (must be empty):

python r2dc_drop.py --namespace my_namespace

Drop namespace and all tables in it:

python r2dc_drop.py --namespace my_namespace --cascade

Drop namespace, all tables, and DELETE ALL FILES:

python r2dc_drop.py --namespace my_namespace --cascade --purge-tables

Skip confirmation prompts (use with caution):

python r2dc_drop.py --table my_namespace.users --purge --force

IMPORTANT NOTES:

  • --purge operations DELETE ALL FILES from storage and CANNOT BE UNDONE
  • --purge requires S3 credentials to be configured in r2dc_spark_config.py
  • Without --purge, the table is removed from the catalog but files remain in storage
  • --purge-tables only works with --cascade for namespace operations
  • By default, you will be prompted to confirm destructive operations

Orphan File Removal Operations

The orphan file removal script performs maintenance on a single table by expiring old snapshots and removing orphan files that are no longer referenced by any snapshot.

Clean up a table with default settings:

python r2dc_orphan_file_removal.py pokegraph.pokemon

This will:

  1. Expire snapshots older than 7 days
  2. Remove orphan files older than 3 days

Clean up with custom retention periods:

python r2dc_orphan_file_removal.py pokegraph.pokemon 5 2

This will:

  1. Expire snapshots older than 5 days
  2. Remove orphan files older than 2 days

Usage:

python r2dc_orphan_file_removal.py <namespace.table> [expire_days] [orphan_days]

Parameters:

  • namespace.table: Required. Fully qualified table name
  • expire_days: Optional. Default 7. Expire snapshots older than N days
  • orphan_days: Optional. Default 3. Remove orphan files older than N days

IMPORTANT NOTES:

  • The script only deletes data older than the specified days, not within that time period
  • For example, with expire_days=7, only snapshots created MORE than 7 days ago are expired
  • All snapshots and files within the retention window are kept safe
  • This operation requires S3 credentials to be configured in r2dc_spark_config.py
  • Run this periodically to keep storage costs down by removing unused files

File Analysis Operations

The file analysis script inspects Iceberg table metadata (snapshots, manifests, data files, partitions) and produces a compaction health report with GREEN/YELLOW/RED scores and actionable recommendations.

Analyze a single table:

python r2dc_file_analysis.py my_namespace.my_table

This will output:

  • Table overview (file count, data size, records, partitions, snapshots, manifests)
  • File size distribution (min, p10, median, avg, p90, max, std dev)
  • Files per partition breakdown
  • Small file analysis distinguishing compactable vs non-compactable files
  • Over-partitioning detection
  • Metadata health (snapshot count, manifest stats)
  • Compaction scoreboard with GREEN/YELLOW/RED ratings
  • Actionable recommendations

Analyze and save detailed results to JSON:

python r2dc_file_analysis.py my_namespace.my_table --json analysis_results.json

Analyze all tables in a namespace:

python r2dc_file_analysis.py --all my_namespace

Specify a custom target file size (default: 128 MB):

python r2dc_file_analysis.py my_namespace.my_table --target-file-size 64

Disable colored output (for piping or logging):

python r2dc_file_analysis.py my_namespace.my_table --no-color

Usage:

python r2dc_file_analysis.py <namespace.table> [--json PATH] [--target-file-size MB] [--no-color]
python r2dc_file_analysis.py --all <namespace> [--target-file-size MB] [--no-color]

What the scores mean:

Score Data Compaction Metadata Compaction
GREEN No compaction needed Metadata is healthy
YELLOW Consider running compaction Consider expiring snapshots
RED Compaction recommended Expire snapshots and rewrite manifests

File Structure

pyspark/
├── r2dc_spark_config.py          # Shared Spark configuration
├── r2dc_create.py                # Create namespaces and tables
├── r2dc_insert.py                # Insert operations
├── r2dc_json_to_iceberg.py       # Convert JSON files in R2 to Iceberg tables
├── r2dc_delete.py                # Delete operations with cleanup
├── r2dc_drop.py                  # Drop tables/namespaces with optional purge to cleanup files
├── r2dc_orphan_file_removal.py   # Orphan file cleanup for single table
├── r2dc_file_analysis.py         # Iceberg file analysis & compaction advisor
├── requirements.txt              # Python dependencies
└── README.md                     # This file

Write Modes

When inserting data, you can specify different modes:

  • append (default): Add new data to existing table
  • overwrite: Replace all existing data
  • error: Throw error if table exists
  • ignore: Silently ignore if table exists

Cleanup Operations

The delete script includes several cleanup operations:

  1. Expire Snapshots: Marks old snapshots for deletion
  2. Remove Orphan Files: Removes data files no longer referenced by any snapshot
  3. Cleanup Metadata: Removes old metadata files

Examples

Complete Workflow Example

# 1. Create namespace
python r2dc_create.py --namespace sales_data

# 2. Create table from SQL file
python r2dc_create.py --sql-file tables/sales.sql

# 3. Insert data from CSV
python r2dc_insert.py --csv-file data/sales_2024.csv --table sales_data.sales

# 4. Verify data
python r2dc_insert.py --show sales_data.sales --limit 10

# 5. Delete old records with cleanup
python r2dc_delete.py --table sales_data.sales --where "date < '2024-01-01'" --cleanup

# 6. Check if table needs compaction or maintenance
python r2dc_file_analysis.py sales_data.sales

# 7. Perform maintenance to clean up old snapshots and orphan files
python r2dc_orphan_file_removal.py sales_data.sales

# 8. Drop table and remove all files from storage (when done)
python r2dc_drop.py --table sales_data.sales --purge

Additional Resources

About

PySpark examples for managing Apache Iceberg tables on Cloudflare R2 Data Catalog

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages