1
1
import pytest
2
2
import math
3
3
4
+ from decimal import Decimal
4
5
from datetime import datetime , timedelta
5
6
from ydb .tests .library .compatibility .fixtures import RestartToAnotherVersionFixture
6
7
from ydb .tests .oss .ydb_sdk_import import ydb
7
8
from ydb .tests .datashard .lib .create_table import create_table_sql_request
8
- from ydb .tests .datashard .lib .types_of_variables import pk_types , non_pk_types , cleanup_type_name , format_sql_value , types_not_supported_yet_in_columnshard
9
+ from ydb .tests .datashard .lib .types_of_variables import (
10
+ pk_types ,
11
+ non_pk_types ,
12
+ cleanup_type_name ,
13
+ format_sql_value ,
14
+ types_not_supported_yet_in_columnshard ,
15
+ non_comparable_types ,
16
+ primitive_type ,
17
+ )
9
18
10
19
11
20
class TestDataType (RestartToAnotherVersionFixture ):
@@ -159,6 +168,90 @@ def create_table(self):
159
168
for query in querys :
160
169
session_pool .execute_with_retries (query )
161
170
171
+ def create_typed_value (self , type_name , value ):
172
+ if "Decimal" in type_name :
173
+ prec , scale = type_name .replace ("Decimal(" , "" ).replace (")" , "" ).split ("," )
174
+ return ydb .TypedValue (Decimal (value ), ydb .DecimalType (int (prec ), int (scale )))
175
+ if type_name == "String" or type_name == "Yson" :
176
+ return ydb .TypedValue (value .encode (), primitive_type [type_name ])
177
+ if type_name == "DyNumber" :
178
+ return ydb .TypedValue (str (value ), primitive_type [type_name ])
179
+ if type_name == "Datetime64" or type_name == "Datetime" :
180
+ return ydb .TypedValue (int (value .timestamp ()), primitive_type [type_name ])
181
+ return ydb .TypedValue (value , primitive_type [type_name ])
182
+
183
+ def parametrized_write_data (self ):
184
+ queries_with_parameters = []
185
+
186
+ for i in range (self .count_table ):
187
+ query = f"""
188
+ { ";" .join ([f"DECLARE $pk_{ cleanup_type_name (name )} AS { name } " for name in self .pk_types [i ].keys ()])} ;
189
+ { ";" .join ([f"DECLARE $col_{ cleanup_type_name (name )} AS { name } " for name in self .all_types .keys ()])} ;
190
+
191
+ UPSERT INTO { self .table_names [i ]} (
192
+ { ", " .join ([f"pk_{ cleanup_type_name (name )} " for name in self .pk_types [i ].keys ()])} ,
193
+ { ", " .join ([f"col_{ cleanup_type_name (name )} " for name in self .all_types .keys ()])}
194
+ )
195
+ VALUES (
196
+ { ", " .join ([f"$pk_{ cleanup_type_name (name )} " for name in self .pk_types [i ].keys ()])} ,
197
+ { ", " .join ([f"$col_{ cleanup_type_name (name )} " for name in self .all_types .keys ()])}
198
+ );
199
+ """
200
+
201
+ for row in range (1 , self .count_rows + 1 ):
202
+ parameters = {}
203
+ for type_name , lamb in self .pk_types [i ].items ():
204
+ parameters [f"$pk_{ cleanup_type_name (type_name )} " ] = self .create_typed_value (type_name , lamb (row ))
205
+ for type_name , lamb in self .all_types .items ():
206
+ parameters [f"$col_{ cleanup_type_name (type_name )} " ] = self .create_typed_value (type_name , lamb (row ))
207
+ queries_with_parameters .append ((query , parameters ))
208
+
209
+ with ydb .QuerySessionPool (self .driver ) as session_pool :
210
+ for query , parameters in queries_with_parameters :
211
+ session_pool .execute_with_retries (query , parameters )
212
+
213
+ def parametrized_check_table (self ):
214
+ queries_with_parameters = []
215
+ comparable_types = [(type_name , lamb ) for type_name , lamb in self .all_types .items () if type_name not in non_comparable_types ]
216
+
217
+ for i in range (self .count_table ):
218
+ query = f"""
219
+ { ";" .join ([f"DECLARE $pk_{ cleanup_type_name (name )} AS { name } " for name in self .pk_types [i ].keys ()])} ;
220
+ { ";" .join ([f"DECLARE $col_{ cleanup_type_name (name )} AS { name } " for name , _ in comparable_types ])} ;
221
+
222
+ SELECT * FROM { self .table_names [i ]} WHERE
223
+ { " AND " .join ([f"pk_{ cleanup_type_name (name )} = $pk_{ cleanup_type_name (name )} " for name in self .pk_types [i ].keys ()])}
224
+ { " AND " if len (comparable_types ) > 0 else "" }
225
+ { " AND " .join ([f"col_{ cleanup_type_name (name )} = $col_{ cleanup_type_name (name )} " for name , _ in comparable_types ])}
226
+ """
227
+
228
+ for row in range (1 , self .count_rows + 1 ):
229
+ parameters = {}
230
+ for type_name , lamb in self .pk_types [i ].items ():
231
+ parameters [f"$pk_{ cleanup_type_name (type_name )} " ] = self .create_typed_value (type_name , lamb (row ))
232
+ for type_name , lamb in comparable_types :
233
+ parameters [f"$col_{ cleanup_type_name (type_name )} " ] = self .create_typed_value (type_name , lamb (row ))
234
+ queries_with_parameters .append ((query , parameters ))
235
+
236
+ with ydb .QuerySessionPool (self .driver ) as session_pool :
237
+ for query_index , (query , parameters ) in enumerate (queries_with_parameters ):
238
+ table_index = query_index // self .count_rows
239
+ row_num = (query_index % self .count_rows ) + 1
240
+
241
+ result_rows = []
242
+
243
+ result_sets = session_pool .execute_with_retries (query , parameters )
244
+ for result_set in result_sets :
245
+ for row in result_set .rows :
246
+ result_rows .append (row )
247
+
248
+ assert len (result_rows ) == 1
249
+
250
+ for row in result_rows :
251
+ for prefix in self .columns [table_index ].keys ():
252
+ for type_name in self .columns [table_index ][prefix ]:
253
+ self .assert_type (type_name , row_num , row [f"{ prefix } { cleanup_type_name (type_name )} " ])
254
+
162
255
@pytest .mark .parametrize ("store_type" , ["ROW" , "COLUMN" ], indirect = True )
163
256
def test_data_type (self ):
164
257
if any ("Decimal" in type_name for type_name in self .all_types .keys ()) and self .store_type == "COLUMN" :
@@ -175,3 +268,20 @@ def test_data_type(self):
175
268
self .check_table ()
176
269
self .write_data ()
177
270
self .check_table ()
271
+
272
+ @pytest .mark .parametrize ("store_type" , ["ROW" , "COLUMN" ], indirect = True )
273
+ def test_parametrized_data_type (self ):
274
+ if any ("Decimal" in type_name for type_name in self .all_types .keys ()) and self .store_type == "COLUMN" :
275
+ if (min (self .versions ) < (25 , 1 )):
276
+ pytest .skip ("Decimal types are not supported in columnshard in this version" )
277
+
278
+ self .create_table ()
279
+
280
+ self .parametrized_write_data ()
281
+ self .parametrized_check_table ()
282
+
283
+ self .change_cluster_version ()
284
+
285
+ self .parametrized_check_table ()
286
+ self .parametrized_write_data ()
287
+ self .parametrized_check_table ()
0 commit comments