Skip to content

Commit aa59971

Browse files
ENH: Implement pandas.read_iceberg
1 parent cf6de58 commit aa59971

16 files changed

+332
-1
lines changed

doc/source/getting_started/install.rst

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,7 @@ Dependency Minimum Versi
299299
Other data sources
300300
^^^^^^^^^^^^^^^^^^
301301

302-
Installable with ``pip install "pandas[hdf5, parquet, feather, spss, excel]"``
302+
Installable with ``pip install "pandas[hdf5, parquet, iceberg, feather, spss, excel]"``
303303

304304
====================================================== ================== ================ ==========================================================
305305
Dependency Minimum Version pip extra Notes
@@ -309,6 +309,7 @@ Dependency Minimum Version pip ex
309309
`zlib <https://github.com/madler/zlib>`__ hdf5 Compression for HDF5
310310
`fastparquet <https://github.com/dask/fastparquet>`__ 2024.2.0 - Parquet reading / writing (pyarrow is default)
311311
`pyarrow <https://github.com/apache/arrow>`__ 10.0.1 parquet, feather Parquet, ORC, and feather reading / writing
312+
`PyIceberg <https://py.iceberg.apache.org/>`__ 0.6.1 iceberg Apache Iceberg reading
312313
`pyreadstat <https://github.com/Roche/pyreadstat>`__ 1.2.6 spss SPSS files (.sav) reading
313314
`odfpy <https://github.com/eea/odfpy>`__ 1.4.1 excel Open document format (.odf, .ods, .odt) reading / writing
314315
====================================================== ================== ================ ==========================================================

doc/source/reference/io.rst

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,12 @@ Parquet
156156
read_parquet
157157
DataFrame.to_parquet
158158

159+
Iceberg
160+
~~~~~~~
161+
.. autosummary::
162+
:toctree: api/
163+
read_iceberg
164+
159165
ORC
160166
~~~
161167
.. autosummary::

doc/source/user_guide/io.rst

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ The pandas I/O API is a set of top level ``reader`` functions accessed like
2929
binary,`HDF5 Format <https://support.hdfgroup.org/HDF5/whatishdf5.html>`__, :ref:`read_hdf<io.hdf5>`, :ref:`to_hdf<io.hdf5>`
3030
binary,`Feather Format <https://github.com/wesm/feather>`__, :ref:`read_feather<io.feather>`, :ref:`to_feather<io.feather>`
3131
binary,`Parquet Format <https://parquet.apache.org/>`__, :ref:`read_parquet<io.parquet>`, :ref:`to_parquet<io.parquet>`
32+
binary,`Apache Iceberg <https://iceberg.apache.org/>`__, :ref:`read_iceberg<io.iceberg>` , NA
3233
binary,`ORC Format <https://orc.apache.org/>`__, :ref:`read_orc<io.orc>`, :ref:`to_orc<io.orc>`
3334
binary,`Stata <https://en.wikipedia.org/wiki/Stata>`__, :ref:`read_stata<io.stata_reader>`, :ref:`to_stata<io.stata_writer>`
3435
binary,`SAS <https://en.wikipedia.org/wiki/SAS_(software)>`__, :ref:`read_sas<io.sas_reader>` , NA
@@ -5403,6 +5404,102 @@ The above example creates a partitioned dataset that may look like:
54035404
except OSError:
54045405
pass
54055406
5407+
.. _io.iceberg:
5408+
5409+
Iceberg
5410+
-------
5411+
5412+
.. versionadded:: 3.0.0
5413+
5414+
Apache Iceberg is a high performance open-source format for large analytic tables.
5415+
Iceberg enables the use of SQL tables for big data while making it possible for different
5416+
engines to safely work with the same tables at the same time.
5417+
5418+
Iceberg support predicate pushdown and column pruning, which are available to pandas
5419+
users via the ``row_filter`` and ``selected_fields`` parameters of the :func:`~pandas.read_iceberg`
5420+
function. This is convenient to extract from large tables a subset that fits in memory asa
5421+
pandas ``DataFrame``.
5422+
5423+
Internally, pandas uses PyIceberg_ to query Iceberg.
5424+
5425+
.. _PyIceberg: https://py.iceberg.apache.org/
5426+
5427+
A simple example loading all data from an Iceberg table ``my_table`` defined in the
5428+
``my_catalog`` catalog.
5429+
5430+
.. code-block:: python
5431+
5432+
df = pd.read_iceberg("my_table", catalog_name="my_catalog")
5433+
5434+
Catalogs must be defined in the ``.pyiceberg.yaml`` file, usually in the home directory.
5435+
It is possible to to change properties of the catalog definition with the
5436+
``catalog_properties`` parameter:
5437+
5438+
.. code-block:: python
5439+
5440+
df = pd.read_iceberg(
5441+
"my_table",
5442+
catalog_name="my_catalog",
5443+
catalog_properties={"s3.secret-access-key": "my_secret"},
5444+
)
5445+
5446+
It is also possible to fully specify the catalog in ``catalog_properties`` and not provide
5447+
a ``catalog_name``:
5448+
5449+
.. code-block:: python
5450+
5451+
df = pd.read_iceberg(
5452+
"my_table",
5453+
catalog_properties={
5454+
"uri": "http://127.0.0.1:8181",
5455+
"s3.endpoint": "http://127.0.0.1:9000",
5456+
},
5457+
)
5458+
5459+
To create the ``DataFrame`` with only a subset of the columns:
5460+
5461+
.. code-block:: python
5462+
5463+
df = pd.read_iceberg(
5464+
"my_table",
5465+
catalog_name="my_catalog",
5466+
selected_fields=["my_column_3", "my_column_7"]
5467+
)
5468+
5469+
This will execute the function faster, since other columns won't be read. And it will also
5470+
save memory, since the data from other columns won't be loaded into the underlying memory of
5471+
the ``DataFrame``.
5472+
5473+
To fetch only a subset of the rows, we can do it with the ``limit`` parameter:
5474+
5475+
.. code-block:: python
5476+
5477+
df = pd.read_iceberg(
5478+
"my_table",
5479+
catalog_name="my_catalog",
5480+
limit=100,
5481+
)
5482+
5483+
This will create a ``DataFrame`` with 100 rows, assuming there are at least this number in
5484+
the table.
5485+
5486+
To fetch a subset of the rows based on a condition, this can be done using the ``row_filter``
5487+
parameter:
5488+
5489+
.. code-block:: python
5490+
5491+
df = pd.read_iceberg(
5492+
"my_table",
5493+
catalog_name="my_catalog",
5494+
row_filter="distance > 10.0",
5495+
)
5496+
5497+
Reading a particular snapshot is also possible providing the snapshot ID as an argument to
5498+
``snapshot_id``.
5499+
5500+
More information about the Iceberg format can be found in the `Apache Iceberg official
5501+
page <https://iceberg.apache.org/>`__.
5502+
54065503
.. _io.orc:
54075504

54085505
ORC

pandas/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@
164164
read_stata,
165165
read_sas,
166166
read_spss,
167+
read_iceberg,
167168
)
168169

169170
from pandas.io.json._normalize import json_normalize
@@ -319,6 +320,7 @@
319320
"read_fwf",
320321
"read_hdf",
321322
"read_html",
323+
"read_iceberg",
322324
"read_json",
323325
"read_orc",
324326
"read_parquet",

pandas/io/api.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
)
1111
from pandas.io.feather_format import read_feather
1212
from pandas.io.html import read_html
13+
from pandas.io.iceberg import read_iceberg
1314
from pandas.io.json import read_json
1415
from pandas.io.orc import read_orc
1516
from pandas.io.parquet import read_parquet
@@ -47,6 +48,7 @@
4748
"read_fwf",
4849
"read_hdf",
4950
"read_html",
51+
"read_iceberg",
5052
"read_json",
5153
"read_orc",
5254
"read_parquet",

pandas/io/iceberg.py

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
from typing import (
2+
Any,
3+
)
4+
5+
from pandas.compat._optional import import_optional_dependency
6+
7+
from pandas import DataFrame
8+
9+
10+
def read_iceberg(
11+
table_identifier: str,
12+
catalog_name: str | None = None,
13+
catalog_properties: dict[str, Any] | None = None,
14+
row_filter: str | None = None,
15+
selected_fields: tuple[str] | None = None,
16+
case_sensitive: bool = True,
17+
snapshot_id: int | None = None,
18+
limit: int | None = None,
19+
scan_properties: dict[str, Any] | None = None,
20+
) -> DataFrame:
21+
"""
22+
Read an Apache Iceberg table into a pandas DataFrame.
23+
24+
Parameters
25+
----------
26+
table_identifier : str
27+
Table identifier.
28+
catalog_name : str, optional
29+
The name of the catalog.
30+
catalog_properties : dict of {str: str}, optional
31+
The properties that are used next to the catalog configuration.
32+
row_filter : str, optional
33+
A string that describes the desired rows.
34+
selected_fields : tuple of str, optional
35+
A tuple of strings representing the column names to return in the output
36+
dataframe.
37+
case_sensitive : bool, default True
38+
If True column matching is case sensitive,
39+
snapshot_id : int, optional
40+
Snapshot ID to time travel to. By default the table will be scanned as of the
41+
current snapshot ID.
42+
limit : int, optional
43+
An integer representing the number of rows to return in the scan result.
44+
By default all matching rows will be fetched.
45+
scan_properties : dict of {str: obj}, optional
46+
Additional Table properties as a dictionary of string key value pairs to use
47+
for this scan.
48+
49+
Returns
50+
-------
51+
DataFrame
52+
DataFrame based on the Iceberg table.
53+
54+
See Also
55+
--------
56+
read_parquet : Read a Parquet file.
57+
58+
Examples
59+
--------
60+
>>> df = pandas.read_iceberg(
61+
... catalog_name="my_catalog",
62+
... catalog_options={"s3.secret-access-key": "my-secret"},
63+
... table_name="my_table",
64+
... row_filter="trip_distance >= 10.0",
65+
... selected_fields=("VendorID", "tpep_pickup_datetime"),
66+
... )
67+
"""
68+
pyiceberg_catalog = import_optional_dependency("pyiceberg.catalog")
69+
pyiceberg_expressions = import_optional_dependency("pyiceberg.expressions")
70+
71+
if catalog_properties is None:
72+
catalog_properties = {}
73+
catalog = pyiceberg_catalog.load_catalog(catalog_name, **catalog_properties)
74+
table = catalog.load_table(table_identifier)
75+
if row_filter is None:
76+
row_filter = pyiceberg_expressions.AlwaysTrue()
77+
if selected_fields is None:
78+
selected_fields = ("*",)
79+
if scan_properties is None:
80+
scan_properties = {}
81+
result = table.scan(
82+
row_filter=row_filter,
83+
selected_fields=selected_fields,
84+
case_sensitive=case_sensitive,
85+
snapshot_id=snapshot_id,
86+
options=scan_properties,
87+
limit=limit,
88+
)
89+
return result.to_pandas()

pandas/tests/api/test_api.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ class TestPDApi(Base):
168168
"read_parquet",
169169
"read_orc",
170170
"read_spss",
171+
"read_iceberg",
171172
]
172173

173174
# top-level json funcs
20 KB
Binary file not shown.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{"location":"file:///home/mgarcia/src/pandas/pandas/tests/io/data/iceberg/default.db/simple","table-uuid":"b991389a-a555-4af4-a26a-260eba47eca9","last-updated-ms":1746040267249,"last-column-id":2,"schemas":[{"type":"struct","fields":[{"id":1,"name":"A","type":"long","required":false},{"id":2,"name":"B","type":"string","required":false}],"schema-id":0,"identifier-field-ids":[]}],"current-schema-id":0,"partition-specs":[{"spec-id":0,"fields":[]}],"default-spec-id":0,"last-partition-id":999,"properties":{},"snapshots":[],"snapshot-log":[],"metadata-log":[],"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0,"refs":{},"statistics":[],"format-version":2,"last-sequence-number":0}

0 commit comments

Comments
 (0)