@@ -182,6 +182,14 @@ async def close(self, flush: bool = True):
182182 await self ._reconnector .close (flush )
183183
184184
185+ class PublicAsyncIONoConsumerReader (PublicAsyncIOReader ):
186+ def commit (self , batch : typing .Union [datatypes .PublicMessage , datatypes .PublicBatch ]):
187+ raise issues .Error ("Commit operations are not supported for topic reader without consumer." )
188+
189+ async def commit_with_ack (self , batch : typing .Union [datatypes .PublicMessage , datatypes .PublicBatch ]):
190+ raise issues .Error ("Commit operations are not supported for topic reader without consumer." )
191+
192+
185193class ReaderReconnector :
186194 _static_reader_reconnector_counter = AtomicCounter ()
187195
@@ -393,6 +401,7 @@ class ReaderStream:
393401 _update_token_interval : Union [int , float ]
394402 _update_token_event : asyncio .Event
395403 _get_token_function : Callable [[], str ]
404+ _settings : topic_reader .PublicReaderSettings
396405
397406 def __init__ (
398407 self ,
@@ -425,6 +434,8 @@ def __init__(
425434 self ._get_token_function = get_token_function
426435 self ._update_token_event = asyncio .Event ()
427436
437+ self ._settings = settings
438+
428439 @staticmethod
429440 async def create (
430441 reader_reconnector_id : int ,
@@ -615,7 +626,7 @@ async def _read_messages_loop(self):
615626 message .server_message ,
616627 StreamReadMessage .StartPartitionSessionRequest ,
617628 ):
618- self ._on_start_partition_session (message .server_message )
629+ await self ._on_start_partition_session (message .server_message )
619630
620631 elif isinstance (
621632 message .server_message ,
@@ -660,7 +671,7 @@ async def _update_token(self, token: str):
660671 finally :
661672 self ._update_token_event .clear ()
662673
663- def _on_start_partition_session (self , message : StreamReadMessage .StartPartitionSessionRequest ):
674+ async def _on_start_partition_session (self , message : StreamReadMessage .StartPartitionSessionRequest ):
664675 try :
665676 if message .partition_session .partition_session_id in self ._partition_sessions :
666677 raise TopicReaderError (
@@ -676,11 +687,19 @@ def _on_start_partition_session(self, message: StreamReadMessage.StartPartitionS
676687 reader_reconnector_id = self ._reader_reconnector_id ,
677688 reader_stream_id = self ._id ,
678689 )
690+
691+ read_offset = None
692+ callee = self ._settings .get_start_offset_lambda
693+ if callee is not None :
694+ read_offset = callee (message .partition_session .partition_id )
695+ if asyncio .iscoroutinefunction (callee ):
696+ read_offset = await read_offset
697+
679698 self ._stream .write (
680699 StreamReadMessage .FromClient (
681700 client_message = StreamReadMessage .StartPartitionSessionResponse (
682701 partition_session_id = message .partition_session .partition_session_id ,
683- read_offset = None ,
702+ read_offset = read_offset ,
684703 commit_offset = None ,
685704 )
686705 ),
0 commit comments