1- import asyncio
21import argparse
32import logging
43import ydb
54
65
7- async def connect (endpoint : str , database : str ) -> ydb . aio .Driver :
6+ def connect (endpoint : str , database : str ) -> ydb .Driver :
87 config = ydb .DriverConfig (endpoint = endpoint , database = database )
98 config .credentials = ydb .credentials_from_env_variables ()
10- driver = ydb .aio . Driver (config )
11- await driver .wait (5 , fail_fast = True )
9+ driver = ydb .Driver (config )
10+ driver .wait (5 , fail_fast = True )
1211 return driver
1312
1413
15- async def create_topic (driver : ydb . aio .Driver , topic : str , consumer : str ):
14+ def create_topic (driver : ydb .Driver , topic : str , consumer : str ):
1615 try :
17- await driver .topic_client .drop_topic (topic )
16+ driver .topic_client .drop_topic (topic )
1817 except ydb .SchemeError :
1918 pass
2019
21- await driver .topic_client .create_topic (topic , consumers = [consumer ])
20+ driver .topic_client .create_topic (topic , consumers = [consumer ])
2221
2322
24- async def write_with_tx_example (driver : ydb . aio .Driver , topic : str , message_count : int = 10 ):
25- async with ydb . aio .QuerySessionPool (driver ) as session_pool :
23+ def write_with_tx_example (driver : ydb .Driver , topic : str , message_count : int = 10 ):
24+ with ydb .QuerySessionPool (driver ) as session_pool :
2625
27- async def callee (tx : ydb .aio .QueryTxContext ):
28- print (f"TX ID: { tx .tx_id } " )
29- print (f"TX STATE: { tx ._tx_state ._state .value } " )
30- tx_writer : ydb .TopicTxWriterAsyncIO = driver .topic_client .tx_writer (tx , topic )
31- print (f"TX ID: { tx .tx_id } " )
32- print (f"TX STATE: { tx ._tx_state ._state .value } " )
26+ def callee (tx : ydb .QueryTxContext ):
27+ tx_writer : ydb .TopicTxWriter = driver .topic_client .tx_writer (tx , topic )
3328 for i in range (message_count ):
34- result_stream = await tx .execute (query = f"select { i } as res" )
35- messages = [result_set .rows [0 ]["res" ] async for result_set in result_stream ]
29+ result_stream = tx .execute (query = f"select { i } as res" )
30+ messages = [result_set .rows [0 ]["res" ] for result_set in result_stream ]
3631
37- await tx_writer .write ([ydb .TopicWriterMessage (data = str (message )) for message in messages ])
32+ tx_writer .write ([ydb .TopicWriterMessage (data = str (message )) for message in messages ])
3833
3934 print (f"Messages { messages } were written with tx." )
4035
41- await session_pool .retry_tx_async (callee )
36+ session_pool .retry_tx_sync (callee )
4237
4338
44- async def read_with_tx_example (driver : ydb . aio .Driver , topic : str , consumer : str , message_count : int = 10 ):
45- async with driver .topic_client .reader (topic , consumer ) as reader :
46- async with ydb . aio .QuerySessionPool (driver ) as session_pool :
39+ def read_with_tx_example (driver : ydb .Driver , topic : str , consumer : str , message_count : int = 10 ):
40+ with driver .topic_client .reader (topic , consumer ) as reader :
41+ with ydb .QuerySessionPool (driver ) as session_pool :
4742 for _ in range (message_count ):
4843
49- async def callee (tx : ydb . aio .QueryTxContext ):
50- batch = await reader .receive_batch_with_tx (tx , max_messages = 1 )
51- print (f"Messages { batch .messages [0 ].data } were read with tx." )
44+ def callee (tx : ydb .QueryTxContext ):
45+ batch = reader .receive_batch_with_tx (tx , max_messages = 1 )
46+ print (f"Messages [ { batch .messages [0 ].data . decode () } ] were read with tx." )
5247
53- await session_pool .retry_tx_async (callee )
48+ session_pool .retry_tx_sync (callee )
5449
5550
56- async def main ():
51+ def main ():
5752 parser = argparse .ArgumentParser (
5853 formatter_class = argparse .RawDescriptionHelpFormatter ,
5954 description = """YDB topic basic example.\n """ ,
@@ -78,13 +73,13 @@ async def main():
7873 logger .setLevel (logging .DEBUG )
7974 logger .addHandler (logging .StreamHandler ())
8075
81- driver = await connect (args .endpoint , args .database )
82- if not args .skip_drop_and_create_topic :
83- await create_topic (driver , args .path , args .consumer )
76+ with connect (args .endpoint , args .database ) as driver :
77+ if not args .skip_drop_and_create_topic :
78+ create_topic (driver , args .path , args .consumer )
8479
85- await write_with_tx_example (driver , args .path )
86- await read_with_tx_example (driver , args .path , args .consumer )
80+ write_with_tx_example (driver , args .path )
81+ read_with_tx_example (driver , args .path , args .consumer )
8782
8883
8984if __name__ == "__main__" :
90- asyncio . run ( main () )
85+ main ()
0 commit comments