Skip to content

Commit 1f94f7b

Browse files
committed
fix examples
1 parent 677edfd commit 1f94f7b

File tree

4 files changed

+71
-65
lines changed

4 files changed

+71
-65
lines changed

examples/topic/reader_async_example.py

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,14 @@ async def connect():
1010
connection_string="grpc://localhost:2135?database=/local",
1111
credentials=ydb.credentials.AnonymousCredentials(),
1212
)
13-
reader = ydb.TopicClientAsyncIO(db).reader("/local/topic", consumer="consumer")
13+
reader = db.topic_client.reader("/local/topic", consumer="consumer")
1414
return reader
1515

1616

1717
async def create_reader_and_close_with_context_manager(db: ydb.aio.Driver):
18-
async with ydb.TopicClientAsyncIO(db).reader(
18+
async with db.topic_client.reader(
1919
"/database/topic/path", consumer="consumer"
20-
) as reader:
20+
) as reader: # noqa
2121
...
2222

2323

@@ -80,7 +80,7 @@ async def get_one_batch_from_external_loop_async(reader: ydb.TopicReaderAsyncIO)
8080
async def auto_deserialize_message(db: ydb.aio.Driver):
8181
# async, batch work similar to this
8282

83-
async with ydb.TopicClientAsyncIO(db).reader(
83+
async with db.topic_client.reader(
8484
"/database/topic/path", consumer="asd", deserializer=json.loads
8585
) as reader:
8686
while True:
@@ -116,7 +116,7 @@ def process_batch(batch):
116116

117117

118118
async def connect_and_read_few_topics(db: ydb.aio.Driver):
119-
with ydb.TopicClientAsyncIO(db).reader(
119+
with db.topic_client.reader(
120120
[
121121
"/database/topic/path",
122122
ydb.TopicSelector("/database/second-topic", partitions=3),
@@ -133,7 +133,7 @@ def on_commit(event: ydb.TopicReaderEvents.OnCommit) -> None:
133133
print(event.topic)
134134
print(event.offset)
135135

136-
async with ydb.TopicClientAsyncIO(db).reader(
136+
async with db.topic_client.reader(
137137
"/local", consumer="consumer", commit_batch_time=4, on_commit=on_commit
138138
) as reader:
139139
while True:
@@ -151,12 +151,13 @@ async def on_get_partition_start_offset(
151151
resp.start_offset = 123
152152
return resp
153153

154-
async with ydb.TopicClient(db).reader(
154+
async with db.topic_client.reader(
155155
"/local/test",
156156
consumer="consumer",
157157
on_get_partition_start_offset=on_get_partition_start_offset,
158158
) as reader:
159-
async for mess in reader.messages():
159+
while True:
160+
mess = reader.receive_message()
160161
await _process(mess)
161162
# save progress to own database
162163

examples/topic/reader_example.py

Lines changed: 36 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -9,33 +9,37 @@ def connect():
99
connection_string="grpc://localhost:2135?database=/local",
1010
credentials=ydb.credentials.AnonymousCredentials(),
1111
)
12-
reader = ydb.TopicClient(db).reader("/local/topic", consumer="consumer")
12+
reader = db.topic_client.reader("/local/topic", consumer="consumer")
1313
return reader
1414

1515

1616
def create_reader_and_close_with_context_manager(db: ydb.Driver):
17-
with ydb.TopicClient(db).reader(
17+
with db.topic_client.reader(
1818
"/database/topic/path", consumer="consumer", buffer_size_bytes=123
1919
) as reader:
20-
for message in reader:
20+
while True:
21+
message = reader.receive_message() # noqa
2122
pass
2223

2324

2425
def print_message_content(reader: ydb.TopicReader):
25-
for message in reader.messages():
26+
while True:
27+
message = reader.receive_message()
2628
print("text", message.data.read().decode("utf-8"))
2729
reader.commit(message)
2830

2931

3032
def process_messages_batch_explicit_commit(reader: ydb.TopicReader):
31-
for batch in reader.batches(max_messages=100, timeout=2):
33+
while True:
34+
batch = reader.receive_batch()
3235
for message in batch.messages:
3336
_process(message)
3437
reader.commit(batch)
3538

3639

3740
def process_messages_batch_context_manager_commit(reader: ydb.TopicReader):
38-
for batch in reader.batches(max_messages=100, timeout=2):
41+
while True:
42+
batch = reader.receive_batch()
3943
with reader.commit_on_exit(batch):
4044
for message in batch.messages:
4145
_process(message)
@@ -52,9 +56,12 @@ def get_message_with_timeout(reader: ydb.TopicReader):
5256

5357

5458
def get_all_messages_with_small_wait(reader: ydb.TopicReader):
55-
for message in reader.messages(timeout=1):
56-
_process(message)
57-
print("Have no new messages in a second")
59+
while True:
60+
try:
61+
message = reader.receive_message(timeout=1)
62+
_process(message)
63+
except TimeoutError:
64+
print("Have no new messages in a second")
5865

5966

6067
def get_a_message_from_external_loop(reader: ydb.TopicReader):
@@ -81,30 +88,23 @@ def get_one_batch_from_external_loop(reader: ydb.TopicReader):
8188
def auto_deserialize_message(db: ydb.Driver):
8289
# async, batch work similar to this
8390

84-
reader = ydb.TopicClient(db).reader(
91+
reader = db.topic_client.reader(
8592
"/database/topic/path", consumer="asd", deserializer=json.loads
8693
)
87-
for message in reader.messages():
94+
while True:
95+
message = reader.receive_message()
8896
print(
8997
message.data.Name
9098
) # message.data replaces by json.loads(message.data) of raw message
9199
reader.commit(message)
92100

93101

94-
def commit_batch_with_context(reader: ydb.TopicReader):
95-
for batch in reader.batches():
96-
with reader.commit_on_exit(batch):
97-
for message in batch.messages:
98-
if not batch.is_alive:
99-
break
100-
_process(message)
101-
102-
103102
def handle_partition_stop(reader: ydb.TopicReader):
104-
for message in reader.messages():
105-
time.sleep(1) # some work
103+
while True:
104+
message = reader.receive_message()
105+
time.sleep(123) # some work
106106
if message.is_alive:
107-
time.sleep(123) # some other work
107+
time.sleep(1) # some other work
108108
reader.commit(message)
109109

110110

@@ -118,25 +118,28 @@ def process_batch(batch):
118118
_process(message)
119119
reader.commit(batch)
120120

121-
for batch in reader.batches():
121+
while True:
122+
batch = reader.receive_batch()
122123
process_batch(batch)
123124

124125

125126
def connect_and_read_few_topics(db: ydb.Driver):
126-
with ydb.TopicClient(db).reader(
127+
with db.topic_client.reader(
127128
[
128129
"/database/topic/path",
129130
ydb.TopicSelector("/database/second-topic", partitions=3),
130131
]
131132
) as reader:
132-
for message in reader:
133+
while True:
134+
message = reader.receive_message()
133135
_process(message)
134136
reader.commit(message)
135137

136138

137139
def handle_partition_graceful_stop_batch(reader: ydb.TopicReader):
138140
# no special handle, but batch will contain less than prefer count messages
139-
for batch in reader.batches():
141+
while True:
142+
batch = reader.receive_batch()
140143
_process(batch)
141144
reader.commit(batch)
142145

@@ -146,10 +149,11 @@ def on_commit(event: ydb.TopicReaderEvents.OnCommit) -> None:
146149
print(event.topic)
147150
print(event.offset)
148151

149-
with ydb.TopicClient(db).reader(
152+
with db.topic_client.reader(
150153
"/local", consumer="consumer", commit_batch_time=4, on_commit=on_commit
151154
) as reader:
152-
for message in reader:
155+
while True:
156+
message = reader.receive_message()
153157
with reader.commit_on_exit(message):
154158
_process(message)
155159

@@ -164,12 +168,13 @@ def on_get_partition_start_offset(
164168
resp.start_offset = 123
165169
return resp
166170

167-
with ydb.TopicClient(db).reader(
171+
with db.topic_client.reader(
168172
"/local/test",
169173
consumer="consumer",
170174
on_get_partition_start_offset=on_get_partition_start_offset,
171175
) as reader:
172-
for mess in reader:
176+
while True:
177+
mess = reader.receive_message()
173178
_process(mess)
174179
# save progress to own database
175180

examples/topic/writer_async_example.py

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,31 @@
11
import asyncio
2+
import datetime
23
import json
3-
import time
44
from typing import Dict, List
55

66
import ydb
77
from ydb import TopicWriterMessage
88

99

1010
async def create_writer(db: ydb.aio.Driver):
11-
async with ydb.TopicClientAsyncIO(db).writer(
11+
async with db.topic_client.writer(
1212
"/database/topic/path",
1313
producer_id="producer-id",
1414
) as writer:
1515
await writer.write(TopicWriterMessage("asd"))
1616

1717

1818
async def connect_and_wait(db: ydb.aio.Driver):
19-
async with ydb.TopicClientAsyncIO(db).writer(
19+
async with db.topic_client.writer(
2020
"/database/topic/path",
2121
producer_id="producer-id",
2222
) as writer:
23-
writer.wait_init()
23+
info = await writer.wait_init() # noqa
24+
...
2425

2526

2627
async def connect_without_context_manager(db: ydb.aio.Driver):
27-
writer = ydb.TopicClientAsyncIO(db).writer(
28+
writer = db.topic_client.writer(
2829
"/database/topic/path",
2930
producer_id="producer-id",
3031
)
@@ -49,7 +50,7 @@ async def send_messages(writer: ydb.TopicWriterAsyncIO):
4950

5051
# with meta
5152
await writer.write(
52-
ydb.TopicWriterMessage("asd", seqno=123, created_at_ns=time.time_ns())
53+
ydb.TopicWriterMessage("asd", seqno=123, created_at=datetime.datetime.now())
5354
)
5455

5556

@@ -71,7 +72,7 @@ async def send_messages_with_manual_seqno(writer: ydb.TopicWriter):
7172

7273
async def send_messages_with_wait_ack(writer: ydb.TopicWriterAsyncIO):
7374
# future wait
74-
await writer.write_with_result(
75+
await writer.write_with_ack(
7576
[
7677
ydb.TopicWriterMessage("mess", seqno=1),
7778
ydb.TopicWriterMessage("mess", seqno=2),
@@ -84,10 +85,10 @@ async def send_messages_with_wait_ack(writer: ydb.TopicWriterAsyncIO):
8485

8586

8687
async def send_json_message(db: ydb.aio.Driver):
87-
async with ydb.TopicClientAsyncIO(db).writer(
88+
async with db.topic_client.writer(
8889
"/database/path/topic", serializer=json.dumps
8990
) as writer:
90-
writer.write({"a": 123})
91+
await writer.write({"a": 123})
9192

9293

9394
async def send_messages_and_wait_all_commit_with_flush(writer: ydb.TopicWriterAsyncIO):
@@ -99,14 +100,11 @@ async def send_messages_and_wait_all_commit_with_flush(writer: ydb.TopicWriterAs
99100
async def send_messages_and_wait_all_commit_with_results(
100101
writer: ydb.TopicWriterAsyncIO,
101102
):
102-
last_future = None
103103
for i in range(10):
104104
content = "%s" % i
105-
last_future = await writer.write_with_ack(content)
105+
await writer.write(content)
106106

107-
await asyncio.wait(last_future)
108-
if last_future.exception() is not None:
109-
raise last_future.exception()
107+
await writer.flush()
110108

111109

112110
async def switch_messages_with_many_producers(
@@ -118,7 +116,7 @@ async def switch_messages_with_many_producers(
118116
# select writer for the msg
119117
writer_idx = msg[:1]
120118
writer = writers[writer_idx]
121-
future = await writer.write_with_ack(msg)
119+
future = await writer.write_with_ack_future(msg)
122120
futures.append(future)
123121

124122
# wait acks from all writes

examples/topic/writer_example.py

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,43 @@
11
import concurrent.futures
2+
import datetime
23
import json
3-
import time
44
from typing import Dict, List
55
from concurrent.futures import Future, wait
66

77
import ydb
88
from ydb import TopicWriterMessage
99

1010

11-
async def connect():
12-
db = ydb.aio.Driver(
11+
def connect():
12+
db = ydb.Driver(
1313
connection_string="grpc://localhost:2135?database=/local",
1414
credentials=ydb.credentials.AnonymousCredentials(),
1515
)
16-
writer = ydb.TopicClientAsyncIO(db).writer(
16+
writer = db.topic_client.writer(
1717
"/local/topic",
1818
producer_id="producer-id",
1919
)
20-
await writer.write(TopicWriterMessage("asd"))
20+
writer.write(TopicWriterMessage("asd"))
2121

2222

2323
def create_writer(db: ydb.Driver):
24-
with ydb.TopicClient(db).writer(
24+
with db.topic_client.writer(
2525
"/database/topic/path",
2626
producer_id="producer-id",
2727
) as writer:
2828
writer.write(TopicWriterMessage("asd"))
2929

3030

3131
def connect_and_wait(db: ydb.Driver):
32-
with ydb.TopicClient(db).writer(
32+
with db.topic_client.writer(
3333
"/database/topic/path",
3434
producer_id="producer-id",
3535
) as writer:
36-
writer.wait()
36+
info = writer.wait_init() # noqa
3737

3838

3939
def connect_without_context_manager(db: ydb.Driver):
40-
writer = ydb.TopicClient(db).writer(
40+
writer = db.topic_client.writer(
4141
"/database/topic/path",
4242
producer_id="producer-id",
4343
)
@@ -61,7 +61,9 @@ def send_messages(writer: ydb.TopicWriter):
6161
) # send few messages by one call
6262

6363
# with meta
64-
writer.write(ydb.TopicWriterMessage("asd", seqno=123, created_at_ns=time.time_ns()))
64+
writer.write(
65+
ydb.TopicWriterMessage("asd", seqno=123, created_at=datetime.datetime.now())
66+
)
6567

6668

6769
def send_message_without_block_if_internal_buffer_is_full(
@@ -101,7 +103,7 @@ def send_messages_with_wait_ack(writer: ydb.TopicWriter):
101103

102104

103105
def send_json_message(db: ydb.Driver):
104-
with ydb.TopicClient(db).writer(
106+
with db.topic_client.writer(
105107
"/database/path/topic", serializer=json.dumps
106108
) as writer:
107109
writer.write({"a": 123})

0 commit comments

Comments
 (0)