Skip to content

Commit 3854f04

Browse files
committed
initial topic writer
1 parent f9c5d13 commit 3854f04

28 files changed

+3020
-12
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,5 @@ ydb.egg-info/
33
/.idea
44
/tox
55
/venv
6-
/ydb_certs
6+
/ydb_certs
7+
/tmp

AUTHORS

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
The following authors have created the source code of "YDB Python SDK"
1+
The following authors have created the source code of "Yandex Database Python SDK"
22
published and distributed by YANDEX LLC as the owner:
33

44
Vitalii Gridnev <[email protected]>
5+
Timofey Koolin <[email protected]>
Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
import asyncio
2+
import json
3+
import time
4+
5+
import ydb
6+
7+
8+
async def connect():
9+
db = ydb.aio.Driver(connection_string="grpc://localhost:2135?database=/local", credentials=ydb.credentials.AnonymousCredentials())
10+
reader = ydb.TopicClientAsyncIO(db).topic_reader("/local/topic", consumer="consumer")
11+
12+
13+
async def create_reader_and_close_with_context_manager(db: ydb.aio.Driver):
14+
with ydb.TopicClientAsyncIO(db).topic_reader("/database/topic/path", consumer="consumer") as reader:
15+
async for message in reader.messages():
16+
pass
17+
18+
19+
async def print_message_content(reader: ydb.TopicReaderAsyncIO):
20+
async for message in reader.messages():
21+
print("text", message.data.read().decode("utf-8"))
22+
# await and async_commit need only for sync commit mode - for wait ack from servr
23+
await reader.commit(message)
24+
25+
26+
async def process_messages_batch_explicit_commit(reader: ydb.TopicReaderAsyncIO):
27+
# Explicit commit example
28+
async for batch in reader.batches(max_messages=100, timeout=2):
29+
async with asyncio.TaskGroup() as tg:
30+
for message in batch.messages:
31+
tg.create_task(_process(message))
32+
33+
# wait complete of process all messages from batch be taskgroup context manager
34+
# and commit complete batch
35+
await reader.commit(batch)
36+
37+
38+
async def process_messages_batch_context_manager_commit(reader: ydb.TopicReaderAsyncIO):
39+
# Commit with context manager
40+
async for batch in reader.batches():
41+
async with reader.commit_on_exit(batch), asyncio.TaskGroup() as tg:
42+
for message in batch.messages:
43+
tg.create_task(_process(message))
44+
45+
46+
async def get_message_with_timeout(reader: ydb.TopicReaderAsyncIO):
47+
try:
48+
message = await asyncio.wait_for(reader.receive_message(), timeout=1)
49+
except TimeoutError:
50+
print("Have no new messages in a second")
51+
return
52+
53+
print("mess", message.data)
54+
55+
56+
async def get_all_messages_with_small_wait(reader: ydb.TopicReaderAsyncIO):
57+
async for message in reader.messages(timeout=1):
58+
await _process(message)
59+
print("Have no new messages in a second")
60+
61+
62+
async def get_a_message_from_external_loop(reader: ydb.TopicReaderAsyncIO):
63+
for i in range(10):
64+
try:
65+
message = await asyncio.wait_for(reader.receive_message(), timeout=1)
66+
except TimeoutError:
67+
return
68+
await _process(message)
69+
70+
71+
async def get_one_batch_from_external_loop_async(reader: ydb.TopicReaderAsyncIO):
72+
for i in range(10):
73+
try:
74+
batch = await asyncio.wait_for(reader.receive_batch(), timeout=2)
75+
except TimeoutError:
76+
return
77+
78+
for message in batch.messages:
79+
await _process(message)
80+
await reader.commit(batch)
81+
82+
83+
async def auto_deserialize_message(db: ydb.aio.Driver):
84+
# async, batch work similar to this
85+
86+
async with ydb.TopicClientAsyncIO(db).topic_reader("/database/topic/path", consumer="asd", deserializer=json.loads) as reader:
87+
async for message in reader.messages():
88+
print(message.data.Name) # message.data replaces by json.loads(message.data) of raw message
89+
reader.commit(message)
90+
91+
92+
async def commit_batch_with_context(reader: ydb.TopicReaderAsyncIO):
93+
async for batch in reader.batches():
94+
async with reader.commit_on_exit(batch):
95+
for message in batch.messages:
96+
if not batch.is_alive:
97+
break
98+
await _process(message)
99+
100+
101+
async def handle_partition_stop(reader: ydb.TopicReaderAsyncIO):
102+
async for message in reader.messages():
103+
time.sleep(1) # some work
104+
if message.is_alive:
105+
time.sleep(123) # some other work
106+
await reader.commit(message)
107+
108+
109+
async def handle_partition_stop_batch(reader: ydb.TopicReaderAsyncIO):
110+
def process_batch(batch):
111+
for message in batch.messages:
112+
if not batch.is_alive:
113+
# no reason work with expired batch
114+
# go read next - good batch
115+
return
116+
await _process(message)
117+
await reader.commit(batch)
118+
119+
async for batch in reader.batches():
120+
process_batch(batch)
121+
122+
123+
async def connect_and_read_few_topics(db: ydb.aio.Driver):
124+
with ydb.TopicClientAsyncIO(db).topic_reader(
125+
["/database/topic/path", ydb.TopicSelector("/database/second-topic", partitions=3)]) as reader:
126+
async for message in reader.messages():
127+
await _process(message)
128+
await reader.commit(message)
129+
130+
131+
async def handle_partition_graceful_stop_batch(reader: ydb.TopicReaderAsyncIO):
132+
# no special handle, but batch will contain less than prefer count messages
133+
async for batch in reader.batches():
134+
await _process(batch)
135+
reader.commit(batch)
136+
137+
138+
async def advanced_commit_notify(db: ydb.aio.Driver):
139+
def on_commit(event: ydb.TopicReaderEvents.OnCommit) -> None:
140+
print(event.topic)
141+
print(event.offset)
142+
143+
async with ydb.TopicClientAsyncIO(db).topic_reader("/local",
144+
consumer="consumer",
145+
commit_batch_time=4,
146+
on_commit=on_commit) as reader:
147+
async for message in reader.messages():
148+
await _process(message)
149+
await reader.commit(message)
150+
151+
152+
async def advanced_read_with_own_progress_storage(db: ydb.TopicReaderAsyncIO):
153+
async def on_get_partition_start_offset(req: ydb.TopicReaderEvents.OnPartitionGetStartOffsetRequest) -> \
154+
ydb.TopicReaderEvents.OnPartitionGetStartOffsetResponse:
155+
# read current progress from database
156+
resp = ydb.TopicReaderEvents.OnPartitionGetStartOffsetResponse()
157+
resp.start_offset = 123
158+
return resp
159+
160+
async with ydb.TopicClient(db).topic_reader("/local/test", consumer="consumer",
161+
on_get_partition_start_offset=on_get_partition_start_offset
162+
) as reader:
163+
async for mess in reader.messages():
164+
await _process(mess)
165+
# save progress to own database
166+
167+
# no commit progress to topic service
168+
# reader.commit(mess)
169+
170+
171+
async def _process(msg):
172+
raise NotImplementedError()

examples/topic/reader_example.py

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
import json
2+
import time
3+
4+
import ydb
5+
6+
7+
def connect():
8+
db = ydb.Driver(connection_string="grpc://localhost:2135?database=/local", credentials=ydb.credentials.AnonymousCredentials())
9+
reader = ydb.TopicClient(db).topic_reader("/local/topic", consumer="consumer")
10+
11+
12+
def create_reader_and_close_with_context_manager(db: ydb.Driver):
13+
with ydb.TopicClient(db).topic_reader("/database/topic/path", consumer="consumer", buffer_size_bytes=123) as reader:
14+
for message in reader:
15+
pass
16+
17+
18+
def print_message_content(reader: ydb.TopicReader):
19+
for message in reader.messages():
20+
print("text", message.data.read().decode("utf-8"))
21+
reader.commit(message)
22+
23+
24+
def process_messages_batch_explicit_commit(reader: ydb.TopicReader):
25+
for batch in reader.batches(max_messages=100, timeout=2):
26+
for message in batch.messages:
27+
_process(message)
28+
reader.commit(batch)
29+
30+
31+
def process_messages_batch_context_manager_commit(reader: ydb.TopicReader):
32+
for batch in reader.batches(max_messages=100, timeout=2):
33+
with reader.commit_on_exit(batch):
34+
for message in batch.messages:
35+
_process(message)
36+
37+
38+
def get_message_with_timeout(reader: ydb.TopicReader):
39+
try:
40+
message = reader.receive_message(timeout=1)
41+
except TimeoutError:
42+
print("Have no new messages in a second")
43+
return
44+
45+
print("mess", message.data)
46+
47+
48+
def get_all_messages_with_small_wait(reader: ydb.TopicReader):
49+
for message in reader.messages(timeout=1):
50+
_process(message)
51+
print("Have no new messages in a second")
52+
53+
54+
def get_a_message_from_external_loop(reader: ydb.TopicReader):
55+
for i in range(10):
56+
try:
57+
message = reader.receive_message(timeout=1)
58+
except TimeoutError:
59+
return
60+
_process(message)
61+
62+
63+
def get_one_batch_from_external_loop(reader: ydb.TopicReader):
64+
for i in range(10):
65+
try:
66+
batch = reader.receive_batch(timeout=2)
67+
except TimeoutError:
68+
return
69+
70+
for message in batch.messages:
71+
_process(message)
72+
reader.commit(batch)
73+
74+
75+
def auto_deserialize_message(db: ydb.Driver):
76+
# async, batch work similar to this
77+
78+
reader = ydb.TopicClient(db).topic_reader("/database/topic/path", consumer="asd", deserializer=json.loads)
79+
for message in reader.messages():
80+
print(message.data.Name) # message.data replaces by json.loads(message.data) of raw message
81+
reader.commit(message)
82+
83+
84+
def commit_batch_with_context(reader: ydb.TopicReader):
85+
for batch in reader.batches():
86+
with reader.commit_on_exit(batch):
87+
for message in batch.messages:
88+
if not batch.is_alive:
89+
break
90+
_process(message)
91+
92+
93+
def handle_partition_stop(reader: ydb.TopicReader):
94+
for message in reader.messages():
95+
time.sleep(1) # some work
96+
if message.is_alive:
97+
time.sleep(123) # some other work
98+
reader.commit(message)
99+
100+
101+
def handle_partition_stop_batch(reader: ydb.TopicReader):
102+
def process_batch(batch):
103+
for message in batch.messages:
104+
if not batch.is_alive:
105+
# no reason work with expired batch
106+
# go read next - good batch
107+
return
108+
_process(message)
109+
reader.commit(batch)
110+
111+
for batch in reader.batches():
112+
process_batch(batch)
113+
114+
115+
def connect_and_read_few_topics(db: ydb.Driver):
116+
with ydb.TopicClient(db).topic_reader(["/database/topic/path", ydb.TopicSelector("/database/second-topic", partitions=3)]) as reader:
117+
for message in reader:
118+
_process(message)
119+
reader.commit(message)
120+
121+
122+
def handle_partition_graceful_stop_batch(reader: ydb.TopicReader):
123+
# no special handle, but batch will contain less than prefer count messages
124+
for batch in reader.batches():
125+
_process(batch)
126+
reader.commit(batch)
127+
128+
129+
def advanced_commit_notify(db: ydb.Driver):
130+
def on_commit(event: ydb.TopicReaderEvents.OnCommit) -> None:
131+
print(event.topic)
132+
print(event.offset)
133+
134+
with ydb.TopicClient(db).topic_reader("/local", consumer="consumer", commit_batch_time=4, on_commit=on_commit) as reader:
135+
for message in reader:
136+
with reader.commit_on_exit(message):
137+
_process(message)
138+
139+
140+
def advanced_read_with_own_progress_storage(db: ydb.TopicReader):
141+
def on_get_partition_start_offset(req: ydb.TopicReaderEvents.OnPartitionGetStartOffsetRequest) -> \
142+
ydb.TopicReaderEvents.OnPartitionGetStartOffsetResponse:
143+
144+
# read current progress from database
145+
resp = ydb.TopicReaderEvents.OnPartitionGetStartOffsetResponse()
146+
resp.start_offset = 123
147+
return resp
148+
149+
with ydb.TopicClient(db).topic_reader("/local/test", consumer="consumer",
150+
on_get_partition_start_offset=on_get_partition_start_offset
151+
) as reader:
152+
for mess in reader:
153+
_process(mess)
154+
# save progress to own database
155+
156+
# no commit progress to topic service
157+
# reader.commit(mess)
158+
159+
160+
def get_current_statistics(reader: ydb.TopicReader):
161+
# sync
162+
stat = reader.sessions_stat()
163+
print(stat)
164+
165+
# with feature
166+
f = reader.async_sessions_stat()
167+
stat = f.result()
168+
print(stat)
169+
170+
171+
def _process(msg):
172+
raise NotImplementedError()
173+

0 commit comments

Comments
 (0)