Skip to content

Commit ce138f8

Browse files
authored
Merge pull request #412 added topic basic example
2 parents 4c40e2e + dc52058 commit ce138f8

File tree

1 file changed

+82
-0
lines changed

1 file changed

+82
-0
lines changed

examples/topic/basic_example.py

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
import argparse
2+
import asyncio
3+
import logging
4+
5+
import ydb
6+
7+
8+
async def connect(endpoint: str, database: str) -> ydb.aio.Driver:
9+
config = ydb.DriverConfig(endpoint=endpoint, database=database)
10+
config.credentials = ydb.credentials_from_env_variables()
11+
driver = ydb.aio.Driver(config)
12+
await driver.wait(15)
13+
return driver
14+
15+
16+
async def create_topic(driver: ydb.aio.Driver, topic: str, consumer: str):
17+
try:
18+
await driver.topic_client.drop_topic(topic)
19+
except ydb.SchemeError:
20+
pass
21+
22+
await driver.topic_client.create_topic(topic, consumers=[consumer])
23+
24+
25+
async def write_messages(driver: ydb.aio.Driver, topic: str):
26+
async with driver.topic_client.writer(topic) as writer:
27+
for i in range(10):
28+
await writer.write(f"mess-{i}")
29+
await asyncio.sleep(1)
30+
31+
32+
async def read_messages(driver: ydb.aio.Driver, topic: str, consumer: str):
33+
async with driver.topic_client.reader(topic, consumer) as reader:
34+
while True:
35+
try:
36+
mess = await asyncio.wait_for(reader.receive_message(), 5)
37+
print()
38+
print(mess.seqno)
39+
print(mess.created_at)
40+
print(mess.data.decode())
41+
reader.commit(mess)
42+
except asyncio.TimeoutError:
43+
return
44+
45+
46+
async def main():
47+
parser = argparse.ArgumentParser(
48+
formatter_class=argparse.RawDescriptionHelpFormatter,
49+
description="""YDB topic basic example.\n""",
50+
)
51+
parser.add_argument("-d", "--database", default="/local", help="Name of the database to use")
52+
parser.add_argument("-e", "--endpoint", default="grpc://localhost:2136", help="Endpoint url to use")
53+
parser.add_argument("-p", "--path", default="test-topic", help="Topic name")
54+
parser.add_argument("-c", "--consumer", default="consumer", help="Consumer name")
55+
parser.add_argument("-v", "--verbose", default=False, action="store_true")
56+
parser.add_argument(
57+
"-s",
58+
"--skip-drop-and-create-topic",
59+
default=False,
60+
action="store_true",
61+
help="Use existed topic, skip remove it and re-create",
62+
)
63+
64+
args = parser.parse_args()
65+
66+
if args.verbose:
67+
logger = logging.getLogger("topicexample")
68+
logger.setLevel(logging.DEBUG)
69+
logger.addHandler(logging.StreamHandler())
70+
71+
driver = await connect(args.endpoint, args.database)
72+
if not args.skip_drop_and_create_topic:
73+
await create_topic(driver, args.path, args.consumer)
74+
75+
await asyncio.gather(
76+
write_messages(driver, args.path),
77+
read_messages(driver, args.path, args.consumer),
78+
)
79+
80+
81+
if __name__ == "__main__":
82+
asyncio.run(main())

0 commit comments

Comments
 (0)