Skip to content

Commit 677edfd

Browse files
committed
sync
1 parent 3ebf21d commit 677edfd

File tree

1 file changed

+29
-49
lines changed

1 file changed

+29
-49
lines changed

examples/topic/reader_async_example.py

Lines changed: 29 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -15,61 +15,52 @@ async def connect():
1515

1616

1717
async def create_reader_and_close_with_context_manager(db: ydb.aio.Driver):
18-
with ydb.TopicClientAsyncIO(db).reader(
18+
async with ydb.TopicClientAsyncIO(db).reader(
1919
"/database/topic/path", consumer="consumer"
2020
) as reader:
21-
async for message in reader.messages():
22-
pass
21+
...
2322

2423

2524
async def print_message_content(reader: ydb.TopicReaderAsyncIO):
26-
async for message in reader.messages():
25+
while True:
26+
message = await reader.receive_message()
2727
print("text", message.data.read().decode("utf-8"))
2828
# await and async_commit need only for sync commit mode - for wait ack from servr
2929
await reader.commit(message)
3030

3131

32-
async def process_messages_batch_explicit_commit(reader: ydb.TopicReaderAsyncIO):
32+
async def process_messages_batch_with_commit(reader: ydb.TopicReaderAsyncIO):
3333
# Explicit commit example
34-
async for batch in reader.batches(max_messages=100, timeout=2):
35-
async with asyncio.TaskGroup() as tg:
36-
for message in batch.messages:
37-
tg.create_task(_process(message))
38-
39-
# wait complete of process all messages from batch be taskgroup context manager
40-
# and commit complete batch
34+
while True:
35+
batch = await reader.receive_batch()
36+
...
4137
await reader.commit(batch)
4238

4339

44-
async def process_messages_batch_context_manager_commit(reader: ydb.TopicReaderAsyncIO):
45-
# Commit with context manager
46-
async for batch in reader.batches():
47-
async with reader.commit_on_exit(batch), asyncio.TaskGroup() as tg:
48-
for message in batch.messages:
49-
tg.create_task(_process(message))
50-
51-
5240
async def get_message_with_timeout(reader: ydb.TopicReaderAsyncIO):
5341
try:
5442
message = await asyncio.wait_for(reader.receive_message(), timeout=1)
55-
except TimeoutError:
43+
except asyncio.TimeoutError:
5644
print("Have no new messages in a second")
5745
return
5846

5947
print("mess", message.data)
6048

6149

6250
async def get_all_messages_with_small_wait(reader: ydb.TopicReaderAsyncIO):
63-
async for message in reader.messages(timeout=1):
64-
await _process(message)
65-
print("Have no new messages in a second")
51+
while True:
52+
try:
53+
message = await reader.receive_message()
54+
await _process(message)
55+
except asyncio.TimeoutError:
56+
print("Have no new messages in a second")
6657

6758

6859
async def get_a_message_from_external_loop(reader: ydb.TopicReaderAsyncIO):
6960
for i in range(10):
7061
try:
7162
message = await asyncio.wait_for(reader.receive_message(), timeout=1)
72-
except TimeoutError:
63+
except asyncio.TimeoutError:
7364
return
7465
await _process(message)
7566

@@ -78,7 +69,7 @@ async def get_one_batch_from_external_loop_async(reader: ydb.TopicReaderAsyncIO)
7869
for i in range(10):
7970
try:
8071
batch = await asyncio.wait_for(reader.receive_batch(), timeout=2)
81-
except TimeoutError:
72+
except asyncio.TimeoutError:
8273
return
8374

8475
for message in batch.messages:
@@ -92,27 +83,20 @@ async def auto_deserialize_message(db: ydb.aio.Driver):
9283
async with ydb.TopicClientAsyncIO(db).reader(
9384
"/database/topic/path", consumer="asd", deserializer=json.loads
9485
) as reader:
95-
async for message in reader.messages():
86+
while True:
87+
message = await reader.receive_message()
9688
print(
9789
message.data.Name
9890
) # message.data replaces by json.loads(message.data) of raw message
9991
reader.commit(message)
10092

10193

102-
async def commit_batch_with_context(reader: ydb.TopicReaderAsyncIO):
103-
async for batch in reader.batches():
104-
async with reader.commit_on_exit(batch):
105-
for message in batch.messages:
106-
if not batch.is_alive:
107-
break
108-
await _process(message)
109-
110-
11194
async def handle_partition_stop(reader: ydb.TopicReaderAsyncIO):
112-
async for message in reader.messages():
113-
time.sleep(1) # some work
95+
while True:
96+
message = await reader.receive_message()
97+
time.sleep(123) # some work
11498
if message.is_alive:
115-
time.sleep(123) # some other work
99+
time.sleep(1) # some other work
116100
await reader.commit(message)
117101

118102

@@ -126,7 +110,8 @@ def process_batch(batch):
126110
_process(message)
127111
reader.commit(batch)
128112

129-
async for batch in reader.batches():
113+
while True:
114+
batch = await reader.receive_batch()
130115
process_batch(batch)
131116

132117

@@ -137,18 +122,12 @@ async def connect_and_read_few_topics(db: ydb.aio.Driver):
137122
ydb.TopicSelector("/database/second-topic", partitions=3),
138123
]
139124
) as reader:
140-
async for message in reader.messages():
125+
while True:
126+
message = await reader.receive_message()
141127
await _process(message)
142128
await reader.commit(message)
143129

144130

145-
async def handle_partition_graceful_stop_batch(reader: ydb.TopicReaderAsyncIO):
146-
# no special handle, but batch will contain less than prefer count messages
147-
async for batch in reader.batches():
148-
await _process(batch)
149-
reader.commit(batch)
150-
151-
152131
async def advanced_commit_notify(db: ydb.aio.Driver):
153132
def on_commit(event: ydb.TopicReaderEvents.OnCommit) -> None:
154133
print(event.topic)
@@ -157,7 +136,8 @@ def on_commit(event: ydb.TopicReaderEvents.OnCommit) -> None:
157136
async with ydb.TopicClientAsyncIO(db).reader(
158137
"/local", consumer="consumer", commit_batch_time=4, on_commit=on_commit
159138
) as reader:
160-
async for message in reader.messages():
139+
while True:
140+
message = await reader.receive_message()
161141
await _process(message)
162142
await reader.commit(message)
163143

0 commit comments

Comments
 (0)