11# -*- coding: utf-8 -*-
2- import posixpath
32import ydb
43import basic_example_data
54
6- # Table path prefix allows to put the working tables into the specific directory
7- # inside the YDB database. Putting `PRAGMA TablePathPrefix("some/path")`
8- # at the beginning of the query allows to reference the tables through
9- # their names "under" the specified directory.
10- #
11- # TablePathPrefix has to be defined as an absolute path, which has to be started
12- # with the current database location.
13- #
14- # https://ydb.tech/ru/docs/yql/reference/syntax/pragma#table-path-prefix
15-
16- DropTablesQuery = """PRAGMA TablePathPrefix("{}");
5+
6+ DropTablesQuery = """
177DROP TABLE IF EXISTS series;
188DROP TABLE IF EXISTS seasons;
199DROP TABLE IF EXISTS episodes;
2010"""
2111
22- FillDataQuery = """PRAGMA TablePathPrefix("{}");
23-
12+ FillDataQuery = """
2413DECLARE $seriesData AS List<Struct<
2514 series_id: Int64,
2615 title: Utf8,
6958"""
7059
7160
72- def fill_tables_with_data (pool : ydb .QuerySessionPool , path : str ):
61+ def fill_tables_with_data (pool : ydb .QuerySessionPool ):
7362 print ("\n Filling tables with data..." )
7463
75- query = FillDataQuery .format (path )
76-
7764 pool .execute_with_retries (
78- query ,
65+ FillDataQuery ,
7966 {
8067 "$seriesData" : (basic_example_data .get_series_data (), basic_example_data .get_series_data_type ()),
8168 "$seasonsData" : (basic_example_data .get_seasons_data (), basic_example_data .get_seasons_data_type ()),
@@ -84,11 +71,10 @@ def fill_tables_with_data(pool: ydb.QuerySessionPool, path: str):
8471 )
8572
8673
87- def select_simple (pool : ydb .QuerySessionPool , path : str ):
74+ def select_simple (pool : ydb .QuerySessionPool ):
8875 print ("\n Check series table..." )
8976 result_sets = pool .execute_with_retries (
90- f"""
91- PRAGMA TablePathPrefix("{ path } ");
77+ """
9278 SELECT
9379 series_id,
9480 title,
@@ -111,21 +97,23 @@ def select_simple(pool: ydb.QuerySessionPool, path: str):
11197 return first_set
11298
11399
114- def upsert_simple (pool : ydb .QuerySessionPool , path : str ):
100+ def upsert_simple (pool : ydb .QuerySessionPool ):
115101 print ("\n Performing UPSERT into episodes..." )
116102
117103 pool .execute_with_retries (
118- f"""
119- PRAGMA TablePathPrefix("{ path } ");
104+ """
120105 UPSERT INTO episodes (series_id, season_id, episode_id, title) VALUES (2, 6, 1, "TBD");
121106 """
122107 )
123108
124109
125- def select_with_parameters (pool : ydb .QuerySessionPool , path : str , series_id , season_id , episode_id ):
110+ def select_with_parameters (pool : ydb .QuerySessionPool , series_id , season_id , episode_id ):
126111 result_sets = pool .execute_with_retries (
127- f"""
128- PRAGMA TablePathPrefix("{ path } ");
112+ """
113+ DECLARE $seriesId AS Int64;
114+ DECLARE $seasonId AS Int64;
115+ DECLARE $episodeId AS Int64;
116+
129117 SELECT
130118 title,
131119 air_date
@@ -151,10 +139,13 @@ def select_with_parameters(pool: ydb.QuerySessionPool, path: str, series_id, sea
151139# In most cases it's better to use transaction control settings in session.transaction
152140# calls instead to avoid additional hops to YDB cluster and allow more efficient
153141# execution of queries.
154- def explicit_transaction_control (pool : ydb .QuerySessionPool , path : str , series_id , season_id , episode_id ):
142+ def explicit_transaction_control (pool : ydb .QuerySessionPool , series_id , season_id , episode_id ):
155143 def callee (session : ydb .QuerySessionSync ):
156- query = f"""
157- PRAGMA TablePathPrefix("{ path } ");
144+ query = """
145+ DECLARE $seriesId AS Int64;
146+ DECLARE $seasonId AS Int64;
147+ DECLARE $episodeId AS Int64;
148+
158149 UPDATE episodes
159150 SET air_date = CurrentUtcDate()
160151 WHERE series_id = $seriesId AND season_id = $seasonId AND episode_id = $episodeId;
@@ -183,16 +174,31 @@ def callee(session: ydb.QuerySessionSync):
183174 return pool .retry_operation_sync (callee )
184175
185176
186- def drop_tables (pool : ydb .QuerySessionPool , path : str ):
177+ def huge_select (pool : ydb .QuerySessionPool ):
178+ def callee (session : ydb .QuerySessionSync ):
179+ query = """SELECT * from episodes;"""
180+
181+ with session .transaction ().execute (
182+ query ,
183+ commit_tx = True ,
184+ ) as result_sets :
185+ print ("\n > Huge SELECT call" )
186+ for result_set in result_sets :
187+ for row in result_set .rows :
188+ print ("episode title:" , row .title , ", air date:" , row .air_date )
189+
190+ return pool .retry_operation_sync (callee )
191+
192+
193+ def drop_tables (pool : ydb .QuerySessionPool ):
187194 print ("\n Cleaning up existing tables..." )
188- pool .execute_with_retries (DropTablesQuery . format ( path ) )
195+ pool .execute_with_retries (DropTablesQuery )
189196
190197
191- def create_tables (pool : ydb .QuerySessionPool , path : str ):
198+ def create_tables (pool : ydb .QuerySessionPool ):
192199 print ("\n Creating table series..." )
193200 pool .execute_with_retries (
194- f"""
195- PRAGMA TablePathPrefix("{ path } ");
201+ """
196202 CREATE table `series` (
197203 `series_id` Int64,
198204 `title` Utf8,
@@ -205,8 +211,7 @@ def create_tables(pool: ydb.QuerySessionPool, path: str):
205211
206212 print ("\n Creating table seasons..." )
207213 pool .execute_with_retries (
208- f"""
209- PRAGMA TablePathPrefix("{ path } ");
214+ """
210215 CREATE table `seasons` (
211216 `series_id` Int64,
212217 `season_id` Int64,
@@ -220,8 +225,7 @@ def create_tables(pool: ydb.QuerySessionPool, path: str):
220225
221226 print ("\n Creating table episodes..." )
222227 pool .execute_with_retries (
223- f"""
224- PRAGMA TablePathPrefix("{ path } ");
228+ """
225229 CREATE table `episodes` (
226230 `series_id` Int64,
227231 `season_id` Int64,
@@ -234,29 +238,7 @@ def create_tables(pool: ydb.QuerySessionPool, path: str):
234238 )
235239
236240
237- def is_directory_exists (driver : ydb .Driver , path : str ):
238- try :
239- return driver .scheme_client .describe_path (path ).is_directory ()
240- except ydb .SchemeError :
241- return False
242-
243-
244- def ensure_path_exists (driver , database , path ):
245- paths_to_create = list ()
246- path = path .rstrip ("/" )
247- while path not in ("" , database ):
248- full_path = posixpath .join (database , path )
249- if is_directory_exists (driver , full_path ):
250- break
251- paths_to_create .append (full_path )
252- path = posixpath .dirname (path ).rstrip ("/" )
253-
254- while len (paths_to_create ) > 0 :
255- full_path = paths_to_create .pop (- 1 )
256- driver .scheme_client .make_directory (full_path )
257-
258-
259- def run (endpoint , database , path ):
241+ def run (endpoint , database ):
260242 with ydb .Driver (
261243 endpoint = endpoint ,
262244 database = database ,
@@ -265,25 +247,19 @@ def run(endpoint, database, path):
265247 driver .wait (timeout = 5 , fail_fast = True )
266248
267249 with ydb .QuerySessionPool (driver ) as pool :
250+ drop_tables (pool )
268251
269- ensure_path_exists (driver , database , path )
270-
271- # absolute path - prefix to the table's names,
272- # including the database location
273- full_path = posixpath .join (database , path )
274-
275- drop_tables (pool , full_path )
276-
277- create_tables (pool , full_path )
252+ create_tables (pool )
278253
279- fill_tables_with_data (pool , full_path )
254+ fill_tables_with_data (pool )
280255
281- select_simple (pool , full_path )
256+ select_simple (pool )
282257
283- upsert_simple (pool , full_path )
258+ upsert_simple (pool )
284259
285- select_with_parameters (pool , full_path , 2 , 3 , 7 )
286- select_with_parameters (pool , full_path , 2 , 3 , 8 )
260+ select_with_parameters (pool , 2 , 3 , 7 )
261+ select_with_parameters (pool , 2 , 3 , 8 )
287262
288- explicit_transaction_control (pool , full_path , 2 , 6 , 1 )
289- select_with_parameters (pool , full_path , 2 , 6 , 1 )
263+ explicit_transaction_control (pool , 2 , 6 , 1 )
264+ select_with_parameters (pool , 2 , 6 , 1 )
265+ huge_select (pool )
0 commit comments