11# -*- coding: utf-8 -*-
22import boto3
3-
4- import os
5-
3+ import time
4+ import pytest
5+ import logging
66import yatest
7+ import os
8+ import json
79from ydb .tests .library .harness .kikimr_runner import KiKiMR
810from ydb .tests .library .harness .kikimr_config import KikimrConfigGenerator
911from ydb .tests .library .harness .param_constants import kikimr_driver_path
1012from ydb .tests .library .common .types import Erasure
1113from ydb .tests .oss .ydb_sdk_import import ydb
1214
15+ from decimal import Decimal
16+
17+
18+ last_stable_binary_path = yatest .common .binary_path ("ydb/tests/library/compatibility/ydbd-last-stable" )
19+ current_binary_path = kikimr_driver_path ()
20+
21+ all_binary_combinations = [
22+ [last_stable_binary_path , current_binary_path ],
23+ [last_stable_binary_path , [last_stable_binary_path , current_binary_path ]],
24+ [current_binary_path , last_stable_binary_path ],
25+ [current_binary_path , current_binary_path ],
26+ ]
27+ all_binary_combinations_ids = [
28+ "last_stable_to_current" ,
29+ "last_stable_to_current_mixed" ,
30+ "current_to_last_stable" ,
31+ "current_to_current" ,
32+ ]
33+
34+ logger = logging .getLogger (__name__ )
35+
1336
1437class TestCompatibility (object ):
15- @classmethod
16- def setup_class (cls ):
17- last_stable_path = yatest .common .binary_path ("ydb/tests/library/compatibility/ydbd-last-stable" )
18- binary_paths = [kikimr_driver_path (), last_stable_path ]
19- cls .cluster = KiKiMR (KikimrConfigGenerator (erasure = Erasure .MIRROR_3_DC , binary_paths = binary_paths ))
20- cls .cluster .start ()
21- cls .endpoint = "%s:%s" % (
22- cls .cluster .nodes [1 ].host , cls .cluster .nodes [1 ].port
38+ @pytest .fixture (autouse = True , params = all_binary_combinations , ids = all_binary_combinations_ids )
39+ def setup (self , request ):
40+ self .all_binary_paths = request .param
41+ self .config = KikimrConfigGenerator (
42+ erasure = Erasure .MIRROR_3_DC ,
43+ binary_paths = [self .all_binary_paths [0 ]],
44+ use_in_memory_pdisks = False ,
45+
46+ extra_feature_flags = {
47+ "suppress_compatibility_check" : True ,
48+ # "enable_table_datetime64": True # uncomment for 64 datetime in tpc-h/tpc-ds
49+ },
50+ column_shard_config = {
51+ 'disabled_on_scheme_shard' : False ,
52+ },
2353 )
24- cls .driver = ydb .Driver (
54+
55+ self .cluster = KiKiMR (self .config )
56+ self .cluster .start ()
57+ self .endpoint = "grpc://%s:%s" % ('localhost' , self .cluster .nodes [1 ].port )
58+ output_path = yatest .common .test_output_path ()
59+ self .output_f = open (os .path .join (output_path , "out.log" ), "w" )
60+ self .s3_config = self .setup_s3 ()
61+
62+ self .driver = ydb .Driver (
2563 ydb .DriverConfig (
2664 database = '/Root' ,
27- endpoint = cls .endpoint
65+ endpoint = self .endpoint
2866 )
2967 )
30- cls .driver .wait ()
31- output_path = yatest .common .test_output_path ()
32- cls .output_f = open (os .path .join (output_path , "out.log" ), "w" )
33-
34- cls .s3_config = cls .setup_s3 ()
35-
36- @classmethod
37- def teardown_class (cls ):
38- if hasattr (cls , 'driver' ):
39- cls .driver .stop ()
40-
41- if hasattr (cls , 'cluster' ):
42- cls .cluster .stop (kill = True ) # TODO fix
68+ self .driver .wait ()
69+ yield
70+ self .cluster .stop ()
4371
4472 @staticmethod
4573 def setup_s3 ():
@@ -56,48 +84,259 @@ def setup_s3():
5684
5785 return s3_endpoint , s3_access_key , s3_secret_key , s3_bucket
5886
59- def test_simple (self ):
60- session = ydb .retry_operation_sync (lambda : self .driver .table_client .session ().create ())
87+ def change_cluster_version (self , new_binary_paths ):
88+ binary_path_before = self .config .get_binary_paths ()
89+ versions_on_before = self .get_nodes_version ()
90+ if isinstance (new_binary_paths , str ):
91+ new_binary_paths = [new_binary_paths ]
92+ elif not isinstance (new_binary_paths , list ):
93+ raise ValueError ("binary_paths must be a string or a list of strings" )
94+ self .config .set_binary_paths (new_binary_paths )
95+ self .cluster .update_nodes_configurator (self .config )
96+ time .sleep (60 )
97+ versions_on_after = self .get_nodes_version ()
98+ if binary_path_before != new_binary_paths :
99+ assert versions_on_before != versions_on_after , f'Versions on before and after should be different: { versions_on_before } { versions_on_after } '
100+ else :
101+ assert versions_on_before == versions_on_after , f'Versions on before and after should be the same: { versions_on_before } { versions_on_after } '
102+
103+ def get_nodes_version (self ):
104+ versions = []
105+ for node_id , node in enumerate (self .cluster .nodes .values ()):
106+ node .get_config_version ()
107+ get_version_command = [
108+ yatest .common .binary_path (os .getenv ("YDB_CLI_BINARY" )),
109+ "--verbose" ,
110+ "--endpoint" ,
111+ "grpc://localhost:%d" % node .grpc_port ,
112+ "--database=/Root" ,
113+ "yql" ,
114+ "--script" ,
115+ f'select version() as node_{ node_id } _version' ,
116+ '--format' ,
117+ 'json-unicode'
118+ ]
119+ result = yatest .common .execute (get_version_command , wait = True )
120+ result_data = json .loads (result .std_out .decode ('utf-8' ))
121+ logger .debug (f'node_{ node_id } _version": { result_data } ' )
122+ node_version_key = f"node_{ node_id } _version"
123+ if node_version_key in result_data :
124+ node_version = result_data [node_version_key ]
125+ versions .append (node_version )
126+ else :
127+ print (f"Key { node_version_key } not found in the result." )
128+ return versions
129+
130+ def check_table_exists (driver , table_path ):
131+ try :
132+ driver .scheme_client .describe_table (table_path )
133+ return True
134+ except ydb .SchemeError as e :
135+ if e .issue_code == ydb .IssueCode .SCHEME_ERROR_NO_SUCH_TABLE :
136+ return False
137+ else :
138+ raise
139+
140+ def exec_query (self , query : str ):
141+ command = [
142+ yatest .common .binary_path (os .getenv ("YDB_CLI_BINARY" )),
143+ "--verbose" ,
144+ "-e" ,
145+ "grpc://localhost:%d" % self .cluster .nodes [1 ].port ,
146+ "-d" ,
147+ "/Root" ,
148+ "yql" ,
149+ "--script" ,
150+ f"{ query } "
151+ ]
152+ yatest .common .execute (command , wait = True , stdout = self .output_f )
153+
154+ def execute_scan_query (self , query_body ):
155+ query = ydb .ScanQuery (query_body , {})
156+ it = self .driver .table_client .scan_query (query )
157+ result_set = []
158+
159+ try :
160+ while True :
161+ result = next (it )
162+ result_set .extend (result .result_set .rows )
163+ except StopIteration :
164+ pass
165+
166+ return result_set
167+
168+ def log_database_scheme (self ):
169+ get_scheme_command = [
170+ yatest .common .binary_path (os .getenv ("YDB_CLI_BINARY" )),
171+ "--verbose" ,
172+ "-e" ,
173+ "grpc://localhost:%d" % self .cluster .nodes [1 ].port ,
174+ "-d" ,
175+ "/Root" ,
176+ "scheme" ,
177+ "ls" ,
178+ "-l" ,
179+ "-R"
180+ ]
181+ yatest .common .execute (get_scheme_command , wait = True , stdout = self .output_f )
182+
183+ @pytest .mark .parametrize ("store_type" , ["row" , "column" ])
184+ def test_simple (self , store_type ):
185+ def read_update_data (self , iteration_count = 1 , start_index = 0 ):
186+ id_ = start_index
61187
62- with ydb .SessionPool (self .driver , size = 1 ) as pool :
63- with pool .checkout () as session :
64- session .execute_scheme (
65- "create table `sample_table` (id Uint64, value Uint64, payload Utf8, PRIMARY KEY(id)) WITH (AUTO_PARTITIONING_BY_SIZE = ENABLED, AUTO_PARTITIONING_PARTITION_SIZE_MB = 1);"
188+ upsert_count = 200
189+ iteration_count = iteration_count
190+ for i in range (iteration_count ):
191+ rows = []
192+ for j in range (upsert_count ):
193+ row = {}
194+ row ["id" ] = id_
195+ row ["value" ] = 1
196+ row ["payload" ] = "DEADBEEF" * 1024 * 16 # 128 kb
197+ row ["income" ] = Decimal ("123.001" ).quantize (Decimal ('0.000000000' ))
198+
199+ rows .append (row )
200+ id_ += 1
201+
202+ column_types = ydb .BulkUpsertColumns ()
203+ column_types .add_column ("id" , ydb .PrimitiveType .Uint64 )
204+ column_types .add_column ("value" , ydb .PrimitiveType .Uint64 )
205+ column_types .add_column ("payload" , ydb .PrimitiveType .Utf8 )
206+ column_types .add_column ("income" , ydb .DecimalType ())
207+ self .driver .table_client .bulk_upsert (
208+ "Root/sample_table" , rows , column_types
66209 )
67- id_ = 0
68-
69- upsert_count = 200
70- iteration_count = 1
71- for i in range (iteration_count ):
72- rows = []
73- for j in range (upsert_count ):
74- row = {}
75- row ["id" ] = id_
76- row ["value" ] = 1
77- row ["payload" ] = "DEADBEEF" * 1024 * 16 # 128 kb
78- rows .append (row )
79- id_ += 1
80-
81- column_types = ydb .BulkUpsertColumns ()
82- column_types .add_column ("id" , ydb .PrimitiveType .Uint64 )
83- column_types .add_column ("value" , ydb .PrimitiveType .Uint64 )
84- column_types .add_column ("payload" , ydb .PrimitiveType .Utf8 )
85- self .driver .table_client .bulk_upsert (
86- "Root/sample_table" , rows , column_types
210+
211+ query_body = "SELECT SUM(value) as sum_value from `sample_table`"
212+ query = ydb .ScanQuery (query_body , {})
213+ it = self .driver .table_client .scan_query (query )
214+ result_set = []
215+
216+ while True :
217+ try :
218+ result = next (it )
219+ result_set = result_set + result .result_set .rows
220+ except StopIteration :
221+ break
222+
223+ for row in result_set :
224+ print (" " .join ([str (x ) for x in list (row .values ())]))
225+
226+ assert len (result_set ) == 1
227+ assert len (result_set [0 ]) == 1
228+ result = list (result_set )
229+ assert len (result ) == 1
230+ assert result [0 ]['sum_value' ] == upsert_count * iteration_count + start_index
231+
232+ def create_table_column (self ):
233+ with ydb .SessionPool (self .driver , size = 1 ) as pool :
234+ with pool .checkout () as session :
235+ session .execute_scheme (
236+ """create table `sample_table` (
237+ id Uint64 NOT NULL, value Uint64,
238+ payload Utf8, income Decimal(22,9),
239+ PRIMARY KEY(id)
240+ ) WITH (
241+ STORE = COLUMN,
242+ AUTO_PARTITIONING_BY_SIZE = ENABLED,
243+ AUTO_PARTITIONING_PARTITION_SIZE_MB = 1);"""
87244 )
88245
89- query = "SELECT SUM(value) from sample_table"
90- result_sets = session .transaction ().execute (
91- query , commit_tx = True
92- )
93- for row in result_sets [0 ].rows :
94- print (" " .join ([str (x ) for x in list (row .values ())]))
95-
96- assert len (result_sets ) == 1
97- assert len (result_sets [0 ].rows ) == 1
98- result = list (result_sets [0 ].rows [0 ].values ())
99- assert len (result ) == 1
100- assert result [0 ] == upsert_count * iteration_count
246+ def create_table_row (self ):
247+ with ydb .SessionPool (self .driver , size = 1 ) as pool :
248+ with pool .checkout () as session :
249+ session .execute_scheme (
250+ """create table `sample_table` (
251+ id Uint64, value Uint64,
252+ payload Utf8, income Decimal(22,9),
253+ PRIMARY KEY(id)
254+ ) WITH (
255+ AUTO_PARTITIONING_BY_SIZE = ENABLED,
256+ AUTO_PARTITIONING_PARTITION_SIZE_MB = 1);"""
257+ )
258+
259+ create_table_row (self ) if store_type == "row" else create_table_column (self )
260+ read_update_data (self )
261+ self .change_cluster_version (self .all_binary_paths [1 ])
262+ assert self .execute_scan_query ('select count(*) as row_count from `sample_table`' )[0 ]['row_count' ] == 200 , 'Expected 200 rows after update version'
263+ read_update_data (self , iteration_count = 2 , start_index = 100 )
264+ assert self .execute_scan_query ('select count(*) as row_count from `sample_table`' )[0 ]['row_count' ] == 500 , 'Expected 500 rows: update 100-200 rows and added 300 rows'
265+
266+ @pytest .mark .parametrize ("store_type" , ["row" , "column" ])
267+ def test_tpch1 (self , store_type ):
268+ result_json_path = os .path .join (yatest .common .test_output_path (), "result.json" )
269+ query_output_path = os .path .join (yatest .common .test_output_path (), "query_output.json" )
270+ init_command = [
271+ yatest .common .binary_path (os .getenv ("YDB_CLI_BINARY" )),
272+ "--verbose" ,
273+ "--endpoint" ,
274+ "grpc://localhost:%d" % self .cluster .nodes [1 ].port ,
275+ "--database=/Root" ,
276+ "workload" ,
277+ "tpch" ,
278+ "-p" ,
279+ "tpch" ,
280+ "init" ,
281+ "--store={}" .format (store_type ),
282+ "--datetime" , # use 32 bit dates instead of 64 (not supported in 24-4)
283+ "--partition-size=25" ,
284+ ]
285+ import_command = [
286+ yatest .common .binary_path (os .getenv ("YDB_CLI_BINARY" )),
287+ "--verbose" ,
288+ "--endpoint" ,
289+ "grpc://localhost:%d" % self .cluster .nodes [1 ].port ,
290+ "--database=/Root" ,
291+ "workload" ,
292+ "tpch" ,
293+ "-p" ,
294+ "tpch" ,
295+ "import" ,
296+ "generator" ,
297+ "--scale=1" ,
298+ ]
299+ run_command = [
300+ yatest .common .binary_path (os .getenv ("YDB_CLI_BINARY" )),
301+ "--verbose" ,
302+ "--endpoint" ,
303+ "grpc://localhost:%d" % self .cluster .nodes [1 ].port ,
304+ "--database=/Root" ,
305+ "workload" ,
306+ "tpch" ,
307+ "-p" ,
308+ "tpch" ,
309+ "run" ,
310+ "--scale=1" ,
311+ "--exclude" ,
312+ "17" , # not working for row tables
313+ "--check-canonical" ,
314+ "--retries" ,
315+ "5" , # in row tables we have to retry query by design
316+ "--json" ,
317+ result_json_path ,
318+ "--output" ,
319+ query_output_path ,
320+ ]
321+ clean_command = [
322+ yatest .common .binary_path (os .getenv ("YDB_CLI_BINARY" )),
323+ "--verbose" ,
324+ "--endpoint" ,
325+ "grpc://localhost:%d" % self .cluster .nodes [1 ].port ,
326+ "--database=/Root" ,
327+ "workload" ,
328+ "tpch" ,
329+ "-p" ,
330+ "tpch" ,
331+ "clean"
332+ ]
333+
334+ yatest .common .execute (init_command , wait = True , stdout = self .output_f )
335+ yatest .common .execute (import_command , wait = True , stdout = self .output_f )
336+ yatest .common .execute (run_command , wait = True , stdout = self .output_f )
337+ self .change_cluster_version (self .all_binary_paths [1 ])
338+ yatest .common .execute (run_command , wait = True , stdout = self .output_f )
339+ yatest .common .execute (clean_command , wait = True , stdout = self .output_f )
101340
102341 def test_export (self ):
103342 s3_endpoint , s3_access_key , s3_secret_key , s3_bucket = self .s3_config
0 commit comments