27
27
"""
28
28
29
29
from contextlib import contextmanager
30
+ import importlib
30
31
import pathlib
31
32
import tempfile
32
33
43
44
44
45
45
46
@contextmanager
46
- def create_catalog ():
47
+ def create_catalog (catalog_name_in_pyiceberg_config = None ):
47
48
# the catalog stores the full path of data files, so the catalog needs to be
48
49
# created dynamically, and not saved in pandas/tests/io/data as other formats
49
- with tempfile .TemporaryDirectory ("pandas-iceberg.tmp" ) as catalog_path :
50
+ with tempfile .TemporaryDirectory ("- pandas-iceberg.tmp" ) as catalog_path :
50
51
uri = f"sqlite:///{ catalog_path } /catalog.sqlite"
52
+ warehouse = f"file://{ catalog_path } "
51
53
catalog = pyiceberg_catalog .load_catalog (
52
- "default" ,
54
+ catalog_name_in_pyiceberg_config or "default" ,
53
55
type = "sql" ,
54
56
uri = uri ,
55
- warehouse = f"file:// { catalog_path } " ,
57
+ warehouse = warehouse ,
56
58
)
57
- catalog .create_namespace ("default " )
59
+ catalog .create_namespace ("ns " )
58
60
59
61
df = pq .read_table (
60
62
pathlib .Path (__file__ ).parent / "data" / "parquet" / "simple.parquet"
61
63
)
62
- table = catalog .create_table ("default.simple " , schema = df .schema )
64
+ table = catalog .create_table ("ns.my_table " , schema = df .schema )
63
65
table .append (df )
64
66
65
- yield uri
67
+ if catalog_name_in_pyiceberg_config is not None :
68
+ config_path = pathlib .Path .home () / ".pyiceberg.yaml"
69
+ with open (config_path , "w" ) as f :
70
+ f .write (f"""\
71
+ catalog:
72
+ { catalog_name_in_pyiceberg_config } :
73
+ type: sql
74
+ uri: { uri }
75
+ warehouse: { warehouse } """ )
76
+ importlib .reload (pyiceberg_catalog ) # needed to reload the config file
77
+
78
+ try :
79
+ yield uri
80
+ finally :
81
+ if catalog_name_in_pyiceberg_config is not None :
82
+ config_path .unlink ()
66
83
67
84
68
85
class TestIceberg :
@@ -75,31 +92,25 @@ def test_read(self):
75
92
)
76
93
with create_catalog () as catalog_uri :
77
94
result = read_iceberg (
78
- "default.simple " ,
95
+ "ns.my_table " ,
79
96
catalog_properties = {"uri" : catalog_uri },
80
97
)
81
98
tm .assert_frame_equal (result , expected )
82
99
83
- def test_read_by_catalog_name (self ):
84
- config_path = pathlib .Path .home () / ".pyiceberg.yaml"
85
- with create_catalog () as catalog_uri :
86
- with open (config_path , "w" ) as f :
87
- f .write (f"""\
88
- catalog:
89
- pandas_tests_catalog:
90
- uri: { catalog_uri } """ )
91
- expected = pd .DataFrame (
92
- {
93
- "A" : [1 , 2 , 3 ],
94
- "B" : ["foo" , "foo" , "foo" ],
95
- }
96
- )
100
+ @pytest .mark .parametrize ("catalog_name" , ["default" , "pandas_tests" ])
101
+ def test_read_by_catalog_name (self , catalog_name ):
102
+ expected = pd .DataFrame (
103
+ {
104
+ "A" : [1 , 2 , 3 ],
105
+ "B" : ["foo" , "foo" , "foo" ],
106
+ }
107
+ )
108
+ with create_catalog (catalog_name_in_pyiceberg_config = catalog_name ):
97
109
result = read_iceberg (
98
- "default.simple " ,
99
- catalog_name = "pandas_tests_catalog" ,
110
+ "ns.my_table " ,
111
+ catalog_name = catalog_name ,
100
112
)
101
113
tm .assert_frame_equal (result , expected )
102
- # config_path.unlink()
103
114
104
115
def test_read_with_row_filter (self ):
105
116
expected = pd .DataFrame (
@@ -110,7 +121,7 @@ def test_read_with_row_filter(self):
110
121
)
111
122
with create_catalog () as catalog_uri :
112
123
result = read_iceberg (
113
- "default.simple " ,
124
+ "ns.my_table " ,
114
125
catalog_properties = {"uri" : catalog_uri },
115
126
row_filter = "A > 1" ,
116
127
)
@@ -124,7 +135,7 @@ def test_read_with_case_sensitive(self):
124
135
)
125
136
with create_catalog () as catalog_uri :
126
137
result = read_iceberg (
127
- "default.simple " ,
138
+ "ns.my_table " ,
128
139
catalog_properties = {"uri" : catalog_uri },
129
140
selected_fields = ["a" ],
130
141
case_sensitive = False ,
@@ -134,7 +145,7 @@ def test_read_with_case_sensitive(self):
134
145
with create_catalog () as catalog_uri :
135
146
with pytest .raises (ValueError , match = "^Could not find column" ):
136
147
read_iceberg (
137
- "default.simple " ,
148
+ "ns.my_table " ,
138
149
catalog_properties = {"uri" : catalog_uri },
139
150
selected_fields = ["a" ],
140
151
case_sensitive = True ,
@@ -149,7 +160,7 @@ def test_read_with_limit(self):
149
160
)
150
161
with create_catalog () as catalog_uri :
151
162
result = read_iceberg (
152
- "default.simple " ,
163
+ "ns.my_table " ,
153
164
catalog_properties = {"uri" : catalog_uri },
154
165
limit = 2 ,
155
166
)
0 commit comments