Skip to content

Commit 65fccc5

Browse files
authored
Merge pull request #238 fix topic examples
2 parents fa3693a + 1f94f7b commit 65fccc5

File tree

4 files changed

+99
-113
lines changed

4 files changed

+99
-113
lines changed

examples/topic/reader_async_example.py

Lines changed: 37 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -10,66 +10,57 @@ async def connect():
1010
connection_string="grpc://localhost:2135?database=/local",
1111
credentials=ydb.credentials.AnonymousCredentials(),
1212
)
13-
reader = ydb.TopicClientAsyncIO(db).reader("/local/topic", consumer="consumer")
13+
reader = db.topic_client.reader("/local/topic", consumer="consumer")
1414
return reader
1515

1616

1717
async def create_reader_and_close_with_context_manager(db: ydb.aio.Driver):
18-
with ydb.TopicClientAsyncIO(db).reader(
18+
async with db.topic_client.reader(
1919
"/database/topic/path", consumer="consumer"
20-
) as reader:
21-
async for message in reader.messages():
22-
pass
20+
) as reader: # noqa
21+
...
2322

2423

2524
async def print_message_content(reader: ydb.TopicReaderAsyncIO):
26-
async for message in reader.messages():
25+
while True:
26+
message = await reader.receive_message()
2727
print("text", message.data.read().decode("utf-8"))
2828
# await and async_commit need only for sync commit mode - for wait ack from servr
2929
await reader.commit(message)
3030

3131

32-
async def process_messages_batch_explicit_commit(reader: ydb.TopicReaderAsyncIO):
32+
async def process_messages_batch_with_commit(reader: ydb.TopicReaderAsyncIO):
3333
# Explicit commit example
34-
async for batch in reader.batches(max_messages=100, timeout=2):
35-
async with asyncio.TaskGroup() as tg:
36-
for message in batch.messages:
37-
tg.create_task(_process(message))
38-
39-
# wait complete of process all messages from batch be taskgroup context manager
40-
# and commit complete batch
34+
while True:
35+
batch = await reader.receive_batch()
36+
...
4137
await reader.commit(batch)
4238

4339

44-
async def process_messages_batch_context_manager_commit(reader: ydb.TopicReaderAsyncIO):
45-
# Commit with context manager
46-
async for batch in reader.batches():
47-
async with reader.commit_on_exit(batch), asyncio.TaskGroup() as tg:
48-
for message in batch.messages:
49-
tg.create_task(_process(message))
50-
51-
5240
async def get_message_with_timeout(reader: ydb.TopicReaderAsyncIO):
5341
try:
5442
message = await asyncio.wait_for(reader.receive_message(), timeout=1)
55-
except TimeoutError:
43+
except asyncio.TimeoutError:
5644
print("Have no new messages in a second")
5745
return
5846

5947
print("mess", message.data)
6048

6149

6250
async def get_all_messages_with_small_wait(reader: ydb.TopicReaderAsyncIO):
63-
async for message in reader.messages(timeout=1):
64-
await _process(message)
65-
print("Have no new messages in a second")
51+
while True:
52+
try:
53+
message = await reader.receive_message()
54+
await _process(message)
55+
except asyncio.TimeoutError:
56+
print("Have no new messages in a second")
6657

6758

6859
async def get_a_message_from_external_loop(reader: ydb.TopicReaderAsyncIO):
6960
for i in range(10):
7061
try:
7162
message = await asyncio.wait_for(reader.receive_message(), timeout=1)
72-
except TimeoutError:
63+
except asyncio.TimeoutError:
7364
return
7465
await _process(message)
7566

@@ -78,7 +69,7 @@ async def get_one_batch_from_external_loop_async(reader: ydb.TopicReaderAsyncIO)
7869
for i in range(10):
7970
try:
8071
batch = await asyncio.wait_for(reader.receive_batch(), timeout=2)
81-
except TimeoutError:
72+
except asyncio.TimeoutError:
8273
return
8374

8475
for message in batch.messages:
@@ -89,30 +80,23 @@ async def get_one_batch_from_external_loop_async(reader: ydb.TopicReaderAsyncIO)
8980
async def auto_deserialize_message(db: ydb.aio.Driver):
9081
# async, batch work similar to this
9182

92-
async with ydb.TopicClientAsyncIO(db).reader(
83+
async with db.topic_client.reader(
9384
"/database/topic/path", consumer="asd", deserializer=json.loads
9485
) as reader:
95-
async for message in reader.messages():
86+
while True:
87+
message = await reader.receive_message()
9688
print(
9789
message.data.Name
9890
) # message.data replaces by json.loads(message.data) of raw message
9991
reader.commit(message)
10092

10193

102-
async def commit_batch_with_context(reader: ydb.TopicReaderAsyncIO):
103-
async for batch in reader.batches():
104-
async with reader.commit_on_exit(batch):
105-
for message in batch.messages:
106-
if not batch.is_alive:
107-
break
108-
await _process(message)
109-
110-
11194
async def handle_partition_stop(reader: ydb.TopicReaderAsyncIO):
112-
async for message in reader.messages():
113-
time.sleep(1) # some work
95+
while True:
96+
message = await reader.receive_message()
97+
time.sleep(123) # some work
11498
if message.is_alive:
115-
time.sleep(123) # some other work
99+
time.sleep(1) # some other work
116100
await reader.commit(message)
117101

118102

@@ -126,38 +110,34 @@ def process_batch(batch):
126110
_process(message)
127111
reader.commit(batch)
128112

129-
async for batch in reader.batches():
113+
while True:
114+
batch = await reader.receive_batch()
130115
process_batch(batch)
131116

132117

133118
async def connect_and_read_few_topics(db: ydb.aio.Driver):
134-
with ydb.TopicClientAsyncIO(db).reader(
119+
with db.topic_client.reader(
135120
[
136121
"/database/topic/path",
137122
ydb.TopicSelector("/database/second-topic", partitions=3),
138123
]
139124
) as reader:
140-
async for message in reader.messages():
125+
while True:
126+
message = await reader.receive_message()
141127
await _process(message)
142128
await reader.commit(message)
143129

144130

145-
async def handle_partition_graceful_stop_batch(reader: ydb.TopicReaderAsyncIO):
146-
# no special handle, but batch will contain less than prefer count messages
147-
async for batch in reader.batches():
148-
await _process(batch)
149-
reader.commit(batch)
150-
151-
152131
async def advanced_commit_notify(db: ydb.aio.Driver):
153132
def on_commit(event: ydb.TopicReaderEvents.OnCommit) -> None:
154133
print(event.topic)
155134
print(event.offset)
156135

157-
async with ydb.TopicClientAsyncIO(db).reader(
136+
async with db.topic_client.reader(
158137
"/local", consumer="consumer", commit_batch_time=4, on_commit=on_commit
159138
) as reader:
160-
async for message in reader.messages():
139+
while True:
140+
message = await reader.receive_message()
161141
await _process(message)
162142
await reader.commit(message)
163143

@@ -171,12 +151,13 @@ async def on_get_partition_start_offset(
171151
resp.start_offset = 123
172152
return resp
173153

174-
async with ydb.TopicClient(db).reader(
154+
async with db.topic_client.reader(
175155
"/local/test",
176156
consumer="consumer",
177157
on_get_partition_start_offset=on_get_partition_start_offset,
178158
) as reader:
179-
async for mess in reader.messages():
159+
while True:
160+
mess = reader.receive_message()
180161
await _process(mess)
181162
# save progress to own database
182163

examples/topic/reader_example.py

Lines changed: 36 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -9,33 +9,37 @@ def connect():
99
connection_string="grpc://localhost:2135?database=/local",
1010
credentials=ydb.credentials.AnonymousCredentials(),
1111
)
12-
reader = ydb.TopicClient(db).reader("/local/topic", consumer="consumer")
12+
reader = db.topic_client.reader("/local/topic", consumer="consumer")
1313
return reader
1414

1515

1616
def create_reader_and_close_with_context_manager(db: ydb.Driver):
17-
with ydb.TopicClient(db).reader(
17+
with db.topic_client.reader(
1818
"/database/topic/path", consumer="consumer", buffer_size_bytes=123
1919
) as reader:
20-
for message in reader:
20+
while True:
21+
message = reader.receive_message() # noqa
2122
pass
2223

2324

2425
def print_message_content(reader: ydb.TopicReader):
25-
for message in reader.messages():
26+
while True:
27+
message = reader.receive_message()
2628
print("text", message.data.read().decode("utf-8"))
2729
reader.commit(message)
2830

2931

3032
def process_messages_batch_explicit_commit(reader: ydb.TopicReader):
31-
for batch in reader.batches(max_messages=100, timeout=2):
33+
while True:
34+
batch = reader.receive_batch()
3235
for message in batch.messages:
3336
_process(message)
3437
reader.commit(batch)
3538

3639

3740
def process_messages_batch_context_manager_commit(reader: ydb.TopicReader):
38-
for batch in reader.batches(max_messages=100, timeout=2):
41+
while True:
42+
batch = reader.receive_batch()
3943
with reader.commit_on_exit(batch):
4044
for message in batch.messages:
4145
_process(message)
@@ -52,9 +56,12 @@ def get_message_with_timeout(reader: ydb.TopicReader):
5256

5357

5458
def get_all_messages_with_small_wait(reader: ydb.TopicReader):
55-
for message in reader.messages(timeout=1):
56-
_process(message)
57-
print("Have no new messages in a second")
59+
while True:
60+
try:
61+
message = reader.receive_message(timeout=1)
62+
_process(message)
63+
except TimeoutError:
64+
print("Have no new messages in a second")
5865

5966

6067
def get_a_message_from_external_loop(reader: ydb.TopicReader):
@@ -81,30 +88,23 @@ def get_one_batch_from_external_loop(reader: ydb.TopicReader):
8188
def auto_deserialize_message(db: ydb.Driver):
8289
# async, batch work similar to this
8390

84-
reader = ydb.TopicClient(db).reader(
91+
reader = db.topic_client.reader(
8592
"/database/topic/path", consumer="asd", deserializer=json.loads
8693
)
87-
for message in reader.messages():
94+
while True:
95+
message = reader.receive_message()
8896
print(
8997
message.data.Name
9098
) # message.data replaces by json.loads(message.data) of raw message
9199
reader.commit(message)
92100

93101

94-
def commit_batch_with_context(reader: ydb.TopicReader):
95-
for batch in reader.batches():
96-
with reader.commit_on_exit(batch):
97-
for message in batch.messages:
98-
if not batch.is_alive:
99-
break
100-
_process(message)
101-
102-
103102
def handle_partition_stop(reader: ydb.TopicReader):
104-
for message in reader.messages():
105-
time.sleep(1) # some work
103+
while True:
104+
message = reader.receive_message()
105+
time.sleep(123) # some work
106106
if message.is_alive:
107-
time.sleep(123) # some other work
107+
time.sleep(1) # some other work
108108
reader.commit(message)
109109

110110

@@ -118,25 +118,28 @@ def process_batch(batch):
118118
_process(message)
119119
reader.commit(batch)
120120

121-
for batch in reader.batches():
121+
while True:
122+
batch = reader.receive_batch()
122123
process_batch(batch)
123124

124125

125126
def connect_and_read_few_topics(db: ydb.Driver):
126-
with ydb.TopicClient(db).reader(
127+
with db.topic_client.reader(
127128
[
128129
"/database/topic/path",
129130
ydb.TopicSelector("/database/second-topic", partitions=3),
130131
]
131132
) as reader:
132-
for message in reader:
133+
while True:
134+
message = reader.receive_message()
133135
_process(message)
134136
reader.commit(message)
135137

136138

137139
def handle_partition_graceful_stop_batch(reader: ydb.TopicReader):
138140
# no special handle, but batch will contain less than prefer count messages
139-
for batch in reader.batches():
141+
while True:
142+
batch = reader.receive_batch()
140143
_process(batch)
141144
reader.commit(batch)
142145

@@ -146,10 +149,11 @@ def on_commit(event: ydb.TopicReaderEvents.OnCommit) -> None:
146149
print(event.topic)
147150
print(event.offset)
148151

149-
with ydb.TopicClient(db).reader(
152+
with db.topic_client.reader(
150153
"/local", consumer="consumer", commit_batch_time=4, on_commit=on_commit
151154
) as reader:
152-
for message in reader:
155+
while True:
156+
message = reader.receive_message()
153157
with reader.commit_on_exit(message):
154158
_process(message)
155159

@@ -164,12 +168,13 @@ def on_get_partition_start_offset(
164168
resp.start_offset = 123
165169
return resp
166170

167-
with ydb.TopicClient(db).reader(
171+
with db.topic_client.reader(
168172
"/local/test",
169173
consumer="consumer",
170174
on_get_partition_start_offset=on_get_partition_start_offset,
171175
) as reader:
172-
for mess in reader:
176+
while True:
177+
mess = reader.receive_message()
173178
_process(mess)
174179
# save progress to own database
175180

0 commit comments

Comments
 (0)