@@ -36,13 +36,13 @@ def started_cluster():
3636 user_configs = [],
3737 image = "clickhouse/integration-test-with-unity-catalog" ,
3838 with_installed_binary = False ,
39- tag = os .environ .get ("DOCKER_BASE_WITH_UNITY_CATALOG_TAG" , "latest" ),
39+ tag = os .environ .get ("DOCKER_BASE_WITH_UNITY_CATALOG_TAG" , "latest" )
4040 )
4141
4242 logging .info ("Starting cluster..." )
4343 cluster .start ()
4444
45- start_unity_catalog (cluster .instances [" node1" ])
45+ start_unity_catalog (cluster .instances [' node1' ])
4646
4747 yield cluster
4848
@@ -68,156 +68,65 @@ def execute_spark_query(node, query_text, ignore_exit_code=False):
6868 --conf "spark.sql.defaultCatalog=unity" \\
6969 -S -e "{ query_text } " | grep -v 'loading settings'
7070""" ,
71- ],
72- nothrow = ignore_exit_code ,
71+ ], nothrow = ignore_exit_code
7372 )
7473
75-
7674def execute_multiple_spark_queries (node , queries_list , ignore_exit_code = False ):
77- return execute_spark_query (node , ";" .join (queries_list ), ignore_exit_code )
78-
75+ return execute_spark_query (node , ';' .join (queries_list ), ignore_exit_code )
7976
8077def test_embedded_database_and_tables (started_cluster ):
81- node1 = started_cluster .instances ["node1" ]
82- node1 .query (
83- "create database unity_test engine DataLakeCatalog('http://localhost:8080/api/2.1/unity-catalog') settings warehouse = 'unity', catalog_type='unity', vended_credentials=false" ,
84- settings = {"allow_experimental_database_unity_catalog" : "1" },
85- )
86- default_tables = list (
87- sorted (
88- node1 .query (
89- "SHOW TABLES FROM unity_test LIKE 'default%'" ,
90- settings = {"use_hive_partitioning" : "0" },
91- )
92- .strip ()
93- .split ("\n " )
94- )
95- )
78+ node1 = started_cluster .instances ['node1' ]
79+ node1 .query ("create database unity_test engine DataLakeCatalog('http://localhost:8080/api/2.1/unity-catalog') settings warehouse = 'unity', catalog_type='unity', vended_credentials=false" , settings = {"allow_experimental_database_unity_catalog" : "1" })
80+ default_tables = list (sorted (node1 .query ("SHOW TABLES FROM unity_test LIKE 'default%'" , settings = {'use_hive_partitioning' :'0' }).strip ().split ('\n ' )))
9681 print ("Default tables" , default_tables )
97- assert default_tables == [
98- "default.marksheet" ,
99- "default.marksheet_uniform" ,
100- "default.numbers" ,
101- "default.user_countries" ,
102- ]
82+ assert default_tables == ['default.marksheet' , 'default.marksheet_uniform' , 'default.numbers' , 'default.user_countries' ]
10383
10484 for table in default_tables :
10585 if table == "default.marksheet_uniform" :
10686 continue
10787 assert "DeltaLake" in node1 .query (f"show create table unity_test.`{ table } `" )
108- if table in ("default.marksheet" , "default.user_countries" ):
109- data_clickhouse = TSV (
110- node1 .query (f"SELECT * FROM unity_test.`{ table } ` ORDER BY 1,2,3" )
111- )
112- data_spark = TSV (
113- execute_spark_query (
114- node1 , f"SELECT * FROM unity.{ table } ORDER BY 1,2,3"
115- )
116- )
88+ if table in ('default.marksheet' , 'default.user_countries' ):
89+ data_clickhouse = TSV (node1 .query (f"SELECT * FROM unity_test.`{ table } ` ORDER BY 1,2,3" ))
90+ data_spark = TSV (execute_spark_query (node1 , f"SELECT * FROM unity.{ table } ORDER BY 1,2,3" ))
11791 print ("Data ClickHouse\n " , data_clickhouse )
11892 print ("Data Spark\n " , data_spark )
11993 assert data_clickhouse == data_spark
12094
12195
12296def test_multiple_schemes_tables (started_cluster ):
123- node1 = started_cluster .instances ["node1" ]
124- execute_multiple_spark_queries (
125- node1 , [f"CREATE SCHEMA test_schema{ i } " for i in range (10 )], True
126- )
127- execute_multiple_spark_queries (
128- node1 ,
129- [
130- f"CREATE TABLE test_schema{ i } .test_table{ i } (col1 int, col2 double) using Delta location '/tmp/test_schema{ i } /test_table{ i } '"
131- for i in range (10 )
132- ],
133- True ,
134- )
135- execute_multiple_spark_queries (
136- node1 ,
137- [
138- f"INSERT INTO test_schema{ i } .test_table{ i } VALUES ({ i } , { i } .0)"
139- for i in range (10 )
140- ],
141- True ,
142- )
97+ node1 = started_cluster .instances ['node1' ]
98+ execute_multiple_spark_queries (node1 , [f'CREATE SCHEMA test_schema{ i } ' for i in range (10 )], True )
99+ execute_multiple_spark_queries (node1 , [f'CREATE TABLE test_schema{ i } .test_table{ i } (col1 int, col2 double) using Delta location \' /tmp/test_schema{ i } /test_table{ i } \' ' for i in range (10 )], True )
100+ execute_multiple_spark_queries (node1 , [f'INSERT INTO test_schema{ i } .test_table{ i } VALUES ({ i } , { i } .0)' for i in range (10 )], True )
143101
144- node1 .query (
145- "create database multi_schema_test engine DataLakeCatalog('http://localhost:8080/api/2.1/unity-catalog') settings warehouse = 'unity', catalog_type='unity', vended_credentials=false" ,
146- settings = {"allow_experimental_database_unity_catalog" : "1" },
147- )
148- multi_schema_tables = list (
149- sorted (
150- node1 .query (
151- "SHOW TABLES FROM multi_schema_test LIKE 'test_schema%'" ,
152- settings = {"use_hive_partitioning" : "0" },
153- )
154- .strip ()
155- .split ("\n " )
156- )
157- )
102+ node1 .query ("create database multi_schema_test engine DataLakeCatalog('http://localhost:8080/api/2.1/unity-catalog') settings warehouse = 'unity', catalog_type='unity', vended_credentials=false" , settings = {"allow_experimental_database_unity_catalog" : "1" })
103+ multi_schema_tables = list (sorted (node1 .query ("SHOW TABLES FROM multi_schema_test LIKE 'test_schema%'" , settings = {'use_hive_partitioning' :'0' }).strip ().split ('\n ' )))
158104 print (multi_schema_tables )
159105
160106 for i , table in enumerate (multi_schema_tables ):
161- assert node1 .query (
162- f"SELECT col1 FROM multi_schema_test.`{ table } `"
163- ).strip () == str (i )
164- assert (
165- int (node1 .query (f"SELECT col2 FROM multi_schema_test.`{ table } `" ).strip ())
166- == i
167- )
107+ assert node1 .query (f"SELECT col1 FROM multi_schema_test.`{ table } `" ).strip () == str (i )
108+ assert int (node1 .query (f"SELECT col2 FROM multi_schema_test.`{ table } `" ).strip ()) == i
168109
169110
170- @pytest .mark .parametrize ("use_delta_kernel" , ["1" , "0" ])
171- def test_complex_table_schema (started_cluster , use_delta_kernel ):
172- node1 = started_cluster .instances ["node1" ]
173- execute_spark_query (
174- node1 , "CREATE SCHEMA schema_with_complex_tables" , ignore_exit_code = True
175- )
111+ def test_complex_table_schema (started_cluster ):
112+ node1 = started_cluster .instances ['node1' ]
113+ execute_spark_query (node1 , "CREATE SCHEMA schema_with_complex_tables" , ignore_exit_code = True )
176114 schema = "event_date DATE, event_time TIMESTAMP, hits ARRAY<integer>, ids MAP<int, string>, really_complex STRUCT<f1:int,f2:string>"
177115 create_query = f"CREATE TABLE schema_with_complex_tables.complex_table ({ schema } ) using Delta location '/tmp/complex_schema/complex_table'"
178116 execute_spark_query (node1 , create_query , ignore_exit_code = True )
179- execute_spark_query (
180- node1 ,
181- "insert into schema_with_complex_tables.complex_table SELECT to_date('2024-10-01', 'yyyy-MM-dd'), to_timestamp('2024-10-01 00:12:00'), array(42, 123, 77), map(7, 'v7', 5, 'v5'), named_struct(\\ \" f1\\ \" , 34, \\ \" f2\\ \" , 'hello')" ,
182- ignore_exit_code = True ,
183- )
117+ execute_spark_query (node1 , "insert into schema_with_complex_tables.complex_table SELECT to_date('2024-10-01', 'yyyy-MM-dd'), to_timestamp('2024-10-01 00:12:00'), array(42, 123, 77), map(7, 'v7', 5, 'v5'), named_struct(\\ \" f1\\ \" , 34, \\ \" f2\\ \" , 'hello')" , ignore_exit_code = True )
184118
185- node1 .query (
186- "create database complex_schema engine DataLakeCatalog('http://localhost:8080/api/2.1/unity-catalog') settings warehouse = 'unity', catalog_type='unity', vended_credentials=false, allow_experimental_delta_kernel_rs=1" ,
187- settings = {"allow_experimental_database_unity_catalog" : "1" },
188- )
119+ node1 .query ("create database complex_schema engine DataLakeCatalog('http://localhost:8080/api/2.1/unity-catalog') settings warehouse = 'unity', catalog_type='unity', vended_credentials=false" , settings = {"allow_experimental_database_unity_catalog" : "1" })
189120
190- complex_schema_tables = list (
191- sorted (
192- node1 .query (
193- "SHOW TABLES FROM complex_schema LIKE 'schema_with_complex_tables%'" ,
194- settings = {"use_hive_partitioning" : "0" },
195- )
196- .strip ()
197- .split ("\n " )
198- )
199- )
121+ complex_schema_tables = list (sorted (node1 .query ("SHOW TABLES FROM complex_schema LIKE 'schema_with_complex_tables%'" , settings = {'use_hive_partitioning' :'0' }).strip ().split ('\n ' )))
200122
201123 assert len (complex_schema_tables ) == 1
202124
203- print (
204- node1 .query (
205- "SHOW CREATE TABLE complex_schema.`schema_with_complex_tables.complex_table`"
206- )
207- )
208- complex_data = (
209- node1 .query (
210- "SELECT * FROM complex_schema.`schema_with_complex_tables.complex_table`"
211- )
212- .strip ()
213- .split ("\t " )
214- )
125+ print (node1 .query ("SHOW CREATE TABLE complex_schema.`schema_with_complex_tables.complex_table`" ))
126+ complex_data = node1 .query ("SELECT * FROM complex_schema.`schema_with_complex_tables.complex_table`" ).strip ().split ('\t ' )
215127 print (complex_data )
216128 assert complex_data [0 ] == "2024-10-01"
217129 assert complex_data [1 ] == "2024-10-01 00:12:00.000000"
218130 assert complex_data [2 ] == "[42,123,77]"
219131 assert complex_data [3 ] == "{7:'v7',5:'v5'}"
220132 assert complex_data [4 ] == "(34,'hello')"
221-
222- if use_delta_kernel :
223- assert node1 .contains_in_log (f"DeltaLakeMetadata: Initializing snapshot" )
0 commit comments