2
2
import trino
3
3
import argparse
4
4
import sys
5
+ import re
5
6
6
7
if not sys .warnoptions :
7
8
import warnings
8
9
warnings .simplefilter ("ignore" )
9
10
10
11
11
12
def get_connection (username , password , namespace ):
12
- host = 'trino-coordinator-default-0.trino-coordinator-default.' + namespace + '.svc.cluster.local'
13
+ host = (
14
+ "trino-coordinator-default-0.trino-coordinator-default."
15
+ + namespace
16
+ + ".svc.cluster.local"
17
+ )
13
18
# If you want to debug this locally use
14
19
# kubectl -n kuttl-test-XXX port-forward svc/trino-coordinator-default 8443
15
20
# host = '127.0.0.1'
@@ -18,7 +23,7 @@ def get_connection(username, password, namespace):
18
23
host = host ,
19
24
port = 8443 ,
20
25
user = username ,
21
- http_scheme = ' https' ,
26
+ http_scheme = " https" ,
22
27
auth = trino .auth .BasicAuthentication (username , password ),
23
28
session_properties = {"query_max_execution_time" : "60s" },
24
29
)
@@ -33,34 +38,47 @@ def run_query(connection, query):
33
38
return cursor .fetchall ()
34
39
35
40
36
- if __name__ == ' __main__' :
41
+ if __name__ == " __main__" :
37
42
# Construct an argument parser
38
43
all_args = argparse .ArgumentParser ()
39
44
# Add arguments to the parser
40
- all_args .add_argument ("-n" , "--namespace" , required = True , help = "Namespace the test is running in" )
45
+ all_args .add_argument (
46
+ "-n" , "--namespace" , required = True , help = "Namespace the test is running in"
47
+ )
41
48
42
49
args = vars (all_args .parse_args ())
43
50
namespace = args ["namespace" ]
44
51
45
52
print ("Starting S3 tests..." )
46
53
connection = get_connection ("admin" , "admin" , namespace )
47
54
48
- trino_version = run_query (connection , "select node_version from system.runtime.nodes where coordinator = true and state = 'active'" )[0 ][0 ]
49
- print (f"[INFO] Testing against Trino version \" { trino_version } \" " )
55
+ trino_version = run_query (
56
+ connection ,
57
+ "select node_version from system.runtime.nodes where coordinator = true and state = 'active'" ,
58
+ )[0 ][0 ]
59
+ print (f'[INFO] Testing against Trino version "{ trino_version } "' )
50
60
51
- assert len (trino_version ) >= 3
52
- assert trino_version .isnumeric ()
61
+ # Strip SDP release suffix from the version string
62
+ trino_product_version = re .split (r"-stackable" , trino_version , maxsplit = 1 )[0 ]
63
+
64
+ assert len (trino_product_version ) >= 3
65
+ assert trino_product_version .isnumeric ()
53
66
assert trino_version == run_query (connection , "select version()" )[0 ][0 ]
54
67
55
- run_query (connection , "CREATE SCHEMA IF NOT EXISTS hive.minio WITH (location = 's3a://trino/')" )
68
+ run_query (
69
+ connection ,
70
+ "CREATE SCHEMA IF NOT EXISTS hive.minio WITH (location = 's3a://trino/')" ,
71
+ )
56
72
57
73
run_query (connection , "DROP TABLE IF EXISTS hive.minio.taxi_data" )
58
74
run_query (connection , "DROP TABLE IF EXISTS hive.minio.taxi_data_copy" )
59
75
run_query (connection , "DROP TABLE IF EXISTS hive.minio.taxi_data_transformed" )
60
76
run_query (connection , "DROP TABLE IF EXISTS hive.hdfs.taxi_data_copy" )
61
77
run_query (connection , "DROP TABLE IF EXISTS iceberg.minio.taxi_data_copy_iceberg" )
62
78
63
- run_query (connection , """
79
+ run_query (
80
+ connection ,
81
+ """
64
82
CREATE TABLE IF NOT EXISTS hive.minio.taxi_data (
65
83
vendor_id VARCHAR,
66
84
tpep_pickup_datetime VARCHAR,
@@ -73,13 +91,24 @@ def run_query(connection, query):
73
91
format = 'csv',
74
92
skip_header_line_count = 1
75
93
)
76
- """ )
77
- assert run_query (connection , "SELECT COUNT(*) FROM hive.minio.taxi_data" )[0 ][0 ] == 5000
78
- rows_written = run_query (connection , "CREATE TABLE IF NOT EXISTS hive.minio.taxi_data_copy AS SELECT * FROM hive.minio.taxi_data" )[0 ][0 ]
94
+ """ ,
95
+ )
96
+ assert (
97
+ run_query (connection , "SELECT COUNT(*) FROM hive.minio.taxi_data" )[0 ][0 ] == 5000
98
+ )
99
+ rows_written = run_query (
100
+ connection ,
101
+ "CREATE TABLE IF NOT EXISTS hive.minio.taxi_data_copy AS SELECT * FROM hive.minio.taxi_data" ,
102
+ )[0 ][0 ]
79
103
assert rows_written == 5000 or rows_written == 0
80
- assert run_query (connection , "SELECT COUNT(*) FROM hive.minio.taxi_data_copy" )[0 ][0 ] == 5000
104
+ assert (
105
+ run_query (connection , "SELECT COUNT(*) FROM hive.minio.taxi_data_copy" )[0 ][0 ]
106
+ == 5000
107
+ )
81
108
82
- rows_written = run_query (connection , """
109
+ rows_written = run_query (
110
+ connection ,
111
+ """
83
112
CREATE TABLE IF NOT EXISTS hive.minio.taxi_data_transformed AS
84
113
SELECT
85
114
CAST(vendor_id as BIGINT) as vendor_id,
@@ -89,61 +118,178 @@ def run_query(connection, query):
89
118
CAST(trip_distance as DOUBLE) as trip_distance,
90
119
CAST(ratecode_id as BIGINT) as ratecode_id
91
120
FROM hive.minio.taxi_data
92
- """ )[0 ][0 ]
121
+ """ ,
122
+ )[0 ][0 ]
93
123
assert rows_written == 5000 or rows_written == 0
94
- assert run_query (connection , "SELECT COUNT(*) FROM hive.minio.taxi_data_transformed" )[0 ][0 ] == 5000
124
+ assert (
125
+ run_query (connection , "SELECT COUNT(*) FROM hive.minio.taxi_data_transformed" )[
126
+ 0
127
+ ][0 ]
128
+ == 5000
129
+ )
95
130
96
131
print ("[INFO] Testing HDFS" )
97
132
98
- run_query (connection , "CREATE SCHEMA IF NOT EXISTS hive.hdfs WITH (location = 'hdfs://hdfs/trino/')" )
99
- rows_written = run_query (connection , "CREATE TABLE IF NOT EXISTS hive.hdfs.taxi_data_copy AS SELECT * FROM hive.minio.taxi_data" )[0 ][0 ]
133
+ run_query (
134
+ connection ,
135
+ "CREATE SCHEMA IF NOT EXISTS hive.hdfs WITH (location = 'hdfs://hdfs/trino/')" ,
136
+ )
137
+ rows_written = run_query (
138
+ connection ,
139
+ "CREATE TABLE IF NOT EXISTS hive.hdfs.taxi_data_copy AS SELECT * FROM hive.minio.taxi_data" ,
140
+ )[0 ][0 ]
100
141
assert rows_written == 5000 or rows_written == 0
101
- assert run_query (connection , "SELECT COUNT(*) FROM hive.hdfs.taxi_data_copy" )[0 ][0 ] == 5000
142
+ assert (
143
+ run_query (connection , "SELECT COUNT(*) FROM hive.hdfs.taxi_data_copy" )[0 ][0 ]
144
+ == 5000
145
+ )
102
146
103
147
print ("[INFO] Testing Iceberg" )
104
- run_query (connection , "DROP TABLE IF EXISTS iceberg.minio.taxi_data_copy_iceberg" ) # Clean up table to don't fail an second run
105
- assert run_query (connection , """
148
+ run_query (
149
+ connection , "DROP TABLE IF EXISTS iceberg.minio.taxi_data_copy_iceberg"
150
+ ) # Clean up table to don't fail an second run
151
+ assert (
152
+ run_query (
153
+ connection ,
154
+ """
106
155
CREATE TABLE IF NOT EXISTS iceberg.minio.taxi_data_copy_iceberg
107
156
WITH (partitioning = ARRAY['vendor_id', 'passenger_count'], format = 'parquet')
108
157
AS SELECT * FROM hive.minio.taxi_data
109
- """ )[0 ][0 ] == 5000
158
+ """ ,
159
+ )[0 ][0 ]
160
+ == 5000
161
+ )
110
162
# Check current count
111
- assert run_query (connection , "SELECT COUNT(*) FROM iceberg.minio.taxi_data_copy_iceberg" )[0 ][0 ] == 5000
112
- assert run_query (connection , 'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$snapshots"' )[0 ][0 ] == 1
113
- assert run_query (connection , 'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$partitions"' )[0 ][0 ] == 12
114
- assert run_query (connection , 'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$files"' )[0 ][0 ] == 12
163
+ assert (
164
+ run_query (
165
+ connection , "SELECT COUNT(*) FROM iceberg.minio.taxi_data_copy_iceberg"
166
+ )[0 ][0 ]
167
+ == 5000
168
+ )
169
+ assert (
170
+ run_query (
171
+ connection ,
172
+ 'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$snapshots"' ,
173
+ )[0 ][0 ]
174
+ == 1
175
+ )
176
+ assert (
177
+ run_query (
178
+ connection ,
179
+ 'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$partitions"' ,
180
+ )[0 ][0 ]
181
+ == 12
182
+ )
183
+ assert (
184
+ run_query (
185
+ connection ,
186
+ 'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$files"' ,
187
+ )[0 ][0 ]
188
+ == 12
189
+ )
115
190
116
- assert run_query (connection , "INSERT INTO iceberg.minio.taxi_data_copy_iceberg SELECT * FROM hive.minio.taxi_data" )[0 ][0 ] == 5000
191
+ assert (
192
+ run_query (
193
+ connection ,
194
+ "INSERT INTO iceberg.minio.taxi_data_copy_iceberg SELECT * FROM hive.minio.taxi_data" ,
195
+ )[0 ][0 ]
196
+ == 5000
197
+ )
117
198
118
199
# Check current count
119
- assert run_query (connection , "SELECT COUNT(*) FROM iceberg.minio.taxi_data_copy_iceberg" )[0 ][0 ] == 10000
120
- assert run_query (connection , 'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$snapshots"' )[0 ][0 ] == 2
121
- assert run_query (connection , 'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$partitions"' )[0 ][0 ] == 12
122
- assert run_query (connection , 'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$files"' )[0 ][0 ] == 24
200
+ assert (
201
+ run_query (
202
+ connection , "SELECT COUNT(*) FROM iceberg.minio.taxi_data_copy_iceberg"
203
+ )[0 ][0 ]
204
+ == 10000
205
+ )
206
+ assert (
207
+ run_query (
208
+ connection ,
209
+ 'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$snapshots"' ,
210
+ )[0 ][0 ]
211
+ == 2
212
+ )
213
+ assert (
214
+ run_query (
215
+ connection ,
216
+ 'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$partitions"' ,
217
+ )[0 ][0 ]
218
+ == 12
219
+ )
220
+ assert (
221
+ run_query (
222
+ connection ,
223
+ 'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$files"' ,
224
+ )[0 ][0 ]
225
+ == 24
226
+ )
123
227
124
- if trino_version == ' 377' :
228
+ if trino_version == " 377" :
125
229
# io.trino.spi.TrinoException: This connector [iceberg] does not support versioned tables
126
- print ("[INFO] Skipping the Iceberg tests reading versioned tables for trino version 377 as it does not support versioned tables" )
230
+ print (
231
+ "[INFO] Skipping the Iceberg tests reading versioned tables for trino version 377 as it does not support versioned tables"
232
+ )
127
233
else :
128
234
# Check count for first snapshot
129
- first_snapshot = run_query (connection , 'select snapshot_id from iceberg.minio."taxi_data_copy_iceberg$snapshots" order by committed_at limit 1' )[0 ][0 ]
130
- assert run_query (connection , f"SELECT COUNT(*) FROM iceberg.minio.taxi_data_copy_iceberg FOR VERSION AS OF { first_snapshot } " )[0 ][0 ] == 5000
235
+ first_snapshot = run_query (
236
+ connection ,
237
+ 'select snapshot_id from iceberg.minio."taxi_data_copy_iceberg$snapshots" order by committed_at limit 1' ,
238
+ )[0 ][0 ]
239
+ assert (
240
+ run_query (
241
+ connection ,
242
+ f"SELECT COUNT(*) FROM iceberg.minio.taxi_data_copy_iceberg FOR VERSION AS OF { first_snapshot } " ,
243
+ )[0 ][0 ]
244
+ == 5000
245
+ )
131
246
132
247
# Compact files
133
- run_query (connection , "ALTER TABLE iceberg.minio.taxi_data_copy_iceberg EXECUTE optimize" )
248
+ run_query (
249
+ connection , "ALTER TABLE iceberg.minio.taxi_data_copy_iceberg EXECUTE optimize"
250
+ )
134
251
135
252
# Check current count
136
- assert run_query (connection , "SELECT COUNT(*) FROM iceberg.minio.taxi_data_copy_iceberg" )[0 ][0 ] == 10000
137
- assert run_query (connection , 'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$snapshots"' )[0 ][0 ] == 3
138
- assert run_query (connection , 'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$partitions"' )[0 ][0 ] == 12
139
- assert run_query (connection , 'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$files"' )[0 ][0 ] == 12 # Compaction yeah :)
253
+ assert (
254
+ run_query (
255
+ connection , "SELECT COUNT(*) FROM iceberg.minio.taxi_data_copy_iceberg"
256
+ )[0 ][0 ]
257
+ == 10000
258
+ )
259
+ assert (
260
+ run_query (
261
+ connection ,
262
+ 'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$snapshots"' ,
263
+ )[0 ][0 ]
264
+ == 3
265
+ )
266
+ assert (
267
+ run_query (
268
+ connection ,
269
+ 'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$partitions"' ,
270
+ )[0 ][0 ]
271
+ == 12
272
+ )
273
+ assert (
274
+ run_query (
275
+ connection ,
276
+ 'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$files"' ,
277
+ )[0 ][0 ]
278
+ == 12
279
+ ) # Compaction yeah :)
140
280
141
281
# Test could be improved by also testing update and deletes
142
282
143
283
# Test postgres connection
144
- run_query (connection , 'SHOW SCHEMAS IN postgresgeneric' )
145
- run_query (connection , 'CREATE SCHEMA IF NOT EXISTS postgresgeneric.tpch' )
146
- run_query (connection , 'CREATE TABLE IF NOT EXISTS postgresgeneric.tpch.nation AS SELECT * FROM tpch.tiny.nation' )
147
- assert run_query (connection , "SELECT COUNT(*) FROM postgresgeneric.tpch.nation" )[0 ][0 ] == 25
284
+ run_query (connection , "SHOW SCHEMAS IN postgresgeneric" )
285
+ run_query (connection , "CREATE SCHEMA IF NOT EXISTS postgresgeneric.tpch" )
286
+ run_query (
287
+ connection ,
288
+ "CREATE TABLE IF NOT EXISTS postgresgeneric.tpch.nation AS SELECT * FROM tpch.tiny.nation" ,
289
+ )
290
+ assert (
291
+ run_query (connection , "SELECT COUNT(*) FROM postgresgeneric.tpch.nation" )[0 ][0 ]
292
+ == 25
293
+ )
148
294
149
295
print ("[SUCCESS] All tests in check-s3.py succeeded!" )
0 commit comments