Skip to content

Commit a91e2f3

Browse files
gangtaoyokofly
andauthored
Docs/iceberg example (#1077)
Co-authored-by: yokofly <[email protected]>
1 parent fd48285 commit a91e2f3

File tree

6 files changed

+415
-1
lines changed

6 files changed

+415
-1
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ cmake-build-*
6969
*.pyc
7070
__pycache__
7171
*.pytest_cache
72+
.ipynb_checkpoints/
7273

7374
test.cpp
7475
CPackConfig.cmake
@@ -161,4 +162,3 @@ website/package-lock.json
161162
/tests/stream/.status
162163
.gdb_history
163164
report_*.html
164-

examples/iceberg/README.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
2+
This demo shows how to read and write Iceberg tables using Proton.
3+
4+
Quick start:
5+
1. `cd examples/iceberg`
6+
2. `docker compose up -d`
7+
3. Open `http://localhost:8888` and run `notebooks/IcebergPythonTest.ipynb` to create and populate the Iceberg table.
8+
4. Run the SQL script from your host:
9+
`docker compose exec -T proton proton-client --multiquery --user proton --password 'proton@t+' < script/proton.sql`
10+
5. Optional (interactive): `docker compose exec proton proton-client --user proton --password 'proton@t+'`
11+
6. Re-run the notebook cell that reads the Iceberg table to see the new rows.
12+
13+
Notes:
14+
- `storage_endpoint` stays as `s3://warehouse/` because HTTP endpoints are rewritten to `s3://<host>/...` and would point at the wrong bucket.
15+
- Jupyter binds to `127.0.0.1`; change the port mapping if you need remote access.
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
url_scheme_mappers:
2+
s3:
3+
to: http://minio:9000/{bucket}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
services:
2+
# Iceberg REST Catalog
3+
iceberg-rest:
4+
image: tabulario/iceberg-rest:1.6.0
5+
container_name: iceberg-rest
6+
ports:
7+
- "8181:8181"
8+
environment:
9+
- CATALOG_WAREHOUSE=s3://warehouse/
10+
- CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
11+
- CATALOG_S3_ENDPOINT=http://minio:9000
12+
- CATALOG_S3_ACCESS__KEY__ID=admin
13+
- CATALOG_S3_SECRET__ACCESS__KEY=Password!
14+
- CATALOG_S3_PATH__STYLE__ACCESS=true
15+
- AWS_REGION=us-east-1
16+
- CATALOG_S3_REGION=us-east-1
17+
- CATALOG_URI__MAPPING=s3://warehouse/:http://minio:9000/warehouse/
18+
depends_on:
19+
- minio
20+
21+
# MinIO for storage
22+
minio:
23+
image: minio/minio:RELEASE.2025-09-07T16-13-09Z
24+
container_name: minio
25+
ports:
26+
- "9000:9000"
27+
- "9001:9001"
28+
environment:
29+
MINIO_ROOT_USER: admin
30+
MINIO_ROOT_PASSWORD: Password!
31+
command: server /data --console-address ":9001"
32+
volumes:
33+
- minio-data:/data
34+
35+
minio-setup:
36+
image: minio/mc:RELEASE.2025-08-13T08-35-41Z
37+
depends_on:
38+
- minio
39+
entrypoint: >
40+
/bin/sh -c "
41+
until /usr/bin/mc alias set myminio http://minio:9000 admin Password!; do
42+
echo 'Waiting for MinIO...';
43+
sleep 2;
44+
done;
45+
/usr/bin/mc mb --ignore-existing myminio/warehouse;
46+
"
47+
48+
proton:
49+
image: d.timeplus.com/timeplus-io/proton:3.0.12
50+
ports:
51+
- 8002:8000
52+
- 8463:8463
53+
- 8123:8123
54+
- 3218:3218
55+
environment:
56+
- ENABLE_DATA_PREALLOCATE=false
57+
volumes:
58+
- proton_data:/var/lib/proton/
59+
- ./conf/proton-server.yaml:/etc/proton-server/config.d/proton-server.yaml:ro
60+
61+
# Jupyter Notebook
62+
jupyter:
63+
image: jupyter/pyspark-notebook:python-3.11.6
64+
container_name: jupyter-iceberg
65+
ports:
66+
- "127.0.0.1:8888:8888"
67+
environment:
68+
- JUPYTER_ENABLE_LAB=yes
69+
- GRANT_SUDO=yes
70+
volumes:
71+
- ./notebooks:/home/jovyan/work
72+
command: start-notebook.sh --NotebookApp.token='' --NotebookApp.password=''
73+
depends_on:
74+
- iceberg-rest
75+
- minio
76+
user: root
77+
working_dir: /home/jovyan/work
78+
79+
volumes:
80+
minio-data:
81+
proton_data:
Lines changed: 269 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,269 @@
1+
{
2+
"cells": [
3+
{
4+
"cell_type": "code",
5+
"execution_count": null,
6+
"id": "0a76426d-183d-404d-967f-4232388daa15",
7+
"metadata": {},
8+
"outputs": [],
9+
"source": [
10+
"!pip install pyiceberg[s3fs,pandas,pyarrow] boto3"
11+
]
12+
},
13+
{
14+
"cell_type": "code",
15+
"execution_count": null,
16+
"id": "bbf55392-3e7d-446f-b0d3-8e2b832c53ba",
17+
"metadata": {},
18+
"outputs": [],
19+
"source": [
20+
"import os\n",
21+
"from pyiceberg.catalog import load_catalog\n",
22+
"import pandas as pd\n",
23+
"import pyarrow as pa\n",
24+
"from datetime import datetime\n",
25+
"\n",
26+
"os.environ['PYICEBERG_DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE'] = 'true'"
27+
]
28+
},
29+
{
30+
"cell_type": "code",
31+
"execution_count": null,
32+
"id": "122dc45e-1d7c-4baf-93fd-020ab46af43d",
33+
"metadata": {},
34+
"outputs": [],
35+
"source": [
36+
"# Connect to Iceberg REST Catalog\n",
37+
"catalog = load_catalog(\n",
38+
" \"rest\",\n",
39+
" **{\n",
40+
" \"uri\": \"http://iceberg-rest:8181\",\n",
41+
" \"s3.endpoint\": \"http://minio:9000\",\n",
42+
" \"s3.access-key-id\": \"admin\",\n",
43+
" \"s3.secret-access-key\": \"Password!\",\n",
44+
" \"s3.path-style-access\": \"true\",\n",
45+
" \"s3.region\": \"us-east-1\" \n",
46+
" }\n",
47+
")\n",
48+
"\n",
49+
"print(\"Connected to Iceberg REST Catalog!\")\n",
50+
"print(f\"Catalog properties: {catalog.properties}\")"
51+
]
52+
},
53+
{
54+
"cell_type": "code",
55+
"execution_count": null,
56+
"id": "0a2c077b-e369-4843-b854-f2e7d0c5f059",
57+
"metadata": {},
58+
"outputs": [],
59+
"source": [
60+
"# Create Namespace (Database)\n",
61+
"try:\n",
62+
" catalog.create_namespace(\"demo\")\n",
63+
" print(\"Created namespace: demo\")\n",
64+
"except Exception as e:\n",
65+
" print(f\"Namespace may already exist: {e}\")\n",
66+
"\n",
67+
"# List namespaces\n",
68+
"print(\"\\nAvailable namespaces:\", catalog.list_namespaces())"
69+
]
70+
},
71+
{
72+
"cell_type": "code",
73+
"execution_count": null,
74+
"id": "9dcfe97d-557d-47ad-a968-4d621fec4dab",
75+
"metadata": {},
76+
"outputs": [],
77+
"source": [
78+
"# Drop existing table\n",
79+
"try:\n",
80+
" catalog.drop_table(\"demo.events\")\n",
81+
" print(\"Dropped existing table\")\n",
82+
"except:\n",
83+
" pass"
84+
]
85+
},
86+
{
87+
"cell_type": "code",
88+
"execution_count": null,
89+
"id": "58e22720-7276-465e-a9fe-9e92728c22ee",
90+
"metadata": {},
91+
"outputs": [],
92+
"source": [
93+
"# Create Table Schema\n",
94+
"from pyiceberg.schema import Schema\n",
95+
"from pyiceberg.types import (\n",
96+
" NestedField,\n",
97+
" StringType,\n",
98+
" DoubleType,\n",
99+
" TimestamptzType, # NOTE, timeplus use timestamptz for timestamp\n",
100+
" LongType\n",
101+
")\n",
102+
"\n",
103+
"schema = Schema(\n",
104+
" NestedField(1, \"id\", LongType(), required=False), # \u2190 Changed to LongType and optional\n",
105+
" NestedField(2, \"timestamp\", TimestamptzType(), required=False), # \u2190 Made optional\n",
106+
" NestedField(3, \"user_id\", StringType(), required=False), # \u2190 Made optional\n",
107+
" NestedField(4, \"event_type\", StringType(), required=False), # \u2190 Made optional\n",
108+
" NestedField(5, \"value\", DoubleType(), required=False),\n",
109+
")\n",
110+
"\n",
111+
"# Create table\n",
112+
"try:\n",
113+
" table = catalog.create_table(\n",
114+
" identifier=\"demo.events\",\n",
115+
" schema=schema,\n",
116+
" )\n",
117+
" print(\"Created table: demo.events\")\n",
118+
"except Exception as e:\n",
119+
" print(f\"Table may already exist: {e}\")\n",
120+
" table = catalog.load_table(\"demo.events\")\n",
121+
"\n",
122+
"print(f\"\\nTable schema:\\n{table.schema()}\")"
123+
]
124+
},
125+
{
126+
"cell_type": "code",
127+
"execution_count": null,
128+
"id": "c29ce832-c14a-4ed6-b84a-1d90aaf20466",
129+
"metadata": {},
130+
"outputs": [],
131+
"source": [
132+
"# Write Data\n",
133+
"\n",
134+
"data = pd.DataFrame({\n",
135+
" \"id\": [1, 2, 3, 4, 5],\n",
136+
" \"timestamp\": pd.date_range(\"2024-01-01\", periods=5, freq=\"H\", tz='UTC'),\n",
137+
" \"user_id\": [\"user_1\", \"user_2\", \"user_1\", \"user_3\", \"user_2\"],\n",
138+
" \"event_type\": [\"login\", \"click\", \"purchase\", \"login\", \"click\"],\n",
139+
" \"value\": [None, 10.5, 99.99, None, 25.0]\n",
140+
"})\n",
141+
"\n",
142+
"# Convert to PyArrow table (PyArrow will handle the precision automatically)\n",
143+
"arrow_table = pa.Table.from_pandas(data)\n",
144+
"print(\"Writing data to Iceberg table...\")\n",
145+
"table.append(arrow_table)\n",
146+
"print(\"\u2713 Data written successfully!\")"
147+
]
148+
},
149+
{
150+
"cell_type": "code",
151+
"execution_count": null,
152+
"id": "6323def1-1b7b-4db1-a86d-b42518604008",
153+
"metadata": {},
154+
"outputs": [],
155+
"source": [
156+
"# Read Data\n",
157+
"\n",
158+
"from pyiceberg.table import TableProperties\n",
159+
"table = catalog.load_table(\"demo.events\")\n",
160+
"\n",
161+
"with table.transaction() as txn:\n",
162+
" txn.set_properties(\n",
163+
" **{TableProperties.DEFAULT_NAME_MAPPING: table.metadata.schema().name_mapping.model_dump_json()}\n",
164+
" )\n",
165+
"\n",
166+
"print(f\"name-mapping: {table.metadata.name_mapping()}\")\n",
167+
"\n",
168+
"print(\"Reading data from table...\")\n",
169+
"df = table.scan().to_pandas()\n",
170+
"print(f\"\\nTable has {len(df)} rows:\\n\")\n",
171+
"print(df)"
172+
]
173+
},
174+
{
175+
"cell_type": "code",
176+
"execution_count": null,
177+
"id": "123c47a4-c93c-4ab8-965f-85ee406788cb",
178+
"metadata": {},
179+
"outputs": [],
180+
"source": [
181+
"# Query with Filters\n",
182+
"print(\"\\n--- Filtering: event_type = 'login' ---\")\n",
183+
"df_filtered = table.scan(\n",
184+
" row_filter=\"event_type == 'login'\"\n",
185+
").to_pandas()\n",
186+
"print(df_filtered)\n",
187+
"\n",
188+
"print(\"\\n--- Filtering: value > 20 ---\")\n",
189+
"df_filtered2 = table.scan(\n",
190+
" row_filter=\"value > 20\"\n",
191+
").to_pandas()\n",
192+
"print(df_filtered2)"
193+
]
194+
},
195+
{
196+
"cell_type": "code",
197+
"execution_count": null,
198+
"id": "8720cd3e-a33d-44ab-b6ba-74babf526890",
199+
"metadata": {},
200+
"outputs": [],
201+
"source": [
202+
"# Append More Data\n",
203+
"new_data = pd.DataFrame({\n",
204+
" \"id\": [6, 7, 8],\n",
205+
" \"timestamp\": pd.date_range(\"2024-01-01 05:00:00\", periods=3, freq=\"H\", tz='UTC'),\n",
206+
" \"user_id\": [\"user_1\", \"user_4\", \"user_2\"],\n",
207+
" \"event_type\": [\"logout\", \"login\", \"purchase\"],\n",
208+
" \"value\": [None, None, 149.99]\n",
209+
"})\n",
210+
"\n",
211+
"\n",
212+
"# Convert to PyArrow table\n",
213+
"arrow_new_data = pa.Table.from_pandas(new_data)\n",
214+
"\n",
215+
"print(\"Appending more data...\")\n",
216+
"table.append(arrow_new_data)\n",
217+
"print(\"\u2713 Data appended!\")\n",
218+
"\n",
219+
"# Read updated data\n",
220+
"df_updated = table.scan().to_pandas()\n",
221+
"print(f\"\\nTable now has {len(df_updated)} rows\")\n",
222+
"print(df_updated)"
223+
]
224+
},
225+
{
226+
"cell_type": "code",
227+
"execution_count": null,
228+
"id": "60253f68-a62a-40bc-9988-17b0fb72387b",
229+
"metadata": {},
230+
"outputs": [],
231+
"source": [
232+
"# Table History & Time Travel\n",
233+
"print(\"--- Table History ---\")\n",
234+
"snapshots = table.metadata.snapshots\n",
235+
"for snapshot in snapshots:\n",
236+
" print(f\"Snapshot ID: {snapshot.snapshot_id}, Timestamp: {snapshot.timestamp_ms}\")\n",
237+
"\n",
238+
"# Time travel - read data as of first snapshot\n",
239+
"if len(snapshots) >= 2:\n",
240+
" first_snapshot_id = snapshots[0].snapshot_id\n",
241+
" print(f\"\\n--- Time Travel to Snapshot {first_snapshot_id} ---\")\n",
242+
" df_historical = table.scan(snapshot_id=first_snapshot_id).to_pandas()\n",
243+
" print(f\"Historical data ({len(df_historical)} rows):\")\n",
244+
" print(df_historical)"
245+
]
246+
}
247+
],
248+
"metadata": {
249+
"kernelspec": {
250+
"display_name": "Python 3 (ipykernel)",
251+
"language": "python",
252+
"name": "python3"
253+
},
254+
"language_info": {
255+
"codemirror_mode": {
256+
"name": "ipython",
257+
"version": 3
258+
},
259+
"file_extension": ".py",
260+
"mimetype": "text/x-python",
261+
"name": "python",
262+
"nbconvert_exporter": "python",
263+
"pygments_lexer": "ipython3",
264+
"version": "3.11.6"
265+
}
266+
},
267+
"nbformat": 4,
268+
"nbformat_minor": 5
269+
}

0 commit comments

Comments
 (0)