Skip to content

Commit 91370db

Browse files
Creating catalog dynamically
1 parent 6081bfb commit 91370db

8 files changed

+77
-52
lines changed

pandas/io/iceberg.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ def read_iceberg(
6363
... catalog_properties={"s3.secret-access-key": "my-secret"},
6464
... row_filter="trip_distance >= 10.0",
6565
... selected_fields=("VendorID", "tpep_pickup_datetime"),
66-
... )
66+
... ) # doctest: +SKIP
6767
"""
6868
pyiceberg_catalog = import_optional_dependency("pyiceberg.catalog")
6969
pyiceberg_expressions = import_optional_dependency("pyiceberg.expressions")
-20 KB
Binary file not shown.

pandas/tests/io/data/iceberg/default.db/simple/metadata/00000-766b7dba-43b8-4a1a-b3db-e14bc9922afe.metadata.json

Lines changed: 0 additions & 1 deletion
This file was deleted.

pandas/tests/io/data/iceberg/default.db/simple/metadata/00001-e5484522-4908-47e1-8816-34ef160f22b7.metadata.json

Lines changed: 0 additions & 1 deletion
This file was deleted.

pandas/tests/io/test_iceberg.py

Lines changed: 76 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@
2626
```
2727
"""
2828

29+
from contextlib import contextmanager
2930
import pathlib
31+
import tempfile
3032

3133
import pytest
3234

@@ -36,94 +38,119 @@
3638
from pandas.io.iceberg import read_iceberg
3739

3840
pyiceberg = pytest.importorskip("pyiceberg")
41+
pyiceberg_catalog = pytest.importorskip("pyiceberg.catalog")
42+
pq = pytest.importorskip("pyarrow.parquet")
43+
44+
45+
@contextmanager
46+
def create_catalog():
47+
# the catalog stores the full path of data files, so the catalog needs to be
48+
# created dynamically, and not saved in pandas/tests/io/data as other formats
49+
with tempfile.TemporaryDirectory("pandas-iceberg.tmp") as catalog_path:
50+
uri = f"sqlite:///{catalog_path}/catalog.sqlite"
51+
catalog = pyiceberg_catalog.load_catalog(
52+
"default",
53+
type="sql",
54+
uri=uri,
55+
warehouse=f"file://{catalog_path}",
56+
)
57+
catalog.create_namespace("default")
3958

59+
df = pq.read_table(
60+
pathlib.Path(__file__).parent / "data" / "parquet" / "simple.parquet"
61+
)
62+
table = catalog.create_table("default.simple", schema=df.schema)
63+
table.append(df)
4064

41-
@pytest.fixture
42-
def catalog_uri(datapath):
43-
path = datapath("io", "data", "iceberg", "catalog.sqlite")
44-
return f"sqlite:///{path}"
65+
yield uri
4566

4667

4768
class TestIceberg:
48-
def test_read(self, catalog_uri):
69+
def test_read(self):
4970
expected = pd.DataFrame(
5071
{
5172
"A": [1, 2, 3],
5273
"B": ["foo", "foo", "foo"],
5374
}
5475
)
55-
result = read_iceberg(
56-
"default.simple",
57-
catalog_properties={"uri": catalog_uri},
58-
)
76+
with create_catalog() as catalog_uri:
77+
result = read_iceberg(
78+
"default.simple",
79+
catalog_properties={"uri": catalog_uri},
80+
)
5981
tm.assert_frame_equal(result, expected)
6082

61-
def test_read_by_catalog_name(self, catalog_uri):
83+
def test_read_by_catalog_name(self):
6284
config_path = pathlib.Path.home() / ".pyiceberg.yaml"
63-
with open(config_path, "w") as f:
64-
f.write(f"""\
65-
catalog:
66-
pandas_tests_catalog:
67-
uri: {catalog_uri}""")
68-
expected = pd.DataFrame(
69-
{
70-
"A": [1, 2, 3],
71-
"B": ["foo", "foo", "foo"],
72-
}
73-
)
74-
result = read_iceberg(
75-
"default.simple",
76-
catalog_name="pandas_tests_catalog",
77-
)
85+
with create_catalog() as catalog_uri:
86+
with open(config_path, "w") as f:
87+
f.write(f"""\
88+
catalog:
89+
pandas_tests_catalog:
90+
uri: {catalog_uri}""")
91+
expected = pd.DataFrame(
92+
{
93+
"A": [1, 2, 3],
94+
"B": ["foo", "foo", "foo"],
95+
}
96+
)
97+
result = read_iceberg(
98+
"default.simple",
99+
catalog_name="pandas_tests_catalog",
100+
)
78101
tm.assert_frame_equal(result, expected)
79102
# config_path.unlink()
80103

81-
def test_read_with_row_filter(self, catalog_uri):
104+
def test_read_with_row_filter(self):
82105
expected = pd.DataFrame(
83106
{
84107
"A": [2, 3],
85108
"B": ["foo", "foo"],
86109
}
87110
)
88-
result = read_iceberg(
89-
"default.simple",
90-
catalog_properties={"uri": catalog_uri},
91-
row_filter="A > 1",
92-
)
111+
with create_catalog() as catalog_uri:
112+
result = read_iceberg(
113+
"default.simple",
114+
catalog_properties={"uri": catalog_uri},
115+
row_filter="A > 1",
116+
)
93117
tm.assert_frame_equal(result, expected)
94118

95-
def test_read_with_case_sensitive(self, catalog_uri):
119+
def test_read_with_case_sensitive(self):
96120
expected = pd.DataFrame(
97121
{
98122
"A": [1, 2, 3],
99123
}
100124
)
101-
result = read_iceberg(
102-
"default.simple",
103-
catalog_properties={"uri": catalog_uri},
104-
selected_fields=["a"],
105-
case_sensitive=False,
106-
)
107-
tm.assert_frame_equal(result, expected)
108-
109-
with pytest.raises(ValueError, match="^Could not find column"):
110-
read_iceberg(
125+
with create_catalog() as catalog_uri:
126+
result = read_iceberg(
111127
"default.simple",
112128
catalog_properties={"uri": catalog_uri},
113129
selected_fields=["a"],
114-
case_sensitive=True,
130+
case_sensitive=False,
115131
)
132+
tm.assert_frame_equal(result, expected)
116133

117-
def test_read_with_limit(self, catalog_uri):
134+
with create_catalog() as catalog_uri:
135+
with pytest.raises(ValueError, match="^Could not find column"):
136+
read_iceberg(
137+
"default.simple",
138+
catalog_properties={"uri": catalog_uri},
139+
selected_fields=["a"],
140+
case_sensitive=True,
141+
)
142+
143+
def test_read_with_limit(self):
118144
expected = pd.DataFrame(
119145
{
120146
"A": [1, 2],
121147
"B": ["foo", "foo"],
122148
}
123149
)
124-
result = read_iceberg(
125-
"default.simple",
126-
catalog_properties={"uri": catalog_uri},
127-
limit=2,
128-
)
150+
with create_catalog() as catalog_uri:
151+
result = read_iceberg(
152+
"default.simple",
153+
catalog_properties={"uri": catalog_uri},
154+
limit=2,
155+
)
129156
tm.assert_frame_equal(result, expected)

0 commit comments

Comments
 (0)