Skip to content

Commit 918d1b2

Browse files
authored
Merge pull request #242 clear interface
2 parents 34dd869 + abb6876 commit 918d1b2

File tree

12 files changed

+44
-375
lines changed

12 files changed

+44
-375
lines changed

examples/topic/reader_async_example.py

Lines changed: 2 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import asyncio
2-
import json
32
import time
43

54
import ydb
@@ -77,33 +76,19 @@ async def get_one_batch_from_external_loop_async(reader: ydb.TopicReaderAsyncIO)
7776
await reader.commit(batch)
7877

7978

80-
async def auto_deserialize_message(db: ydb.aio.Driver):
81-
# async, batch work similar to this
82-
83-
async with db.topic_client.reader(
84-
"/database/topic/path", consumer="asd", deserializer=json.loads
85-
) as reader:
86-
while True:
87-
message = await reader.receive_message()
88-
print(
89-
message.data.Name
90-
) # message.data replaces by json.loads(message.data) of raw message
91-
reader.commit(message)
92-
93-
9479
async def handle_partition_stop(reader: ydb.TopicReaderAsyncIO):
9580
while True:
9681
message = await reader.receive_message()
9782
time.sleep(123) # some work
98-
if message.is_alive:
83+
if message.alive:
9984
time.sleep(1) # some other work
10085
await reader.commit(message)
10186

10287

10388
async def handle_partition_stop_batch(reader: ydb.TopicReaderAsyncIO):
10489
def process_batch(batch):
10590
for message in batch.messages:
106-
if not batch.is_alive:
91+
if not batch.alive:
10792
# no reason work with expired batch
10893
# go read next - good batch
10994
return
@@ -115,55 +100,5 @@ def process_batch(batch):
115100
process_batch(batch)
116101

117102

118-
async def connect_and_read_few_topics(db: ydb.aio.Driver):
119-
with db.topic_client.reader(
120-
[
121-
"/database/topic/path",
122-
ydb.TopicSelector("/database/second-topic", partitions=3),
123-
]
124-
) as reader:
125-
while True:
126-
message = await reader.receive_message()
127-
await _process(message)
128-
await reader.commit(message)
129-
130-
131-
async def advanced_commit_notify(db: ydb.aio.Driver):
132-
def on_commit(event: ydb.TopicReaderEvents.OnCommit) -> None:
133-
print(event.topic)
134-
print(event.offset)
135-
136-
async with db.topic_client.reader(
137-
"/local", consumer="consumer", commit_batch_time=4, on_commit=on_commit
138-
) as reader:
139-
while True:
140-
message = await reader.receive_message()
141-
await _process(message)
142-
await reader.commit(message)
143-
144-
145-
async def advanced_read_with_own_progress_storage(db: ydb.TopicReaderAsyncIO):
146-
async def on_get_partition_start_offset(
147-
req: ydb.TopicReaderEvents.OnPartitionGetStartOffsetRequest,
148-
) -> ydb.TopicReaderEvents.OnPartitionGetStartOffsetResponse:
149-
# read current progress from database
150-
resp = ydb.TopicReaderEvents.OnPartitionGetStartOffsetResponse()
151-
resp.start_offset = 123
152-
return resp
153-
154-
async with db.topic_client.reader(
155-
"/local/test",
156-
consumer="consumer",
157-
on_get_partition_start_offset=on_get_partition_start_offset,
158-
) as reader:
159-
while True:
160-
mess = reader.receive_message()
161-
await _process(mess)
162-
# save progress to own database
163-
164-
# no commit progress to topic service
165-
# reader.commit(mess)
166-
167-
168103
async def _process(msg):
169104
raise NotImplementedError()

examples/topic/reader_example.py

Lines changed: 2 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import json
21
import time
32

43
import ydb
@@ -37,14 +36,6 @@ def process_messages_batch_explicit_commit(reader: ydb.TopicReader):
3736
reader.commit(batch)
3837

3938

40-
def process_messages_batch_context_manager_commit(reader: ydb.TopicReader):
41-
while True:
42-
batch = reader.receive_batch()
43-
with reader.commit_on_exit(batch):
44-
for message in batch.messages:
45-
_process(message)
46-
47-
4839
def get_message_with_timeout(reader: ydb.TopicReader):
4940
try:
5041
message = reader.receive_message(timeout=1)
@@ -85,33 +76,19 @@ def get_one_batch_from_external_loop(reader: ydb.TopicReader):
8576
reader.commit(batch)
8677

8778

88-
def auto_deserialize_message(db: ydb.Driver):
89-
# async, batch work similar to this
90-
91-
reader = db.topic_client.reader(
92-
"/database/topic/path", consumer="asd", deserializer=json.loads
93-
)
94-
while True:
95-
message = reader.receive_message()
96-
print(
97-
message.data.Name
98-
) # message.data replaces by json.loads(message.data) of raw message
99-
reader.commit(message)
100-
101-
10279
def handle_partition_stop(reader: ydb.TopicReader):
10380
while True:
10481
message = reader.receive_message()
10582
time.sleep(123) # some work
106-
if message.is_alive:
83+
if message.alive:
10784
time.sleep(1) # some other work
10885
reader.commit(message)
10986

11087

11188
def handle_partition_stop_batch(reader: ydb.TopicReader):
11289
def process_batch(batch):
11390
for message in batch.messages:
114-
if not batch.is_alive:
91+
if not batch.alive:
11592
# no reason work with expired batch
11693
# go read next - good batch
11794
return
@@ -123,19 +100,6 @@ def process_batch(batch):
123100
process_batch(batch)
124101

125102

126-
def connect_and_read_few_topics(db: ydb.Driver):
127-
with db.topic_client.reader(
128-
[
129-
"/database/topic/path",
130-
ydb.TopicSelector("/database/second-topic", partitions=3),
131-
]
132-
) as reader:
133-
while True:
134-
message = reader.receive_message()
135-
_process(message)
136-
reader.commit(message)
137-
138-
139103
def handle_partition_graceful_stop_batch(reader: ydb.TopicReader):
140104
# no special handle, but batch will contain less than prefer count messages
141105
while True:
@@ -144,54 +108,5 @@ def handle_partition_graceful_stop_batch(reader: ydb.TopicReader):
144108
reader.commit(batch)
145109

146110

147-
def advanced_commit_notify(db: ydb.Driver):
148-
def on_commit(event: ydb.TopicReaderEvents.OnCommit) -> None:
149-
print(event.topic)
150-
print(event.offset)
151-
152-
with db.topic_client.reader(
153-
"/local", consumer="consumer", commit_batch_time=4, on_commit=on_commit
154-
) as reader:
155-
while True:
156-
message = reader.receive_message()
157-
with reader.commit_on_exit(message):
158-
_process(message)
159-
160-
161-
def advanced_read_with_own_progress_storage(db: ydb.TopicReader):
162-
def on_get_partition_start_offset(
163-
req: ydb.TopicReaderEvents.OnPartitionGetStartOffsetRequest,
164-
) -> ydb.TopicReaderEvents.OnPartitionGetStartOffsetResponse:
165-
166-
# read current progress from database
167-
resp = ydb.TopicReaderEvents.OnPartitionGetStartOffsetResponse()
168-
resp.start_offset = 123
169-
return resp
170-
171-
with db.topic_client.reader(
172-
"/local/test",
173-
consumer="consumer",
174-
on_get_partition_start_offset=on_get_partition_start_offset,
175-
) as reader:
176-
while True:
177-
mess = reader.receive_message()
178-
_process(mess)
179-
# save progress to own database
180-
181-
# no commit progress to topic service
182-
# reader.commit(mess)
183-
184-
185-
def get_current_statistics(reader: ydb.TopicReader):
186-
# sync
187-
stat = reader.sessions_stat()
188-
print(stat)
189-
190-
# with feature
191-
f = reader.async_sessions_stat()
192-
stat = f.result()
193-
print(stat)
194-
195-
196111
def _process(msg):
197112
raise NotImplementedError()

examples/topic/writer_async_example.py

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import asyncio
22
import datetime
3-
import json
43
from typing import Dict, List
54

65
import ydb
@@ -84,13 +83,6 @@ async def send_messages_with_wait_ack(writer: ydb.TopicWriterAsyncIO):
8483
await writer.flush()
8584

8685

87-
async def send_json_message(db: ydb.aio.Driver):
88-
async with db.topic_client.writer(
89-
"/database/path/topic", serializer=json.dumps
90-
) as writer:
91-
await writer.write({"a": 123})
92-
93-
9486
async def send_messages_and_wait_all_commit_with_flush(writer: ydb.TopicWriterAsyncIO):
9587
for i in range(10):
9688
await writer.write(ydb.TopicWriterMessage("%s" % i))
@@ -127,8 +119,3 @@ async def switch_messages_with_many_producers(
127119

128120
# all ok, explicit return - for better
129121
return
130-
131-
132-
async def get_current_statistics(reader: ydb.TopicReaderAsyncIO):
133-
stat = await reader.sessions_stat()
134-
print(stat)

examples/topic/writer_example.py

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import concurrent.futures
22
import datetime
3-
import json
43
from typing import Dict, List
54
from concurrent.futures import Future, wait
65

@@ -85,7 +84,10 @@ def send_messages_with_manual_seqno(writer: ydb.TopicWriter):
8584
def send_messages_with_wait_ack(writer: ydb.TopicWriter):
8685
# Explicit future wait
8786
writer.async_write_with_ack(
88-
ydb.TopicWriterMessage("mess", seqno=1), ydb.TopicWriterMessage("mess", seqno=2)
87+
[
88+
ydb.TopicWriterMessage("mess", seqno=1),
89+
ydb.TopicWriterMessage("mess", seqno=2),
90+
]
8991
).result()
9092

9193
# implicit, by sync call
@@ -102,13 +104,6 @@ def send_messages_with_wait_ack(writer: ydb.TopicWriter):
102104
writer.flush()
103105

104106

105-
def send_json_message(db: ydb.Driver):
106-
with db.topic_client.writer(
107-
"/database/path/topic", serializer=json.dumps
108-
) as writer:
109-
writer.write({"a": 123})
110-
111-
112107
def send_messages_and_wait_all_commit_with_flush(writer: ydb.TopicWriter):
113108
for i in range(10):
114109
content = "%s" % i

ydb/_topic_reader/datatypes.py

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from collections import deque
88
from dataclasses import dataclass, field
99
import datetime
10-
from typing import Mapping, Union, Any, List, Dict, Deque, Optional
10+
from typing import Union, Any, List, Dict, Deque, Optional
1111

1212
from ydb._grpc.grpcwrapper.ydb_topic import OffsetsRange, Codec
1313
from ydb._topic_reader import topic_reader_asyncio
@@ -26,7 +26,7 @@ def _commit_get_offsets_range(self) -> OffsetsRange:
2626
class ISessionAlive(abc.ABC):
2727
@property
2828
@abc.abstractmethod
29-
def is_alive(self) -> bool:
29+
def alive(self) -> bool:
3030
pass
3131

3232

@@ -54,8 +54,8 @@ def _commit_get_offsets_range(self) -> OffsetsRange:
5454

5555
# ISessionAlive implementation
5656
@property
57-
def is_alive(self) -> bool:
58-
raise NotImplementedError()
57+
def alive(self) -> bool:
58+
return not self._partition_session.closed
5959

6060

6161
@dataclass
@@ -127,16 +127,18 @@ def ack_notify(self, offset: int):
127127
break
128128

129129
def close(self):
130-
try:
131-
self._ensure_not_closed()
132-
except topic_reader_asyncio.TopicReaderCommitToExpiredPartition:
130+
if self.closed:
133131
return
134132

135133
self.state = PartitionSession.State.Stopped
136134
exception = topic_reader_asyncio.TopicReaderCommitToExpiredPartition()
137135
for waiter in self._ack_waiters:
138136
waiter._finish_error(exception)
139137

138+
@property
139+
def closed(self):
140+
return self.state == PartitionSession.State.Stopped
141+
140142
def _ensure_not_closed(self):
141143
if self.state == PartitionSession.State.Stopped:
142144
raise topic_reader_asyncio.TopicReaderCommitToExpiredPartition()
@@ -164,7 +166,6 @@ def _finish_error(self, error: Exception):
164166

165167
@dataclass
166168
class PublicBatch(ICommittable, ISessionAlive):
167-
session_metadata: Mapping[str, str]
168169
messages: List[PublicMessage]
169170
_partition_session: PartitionSession
170171
_bytes_size: int
@@ -184,12 +185,8 @@ def empty(self) -> bool:
184185

185186
# ISessionAlive implementation
186187
@property
187-
def is_alive(self) -> bool:
188-
state = self._partition_session.state
189-
return (
190-
state == PartitionSession.State.Active
191-
or state == PartitionSession.State.GracefulShutdown
192-
)
188+
def alive(self) -> bool:
189+
return not self._partition_session.closed
193190

194191
def pop_message(self) -> PublicMessage:
195192
return self.messages.pop(0)

ydb/_topic_reader/topic_reader.py

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,19 +36,6 @@ class PublicReaderSettings:
3636

3737
# decoder_executor, must be set for handle non raw messages
3838
decoder_executor: Optional[concurrent.futures.Executor] = None
39-
40-
# on_commit: Callable[["Events.OnCommit"], None] = None
41-
# on_get_partition_start_offset: Callable[
42-
# ["Events.OnPartitionGetStartOffsetRequest"],
43-
# "Events.OnPartitionGetStartOffsetResponse",
44-
# ] = None
45-
# on_partition_session_start: Callable[["StubEvent"], None] = None
46-
# on_partition_session_stop: Callable[["StubEvent"], None] = None
47-
# on_partition_session_close: Callable[["StubEvent"], None] = None # todo?
48-
# deserializer: Union[Callable[[bytes], Any], None] = None
49-
# one_attempt_connection_timeout: Union[float, None] = 1
50-
# connection_timeout: Union[float, None] = None
51-
# retry_policy: Union["RetryPolicy", None] = None
5239
update_token_interval: Union[int, float] = 3600
5340

5441
def _init_message(self) -> StreamReadMessage.InitRequest:

0 commit comments

Comments
 (0)