|
14 | 14 | "TopicError", |
15 | 15 | "TopicMeteringMode", |
16 | 16 | "TopicReader", |
| 17 | + "TopicNoConsumerReaderAsyncIO", |
17 | 18 | "TopicReaderAsyncIO", |
18 | 19 | "TopicReaderBatch", |
19 | 20 | "TopicReaderMessage", |
|
56 | 57 |
|
57 | 58 | from ._topic_reader.topic_reader_asyncio import ( |
58 | 59 | PublicAsyncIOReader as TopicReaderAsyncIO, |
| 60 | + PublicAsyncIONoConsumerReader as TopicNoConsumerReaderAsyncIO, |
59 | 61 | PublicTopicReaderPartitionExpiredError as TopicReaderPartitionExpiredError, |
60 | 62 | PublicTopicReaderUnexpectedCodecError as TopicReaderUnexpectedCodecError, |
61 | 63 | ) |
@@ -261,6 +263,31 @@ def reader( |
261 | 263 |
|
262 | 264 | return TopicReaderAsyncIO(self._driver, settings, _parent=self) |
263 | 265 |
|
| 266 | + def no_consumer_reader( |
| 267 | + self, |
| 268 | + topic: Union[str, TopicReaderSelector, List[Union[str, TopicReaderSelector]]], |
| 269 | + partition_ids: List[int], |
| 270 | + get_start_offset_lambda: Callable[[int], int], |
| 271 | + buffer_size_bytes: int = 50 * 1024 * 1024, |
| 272 | + # decoders: map[codec_code] func(encoded_bytes)->decoded_bytes |
| 273 | + # the func will be called from multiply threads in parallel |
| 274 | + decoders: Union[Mapping[int, Callable[[bytes], bytes]], None] = None, |
| 275 | + # custom decoder executor for call builtin and custom decoders. If None - use shared executor pool. |
| 276 | + # if max_worker in the executor is 1 - then decoders will be called from the thread without parallel |
| 277 | + decoder_executor: Optional[concurrent.futures.Executor] = None, |
| 278 | + auto_partitioning_support: Optional[bool] = True, # Auto partitioning feature flag. Default - True. |
| 279 | + ) -> TopicNoConsumerReaderAsyncIO: |
| 280 | + if not decoder_executor: |
| 281 | + decoder_executor = self._executor |
| 282 | + |
| 283 | + args = locals().copy() |
| 284 | + del args["self"] |
| 285 | + args['consumer'] = None |
| 286 | + |
| 287 | + settings = TopicReaderSettings(**args) |
| 288 | + |
| 289 | + return TopicNoConsumerReaderAsyncIO(self._driver, settings, _parent=self) |
| 290 | + |
264 | 291 | def writer( |
265 | 292 | self, |
266 | 293 | topic, |
|
0 commit comments