Skip to content

Commit 85b19b2

Browse files
committed
add autosplit example
1 parent f5099d3 commit 85b19b2

File tree

1 file changed

+90
-0
lines changed

1 file changed

+90
-0
lines changed
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
import argparse
2+
import asyncio
3+
import datetime
4+
import logging
5+
6+
import ydb
7+
8+
logger = logging.getLogger(__name__)
9+
logger.setLevel(logging.DEBUG)
10+
11+
12+
async def connect(endpoint: str, database: str) -> ydb.aio.Driver:
13+
config = ydb.DriverConfig(endpoint=endpoint, database=database)
14+
config.credentials = ydb.credentials_from_env_variables()
15+
driver = ydb.aio.Driver(config)
16+
await driver.wait(5, fail_fast=True)
17+
return driver
18+
19+
20+
async def recreate_topic(driver: ydb.aio.Driver, topic: str, consumer: str):
21+
try:
22+
await driver.topic_client.drop_topic(topic)
23+
except ydb.SchemeError:
24+
pass
25+
26+
await driver.topic_client.create_topic(
27+
topic,
28+
consumers=[consumer],
29+
max_active_partitions=100,
30+
auto_partitioning_settings=ydb.TopicAutoPartitioningSettings(
31+
strategy=ydb.TopicAutoPartitioningStrategy.SCALE_UP,
32+
up_utilization_percent=1,
33+
down_utilization_percent=1,
34+
stabilization_window=datetime.timedelta(seconds=1),
35+
),
36+
)
37+
38+
39+
async def write_messages(driver: ydb.aio.Driver, topic: str, id: int = 0):
40+
async with driver.topic_client.writer(topic) as writer:
41+
for i in range(100):
42+
mess = ydb.TopicWriterMessage(data=f"[{id}] mess-{i}", metadata_items={"index": f"{i}"})
43+
await writer.write(mess)
44+
await asyncio.sleep(0.01)
45+
46+
47+
async def read_messages(driver: ydb.aio.Driver, topic: str, consumer: str):
48+
async with driver.topic_client.reader(topic, consumer, auto_partitioning_support=True) as reader:
49+
count = 0
50+
while True:
51+
try:
52+
mess = await asyncio.wait_for(reader.receive_message(), 5)
53+
count += 1
54+
print(mess.data.decode())
55+
reader.commit(mess)
56+
except asyncio.TimeoutError:
57+
assert count == 200
58+
return
59+
60+
61+
62+
async def main():
63+
parser = argparse.ArgumentParser(
64+
formatter_class=argparse.RawDescriptionHelpFormatter,
65+
description="""YDB topic basic example.\n""",
66+
)
67+
parser.add_argument("-d", "--database", default="/local", help="Name of the database to use")
68+
parser.add_argument("-e", "--endpoint", default="grpc://localhost:2136", help="Endpoint url to use")
69+
parser.add_argument("-p", "--path", default="test-topic", help="Topic name")
70+
parser.add_argument("-c", "--consumer", default="consumer", help="Consumer name")
71+
parser.add_argument("-v", "--verbose", default=True, action="store_true")
72+
73+
74+
args = parser.parse_args()
75+
76+
if args.verbose:
77+
logger.addHandler(logging.StreamHandler())
78+
79+
driver = await connect(args.endpoint, args.database)
80+
81+
await recreate_topic(driver, args.path, args.consumer)
82+
83+
await asyncio.gather(
84+
write_messages(driver, args.path, 0),
85+
write_messages(driver, args.path, 1),
86+
read_messages(driver, args.path, args.consumer),
87+
)
88+
89+
if __name__ == "__main__":
90+
asyncio.run(main())

0 commit comments

Comments
 (0)