Skip to content

Commit 59e46a3

Browse files
committed
No Consumer Reader
1 parent d980334 commit 59e46a3

File tree

5 files changed

+104
-6
lines changed

5 files changed

+104
-6
lines changed

tests/topics/test_topic_reader.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,3 +251,54 @@ async def wait(fut):
251251

252252
await reader0.close()
253253
await reader1.close()
254+
255+
256+
@pytest.mark.asyncio
257+
class TestTopicNoConsumerReaderAsyncIO:
258+
async def test_read_message(self, driver, topic_with_messages):
259+
reader = driver.topic_client.no_consumer_reader(topic_with_messages, [0], lambda x: None)
260+
await reader.wait_message()
261+
msg = await reader.receive_message()
262+
263+
assert msg is not None
264+
assert msg.seqno
265+
266+
await reader.close()
267+
268+
async def test_read_with_offset(self, driver, topic_with_messages):
269+
reader = driver.topic_client.no_consumer_reader(topic_with_messages, [0], lambda x: 1)
270+
await reader.wait_message()
271+
msg = await reader.receive_message()
272+
273+
assert msg is not None
274+
assert msg.seqno == 2
275+
276+
await reader.close()
277+
278+
async def test_offsets_updated_after_reconnect(self, driver, topic_with_messages):
279+
current_offset = 0
280+
281+
def get_start_offset_lambda(partition_id: int) -> int:
282+
nonlocal current_offset
283+
return current_offset
284+
285+
reader = driver.topic_client.no_consumer_reader(topic_with_messages, [0], get_start_offset_lambda)
286+
await reader.wait_message()
287+
msg = await reader.receive_message()
288+
289+
assert msg is not None
290+
assert msg.seqno == current_offset + 1
291+
292+
current_offset += 2
293+
294+
reader._reconnector._stream_reader._set_first_error(ydb.Unavailable("some retriable error"))
295+
296+
await asyncio.sleep(0)
297+
298+
await reader.wait_message()
299+
msg = await reader.receive_message()
300+
301+
assert msg is not None
302+
assert msg.seqno == current_offset + 1
303+
304+
await reader.close()

ydb/_grpc/grpcwrapper/ydb_topic.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -439,12 +439,13 @@ def from_proto(
439439
@dataclass
440440
class InitRequest(IToProto):
441441
topics_read_settings: List["StreamReadMessage.InitRequest.TopicReadSettings"]
442-
consumer: str
442+
consumer: Optional[str]
443443
auto_partitioning_support: bool
444444

445445
def to_proto(self) -> ydb_topic_pb2.StreamReadMessage.InitRequest:
446446
res = ydb_topic_pb2.StreamReadMessage.InitRequest()
447-
res.consumer = self.consumer
447+
if self.consumer is not None:
448+
res.consumer = self.consumer
448449
for settings in self.topics_read_settings:
449450
res.topics_read_settings.append(settings.to_proto())
450451
res.auto_partitioning_support = self.auto_partitioning_support

ydb/_topic_reader/topic_reader.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ def _to_topic_read_settings(self) -> StreamReadMessage.InitRequest.TopicReadSett
4242

4343
@dataclass
4444
class PublicReaderSettings:
45-
consumer: str
45+
consumer: Optional[str]
4646
topic: TopicSelectorTypes
4747
buffer_size_bytes: int = 50 * 1024 * 1024
4848
auto_partitioning_support: bool = True
@@ -54,12 +54,15 @@ class PublicReaderSettings:
5454
decoder_executor: Optional[concurrent.futures.Executor] = None
5555
update_token_interval: Union[int, float] = 3600
5656

57+
partition_ids: Optional[List[int]] = None
58+
get_start_offset_lambda: Optional[Callable[[int], int]] = None
59+
5760
def __post_init__(self):
5861
# check possible create init message
5962
_ = self._init_message()
6063

6164
def _init_message(self) -> StreamReadMessage.InitRequest:
62-
if not isinstance(self.consumer, str):
65+
if self.consumer is not None and not isinstance(self.consumer, str):
6366
raise TypeError("Unsupported type for customer field: '%s'" % type(self.consumer))
6467

6568
if isinstance(self.topic, list):
@@ -69,7 +72,7 @@ def _init_message(self) -> StreamReadMessage.InitRequest:
6972

7073
for index, selector in enumerate(selectors):
7174
if isinstance(selector, str):
72-
selectors[index] = PublicTopicSelector(path=selector)
75+
selectors[index] = PublicTopicSelector(path=selector, partitions=self.partition_ids)
7376
elif isinstance(selector, PublicTopicSelector):
7477
pass
7578
else:

ydb/_topic_reader/topic_reader_asyncio.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,14 @@ async def close(self, flush: bool = True):
182182
await self._reconnector.close(flush)
183183

184184

185+
class PublicAsyncIONoConsumerReader(PublicAsyncIOReader):
186+
def commit(self, batch: typing.Union[datatypes.PublicMessage, datatypes.PublicBatch]):
187+
raise issues.Error("Commit operations are not supported for topic reader without consumer.")
188+
189+
async def commit_with_ack(self, batch: typing.Union[datatypes.PublicMessage, datatypes.PublicBatch]):
190+
raise issues.Error("Commit operations are not supported for topic reader without consumer.")
191+
192+
185193
class ReaderReconnector:
186194
_static_reader_reconnector_counter = AtomicCounter()
187195

@@ -393,6 +401,7 @@ class ReaderStream:
393401
_update_token_interval: Union[int, float]
394402
_update_token_event: asyncio.Event
395403
_get_token_function: Callable[[], str]
404+
_settings: topic_reader.PublicReaderSettings
396405

397406
def __init__(
398407
self,
@@ -425,6 +434,8 @@ def __init__(
425434
self._get_token_function = get_token_function
426435
self._update_token_event = asyncio.Event()
427436

437+
self._settings = settings
438+
428439
@staticmethod
429440
async def create(
430441
reader_reconnector_id: int,
@@ -676,11 +687,16 @@ def _on_start_partition_session(self, message: StreamReadMessage.StartPartitionS
676687
reader_reconnector_id=self._reader_reconnector_id,
677688
reader_stream_id=self._id,
678689
)
690+
691+
read_offset = None
692+
if self._settings.get_start_offset_lambda is not None:
693+
read_offset = self._settings.get_start_offset_lambda(message.partition_session.partition_id)
694+
679695
self._stream.write(
680696
StreamReadMessage.FromClient(
681697
client_message=StreamReadMessage.StartPartitionSessionResponse(
682698
partition_session_id=message.partition_session.partition_session_id,
683-
read_offset=None,
699+
read_offset=read_offset,
684700
commit_offset=None,
685701
)
686702
),

ydb/topic.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
"TopicError",
1515
"TopicMeteringMode",
1616
"TopicReader",
17+
"TopicNoConsumerReaderAsyncIO",
1718
"TopicReaderAsyncIO",
1819
"TopicReaderBatch",
1920
"TopicReaderMessage",
@@ -56,6 +57,7 @@
5657

5758
from ._topic_reader.topic_reader_asyncio import (
5859
PublicAsyncIOReader as TopicReaderAsyncIO,
60+
PublicAsyncIONoConsumerReader as TopicNoConsumerReaderAsyncIO,
5961
PublicTopicReaderPartitionExpiredError as TopicReaderPartitionExpiredError,
6062
PublicTopicReaderUnexpectedCodecError as TopicReaderUnexpectedCodecError,
6163
)
@@ -261,6 +263,31 @@ def reader(
261263

262264
return TopicReaderAsyncIO(self._driver, settings, _parent=self)
263265

266+
def no_consumer_reader(
267+
self,
268+
topic: Union[str, TopicReaderSelector, List[Union[str, TopicReaderSelector]]],
269+
partition_ids: List[int],
270+
get_start_offset_lambda: Callable[[int], int],
271+
buffer_size_bytes: int = 50 * 1024 * 1024,
272+
# decoders: map[codec_code] func(encoded_bytes)->decoded_bytes
273+
# the func will be called from multiply threads in parallel
274+
decoders: Union[Mapping[int, Callable[[bytes], bytes]], None] = None,
275+
# custom decoder executor for call builtin and custom decoders. If None - use shared executor pool.
276+
# if max_worker in the executor is 1 - then decoders will be called from the thread without parallel
277+
decoder_executor: Optional[concurrent.futures.Executor] = None,
278+
auto_partitioning_support: Optional[bool] = True, # Auto partitioning feature flag. Default - True.
279+
) -> TopicNoConsumerReaderAsyncIO:
280+
if not decoder_executor:
281+
decoder_executor = self._executor
282+
283+
args = locals().copy()
284+
del args["self"]
285+
args["consumer"] = None
286+
287+
settings = TopicReaderSettings(**args)
288+
289+
return TopicNoConsumerReaderAsyncIO(self._driver, settings, _parent=self)
290+
264291
def writer(
265292
self,
266293
topic,

0 commit comments

Comments
 (0)