1+ import argparse
2+ import asyncio
3+ import logging
4+
5+ import ydb
6+
7+
8+ async def connect (endpoint : str , database : str ) -> ydb .aio .Driver :
9+ config = ydb .DriverConfig (endpoint = endpoint , database = database )
10+ config .credentials = ydb .credentials_from_env_variables ()
11+ driver = ydb .aio .Driver (config )
12+ await driver .wait (15 )
13+ return driver
14+
15+
16+ async def create_topic (driver : ydb .aio .Driver , topic : str , consumer : str ):
17+ try :
18+ await driver .topic_client .drop_topic (topic )
19+ except ydb .SchemeError :
20+ pass
21+
22+ await driver .topic_client .create_topic (topic , consumers = [consumer ])
23+
24+
25+ async def write_messages (driver : ydb .aio .Driver , topic : str ):
26+ async with driver .topic_client .writer (topic ) as writer :
27+ for i in range (10 ):
28+ await writer .write (f"mess-{ i } " )
29+ await asyncio .sleep (1 )
30+
31+
32+ async def read_messages (driver : ydb .aio .Driver , topic : str , consumer : str ):
33+ async with driver .topic_client .reader (topic , consumer ) as reader :
34+ while True :
35+ try :
36+ mess = await asyncio .wait_for (reader .receive_message (), 5 )
37+ print ()
38+ print (mess .seqno )
39+ print (mess .created_at )
40+ print (mess .data .decode ())
41+ reader .commit (mess )
42+ except asyncio .TimeoutError :
43+ return
44+
45+
46+ async def main ():
47+ parser = argparse .ArgumentParser (
48+ formatter_class = argparse .RawDescriptionHelpFormatter ,
49+ description = """YDB topic basic example.\n """ ,
50+ )
51+ parser .add_argument ("-d" , "--database" , default = "/local" , help = "Name of the database to use" )
52+ parser .add_argument ("-e" , "--endpoint" , default = "grpc://localhost:2136" , help = "Endpoint url to use" )
53+ parser .add_argument ("-p" , "--path" , default = "test-topic" , help = "Topic name" )
54+ parser .add_argument ("-c" , "--consumer" , default = "consumer" , help = "Consumer name" )
55+ parser .add_argument ("-v" , "--verbose" , default = False , action = "store_true" )
56+ parser .add_argument ("-s" , "--skip-drop-and-create-topic" , default = False , action = "store_true" , help = "Use existed topic, skip remove it and re-create" )
57+
58+ args = parser .parse_args ()
59+
60+ if args .verbose :
61+ logger = logging .getLogger ("topicexample" )
62+ logger .setLevel (logging .DEBUG )
63+ logger .addHandler (logging .StreamHandler ())
64+
65+ driver = await connect (args .endpoint , args .database )
66+ if not args .skip_drop_and_create_topic :
67+ await create_topic (driver , args .path , args .consumer )
68+
69+ await asyncio .gather (
70+ write_messages (driver , args .path ),
71+ read_messages (driver , args .path , args .consumer ),
72+ )
73+
74+
75+ if __name__ == '__main__' :
76+ asyncio .run (main ())
0 commit comments