Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ cmake-build-*
*.pyc
__pycache__
*.pytest_cache
.ipynb_checkpoints/

test.cpp
CPackConfig.cmake
Expand Down Expand Up @@ -161,4 +162,3 @@ website/package-lock.json
/tests/stream/.status
.gdb_history
report_*.html

15 changes: 15 additions & 0 deletions examples/iceberg/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@

This demo shows how to read and write Iceberg tables using Proton.

Quick start:
1. `cd examples/iceberg`
2. `docker compose up -d`
3. Open `http://localhost:8888` and run `notebooks/IcebergPythonTest.ipynb` to create and populate the Iceberg table.
4. Run the SQL script from your host:
`docker compose exec -T proton proton-client --multiquery --user proton --password 'proton@t+' < script/proton.sql`
5. Optional (interactive): `docker compose exec proton proton-client --user proton --password 'proton@t+'`
6. Re-run the notebook cell that reads the Iceberg table to see the new rows.

Notes:
- `storage_endpoint` stays as `s3://warehouse/` because HTTP endpoints are rewritten to `s3://<host>/...` and would point at the wrong bucket.
- Jupyter binds to `127.0.0.1`; change the port mapping if you need remote access.
3 changes: 3 additions & 0 deletions examples/iceberg/conf/proton-server.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
url_scheme_mappers:
s3:
to: http://minio:9000/{bucket}
81 changes: 81 additions & 0 deletions examples/iceberg/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
services:
# Iceberg REST Catalog
iceberg-rest:
image: tabulario/iceberg-rest:1.6.0
container_name: iceberg-rest
ports:
- "8181:8181"
environment:
- CATALOG_WAREHOUSE=s3://warehouse/
- CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
- CATALOG_S3_ENDPOINT=http://minio:9000
- CATALOG_S3_ACCESS__KEY__ID=admin
- CATALOG_S3_SECRET__ACCESS__KEY=Password!
- CATALOG_S3_PATH__STYLE__ACCESS=true
- AWS_REGION=us-east-1
- CATALOG_S3_REGION=us-east-1
- CATALOG_URI__MAPPING=s3://warehouse/:http://minio:9000/warehouse/
depends_on:
- minio

# MinIO for storage
minio:
image: minio/minio:RELEASE.2025-09-07T16-13-09Z
container_name: minio
ports:
- "9000:9000"
- "9001:9001"
environment:
MINIO_ROOT_USER: admin
MINIO_ROOT_PASSWORD: Password!
command: server /data --console-address ":9001"
volumes:
- minio-data:/data

minio-setup:
image: minio/mc:RELEASE.2025-08-13T08-35-41Z
depends_on:
- minio
entrypoint: >
/bin/sh -c "
until /usr/bin/mc alias set myminio http://minio:9000 admin Password!; do
echo 'Waiting for MinIO...';
sleep 2;
done;
/usr/bin/mc mb --ignore-existing myminio/warehouse;
"

proton:
image: d.timeplus.com/timeplus-io/proton:3.0.12
ports:
- 8002:8000
- 8463:8463
- 8123:8123
- 3218:3218
environment:
- ENABLE_DATA_PREALLOCATE=false
volumes:
- proton_data:/var/lib/proton/
- ./conf/proton-server.yaml:/etc/proton-server/config.d/proton-server.yaml:ro

# Jupyter Notebook
jupyter:
image: jupyter/pyspark-notebook:python-3.11.6
container_name: jupyter-iceberg
ports:
- "127.0.0.1:8888:8888"
environment:
- JUPYTER_ENABLE_LAB=yes
- GRANT_SUDO=yes
volumes:
- ./notebooks:/home/jovyan/work
command: start-notebook.sh --NotebookApp.token='' --NotebookApp.password=''
depends_on:
- iceberg-rest
- minio
user: root
working_dir: /home/jovyan/work

volumes:
minio-data:
proton_data:
269 changes: 269 additions & 0 deletions examples/iceberg/notebooks/IcebergPythonTest.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,269 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"id": "0a76426d-183d-404d-967f-4232388daa15",
"metadata": {},
"outputs": [],
"source": [
"!pip install pyiceberg[s3fs,pandas,pyarrow] boto3"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "bbf55392-3e7d-446f-b0d3-8e2b832c53ba",
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"from pyiceberg.catalog import load_catalog\n",
"import pandas as pd\n",
"import pyarrow as pa\n",
"from datetime import datetime\n",
"\n",
"os.environ['PYICEBERG_DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE'] = 'true'"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "122dc45e-1d7c-4baf-93fd-020ab46af43d",
"metadata": {},
"outputs": [],
"source": [
"# Connect to Iceberg REST Catalog\n",
"catalog = load_catalog(\n",
" \"rest\",\n",
" **{\n",
" \"uri\": \"http://iceberg-rest:8181\",\n",
" \"s3.endpoint\": \"http://minio:9000\",\n",
" \"s3.access-key-id\": \"admin\",\n",
" \"s3.secret-access-key\": \"Password!\",\n",
" \"s3.path-style-access\": \"true\",\n",
" \"s3.region\": \"us-east-1\" \n",
" }\n",
")\n",
"\n",
"print(\"Connected to Iceberg REST Catalog!\")\n",
"print(f\"Catalog properties: {catalog.properties}\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "0a2c077b-e369-4843-b854-f2e7d0c5f059",
"metadata": {},
"outputs": [],
"source": [
"# Create Namespace (Database)\n",
"try:\n",
" catalog.create_namespace(\"demo\")\n",
" print(\"Created namespace: demo\")\n",
"except Exception as e:\n",
" print(f\"Namespace may already exist: {e}\")\n",
"\n",
"# List namespaces\n",
"print(\"\\nAvailable namespaces:\", catalog.list_namespaces())"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "9dcfe97d-557d-47ad-a968-4d621fec4dab",
"metadata": {},
"outputs": [],
"source": [
"# Drop existing table\n",
"try:\n",
" catalog.drop_table(\"demo.events\")\n",
" print(\"Dropped existing table\")\n",
"except:\n",
" pass"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "58e22720-7276-465e-a9fe-9e92728c22ee",
"metadata": {},
"outputs": [],
"source": [
"# Create Table Schema\n",
"from pyiceberg.schema import Schema\n",
"from pyiceberg.types import (\n",
" NestedField,\n",
" StringType,\n",
" DoubleType,\n",
" TimestamptzType, # NOTE, timeplus use timestamptz for timestamp\n",
" LongType\n",
")\n",
"\n",
"schema = Schema(\n",
" NestedField(1, \"id\", LongType(), required=False), # \u2190 Changed to LongType and optional\n",
" NestedField(2, \"timestamp\", TimestamptzType(), required=False), # \u2190 Made optional\n",
" NestedField(3, \"user_id\", StringType(), required=False), # \u2190 Made optional\n",
" NestedField(4, \"event_type\", StringType(), required=False), # \u2190 Made optional\n",
" NestedField(5, \"value\", DoubleType(), required=False),\n",
")\n",
"\n",
"# Create table\n",
"try:\n",
" table = catalog.create_table(\n",
" identifier=\"demo.events\",\n",
" schema=schema,\n",
" )\n",
" print(\"Created table: demo.events\")\n",
"except Exception as e:\n",
" print(f\"Table may already exist: {e}\")\n",
" table = catalog.load_table(\"demo.events\")\n",
"\n",
"print(f\"\\nTable schema:\\n{table.schema()}\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "c29ce832-c14a-4ed6-b84a-1d90aaf20466",
"metadata": {},
"outputs": [],
"source": [
"# Write Data\n",
"\n",
"data = pd.DataFrame({\n",
" \"id\": [1, 2, 3, 4, 5],\n",
" \"timestamp\": pd.date_range(\"2024-01-01\", periods=5, freq=\"H\", tz='UTC'),\n",
" \"user_id\": [\"user_1\", \"user_2\", \"user_1\", \"user_3\", \"user_2\"],\n",
" \"event_type\": [\"login\", \"click\", \"purchase\", \"login\", \"click\"],\n",
" \"value\": [None, 10.5, 99.99, None, 25.0]\n",
"})\n",
"\n",
"# Convert to PyArrow table (PyArrow will handle the precision automatically)\n",
"arrow_table = pa.Table.from_pandas(data)\n",
"print(\"Writing data to Iceberg table...\")\n",
"table.append(arrow_table)\n",
"print(\"\u2713 Data written successfully!\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "6323def1-1b7b-4db1-a86d-b42518604008",
"metadata": {},
"outputs": [],
"source": [
"# Read Data\n",
"\n",
"from pyiceberg.table import TableProperties\n",
"table = catalog.load_table(\"demo.events\")\n",
"\n",
"with table.transaction() as txn:\n",
" txn.set_properties(\n",
" **{TableProperties.DEFAULT_NAME_MAPPING: table.metadata.schema().name_mapping.model_dump_json()}\n",
" )\n",
"\n",
"print(f\"name-mapping: {table.metadata.name_mapping()}\")\n",
"\n",
"print(\"Reading data from table...\")\n",
"df = table.scan().to_pandas()\n",
"print(f\"\\nTable has {len(df)} rows:\\n\")\n",
"print(df)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "123c47a4-c93c-4ab8-965f-85ee406788cb",
"metadata": {},
"outputs": [],
"source": [
"# Query with Filters\n",
"print(\"\\n--- Filtering: event_type = 'login' ---\")\n",
"df_filtered = table.scan(\n",
" row_filter=\"event_type == 'login'\"\n",
").to_pandas()\n",
"print(df_filtered)\n",
"\n",
"print(\"\\n--- Filtering: value > 20 ---\")\n",
"df_filtered2 = table.scan(\n",
" row_filter=\"value > 20\"\n",
").to_pandas()\n",
"print(df_filtered2)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "8720cd3e-a33d-44ab-b6ba-74babf526890",
"metadata": {},
"outputs": [],
"source": [
"# Append More Data\n",
"new_data = pd.DataFrame({\n",
" \"id\": [6, 7, 8],\n",
" \"timestamp\": pd.date_range(\"2024-01-01 05:00:00\", periods=3, freq=\"H\", tz='UTC'),\n",
" \"user_id\": [\"user_1\", \"user_4\", \"user_2\"],\n",
" \"event_type\": [\"logout\", \"login\", \"purchase\"],\n",
" \"value\": [None, None, 149.99]\n",
"})\n",
"\n",
"\n",
"# Convert to PyArrow table\n",
"arrow_new_data = pa.Table.from_pandas(new_data)\n",
"\n",
"print(\"Appending more data...\")\n",
"table.append(arrow_new_data)\n",
"print(\"\u2713 Data appended!\")\n",
"\n",
"# Read updated data\n",
"df_updated = table.scan().to_pandas()\n",
"print(f\"\\nTable now has {len(df_updated)} rows\")\n",
"print(df_updated)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "60253f68-a62a-40bc-9988-17b0fb72387b",
"metadata": {},
"outputs": [],
"source": [
"# Table History & Time Travel\n",
"print(\"--- Table History ---\")\n",
"snapshots = table.metadata.snapshots\n",
"for snapshot in snapshots:\n",
" print(f\"Snapshot ID: {snapshot.snapshot_id}, Timestamp: {snapshot.timestamp_ms}\")\n",
"\n",
"# Time travel - read data as of first snapshot\n",
"if len(snapshots) >= 2:\n",
" first_snapshot_id = snapshots[0].snapshot_id\n",
" print(f\"\\n--- Time Travel to Snapshot {first_snapshot_id} ---\")\n",
" df_historical = table.scan(snapshot_id=first_snapshot_id).to_pandas()\n",
" print(f\"Historical data ({len(df_historical)} rows):\")\n",
" print(df_historical)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.6"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Loading
Loading