Skip to content

Commit 252ee4a

Browse files
authored
Merge pull request #277 custom topic settings
2 parents 86ddb39 + a8e84ba commit 252ee4a

File tree

13 files changed

+144
-47
lines changed

13 files changed

+144
-47
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Added support to set many topics and topic reader settings for read in one reader
2+
13
## 3.2.2 ##
24
* Fixed set keep_in_cache algorithm
35

tests/__init__.py

Whitespace-only changes.

tests/conftest.py

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,36 @@ async def topic_path(driver, topic_consumer, database) -> str:
160160

161161
@pytest.fixture()
162162
@pytest.mark.asyncio()
163-
async def topic_with_messages(driver, topic_path):
163+
async def topic2_path(driver, topic_consumer, database) -> str:
164+
topic_path = database + "/test-topic2"
165+
166+
try:
167+
await driver.topic_client.drop_topic(topic_path)
168+
except issues.SchemeError:
169+
pass
170+
171+
await driver.topic_client.create_topic(
172+
path=topic_path,
173+
consumers=[topic_consumer],
174+
)
175+
176+
return topic_path
177+
178+
179+
@pytest.fixture()
180+
@pytest.mark.asyncio()
181+
async def topic_with_messages(driver, topic_consumer, database):
182+
topic_path = database + "/test-topic-with-messages"
183+
try:
184+
await driver.topic_client.drop_topic(topic_path)
185+
except issues.SchemeError:
186+
pass
187+
188+
await driver.topic_client.create_topic(
189+
path=topic_path,
190+
consumers=[topic_consumer],
191+
)
192+
164193
writer = driver.topic_client.writer(topic_path, producer_id="fixture-producer-id", codec=ydb.TopicCodec.RAW)
165194
await writer.write_with_ack(
166195
[
@@ -175,6 +204,7 @@ async def topic_with_messages(driver, topic_path):
175204
]
176205
)
177206
await writer.close()
207+
return topic_path
178208

179209

180210
@pytest.fixture()

tests/ssl/__init__.py

Whitespace-only changes.

tests/table/__init__.py

Whitespace-only changes.

tests/topics/__init__.py

Whitespace-only changes.

tests/topics/test_topic_reader.py

Lines changed: 45 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55

66
@pytest.mark.asyncio
77
class TestTopicReaderAsyncIO:
8-
async def test_read_batch(self, driver, topic_path, topic_with_messages, topic_consumer):
9-
reader = driver.topic_client.reader(topic_path, topic_consumer)
8+
async def test_read_batch(self, driver, topic_with_messages, topic_consumer):
9+
reader = driver.topic_client.reader(topic_with_messages, topic_consumer)
1010
batch = await reader.receive_batch()
1111

1212
assert batch is not None
@@ -18,30 +18,30 @@ async def test_link_to_client(self, driver, topic_path, topic_consumer):
1818
reader = driver.topic_client.reader(topic_path, topic_consumer)
1919
assert reader._parent is driver.topic_client
2020

21-
async def test_read_message(self, driver, topic_path, topic_with_messages, topic_consumer):
22-
reader = driver.topic_client.reader(topic_path, topic_consumer)
21+
async def test_read_message(self, driver, topic_with_messages, topic_consumer):
22+
reader = driver.topic_client.reader(topic_with_messages, topic_consumer)
2323
msg = await reader.receive_message()
2424

2525
assert msg is not None
2626
assert msg.seqno
2727

2828
await reader.close()
2929

30-
async def test_read_and_commit_with_close_reader(self, driver, topic_path, topic_with_messages, topic_consumer):
31-
async with driver.topic_client.reader(topic_path, topic_consumer) as reader:
30+
async def test_read_and_commit_with_close_reader(self, driver, topic_with_messages, topic_consumer):
31+
async with driver.topic_client.reader(topic_with_messages, topic_consumer) as reader:
3232
message = await reader.receive_message()
3333
reader.commit(message)
3434

35-
async with driver.topic_client.reader(topic_path, topic_consumer) as reader:
35+
async with driver.topic_client.reader(topic_with_messages, topic_consumer) as reader:
3636
message2 = await reader.receive_message()
3737
assert message != message2
3838

39-
async def test_read_and_commit_with_ack(self, driver, topic_path, topic_with_messages, topic_consumer):
40-
reader = driver.topic_client.reader(topic_path, topic_consumer)
39+
async def test_read_and_commit_with_ack(self, driver, topic_with_messages, topic_consumer):
40+
reader = driver.topic_client.reader(topic_with_messages, topic_consumer)
4141
batch = await reader.receive_batch()
4242
await reader.commit_with_ack(batch)
4343

44-
reader = driver.topic_client.reader(topic_path, topic_consumer)
44+
reader = driver.topic_client.reader(topic_with_messages, topic_consumer)
4545
batch2 = await reader.receive_batch()
4646
assert batch.messages[0] != batch2.messages[0]
4747

@@ -71,10 +71,35 @@ def decode(b: bytes):
7171
batch = await reader.receive_batch()
7272
assert batch.messages[0].data.decode() == "123"
7373

74+
async def test_read_from_two_topics(self, driver, topic_path, topic2_path, topic_consumer):
75+
async with driver.topic_client.writer(topic_path) as writer:
76+
await writer.write("1")
77+
await writer.flush()
78+
79+
async with driver.topic_client.writer(topic2_path) as writer:
80+
await writer.write("2")
81+
await writer.flush()
82+
83+
messages = []
84+
async with driver.topic_client.reader(
85+
[
86+
topic_path,
87+
ydb.TopicReaderSelector(path=topic2_path),
88+
],
89+
consumer=topic_consumer,
90+
) as reader:
91+
for _ in range(2):
92+
message = await reader.receive_message()
93+
messages.append(message)
94+
95+
messages = [message.data.decode() for message in messages]
96+
messages.sort()
97+
assert messages == ["1", "2"]
98+
7499

75100
class TestTopicReaderSync:
76-
def test_read_batch(self, driver_sync, topic_path, topic_with_messages, topic_consumer):
77-
reader = driver_sync.topic_client.reader(topic_path, topic_consumer)
101+
def test_read_batch(self, driver_sync, topic_with_messages, topic_consumer):
102+
reader = driver_sync.topic_client.reader(topic_with_messages, topic_consumer)
78103
batch = reader.receive_batch()
79104

80105
assert batch is not None
@@ -86,30 +111,30 @@ def test_link_to_client(self, driver_sync, topic_path, topic_consumer):
86111
reader = driver_sync.topic_client.reader(topic_path, topic_consumer)
87112
assert reader._parent is driver_sync.topic_client
88113

89-
def test_read_message(self, driver_sync, topic_path, topic_with_messages, topic_consumer):
90-
reader = driver_sync.topic_client.reader(topic_path, topic_consumer)
114+
def test_read_message(self, driver_sync, topic_with_messages, topic_consumer):
115+
reader = driver_sync.topic_client.reader(topic_with_messages, topic_consumer)
91116
msg = reader.receive_message()
92117

93118
assert msg is not None
94119
assert msg.seqno
95120

96121
reader.close()
97122

98-
def test_read_and_commit_with_close_reader(self, driver_sync, topic_path, topic_with_messages, topic_consumer):
99-
with driver_sync.topic_client.reader(topic_path, topic_consumer) as reader:
123+
def test_read_and_commit_with_close_reader(self, driver_sync, topic_with_messages, topic_consumer):
124+
with driver_sync.topic_client.reader(topic_with_messages, topic_consumer) as reader:
100125
message = reader.receive_message()
101126
reader.commit(message)
102127

103-
with driver_sync.topic_client.reader(topic_path, topic_consumer) as reader:
128+
with driver_sync.topic_client.reader(topic_with_messages, topic_consumer) as reader:
104129
message2 = reader.receive_message()
105130
assert message != message2
106131

107-
def test_read_and_commit_with_ack(self, driver_sync, topic_path, topic_with_messages, topic_consumer):
108-
reader = driver_sync.topic_client.reader(topic_path, topic_consumer)
132+
def test_read_and_commit_with_ack(self, driver_sync, topic_with_messages, topic_consumer):
133+
reader = driver_sync.topic_client.reader(topic_with_messages, topic_consumer)
109134
batch = reader.receive_batch()
110135
reader.commit_with_ack(batch)
111136

112-
reader = driver_sync.topic_client.reader(topic_path, topic_consumer)
137+
reader = driver_sync.topic_client.reader(topic_with_messages, topic_consumer)
113138
batch2 = reader.receive_batch()
114139
assert batch.messages[0] != batch2.messages[0]
115140

ydb/_grpc/grpcwrapper/common_utils.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -274,14 +274,15 @@ async def to_thread(func, /, *args, **kwargs):
274274
return await loop.run_in_executor(None, func_call)
275275

276276

277-
def proto_duration_from_timedelta(t: Optional[datetime.timedelta]) -> ProtoDuration:
277+
def proto_duration_from_timedelta(t: Optional[datetime.timedelta]) -> Optional[ProtoDuration]:
278278
if t is None:
279279
return None
280+
280281
res = ProtoDuration()
281282
res.FromTimedelta(t)
282283

283284

284-
def proto_timestamp_from_datetime(t: Optional[datetime.datetime]) -> ProtoTimeStamp:
285+
def proto_timestamp_from_datetime(t: Optional[datetime.datetime]) -> Optional[ProtoTimeStamp]:
285286
if t is None:
286287
return None
287288

ydb/_grpc/grpcwrapper/ydb_topic.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -410,17 +410,23 @@ def to_proto(self) -> ydb_topic_pb2.StreamReadMessage.InitRequest:
410410
class TopicReadSettings(IToProto):
411411
path: str
412412
partition_ids: List[int] = field(default_factory=list)
413-
max_lag_seconds: Union[datetime.timedelta, None] = None
414-
read_from: Union[int, float, datetime.datetime, None] = None
413+
max_lag: Optional[datetime.timedelta] = None
414+
read_from: Optional[datetime.datetime] = None
415415

416416
def to_proto(
417417
self,
418418
) -> ydb_topic_pb2.StreamReadMessage.InitRequest.TopicReadSettings:
419419
res = ydb_topic_pb2.StreamReadMessage.InitRequest.TopicReadSettings()
420420
res.path = self.path
421421
res.partition_ids.extend(self.partition_ids)
422-
if self.max_lag_seconds is not None:
423-
res.max_lag = proto_duration_from_timedelta(self.max_lag_seconds)
422+
max_lag = proto_duration_from_timedelta(self.max_lag)
423+
if max_lag is not None:
424+
res.max_lag = max_lag
425+
426+
read_from = proto_timestamp_from_datetime(self.read_from)
427+
if read_from is not None:
428+
res.read_from = read_from
429+
424430
return res
425431

426432
@dataclass

ydb/_topic_reader/topic_reader.py

Lines changed: 40 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,21 +14,36 @@
1414
from .._grpc.grpcwrapper.ydb_topic import StreamReadMessage, OffsetsRange
1515

1616

17-
class Selector:
17+
@dataclass
18+
class PublicTopicSelector:
1819
path: str
19-
partitions: Union[None, int, List[int]]
20-
read_from_timestamp_ms: Optional[int]
21-
max_time_lag_ms: Optional[int]
20+
partitions: Optional[Union[int, List[int]]] = None
21+
read_from: Optional[datetime.datetime] = None
22+
max_lag: Optional[datetime.timedelta] = None
23+
24+
def _to_topic_read_settings(self) -> StreamReadMessage.InitRequest.TopicReadSettings:
25+
partitions = self.partitions
26+
if partitions is None:
27+
partitions = []
28+
29+
elif not isinstance(partitions, list):
30+
partitions = [partitions]
31+
32+
return StreamReadMessage.InitRequest.TopicReadSettings(
33+
path=self.path,
34+
partition_ids=partitions,
35+
max_lag=self.max_lag,
36+
read_from=self.read_from,
37+
)
38+
2239

23-
def __init__(self, path, *, partitions: Union[None, int, List[int]] = None):
24-
self.path = path
25-
self.partitions = partitions
40+
TopicSelectorTypes = Union[str, PublicTopicSelector, List[Union[str, PublicTopicSelector]]]
2641

2742

2843
@dataclass
2944
class PublicReaderSettings:
3045
consumer: str
31-
topic: str
46+
topic: TopicSelectorTypes
3247
buffer_size_bytes: int = 50 * 1024 * 1024
3348

3449
decoders: Union[Mapping[int, Callable[[bytes], bytes]], None] = None
@@ -43,12 +58,24 @@ def __post_init__(self):
4358
_ = self._init_message()
4459

4560
def _init_message(self) -> StreamReadMessage.InitRequest:
61+
if not isinstance(self.consumer, str):
62+
raise TypeError("Unsupported type for customer field: '%s'" % type(self.consumer))
63+
64+
if isinstance(self.topic, list):
65+
selectors = self.topic
66+
else:
67+
selectors = [self.topic]
68+
69+
for index, selector in enumerate(selectors):
70+
if isinstance(selector, str):
71+
selectors[index] = PublicTopicSelector(path=selector)
72+
elif isinstance(selector, PublicTopicSelector):
73+
pass
74+
else:
75+
raise TypeError("Unsupported type for topic field: '%s'" % type(selector))
76+
4677
return StreamReadMessage.InitRequest(
47-
topics_read_settings=[
48-
StreamReadMessage.InitRequest.TopicReadSettings(
49-
path=self.topic,
50-
)
51-
],
78+
topics_read_settings=list(map(PublicTopicSelector._to_topic_read_settings, selectors)), # type: ignore
5279
consumer=self.consumer,
5380
)
5481

0 commit comments

Comments
 (0)