5
5
data used for Parquet tests (``pandas/tests/io/data/parquet/simple.parquet``).
6
6
"""
7
7
8
- from contextlib import contextmanager
8
+ import collections
9
9
import importlib
10
10
import pathlib
11
- import tempfile
12
11
13
12
import pytest
14
13
24
23
pq = pytest .importorskip ("pyarrow.parquet" )
25
24
26
25
27
- @contextmanager
28
- def create_catalog (catalog_name_in_pyiceberg_config = None ):
29
- # the catalog stores the full path of data files, so the catalog needs to be
30
- # created dynamically, and not saved in pandas/tests/io/data as other formats
31
- with tempfile .TemporaryDirectory ("-pandas-iceberg.tmp" ) as catalog_path :
32
- uri = f"sqlite:///{ catalog_path } /catalog.sqlite"
33
- warehouse = f"file://{ catalog_path } "
34
- catalog = pyiceberg_catalog .load_catalog (
35
- catalog_name_in_pyiceberg_config or "default" ,
36
- type = "sql" ,
37
- uri = uri ,
38
- warehouse = warehouse ,
39
- )
40
- catalog .create_namespace ("ns" )
26
+ Catalog = collections .namedtuple ("name" , "uri" )
41
27
42
- df = pq .read_table (
43
- pathlib .Path (__file__ ).parent / "data" / "parquet" / "simple.parquet"
44
- )
45
- table = catalog .create_table ("ns.my_table" , schema = df .schema )
46
- table .append (df )
47
28
48
- if catalog_name_in_pyiceberg_config is not None :
49
- config_path = pathlib .Path .home () / ".pyiceberg.yaml"
50
- with open (config_path , "w" , encoding = "utf-8" ) as f :
51
- f .write (f"""\
29
+ @pytest .fixture
30
+ def catalog (request , tmp_path , params = (None , "default" , "pandas_tests" )):
31
+ # the catalog stores the full path of data files, so the catalog needs to be
32
+ # created dynamically, and not saved in pandas/tests/io/data as other formats
33
+ catalog_path = tmp_path / "pandas-iceberg-catalog"
34
+ catalog_path .mkdir ()
35
+ catalog_name = request .param
36
+ uri = f"sqlite:///{ catalog_path } /catalog.sqlite"
37
+ warehouse = f"file://{ catalog_path } "
38
+ catalog = pyiceberg_catalog .load_catalog (
39
+ catalog_name ,
40
+ type = "sql" ,
41
+ uri = uri ,
42
+ warehouse = warehouse ,
43
+ )
44
+ catalog .create_namespace ("ns" )
45
+
46
+ df = pq .read_table (
47
+ pathlib .Path (__file__ ).parent / "data" / "parquet" / "simple.parquet"
48
+ )
49
+ table = catalog .create_table ("ns.my_table" , schema = df .schema )
50
+ table .append (df )
51
+
52
+ if catalog_name is not None :
53
+ config_path = pathlib .Path .home () / ".pyiceberg.yaml"
54
+ with open (config_path , "w" , encoding = "utf-8" ) as f :
55
+ f .write (f"""\
52
56
catalog:
53
- { catalog_name_in_pyiceberg_config } :
57
+ { catalog_name } :
54
58
type: sql
55
59
uri: { uri }
56
60
warehouse: { warehouse } """ )
57
- importlib .reload (pyiceberg_catalog ) # needed to reload the config file
58
61
59
- try :
60
- yield uri
61
- finally :
62
- if catalog_name_in_pyiceberg_config is not None :
63
- config_path .unlink ()
62
+ importlib .reload (pyiceberg_catalog ) # needed to reload the config file
63
+
64
+ yield Catalog (name = catalog_name , uri = uri )
65
+
66
+ if catalog_name is not None :
67
+ config_path .unlink ()
64
68
65
69
66
70
class TestIceberg :
67
- def test_read (self ):
71
+ def test_read (self , catalog ):
68
72
expected = pd .DataFrame (
69
73
{
70
74
"A" : [1 , 2 , 3 ],
71
75
"B" : ["foo" , "foo" , "foo" ],
72
76
}
73
77
)
74
- with create_catalog () as catalog_uri :
75
- result = read_iceberg (
76
- "ns.my_table" ,
77
- catalog_properties = {"uri" : catalog_uri },
78
- )
78
+ result = read_iceberg (
79
+ "ns.my_table" ,
80
+ catalog_properties = {"uri" : catalog .uri },
81
+ )
79
82
tm .assert_frame_equal (result , expected )
80
83
81
- @pytest .mark .parametrize ("catalog_name" , ["default" , "pandas_tests" ])
82
- def test_read_by_catalog_name (self , catalog_name ):
84
+ def test_read_by_catalog_name (self , catalog ):
83
85
expected = pd .DataFrame (
84
86
{
85
87
"A" : [1 , 2 , 3 ],
86
88
"B" : ["foo" , "foo" , "foo" ],
87
89
}
88
90
)
89
- with create_catalog (catalog_name_in_pyiceberg_config = catalog_name ):
90
- result = read_iceberg (
91
- "ns.my_table" ,
92
- catalog_name = catalog_name ,
93
- )
91
+ result = read_iceberg (
92
+ "ns.my_table" ,
93
+ catalog_name = catalog .name ,
94
+ )
94
95
tm .assert_frame_equal (result , expected )
95
96
96
97
def test_read_with_row_filter (self ):
@@ -100,37 +101,34 @@ def test_read_with_row_filter(self):
100
101
"B" : ["foo" , "foo" ],
101
102
}
102
103
)
103
- with create_catalog () as catalog_uri :
104
- result = read_iceberg (
105
- "ns.my_table" ,
106
- catalog_properties = {"uri" : catalog_uri },
107
- row_filter = "A > 1" ,
108
- )
104
+ result = read_iceberg (
105
+ "ns.my_table" ,
106
+ catalog_properties = {"uri" : catalog .uri },
107
+ row_filter = "A > 1" ,
108
+ )
109
109
tm .assert_frame_equal (result , expected )
110
110
111
- def test_read_with_case_sensitive (self ):
111
+ def test_read_with_case_sensitive (self , catalog ):
112
112
expected = pd .DataFrame (
113
113
{
114
114
"A" : [1 , 2 , 3 ],
115
115
}
116
116
)
117
- with create_catalog () as catalog_uri :
118
- result = read_iceberg (
117
+ result = read_iceberg (
118
+ "ns.my_table" ,
119
+ catalog_properties = {"uri" : catalog .uri },
120
+ selected_fields = ["a" ],
121
+ case_sensitive = False ,
122
+ )
123
+ tm .assert_frame_equal (result , expected )
124
+
125
+ with pytest .raises (ValueError , match = "^Could not find column" ):
126
+ read_iceberg (
119
127
"ns.my_table" ,
120
- catalog_properties = {"uri" : catalog_uri },
128
+ catalog_properties = {"uri" : catalog . uri },
121
129
selected_fields = ["a" ],
122
- case_sensitive = False ,
130
+ case_sensitive = True ,
123
131
)
124
- tm .assert_frame_equal (result , expected )
125
-
126
- with create_catalog () as catalog_uri :
127
- with pytest .raises (ValueError , match = "^Could not find column" ):
128
- read_iceberg (
129
- "ns.my_table" ,
130
- catalog_properties = {"uri" : catalog_uri },
131
- selected_fields = ["a" ],
132
- case_sensitive = True ,
133
- )
134
132
135
133
def test_read_with_limit (self ):
136
134
expected = pd .DataFrame (
@@ -139,10 +137,9 @@ def test_read_with_limit(self):
139
137
"B" : ["foo" , "foo" ],
140
138
}
141
139
)
142
- with create_catalog () as catalog_uri :
143
- result = read_iceberg (
144
- "ns.my_table" ,
145
- catalog_properties = {"uri" : catalog_uri },
146
- limit = 2 ,
147
- )
140
+ result = read_iceberg (
141
+ "ns.my_table" ,
142
+ catalog_properties = {"uri" : catalog .uri },
143
+ limit = 2 ,
144
+ )
148
145
tm .assert_frame_equal (result , expected )
0 commit comments